Beefy Boxes and Bandwidth Generously Provided by pair Networks
We don't bite newbies here... much
 
PerlMonks  

Child process inter communication

by smarthacker67 (Beadle)
on Jun 13, 2017 at 07:04 UTC ( [id://1192662]=perlquestion: print w/replies, xml ) Need Help??

smarthacker67 has asked for the wisdom of the Perl Monks concerning the following question:

HI ,

I need help achieving the child process to child process communication.Forked a multiple child, now lets say PID 1,PID 2,PID 3 I wish to send command fetched on PID1 (saying I want to connect to PID 3) to PID 3 how to achieve this ?

Tried searched a lot about Perl Pipes ,IPC but couldn't succeed. basically I can read command on PID1 but want to send it to particular PID XX how to select such PID ?

Thanks in advance.

Replies are listed 'Best First'.
Re: Child process inter communication
by zentara (Archbishop) on Jun 13, 2017 at 13:11 UTC
    Hi, these following codes have been posted before in some lost node numbers. But here is the way to do it: Also see Fork and wait question. You can setup 2 way pipes too, so you can have bi-directional communication between parent and child.
    #!/usr/bin/perl use warnings; use strict; #piping all child output to parent # by Zaxo of perlmonks # we'll define some subroutiness to act as children # and open a single pipe. my @kids = ( sub { print "First!", $/; }, sub { print "Second!", $/; }, sub { print "Third!", $/; }, ); pipe my ( $in, $out ); # Now we start breeding children. Each will close the $in handle, # select the $out handle as the default, set up autoflush on it, # and then call their sub. We definitely need to prevent zombies, # since the kids will all finish before the parent does, so we # start by declaring a hash where we will keep their pid's. my %kid; for (@kids) { my $cpid = fork; defined $cpid or warn("Couldn't fork"), next; $cpid or do { # child close $in or die $!; select $out; $| = 1; $_->(); exit 0; }; $kid{$cpid} = undef; } # The select statement is the one-arg kind, which windows should # handle ok. Back in the parent, now, we have an unwanted $out handle, # a single $in handle that the kids are all fighting to talk on, and # a hash to remind us of the kids' names. Wrapping up, we close $out, # listen to the kids in turn, decorate their messages to show # who's repeating them, and finally call wait enough times to # bury them all safely. We must be careful not to die before that. close $out or warn $!; s/.$/??? says the child/, print while <$in>; delete $kid{ +wait } while %kid; # Many-to-one pipes like this will keep their messages' integrity # in flush-sized chunks. So long as they are shorter than than # an I/O buffer and end with $/, they will be read as distinct. # You may want to have the kids tag their messages to # identify the source. # I've tested this on Windows XP and Linux. It works on both.
    or simpler, pipe all output to parent
    #!/usr/bin/perl use strict; use warnings; my $pid = open(CHILD, "-|"); if ($pid) { # parent print "parent got:\n"; print while(<CHILD>); close CHILD; # this waits on child } elsif ($pid == 0) { # child print "kid here!\n"; exec '/bin/date' or die "Can't exec date $!\n"; } else { die "fork error: $!\n"; }

    I'm not really a human, but I play one on earth. ..... an animated JAPH
Re: Child process inter communication
by Corion (Patriarch) on Jun 13, 2017 at 09:04 UTC

    What do you mean by "select such PID"?

    Are you asking about a good algorithm to do load scheduling? Or do you want to know how to pass information from your master process to child process XX?

    Personally, I would look into queues, and invert the problem. Have the master post jobs into the queue and the workers take jobs from that queue. That way ou don't have idle workers.

    Such a setup can range from directories into which jobs are placed and moved via rename by the clients to various Queues. Depending on your needs about the queues (persistence, deliver at most once, deliver at least once), you could use Forks::Queue or Thread::Queue or Queue::DBI or IPC::DirQueue, or ZMQ or MQSeries::Queue.

    A reply falls below the community's threshold of quality. You may see it by logging in.
Re: Child process inter communication
by hippo (Bishop) on Jun 13, 2017 at 08:56 UTC

    TIMTOWTDI. There are so many approaches for this and many of them are discussed in perlipc - have you read it? Did you try the examples in it? In what way were you unable to apply these examples to your specific problem?

Re: Child process inter communication
by tybalt89 (Monsignor) on Jun 14, 2017 at 01:42 UTC

    Here's my guess.

    #!/usr/bin/perl # http://perlmonks.org/?node_id=1192662 use strict; use warnings; use IO::Select; my $childcount = 3; my $hasterminal = 1; my %who; my %pipes; for my $from (1 .. $childcount) { for my $to (1 .. $from - 1, $from + 1 .. $childcount) { pipe( my $readhandle, my $writehandle) or die "$! on pipe"; $writehandle->autoflush; $pipes{$from}{$to}{rh} = $readhandle; $pipes{$from}{$to}{wh} = $writehandle; } } for my $me (1 .. $childcount) { if( my $pid = fork ) # parent { $who{$pid} = $me; } elsif( defined $pid ) # child { my $sel = IO::Select->new; $me == $hasterminal and $sel->add(*STDIN); for my $from (1 .. $me - 1, $me + 1 .. $childcount) { $sel->add($pipes{$from}{$me}{rh}); close $pipes{$from}{$me}{wh}; } while(1) { for my $handle ($sel->can_read) { defined( my $command = <$handle> ) or exit; print "$me got $command"; $command =~ /^(\d+)\s+(.*\n)/ and $1 != $me and print { $pipes{$me}{$1}{wh} } $2; } } } else { die "fork failed with $!"; } } use Data::Dump 'pp'; pp \%who; my $pid = wait; # on first exit, kill rest print "$who{$pid} exited\n"; kill 15, keys %who; print "$who{$pid} exited\n" while ($pid = wait) > 0;

    $hasterminal determines which child listens to the terminal for commands.
    Commands with a leading number are forwarded to that child.
    If you type in

    2 3 foobar

    you get back

    1 got 2 3 foobar 2 got 3 foobar 3 got foobar

    showing that child 1 got the message, then forwarded it to child 2, who then forwarded it to child 3.

    Any child can send directly to any other child using the pipes set up in the hash %pipes before the children are forked.

    Note that this is just a prototype and should not be considered production code (there's a bit of cheating going on in it :)

Re: Child process inter communication
by smarthacker67 (Beadle) on Jun 14, 2017 at 06:58 UTC

    Thanks for all your replies. I should have mentioned that in my case child has been forked already and now I wish to communicate with specific. need Child to Child communication. kindly check the comment section below Suggestion for alternate approach is most welcome as well.

    sub make_new_child { # Make new child... my $pid; my $sigset; # block signal for fork $sigset = POSIX::SigSet->new(SIGINT); sigprocmask(SIG_BLOCK, $sigset) or die "Can't block SIGINT for fork: $!\n"; die "fork: $!" unless defined ($pid = fork); if ($pid) { # Parent records the child's birth and returns. sigprocmask(SIG_UNBLOCK, $sigset) or die "Can't unblock SIGINT for fork: $!\n"; print "PID $pid\n" if $Debug; $children{$pid} = 1; $children++; return; } else { print "Child....\n" if $Debug; $SIG{INT} = 'DEFAULT'; # make SIGINT kill us as it did be +fore my $request = ""; # full long big request my $nreads = 0; # number of read loops my $tmout = 150; # timeout in second # unblock signals sigprocmask(SIG_UNBLOCK, $sigset) or die "Can't unblock SIGINT for fork: $!\n"; # handle connections until we've reached $MAX_CLIENTS_PER_CHIL +D for (my $i=0; $i < $MAX_CLIENTS_PER_CHILD; $i++) { # my $device=""; print "New connection....\n" if $Debug; my $client = $Socket->accept() or last; $client->autoflush(1); ($Debug && $client) ? print "Accepted....\n" : print "No S +ignal Recieved\n"; # do something with the connection my $sel = IO::Select->new($client); while ( $sel->can_read($tmout) ) { my $n = sysread($client, $request, 9999,length($re +quest)); last if ($n==0); $nreads++; my $display = ""; if ( $nreads == 0 ) { $display = '[timeout]'; } elsif ( length $request == 0 ) { $display = '[empty]'; } else { $display = $request; } $request=""; print "$display\n" if $Debug; my @data = split(/,/,$display); if($data[0] eq '!!'){ # A command #### ####Received the command here to send it t +o child PID 123 ###### child PID 123 has been already fork +ed and free ###### to serve this request print "A command, Lets send\n"; my $to_pid = $data[1]; my $command = $data[2];#__________________ +___________________________________________________ # # Lets send command to $to_pid from here #_________________________________________ +____________________________ # qx (echo e) print CHLD_IN "$command $pid sent from $p +id \n"; #print FHY ("OK, Lets Send coand\n"); sleep(3); } else { } } } exit(0); } }

        Update: Added size method to Inbox. Updated recv method to safeguard from autovivification. Renamed quit method to end for consistency with other helper classes. Finally, changed 'quit' to 'END' inside the demo script.

        Update: Added next statement in demo (2nd source) to not delay unless the inbox is empty.

        Regarding MCE::Shared, the following is an Inbox module suitable for sharing. Once shared, the object enables inter-process communication between workers supporting threads and child processes.

        package Foo::Inbox; use strict; use warnings; our $VERSION = '0.003'; # $inbox = Foo::Inbox->new(); sub new { bless [ {}, [] ], shift; } # $scalar = $inbox->size( [ $key ] ); # %pairs = $inbox->size(); sub size { my ( $self, $key ) = @_; if ( defined $key ) { exists $self->[0]{$key} ? scalar @{ $self->[0]{$key} } : 0; } elsif ( wantarray ) { local $_; map { $_ => scalar @{ $self->[0]{$_} } } keys %{ $self->[0] }; } else { my $size = 0; foreach my $key ( keys %{ $self->[0] } ) { $size += scalar @{ $self->[0]{$key} }; } $size; } } # $inbox->send( $from, $to, $arg1, ... ); # $inbox->send( $from, \@list, $arg1, ... ); sub send { my ( $self, $from, $to ) = ( shift, shift, shift ); my $mesg = [ $from, [ @_ ] ]; if ( ref $to eq 'ARRAY' ) { push @{ $self->[0]{$_ } }, $mesg for @{ $to }; } else { push @{ $self->[0]{$to} }, $mesg; } return; } # $inbox->recv( $from ); sub recv { my ( $self, $from ) = @_; return @{ $self->[1] } unless exists $self->[0]{ $from }; @{ shift @{ $self->[0]{ $from } } // $self->[1] }; } # $inbox->end(); sub end { $_[0]->[1] = [ 'manager', [ 'END' ] ]; return; } 1;

        A worker may send a data structure or a list (not shown here). Doing so, MCE::Shared will handle serialization automatically via Sereal 3.015+ if available or Storable.

        use strict; use warnings; use Foo::Inbox; use MCE::Hobo; use MCE::Shared; use List::Util 'shuffle'; use Time::HiRes 'sleep'; my $inbox = MCE::Shared->share( Foo::Inbox->new() ); my @names = shuffle qw/ Barny Betty Fred Wilma /; my $index = 0; $| = 1; sub foo { my $name = shift; my $count = 0; # remove my name from the list @names = grep { $_ ne $name } shuffle @names; # send greeting to names on the list $inbox->send($name, \@names, 'Hello'); while ( 1 ) { if ( my ($from, $data) = $inbox->recv($name) ) { # so soon, alrighty then ;-) last if $data->[0] eq 'END'; # display the message received printf "%-5s received %s from %s\n", $name, $data->[0], $from; # forward the message to another worker $inbox->send($name, $names[ ++$index % @names ], $data->[0]) if ( $from eq 'manager' ); next; } sleep 0.01; } } MCE::Hobo->create(\&foo, $_) for @names; # Enter message or type quit to terminate the script. while ( my $msg = <STDIN> ) { chomp $msg; next unless ( length $msg ); $inbox->end(), last() if ( $msg eq 'quit' ); $inbox->send('manager', $names[ ++$index % @names ], $msg); } MCE::Hobo->waitall;

        I've entered in the terminal words from the song Hello by Adele: Hello, it's me -- Hello, can you hear me?. Then entered quit to exit the application.

        Betty received Hello from Barny Wilma received Hello from Barny Fred received Hello from Barny Wilma received Hello from Betty Barny received Hello from Betty Betty received Hello from Wilma Fred received Hello from Betty Betty received Hello from Fred Fred received Hello from Wilma Wilma received Hello from Fred Barny received Hello from Wilma Barny received Hello from Fred Hello, it's me Betty received Hello, it's me from manager Barny received Hello, it's me from Betty Hello, can you hear me? Wilma received Hello, can you hear me? from manager Fred received Hello, can you hear me? from Wilma quit

        A subsequent version of Inbox might provide blocking capabilities so not to poll. Doing so means having to make many pipes or fifos for which I'm not sure. Imagine a future server having 400 logical cores. Making all those pipes or fifos seems excessive, imho.

        Update: I've benchmarked Inbox to determine if having to make pipes or fifos. Adding the next statement enabled the demo script to run reasonably fast if by chance the inbox is filled with messages or from many workers sending simultaneously to the same recipient. It's also possible for two or more workers to read from the same inbox if needed in parallel.

        Regards, Mario

      Here's my second guess, using named pipes (fifos);

      #!/usr/bin/perl # http://perlmonks.org/?node_id=1192662 use strict; use warnings; use IO::Select; use POSIX qw(mkfifo); my $childcount = 3; my $hasterminal = 1; my %who; my $fifodir = '/tmp'; for my $me (1 .. $childcount) { if( my $pid = fork ) # parent { $who{$pid} = $me; } elsif( defined $pid ) # child { my $mypath = "$fifodir/$$.fifo"; mkfifo( $mypath, 0700 ) or die "$! on making mypath"; open my $fifo, '+<', $mypath or die "$! opening $mypath"; my $sel = IO::Select->new($fifo); $me == $hasterminal and $sel->add(*STDIN); while(1) { for my $handle ($sel->can_read) { defined( my $command = <$handle> ) or exit; print "$$ got $command"; if( $command =~ /^(\d+)\s+(.*\n)/ ) { my $otherpath = "$fifodir/$1.fifo"; if( -p $otherpath and open my $otherchild, '>', $otherpath ) { print $otherchild $2; close $otherchild; } else { warn "child $1 does not have a fifo\n"; } } } } } else { die "fork failed with $!"; } } use Data::Dump 'pp'; pp \%who; my $pid = wait; # on first exit, kill rest print "$who{$pid} exited\n"; kill 15, keys %who; unlink map "$fifodir/$_.fifo", keys %who; print "$who{$pid} exited\n" while ($pid = wait) > 0;

      Enter a pid followed by white space and a message, it will forward the message to that pid.

      Example - enter:

      25910 somemessage

      to get

      25908 got 25910 somemessage 25910 got somemessage

      Of course, using your actual pids.

        That is cool tybalt89. I understand it now after reading this line. Ah, it's the first child who is listening on STDIN ;-)

        $me == $hasterminal and $sel->add(*STDIN);

        Regards, Mario

Re: Child process inter communication
by marioroy (Prior) on Jun 18, 2017 at 23:25 UTC

    Update: Added Foo::Inbox4 and removed Foo::Inbox3. I've been accustomed to automatic serialization of complex data structure in MCE::Shared that I didn't realize on having to do that manually for Thread::Queue.

    Corion mentioned queues. The following provides two queue implementations based on Foo::Inbox.

    Foo::Inbox2 using MCE::Shared->queue

    Foo::Inbox4 using Thread::Queue

    A demo and benchmark will follow in the immediate post(s).

    Regards, Mario

      For benchmarking 50,000 messages the following two scripts were made, based on the one found here.

      $ diff hobo_test.pl threads_test.pl 5,6c5,6 < use MCE::Hobo; < use Foo::Inbox2; --- > use threads; > use Foo::Inbox4; 11c11 < my $inbox = Foo::Inbox2->new( @names ); --- > my $inbox = Foo::Inbox4->new( @names ); 38,39c38,39 < MCE::Hobo->create(\&foo, $_) for @names; < MCE::Hobo->waitall; --- > threads->create(\&foo, $_) for @names; > $_->join() for threads->list();

      Foo::Inbox2 via MCE::Hobo + MCE::Shared->queue 50k test

      Foo::Inbox4 via threads + Thread::Queue 50k test

      Update: Added results for MCE::Inbox which will ship with MCE::Shared 1.827 on release day. It is Inbox2 with optimizations.

      Results from Mac OS X and Cent OS 7.

      # Mac OS X ( Perl v5.18.2 ) $ perl foo1.pl | wc -l # Foo::Inbox duration: 0.690 seconds 50000 $ perl inbox.pl | wc -l # MCE::Inbox w/ blocking capability duration: 0.721 seconds 50000 $ perl foo2.pl | wc -l # MCE::Shared->queue duration: 0.789 seconds 50000 $ perl foo4.pl | wc -l # Thread::Queue duration: 5.735 seconds 50000 # CentOS 7 VM ( Perl v5.16.3 ) $ perl foo1.pl | wc -l # Foo::Inbox duration: 0.834 seconds 50000 $ perl inbox.pl | wc -l # MCE::Inbox w/ blocking capability duration: 0.726 seconds 50000 $ perl foo2.pl | wc -l # MCE::Shared->queue duration: 0.945 seconds 50000 $ perl foo4.pl | wc -l # Thread::Queue duration: 3.020 seconds 50000

      Results from Cygwin and Strawberry Perl.

      # Windows ( Cygwin Perl v5.22.3 ) $ perl foo1.pl | wc -l # Foo::Inbox duration: 1.825 seconds 50000 $ perl inbox.pl | wc -l # MCE::Inbox w/ blocking capability duration: 2.059 seconds 50000 $ perl foo2.pl | wc -l # MCE::Shared->queue duration: 2.387 seconds 50000 $ perl foo4.pl | wc -l # Thread::Queue duration: 24.086 seconds 50000 # Windows ( Strawberry Perl v5.22.2.1 ) $ perl foo1.pl > nul # Foo::Inbox duration: 1.570 seconds $ perl inbox.pl > nul # MCE::Inbox w/ blocking capability duration: 1.664 seconds $ perl foo2.pl > nul # MCE::Shared->queue duration: 2.120 seconds $ perl foo4.pl > nul # Thread::Queue duration: 2.886 seconds

      Results from FreeBSD and Solaris.

      # TrueOS 10.0 ( FreeBSD, Perl 5.16.3 ) $ perl foo1.pl | wc -l # Foo::Inbox duration: 0.910 seconds 50000 $ perl inbox.pl | wc -l # MCE::Inbox w/ blocking capability duration: 0.875 seconds 50000 $ perl foo2.pl | wc -l # MCE::Shared->queue duration: 1.107 seconds 50000 $ perl foo4.pl | wc -l # Thread::Queue duration: 0.797 seconds 50000 # Solaris 11.2 ( Perl 5.22.2 ) $ perl foo1.pl | wc -l # Foo::Inbox duration: 1.319 seconds 50000 $ perl inbox.pl | wc -l # MCE::Inbox w/ blocking capability duration: 1.344 seconds 50000 $ perl foo2.pl | wc -l # MCE::Shared->queue duration: 1.525 seconds 50000 $ perl foo4.pl | wc -l # Thread::Queue duration: 1.822 seconds 50000

      From this testing, Threads + Thread::Queue runs better on FreeBSD and Solaris compared to others.

      Foo::Inbox lacks blocking capability, thus has lower latency. However, it may run much slower when the Inbox isn't busy.

      Regards, Mario

        Please know that the results on this post doesn't count. I've overclocked a system on purpose, mainly for simulating a fast machine in the future. On an 8-core box (total 16 logical cores), I ran the same tests to better understand the impact between the three implementations. The box is overclocked to 4.0 GHz. What is interesting to me is not how fast but rather the impact from running many workers.

        Baseline testing involves 4 workers and runs about 2x faster than my laptop. No surprises there due to running at 4 GHz and not from a Linux VM.

        Update: Added results for MCE::Inbox which will ship with MCE::Shared 1.827 on release day. It is Inbox2 with optimizations.

        # 4 GHz, CentOS 7 ( Perl v5.16.3 ) $ perl foo1.pl | wc -l # Foo::Inbox duration: 0.441 seconds 50000 $ perl inbox.pl | wc -l # MCE::Inbox w/ blocking capability duration: 0.422 seconds 50000 $ perl foo2.pl | wc -l # MCE::Shared->queue duration: 0.645 seconds 50000 $ perl foo4.pl | wc -l # Thread::Queue duration: 1.544 seconds 50000

        Next is 128 workers retrieving 1.6 million messages.

        # 4 GHz, CentOS 7 ( Perl v5.16.3 ) $ perl foo1.pl | wc -l # Foo::Inbox duration: 12.685 seconds 1600000 $ perl inbox.pl | wc -l # MCE::Inbox w/ blocking capability duration: 15.939 seconds 1600000 $ perl foo2.pl | wc -l # MCE::Shared->queue duration: 21.533 seconds 1600000 $ perl foo4.pl | wc -l # Thread::Queue duration: 90.015 seconds 1600000

        Unfortunately, threads may not scale linearly, especially when involving locking. In top, I see the process peak at 306% max while running. It's far from the 1600% mark hardware limit. On the other hand, MCE::Inbox scales nicely considering it does blocking as well.

        For the 1.6 million test, I added a loop to generate more names, 128 total.

        my @names = shuffle qw/ Barny Betty Fred Wilma /; foreach my $i ( 1 .. 5 ) { push @names, map { $_.$i } @names; print scalar(@names), "\n"; } __END__ 8 16 32 64 128

        There was one other change and replaced 4167 with 98 because there are now 128 workers, not 4.

        ... # send greeting again $inbox->send($name, \@names, 'Hello') if $count < 98; # eventually stop benchmarking last if ++$count == 12500; ...

        Regards, Mario

      The demo scripts are based on the one found here. One makes use of MCE::Hobo + Inbox2. The other utilizing threads + Inbox4.

      $ diff hobo_demo.pl threads_demo.pl 5,6c5,6 < use MCE::Hobo; < use Foo::Inbox2; --- > use threads; > use Foo::Inbox4; 10c10 < my $inbox = Foo::Inbox2->new( @names ); --- > my $inbox = Foo::Inbox4->new( @names ); 34c34 < MCE::Hobo->create(\&foo, $_) for @names; --- > threads->create(\&foo, $_) for @names; 44c44 < MCE::Hobo->waitall; --- > $_->join() for threads->list();

      Foo::Inbox2 via MCE::Hobo + MCE::Shared->queue example

      Foo::Inbox4 via threads + Thread::Queue example

      Regards, Mario

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: perlquestion [id://1192662]
Approved by beech
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others cooling their heels in the Monastery: (2)
As of 2024-04-25 21:09 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found