Beefy Boxes and Bandwidth Generously Provided by pair Networks
Syntactic Confectionery Delight
 
PerlMonks  

Re^3: Child process inter communication

by marioroy (Prior)
on Jun 16, 2017 at 06:09 UTC ( [id://1192909]=note: print w/replies, xml ) Need Help??


in reply to Re^2: Child process inter communication
in thread Child process inter communication

Update: Added size method to Inbox. Updated recv method to safeguard from autovivification. Renamed quit method to end for consistency with other helper classes. Finally, changed 'quit' to 'END' inside the demo script.

Update: Added next statement in demo (2nd source) to not delay unless the inbox is empty.

Regarding MCE::Shared, the following is an Inbox module suitable for sharing. Once shared, the object enables inter-process communication between workers supporting threads and child processes.

package Foo::Inbox; use strict; use warnings; our $VERSION = '0.003'; # $inbox = Foo::Inbox->new(); sub new { bless [ {}, [] ], shift; } # $scalar = $inbox->size( [ $key ] ); # %pairs = $inbox->size(); sub size { my ( $self, $key ) = @_; if ( defined $key ) { exists $self->[0]{$key} ? scalar @{ $self->[0]{$key} } : 0; } elsif ( wantarray ) { local $_; map { $_ => scalar @{ $self->[0]{$_} } } keys %{ $self->[0] }; } else { my $size = 0; foreach my $key ( keys %{ $self->[0] } ) { $size += scalar @{ $self->[0]{$key} }; } $size; } } # $inbox->send( $from, $to, $arg1, ... ); # $inbox->send( $from, \@list, $arg1, ... ); sub send { my ( $self, $from, $to ) = ( shift, shift, shift ); my $mesg = [ $from, [ @_ ] ]; if ( ref $to eq 'ARRAY' ) { push @{ $self->[0]{$_ } }, $mesg for @{ $to }; } else { push @{ $self->[0]{$to} }, $mesg; } return; } # $inbox->recv( $from ); sub recv { my ( $self, $from ) = @_; return @{ $self->[1] } unless exists $self->[0]{ $from }; @{ shift @{ $self->[0]{ $from } } // $self->[1] }; } # $inbox->end(); sub end { $_[0]->[1] = [ 'manager', [ 'END' ] ]; return; } 1;

A worker may send a data structure or a list (not shown here). Doing so, MCE::Shared will handle serialization automatically via Sereal 3.015+ if available or Storable.

use strict; use warnings; use Foo::Inbox; use MCE::Hobo; use MCE::Shared; use List::Util 'shuffle'; use Time::HiRes 'sleep'; my $inbox = MCE::Shared->share( Foo::Inbox->new() ); my @names = shuffle qw/ Barny Betty Fred Wilma /; my $index = 0; $| = 1; sub foo { my $name = shift; my $count = 0; # remove my name from the list @names = grep { $_ ne $name } shuffle @names; # send greeting to names on the list $inbox->send($name, \@names, 'Hello'); while ( 1 ) { if ( my ($from, $data) = $inbox->recv($name) ) { # so soon, alrighty then ;-) last if $data->[0] eq 'END'; # display the message received printf "%-5s received %s from %s\n", $name, $data->[0], $from; # forward the message to another worker $inbox->send($name, $names[ ++$index % @names ], $data->[0]) if ( $from eq 'manager' ); next; } sleep 0.01; } } MCE::Hobo->create(\&foo, $_) for @names; # Enter message or type quit to terminate the script. while ( my $msg = <STDIN> ) { chomp $msg; next unless ( length $msg ); $inbox->end(), last() if ( $msg eq 'quit' ); $inbox->send('manager', $names[ ++$index % @names ], $msg); } MCE::Hobo->waitall;

I've entered in the terminal words from the song Hello by Adele: Hello, it's me -- Hello, can you hear me?. Then entered quit to exit the application.

Betty received Hello from Barny Wilma received Hello from Barny Fred received Hello from Barny Wilma received Hello from Betty Barny received Hello from Betty Betty received Hello from Wilma Fred received Hello from Betty Betty received Hello from Fred Fred received Hello from Wilma Wilma received Hello from Fred Barny received Hello from Wilma Barny received Hello from Fred Hello, it's me Betty received Hello, it's me from manager Barny received Hello, it's me from Betty Hello, can you hear me? Wilma received Hello, can you hear me? from manager Fred received Hello, can you hear me? from Wilma quit

A subsequent version of Inbox might provide blocking capabilities so not to poll. Doing so means having to make many pipes or fifos for which I'm not sure. Imagine a future server having 400 logical cores. Making all those pipes or fifos seems excessive, imho.

Update: I've benchmarked Inbox to determine if having to make pipes or fifos. Adding the next statement enabled the demo script to run reasonably fast if by chance the inbox is filled with messages or from many workers sending simultaneously to the same recipient. It's also possible for two or more workers to read from the same inbox if needed in parallel.

Regards, Mario

Replies are listed 'Best First'.
Re^4: Child process inter communication
by marioroy (Prior) on Jun 16, 2017 at 08:28 UTC

    Update: Added next statement to not delay unless the inbox is empty.

    The following is a demonstration using Parallel::ForkManager + MCE::Shared + Foo::Inbox.

    use strict; use warnings; use Foo::Inbox; use Parallel::ForkManager; use MCE::Shared; use List::Util 'shuffle'; use Time::HiRes 'sleep'; my $inbox = MCE::Shared->share( Foo::Inbox->new() ); my @names = shuffle qw/ Barny Betty Fred Wilma /; my $index = 0; $| = 1; sub foo { my $name = shift; my $count = 0; # remove my name from the list @names = grep { $_ ne $name } shuffle @names; # send greeting to names on the list $inbox->send($name, \@names, 'Hello'); while ( 1 ) { if ( my ($from, $data) = $inbox->recv($name) ) { # so soon, alrighty then ;-) last if $data->[0] eq 'END'; # display the message received printf "%-5s received %s from %s\n", $name, $data->[0], $from; # forward the message to another worker $inbox->send($name, $names[ ++$index % @names ], $data->[0]) if ( $from eq 'manager' ); next; } sleep 0.01; } } my $pm = new Parallel::ForkManager(scalar @names); $pm->set_waitpid_blocking_sleep(0); foreach my $name ( @names ) { $pm->start($name) and next; # Have the shared-manager assign a data channel for IPC. # This is done automatically for MCE, MCE::Hobo, and threads. MCE::Shared->init(); # Enter the foo routine. foo($name); $pm->finish($name); } # Enter message or type quit to terminate the script. while ( my $msg = <STDIN> ) { chomp $msg; next unless ( length $msg ); $inbox->end(), last() if ( $msg eq 'quit' ); $inbox->send('manager', $names[ ++$index % @names ], $msg); } $pm->wait_all_children;

    MCE::Signal handles signal handling behind the scene if one were to press Ctrl-C (SIGINT). Ditto for the process receiving SIGQUIT or SIGTERM. What is nice about this arrangement is that MCE::Shared and MCE::Signal (loaded by MCE::Shared::Server) complements Parallel::ForkManager quite nice ;-).

    For SIGPIPE to work reliably, it requires $| = 1 on STDOUT. Then the following is possible on any Unix platforms. The script terminates gracefully including the tempdir removal inside Parallel::ForkManager.

    perl script.pl | head -5

    Regards, Mario

Re^4: Child process inter communication
by marioroy (Prior) on Jun 17, 2017 at 10:12 UTC

    Update: Changed 'quit' to 'END' in the example.

    Before benchmarking thought that 10k messages per second would be nice for being a pure Perl solution. Well, on the Windows platform it can depleat the inbox at a rate of 20k messages per second. On the Mac and Linux VM, reaching 50k per second is no problem. For that reason, will introduce MCE::Shared::Inbox with the next MCE::Shared release.

    Disclaimer. Inbox may not be the best solution out there which is fine. What it will do is run reasonably well on multiple platforms including Windows and support threads and processes. Inbox itself does not construct any pipes / fifos, mutexes, or any temp files or directory. Instead it rides on MCE::Shared's parallel data channels. Basically, Inbox doesn't create nor consume additional resources at the OS level.

    That said, I'm not worried if one were to spawn 400 workers in the future on a server having 400 logical cores and do inter-child-or-thread communication using Inbox. MCE::Shared provides a 12-channel barrier to safeguard the OS Kernel. The effect is preserving the fluid-like OS experience while running.

    Running.

    on unix: perl demo.pl | wc -l windows: perl demo.pl > nul

    The Foo::Inbox package is found here.

    use strict; use warnings; use Foo::Inbox; use MCE::Hobo; use MCE::Shared; use List::Util qw( shuffle ); use Time::HiRes qw( sleep time ); my $inbox = MCE::Shared->share( Foo::Inbox->new() ); my @names = shuffle qw/ Barny Betty Fred Wilma /; my $start = time; $| = 1; sub foo { my $name = shift; my $count = 0; # remove my name from the list @names = grep { $_ ne $name } shuffle @names; # send greeting $inbox->send($name, \@names, 'Hello'); while ( 1 ) { if ( my ($from, $data) = $inbox->recv($name) ) { last if $data->[0] eq 'END'; # display the message received printf "%-5s received %s from %s\n", $name, $data->[0], $from; # send greeting again $inbox->send($name, \@names, 'Hello') if $count < 4167; # eventually stop benchmarking last if ++$count == 12500; # don't delay, fetch next message next; } # timer delay if empty sleep 0.01; } } MCE::Hobo->create(\&foo, $_) for @names; MCE::Hobo->waitall; printf {*STDERR} "duration: %0.03f seconds\n", time - $start;

    For long jobs, the timer delay is not likely a problem. Certainly not if batching. Later, may integrate signal handling for notification. But then, signal handling isn't reliable on the Windows platform. So am giving the hybrid approach a try by running at maximum performance for a busy Inbox and delay otherwise.

    Regards, Mario

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://1192909]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others avoiding work at the Monastery: (2)
As of 2024-04-20 03:58 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found