http://qs321.pair.com?node_id=1193067


in reply to Child process inter communication

Update: Added Foo::Inbox4 and removed Foo::Inbox3. I've been accustomed to automatic serialization of complex data structure in MCE::Shared that I didn't realize on having to do that manually for Thread::Queue.

Corion mentioned queues. The following provides two queue implementations based on Foo::Inbox.

Foo::Inbox2 using MCE::Shared->queue

package Foo::Inbox2; use strict; use warnings; our $VERSION = '0.003'; use MCE::Shared; # $inbox = Foo::Inbox->new(); sub new { my ( $class, @names ) = @_; my %self = map { $_ => MCE::Shared->queue( fast => 1 ) } @names; MCE::Shared->start() unless ( exists $INC{'IO/FDPass.pm'} ); bless \%self, $class; } # $scalar = $inbox->size( [ $key ] ); # %pairs = $inbox->size(); sub size { my ( $self, $key ) = @_; if ( defined $key ) { exists $self->{$key} ? $self->{$key}->pending() : 0; } elsif ( wantarray ) { local $_; map { $_ => $self->{$_}->pending() } keys %{ $self }; } else { my $size = 0; foreach my $key ( keys %{ $self } ) { $size += $self->{$key}->pending(); } $size; } } # $inbox->send( $from, $to, $arg1, ... ); # $inbox->send( $from, \@list, $arg1, ... ); sub send { my ( $self, $from, $to ) = ( shift, shift, shift ); my $mesg = [ $from, [ @_ ] ]; if ( ref $to eq 'ARRAY' ) { $self->{$_ }->enqueue($mesg) for @{ $to }; } else { $self->{$to}->enqueue($mesg); } return; } # $inbox->recv( $from ); sub recv { my ( $self, $from ) = @_; return () unless exists $self->{$from}; @{ $self->{$from}->dequeue() // [] }; } # $inbox->end(); sub end { my ( $self ) = @_; foreach my $from ( values %{ $self } ) { $from->end(); } return; } 1;

Foo::Inbox4 using Thread::Queue

package Foo::Inbox4; use strict; use warnings; our $VERSION = '0.003'; use Thread::Queue; my ( $freeze, $thaw ); BEGIN { if ( !exists $INC{'PDL.pm'} ) { eval ' use Sereal::Encoder 3.015 qw( encode_sereal ); use Sereal::Decoder 3.015 qw( decode_sereal ); '; if ( !$@ ) { my $encoder_ver = int( Sereal::Encoder->VERSION() ); my $decoder_ver = int( Sereal::Decoder->VERSION() ); # ensure the base version match e.g. 3 if ( $encoder_ver - $decoder_ver == 0 ) { $freeze = sub { encode_sereal( @_, { freeze_callbacks => 1 } ) + }, $thaw = \&decode_sereal; } } } if ( !defined $freeze ) { require Storable; $freeze = \&Storable::freeze, $thaw = \&Storable::thaw; } } # $inbox = Foo::Inbox->new(); sub new { my ( $class, @names ) = @_; my %self = map { $_ => Thread::Queue->new() } @names; bless \%self, $class; } # $scalar = $inbox->size( [ $key ] ); # %pairs = $inbox->size(); sub size { my ( $self, $key ) = @_; if ( defined $key ) { exists $self->{$key} ? $self->{$key}->pending() : 0; } elsif ( wantarray ) { local $_; map { $_ => $self->{$_}->pending() } keys %{ $self }; } else { my $size = 0; foreach my $key ( keys %{ $self } ) { $size += $self->{$key}->pending(); } $size; } } # $inbox->send( $from, $to, $arg1, ... ); # $inbox->send( $from, \@list, $arg1, ... ); sub send { my ( $self, $from, $to ) = ( shift, shift, shift ); my $mesg = $freeze->( [ $from, [ @_ ] ] ); if ( ref $to eq 'ARRAY' ) { $self->{$_ }->enqueue($mesg) for @{ $to }; } else { $self->{$to}->enqueue($mesg); } return; } # $inbox->recv( $from ); sub recv { my ( $self, $from ) = @_; return () unless exists $self->{$from}; my $mesg = $self->{$from}->dequeue(); $mesg ? @{ $thaw->($mesg) } : (); } # $inbox->end(); sub end { my ( $self ) = @_; foreach my $from ( values %{ $self } ) { $from->end(); } return; } 1;

A demo and benchmark will follow in the immediate post(s).

Regards, Mario

Replies are listed 'Best First'.
Re^2: Child process inter communication
by marioroy (Prior) on Jun 18, 2017 at 23:53 UTC

    For benchmarking 50,000 messages the following two scripts were made, based on the one found here.

    $ diff hobo_test.pl threads_test.pl 5,6c5,6 < use MCE::Hobo; < use Foo::Inbox2; --- > use threads; > use Foo::Inbox4; 11c11 < my $inbox = Foo::Inbox2->new( @names ); --- > my $inbox = Foo::Inbox4->new( @names ); 38,39c38,39 < MCE::Hobo->create(\&foo, $_) for @names; < MCE::Hobo->waitall; --- > threads->create(\&foo, $_) for @names; > $_->join() for threads->list();

    Foo::Inbox2 via MCE::Hobo + MCE::Shared->queue 50k test

    Foo::Inbox4 via threads + Thread::Queue 50k test

    Update: Added results for MCE::Inbox which will ship with MCE::Shared 1.827 on release day. It is Inbox2 with optimizations.

    Results from Mac OS X and Cent OS 7.

    # Mac OS X ( Perl v5.18.2 ) $ perl foo1.pl | wc -l # Foo::Inbox duration: 0.690 seconds 50000 $ perl inbox.pl | wc -l # MCE::Inbox w/ blocking capability duration: 0.721 seconds 50000 $ perl foo2.pl | wc -l # MCE::Shared->queue duration: 0.789 seconds 50000 $ perl foo4.pl | wc -l # Thread::Queue duration: 5.735 seconds 50000 # CentOS 7 VM ( Perl v5.16.3 ) $ perl foo1.pl | wc -l # Foo::Inbox duration: 0.834 seconds 50000 $ perl inbox.pl | wc -l # MCE::Inbox w/ blocking capability duration: 0.726 seconds 50000 $ perl foo2.pl | wc -l # MCE::Shared->queue duration: 0.945 seconds 50000 $ perl foo4.pl | wc -l # Thread::Queue duration: 3.020 seconds 50000

    Results from Cygwin and Strawberry Perl.

    # Windows ( Cygwin Perl v5.22.3 ) $ perl foo1.pl | wc -l # Foo::Inbox duration: 1.825 seconds 50000 $ perl inbox.pl | wc -l # MCE::Inbox w/ blocking capability duration: 2.059 seconds 50000 $ perl foo2.pl | wc -l # MCE::Shared->queue duration: 2.387 seconds 50000 $ perl foo4.pl | wc -l # Thread::Queue duration: 24.086 seconds 50000 # Windows ( Strawberry Perl v5.22.2.1 ) $ perl foo1.pl > nul # Foo::Inbox duration: 1.570 seconds $ perl inbox.pl > nul # MCE::Inbox w/ blocking capability duration: 1.664 seconds $ perl foo2.pl > nul # MCE::Shared->queue duration: 2.120 seconds $ perl foo4.pl > nul # Thread::Queue duration: 2.886 seconds

    Results from FreeBSD and Solaris.

    # TrueOS 10.0 ( FreeBSD, Perl 5.16.3 ) $ perl foo1.pl | wc -l # Foo::Inbox duration: 0.910 seconds 50000 $ perl inbox.pl | wc -l # MCE::Inbox w/ blocking capability duration: 0.875 seconds 50000 $ perl foo2.pl | wc -l # MCE::Shared->queue duration: 1.107 seconds 50000 $ perl foo4.pl | wc -l # Thread::Queue duration: 0.797 seconds 50000 # Solaris 11.2 ( Perl 5.22.2 ) $ perl foo1.pl | wc -l # Foo::Inbox duration: 1.319 seconds 50000 $ perl inbox.pl | wc -l # MCE::Inbox w/ blocking capability duration: 1.344 seconds 50000 $ perl foo2.pl | wc -l # MCE::Shared->queue duration: 1.525 seconds 50000 $ perl foo4.pl | wc -l # Thread::Queue duration: 1.822 seconds 50000

    From this testing, Threads + Thread::Queue runs better on FreeBSD and Solaris compared to others.

    Foo::Inbox lacks blocking capability, thus has lower latency. However, it may run much slower when the Inbox isn't busy.

    Regards, Mario

      Please know that the results on this post doesn't count. I've overclocked a system on purpose, mainly for simulating a fast machine in the future. On an 8-core box (total 16 logical cores), I ran the same tests to better understand the impact between the three implementations. The box is overclocked to 4.0 GHz. What is interesting to me is not how fast but rather the impact from running many workers.

      Baseline testing involves 4 workers and runs about 2x faster than my laptop. No surprises there due to running at 4 GHz and not from a Linux VM.

      Update: Added results for MCE::Inbox which will ship with MCE::Shared 1.827 on release day. It is Inbox2 with optimizations.

      # 4 GHz, CentOS 7 ( Perl v5.16.3 ) $ perl foo1.pl | wc -l # Foo::Inbox duration: 0.441 seconds 50000 $ perl inbox.pl | wc -l # MCE::Inbox w/ blocking capability duration: 0.422 seconds 50000 $ perl foo2.pl | wc -l # MCE::Shared->queue duration: 0.645 seconds 50000 $ perl foo4.pl | wc -l # Thread::Queue duration: 1.544 seconds 50000

      Next is 128 workers retrieving 1.6 million messages.

      # 4 GHz, CentOS 7 ( Perl v5.16.3 ) $ perl foo1.pl | wc -l # Foo::Inbox duration: 12.685 seconds 1600000 $ perl inbox.pl | wc -l # MCE::Inbox w/ blocking capability duration: 15.939 seconds 1600000 $ perl foo2.pl | wc -l # MCE::Shared->queue duration: 21.533 seconds 1600000 $ perl foo4.pl | wc -l # Thread::Queue duration: 90.015 seconds 1600000

      Unfortunately, threads may not scale linearly, especially when involving locking. In top, I see the process peak at 306% max while running. It's far from the 1600% mark hardware limit. On the other hand, MCE::Inbox scales nicely considering it does blocking as well.

      For the 1.6 million test, I added a loop to generate more names, 128 total.

      my @names = shuffle qw/ Barny Betty Fred Wilma /; foreach my $i ( 1 .. 5 ) { push @names, map { $_.$i } @names; print scalar(@names), "\n"; } __END__ 8 16 32 64 128

      There was one other change and replaced 4167 with 98 because there are now 128 workers, not 4.

      ... # send greeting again $inbox->send($name, \@names, 'Hello') if $count < 98; # eventually stop benchmarking last if ++$count == 12500; ...

      Regards, Mario

        MCE::Inbox lives on Github, currently. Depending on whether threads is loaded, it will configure channels using Thread::Queue or MCE::Shared::Queue otherwise. There are examples demonstrating "Chameneos, a Concurrency Game for Java, Ada, and Others" using Perl over here.

        I will make a MCE::Inbox release on CPAN after MCE 1.834 and MCE::Shared 1.835. These provide a fix on Microsoft Windows so that dequeue doesn't involve checking the socket each time if ready. On Windows, I figured a way so that nested-sessions may continue to work and not degrade dequeue performance from Perl 5.20 and up. Basically, workers spawning workers or workers exiting while other workers dequeue items from a queue.

        Cheers, Mario

Re^2: Child process inter communication
by marioroy (Prior) on Jun 18, 2017 at 23:41 UTC

    The demo scripts are based on the one found here. One makes use of MCE::Hobo + Inbox2. The other utilizing threads + Inbox4.

    $ diff hobo_demo.pl threads_demo.pl 5,6c5,6 < use MCE::Hobo; < use Foo::Inbox2; --- > use threads; > use Foo::Inbox4; 10c10 < my $inbox = Foo::Inbox2->new( @names ); --- > my $inbox = Foo::Inbox4->new( @names ); 34c34 < MCE::Hobo->create(\&foo, $_) for @names; --- > threads->create(\&foo, $_) for @names; 44c44 < MCE::Hobo->waitall; --- > $_->join() for threads->list();

    Foo::Inbox2 via MCE::Hobo + MCE::Shared->queue example

    Foo::Inbox4 via threads + Thread::Queue example

    Regards, Mario