use 5.016; use strict; use warnings; use ZMQ::FFI; use ZMQ::FFI::Constants qw(ZMQ_PUSH ZMQ_PULL); use POSIX qw(:sys_wait_h _POSIX_PIPE_BUF setsid _exit); use IO::Select; my ($zmq_admin_ctx, $zmq_admin_socket); my ($terminate, $sigchild); $SIG{__DIE__} = sub { return if (defined($^S) && $^S); # in eval return if !defined($^S); # parsing say "DIE ($_[0])"; return; }; $SIG{TERM} = sub { local $!; # don't overwrite $! say "$$ SIGTERM caught - terminating"; $terminate = 1; }; my $pid = fork; if (!defined $pid) { die "Failed to fork - $!"; } elsif ($pid) { # parent say "Started pull child $pid"; my ($reader, $writer); if (!pipe($reader, $writer)) { die "Failed pipe - $!"; } $reader->autoflush(1); $writer->autoflush(1); my $pid2 = fork; if (!defined($pid2)) { die "Failed to fork - $!"; } elsif ($pid2) { say "Started push child $pid2"; close $reader; sub got_sigchild { local $!; # don't overwrite $! $sigchild++; say "SIGCHLD Child has died"; }; $SIG{CHLD} = \&got_sigchild; while (1) { if ($terminate) { close_admin_zmq(); say "Waiting on $pid"; my $s = waitpid($pid, 0); say "waitpid returned $s"; exit(0); } elsif ($sigchild) { say "Waiting on $pid2"; my $s = waitpid($pid2, 0); say "waitpid returned $s"; say "waiting 5s and killing pull child"; sleep 5; kill 'TERM', $pid; exit(0); } } } else { close $writer; zmq_child_push($reader); } } else { zmq_child_pull(); } sub zmq_child_push { my ($reader) = @_; my $select = IO::Select->new(); $select->add(fileno $reader); sleep 2; ($zmq_admin_ctx, $zmq_admin_socket) = open_admin_zmq(); while (1) { my @ready = $select->can_read(2); my $buf; if (scalar(@ready)) { my $read = sysread($reader, $buf, 64*1024, length($buf)); say "Read $read"; } else { # timeout eval { say "Sending"; $zmq_admin_socket->send("hello"); }; say "send failed - $@" if $@; sleep 10; say "push child dying"; die "FRED"; } } } sub zmq_child_pull { my $context = ZMQ::FFI->new(); my $receiver = $context->socket(ZMQ_PULL); $receiver->bind('tcp://*:9990'); my $string; while (1) { if ($terminate) { say "child got terminate"; last; } eval { $string = $receiver->recv(); }; if (my $ev = $@) { say "Failed to recv on zmq socket - $@"; next; } say "0MQ Received: $string"; } $receiver->close; $context->destroy; say "child exiting"; exit(1); } sub open_admin_zmq { say "open_admin_zmq"; my $ctx = ZMQ::FFI->new(); # Socket to send messages on my $sender = $ctx->socket(ZMQ_PUSH); $sender->connect('tcp://localhost:9990'); return ($ctx, $sender); } sub close_admin_zmq { if ($zmq_admin_socket) { say "close_admin_zmq"; $zmq_admin_socket->disconnect('tcp://localhost:9990'); $zmq_admin_socket->close; $zmq_admin_ctx->destroy; } return; }