Beefy Boxes and Bandwidth Generously Provided by pair Networks
Keep It Simple, Stupid
 
PerlMonks  

Re: Child process inter communication

by smarthacker67 (Beadle)
on Jun 14, 2017 at 06:58 UTC ( [id://1192757]=note: print w/replies, xml ) Need Help??


in reply to Child process inter communication

Thanks for all your replies. I should have mentioned that in my case child has been forked already and now I wish to communicate with specific. need Child to Child communication. kindly check the comment section below Suggestion for alternate approach is most welcome as well.

sub make_new_child { # Make new child... my $pid; my $sigset; # block signal for fork $sigset = POSIX::SigSet->new(SIGINT); sigprocmask(SIG_BLOCK, $sigset) or die "Can't block SIGINT for fork: $!\n"; die "fork: $!" unless defined ($pid = fork); if ($pid) { # Parent records the child's birth and returns. sigprocmask(SIG_UNBLOCK, $sigset) or die "Can't unblock SIGINT for fork: $!\n"; print "PID $pid\n" if $Debug; $children{$pid} = 1; $children++; return; } else { print "Child....\n" if $Debug; $SIG{INT} = 'DEFAULT'; # make SIGINT kill us as it did be +fore my $request = ""; # full long big request my $nreads = 0; # number of read loops my $tmout = 150; # timeout in second # unblock signals sigprocmask(SIG_UNBLOCK, $sigset) or die "Can't unblock SIGINT for fork: $!\n"; # handle connections until we've reached $MAX_CLIENTS_PER_CHIL +D for (my $i=0; $i < $MAX_CLIENTS_PER_CHILD; $i++) { # my $device=""; print "New connection....\n" if $Debug; my $client = $Socket->accept() or last; $client->autoflush(1); ($Debug && $client) ? print "Accepted....\n" : print "No S +ignal Recieved\n"; # do something with the connection my $sel = IO::Select->new($client); while ( $sel->can_read($tmout) ) { my $n = sysread($client, $request, 9999,length($re +quest)); last if ($n==0); $nreads++; my $display = ""; if ( $nreads == 0 ) { $display = '[timeout]'; } elsif ( length $request == 0 ) { $display = '[empty]'; } else { $display = $request; } $request=""; print "$display\n" if $Debug; my @data = split(/,/,$display); if($data[0] eq '!!'){ # A command #### ####Received the command here to send it t +o child PID 123 ###### child PID 123 has been already fork +ed and free ###### to serve this request print "A command, Lets send\n"; my $to_pid = $data[1]; my $command = $data[2];#__________________ +___________________________________________________ # # Lets send command to $to_pid from here #_________________________________________ +____________________________ # qx (echo e) print CHLD_IN "$command $pid sent from $p +id \n"; #print FHY ("OK, Lets Send coand\n"); sleep(3); } else { } } } exit(0); } }

Replies are listed 'Best First'.
Re^2: Child process inter communication
by zentara (Archbishop) on Jun 14, 2017 at 12:48 UTC

      Update: Added size method to Inbox. Updated recv method to safeguard from autovivification. Renamed quit method to end for consistency with other helper classes. Finally, changed 'quit' to 'END' inside the demo script.

      Update: Added next statement in demo (2nd source) to not delay unless the inbox is empty.

      Regarding MCE::Shared, the following is an Inbox module suitable for sharing. Once shared, the object enables inter-process communication between workers supporting threads and child processes.

      package Foo::Inbox; use strict; use warnings; our $VERSION = '0.003'; # $inbox = Foo::Inbox->new(); sub new { bless [ {}, [] ], shift; } # $scalar = $inbox->size( [ $key ] ); # %pairs = $inbox->size(); sub size { my ( $self, $key ) = @_; if ( defined $key ) { exists $self->[0]{$key} ? scalar @{ $self->[0]{$key} } : 0; } elsif ( wantarray ) { local $_; map { $_ => scalar @{ $self->[0]{$_} } } keys %{ $self->[0] }; } else { my $size = 0; foreach my $key ( keys %{ $self->[0] } ) { $size += scalar @{ $self->[0]{$key} }; } $size; } } # $inbox->send( $from, $to, $arg1, ... ); # $inbox->send( $from, \@list, $arg1, ... ); sub send { my ( $self, $from, $to ) = ( shift, shift, shift ); my $mesg = [ $from, [ @_ ] ]; if ( ref $to eq 'ARRAY' ) { push @{ $self->[0]{$_ } }, $mesg for @{ $to }; } else { push @{ $self->[0]{$to} }, $mesg; } return; } # $inbox->recv( $from ); sub recv { my ( $self, $from ) = @_; return @{ $self->[1] } unless exists $self->[0]{ $from }; @{ shift @{ $self->[0]{ $from } } // $self->[1] }; } # $inbox->end(); sub end { $_[0]->[1] = [ 'manager', [ 'END' ] ]; return; } 1;

      A worker may send a data structure or a list (not shown here). Doing so, MCE::Shared will handle serialization automatically via Sereal 3.015+ if available or Storable.

      use strict; use warnings; use Foo::Inbox; use MCE::Hobo; use MCE::Shared; use List::Util 'shuffle'; use Time::HiRes 'sleep'; my $inbox = MCE::Shared->share( Foo::Inbox->new() ); my @names = shuffle qw/ Barny Betty Fred Wilma /; my $index = 0; $| = 1; sub foo { my $name = shift; my $count = 0; # remove my name from the list @names = grep { $_ ne $name } shuffle @names; # send greeting to names on the list $inbox->send($name, \@names, 'Hello'); while ( 1 ) { if ( my ($from, $data) = $inbox->recv($name) ) { # so soon, alrighty then ;-) last if $data->[0] eq 'END'; # display the message received printf "%-5s received %s from %s\n", $name, $data->[0], $from; # forward the message to another worker $inbox->send($name, $names[ ++$index % @names ], $data->[0]) if ( $from eq 'manager' ); next; } sleep 0.01; } } MCE::Hobo->create(\&foo, $_) for @names; # Enter message or type quit to terminate the script. while ( my $msg = <STDIN> ) { chomp $msg; next unless ( length $msg ); $inbox->end(), last() if ( $msg eq 'quit' ); $inbox->send('manager', $names[ ++$index % @names ], $msg); } MCE::Hobo->waitall;

      I've entered in the terminal words from the song Hello by Adele: Hello, it's me -- Hello, can you hear me?. Then entered quit to exit the application.

      Betty received Hello from Barny Wilma received Hello from Barny Fred received Hello from Barny Wilma received Hello from Betty Barny received Hello from Betty Betty received Hello from Wilma Fred received Hello from Betty Betty received Hello from Fred Fred received Hello from Wilma Wilma received Hello from Fred Barny received Hello from Wilma Barny received Hello from Fred Hello, it's me Betty received Hello, it's me from manager Barny received Hello, it's me from Betty Hello, can you hear me? Wilma received Hello, can you hear me? from manager Fred received Hello, can you hear me? from Wilma quit

      A subsequent version of Inbox might provide blocking capabilities so not to poll. Doing so means having to make many pipes or fifos for which I'm not sure. Imagine a future server having 400 logical cores. Making all those pipes or fifos seems excessive, imho.

      Update: I've benchmarked Inbox to determine if having to make pipes or fifos. Adding the next statement enabled the demo script to run reasonably fast if by chance the inbox is filled with messages or from many workers sending simultaneously to the same recipient. It's also possible for two or more workers to read from the same inbox if needed in parallel.

      Regards, Mario

        Update: Added next statement to not delay unless the inbox is empty.

        The following is a demonstration using Parallel::ForkManager + MCE::Shared + Foo::Inbox.

        use strict; use warnings; use Foo::Inbox; use Parallel::ForkManager; use MCE::Shared; use List::Util 'shuffle'; use Time::HiRes 'sleep'; my $inbox = MCE::Shared->share( Foo::Inbox->new() ); my @names = shuffle qw/ Barny Betty Fred Wilma /; my $index = 0; $| = 1; sub foo { my $name = shift; my $count = 0; # remove my name from the list @names = grep { $_ ne $name } shuffle @names; # send greeting to names on the list $inbox->send($name, \@names, 'Hello'); while ( 1 ) { if ( my ($from, $data) = $inbox->recv($name) ) { # so soon, alrighty then ;-) last if $data->[0] eq 'END'; # display the message received printf "%-5s received %s from %s\n", $name, $data->[0], $from; # forward the message to another worker $inbox->send($name, $names[ ++$index % @names ], $data->[0]) if ( $from eq 'manager' ); next; } sleep 0.01; } } my $pm = new Parallel::ForkManager(scalar @names); $pm->set_waitpid_blocking_sleep(0); foreach my $name ( @names ) { $pm->start($name) and next; # Have the shared-manager assign a data channel for IPC. # This is done automatically for MCE, MCE::Hobo, and threads. MCE::Shared->init(); # Enter the foo routine. foo($name); $pm->finish($name); } # Enter message or type quit to terminate the script. while ( my $msg = <STDIN> ) { chomp $msg; next unless ( length $msg ); $inbox->end(), last() if ( $msg eq 'quit' ); $inbox->send('manager', $names[ ++$index % @names ], $msg); } $pm->wait_all_children;

        MCE::Signal handles signal handling behind the scene if one were to press Ctrl-C (SIGINT). Ditto for the process receiving SIGQUIT or SIGTERM. What is nice about this arrangement is that MCE::Shared and MCE::Signal (loaded by MCE::Shared::Server) complements Parallel::ForkManager quite nice ;-).

        For SIGPIPE to work reliably, it requires $| = 1 on STDOUT. Then the following is possible on any Unix platforms. The script terminates gracefully including the tempdir removal inside Parallel::ForkManager.

        perl script.pl | head -5

        Regards, Mario

        Update: Changed 'quit' to 'END' in the example.

        Before benchmarking thought that 10k messages per second would be nice for being a pure Perl solution. Well, on the Windows platform it can depleat the inbox at a rate of 20k messages per second. On the Mac and Linux VM, reaching 50k per second is no problem. For that reason, will introduce MCE::Shared::Inbox with the next MCE::Shared release.

        Disclaimer. Inbox may not be the best solution out there which is fine. What it will do is run reasonably well on multiple platforms including Windows and support threads and processes. Inbox itself does not construct any pipes / fifos, mutexes, or any temp files or directory. Instead it rides on MCE::Shared's parallel data channels. Basically, Inbox doesn't create nor consume additional resources at the OS level.

        That said, I'm not worried if one were to spawn 400 workers in the future on a server having 400 logical cores and do inter-child-or-thread communication using Inbox. MCE::Shared provides a 12-channel barrier to safeguard the OS Kernel. The effect is preserving the fluid-like OS experience while running.

        Running.

        on unix: perl demo.pl | wc -l windows: perl demo.pl > nul

        The Foo::Inbox package is found here.

        use strict; use warnings; use Foo::Inbox; use MCE::Hobo; use MCE::Shared; use List::Util qw( shuffle ); use Time::HiRes qw( sleep time ); my $inbox = MCE::Shared->share( Foo::Inbox->new() ); my @names = shuffle qw/ Barny Betty Fred Wilma /; my $start = time; $| = 1; sub foo { my $name = shift; my $count = 0; # remove my name from the list @names = grep { $_ ne $name } shuffle @names; # send greeting $inbox->send($name, \@names, 'Hello'); while ( 1 ) { if ( my ($from, $data) = $inbox->recv($name) ) { last if $data->[0] eq 'END'; # display the message received printf "%-5s received %s from %s\n", $name, $data->[0], $from; # send greeting again $inbox->send($name, \@names, 'Hello') if $count < 4167; # eventually stop benchmarking last if ++$count == 12500; # don't delay, fetch next message next; } # timer delay if empty sleep 0.01; } } MCE::Hobo->create(\&foo, $_) for @names; MCE::Hobo->waitall; printf {*STDERR} "duration: %0.03f seconds\n", time - $start;

        For long jobs, the timer delay is not likely a problem. Certainly not if batching. Later, may integrate signal handling for notification. But then, signal handling isn't reliable on the Windows platform. So am giving the hybrid approach a try by running at maximum performance for a busy Inbox and delay otherwise.

        Regards, Mario

Re^2: Child process inter communication
by tybalt89 (Monsignor) on Jun 14, 2017 at 21:04 UTC

    Here's my second guess, using named pipes (fifos);

    #!/usr/bin/perl # http://perlmonks.org/?node_id=1192662 use strict; use warnings; use IO::Select; use POSIX qw(mkfifo); my $childcount = 3; my $hasterminal = 1; my %who; my $fifodir = '/tmp'; for my $me (1 .. $childcount) { if( my $pid = fork ) # parent { $who{$pid} = $me; } elsif( defined $pid ) # child { my $mypath = "$fifodir/$$.fifo"; mkfifo( $mypath, 0700 ) or die "$! on making mypath"; open my $fifo, '+<', $mypath or die "$! opening $mypath"; my $sel = IO::Select->new($fifo); $me == $hasterminal and $sel->add(*STDIN); while(1) { for my $handle ($sel->can_read) { defined( my $command = <$handle> ) or exit; print "$$ got $command"; if( $command =~ /^(\d+)\s+(.*\n)/ ) { my $otherpath = "$fifodir/$1.fifo"; if( -p $otherpath and open my $otherchild, '>', $otherpath ) { print $otherchild $2; close $otherchild; } else { warn "child $1 does not have a fifo\n"; } } } } } else { die "fork failed with $!"; } } use Data::Dump 'pp'; pp \%who; my $pid = wait; # on first exit, kill rest print "$who{$pid} exited\n"; kill 15, keys %who; unlink map "$fifodir/$_.fifo", keys %who; print "$who{$pid} exited\n" while ($pid = wait) > 0;

    Enter a pid followed by white space and a message, it will forward the message to that pid.

    Example - enter:

    25910 somemessage

    to get

    25908 got 25910 somemessage 25910 got somemessage

    Of course, using your actual pids.

      That is cool tybalt89. I understand it now after reading this line. Ah, it's the first child who is listening on STDIN ;-)

      $me == $hasterminal and $sel->add(*STDIN);

      Regards, Mario

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://1192757]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others scrutinizing the Monastery: (1)
As of 2024-04-25 02:28 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found