Beefy Boxes and Bandwidth Generously Provided by pair Networks
Keep It Simple, Stupid
 
PerlMonks  

Return all the data from child to parent with Parallel::Forkmanager

by Microcebus (Beadle)
on Aug 18, 2017 at 12:14 UTC ( [id://1197603]=perlquestion: print w/replies, xml ) Need Help??

Microcebus has asked for the wisdom of the Perl Monks concerning the following question:

Dear wise monks,
sorry if that is a stupid question but I'm a biologist rather than a coder... I want to analyze some biological data. Therefore, I need to read and analyze 4 large files before I can actually analyze my own data. To save time I would like to read the 4 files in parallel using Parallel::Forkmanager. My problem is that I have no idea (even after google search) how to return the data back to the parent. Each subroutine generates a lot of different data structures such as hashes and arrays that I need later on in the parent process. Below is the code that I currently try.
use Parallel::ForkManager; $threads=4; if($threads==1) { read_genome(); read_mapfile(); read_GTF(); read_RM(); } else { @eval_code=("read_genome();","read_mapfile();","read_GTF();","read +_RM();"); my$pm=new Parallel::ForkManager($threads); foreach$eval_code(@eval_code) { $pm->start and next; eval$eval_code; $pm->finish; } $pm->wait_all_children; } sub read_genome { # do something } sub read_mapfile; { # do something } sub read_GTF { # do something } sub read_RM { # do something } # use data generated in the subroutines
  • Comment on Return all the data from child to parent with Parallel::Forkmanager
  • Download Code

Replies are listed 'Best First'.
Re: Return all the data from child to parent with Parallel::Forkmanager
by Corion (Patriarch) on Aug 18, 2017 at 12:28 UTC

    fork (and in turn, Parallel::ForkManager) doesn't return data to the parent process and has no provisions to do so.

    The easiest approach would be to have each client write out the results to a file, and then read each file in turn in the parent. The run_on_finish callback allows you to conveniently know when to collect more data in the parent:

    use JSON::XS 'encode_json', 'decode_json'; $pm->run_on_finish(sub { my ($pid, $exit_code, $filename) = @_; print "** $ident was just written"; open my $fh, '<', $filename or die "Couldn't read child results +from '$filename': $!"; binmode $fh; local $/; my $results = <$fh>; # Convert results back to the data structure: $results = json_decode( $results ); # now, merge those results with the other results you already ha +ve }); ... for my $task (@tasks) { my $filename = "some filename based on $task"; $pm->start( $filename ) and next; ... do the work open my $fh, '>', $filename or die "Couldn't write output file '$filename': $!"; binmode $fh; print $fh json_encode(\%results); };
      fork (and in turn, Parallel::ForkManager) doesn't return data to the parent process and has no provisions to do so.

      Nearly right. fork doesn't, but recent versions of Parallel::ForkManager (starting at 0.7.6, released 2010-Aug-15) do allow returning a single data structure, that must be a reference to a string, a hash, or an array. See "RETRIEVING DATASTRUCTURES from child processes" in the documentation of Parallel::ForkManager. What happens there is quite similar to your JSON approach: Parallel::ForkManager serialises the data in the child process using Storable, writes it to disk, and reads it back in the parent, all nearly transparent to the user.

      Alexander

      --
      Today I will gladly share my knowledge and experience, for there are no sweeter words than "I told you so". ;-)
Re: Return all the data from child to parent with Parallel::Forkmanager
by QM (Parson) on Aug 18, 2017 at 13:18 UTC
    While your question of child processes sending data to parents is perfectly reasonable, I would challenge the assumption that reading 4 large files in parallel will be any faster than reading them sequentially, unless they are on different disks in the same system.

    Perl's IO is well-optimized, and I've generally found that the disk throughput is the bottleneck.

    On another tack, if there are 4 different file types, and a different script is needed for each, then you may find some benefit in reading/reducing/writing out to interim files, which can be read by the final script. But the interim files should probably be 100 to 1000 times smaller than the originals. You will find network bandwidth the limiting factor unless the interim files are much smaller than the originals. There are several ways to serialize data that can be restored to native Perl data structures.

    Perl Maven has a nice list of serializers to get you started.

    Finally, DBM::Deep will store a Perl data structure on disk, as if it were in memory, but the access speed is much slower. I used this for an in-memory data structure that was larger than the virtual memory, and it worked very well.

    -QM
    --
    Quantum Mechanics: The dreams stuff is made of

Re: Return all the data from child to parent with Parallel::Forkmanager
by tybalt89 (Monsignor) on Aug 18, 2017 at 17:40 UTC

    Here's a way that avoids (for no particular good reason ) writing disk files by using perl's forked open. It also avoids eval (for maybe slightly better reasons ).

    #!/usr/bin/perl # http://perlmonks.org/?node_id=1197603 use strict; use warnings; use Data::Dumper; use Storable qw( freeze thaw ); use IO::Select; my $sel = IO::Select->new; my %returndata; # this way you don't need to do an eval my @subs = (\&read_genome, \&read_mapfile, \&read_GTF, \&read_RM); for my $sub (@subs) # start all forks { if(open my $fh, '-|') { $sel->add($fh); $returndata{$fh} = ''; } else # child { print freeze $sub->(); exit; } } while( $sel->count ) # get return data { for my $fh ( $sel->can_read ) { if( 0 >= sysread $fh, $returndata{$fh}, 16 * 1024, length $returndata{$fh} ) { my $answer = thaw delete $returndata{$fh}; $sel->remove($fh); print Dumper $answer; # or whatever you want to do with it } } } sub read_genome { # do something select undef, undef, undef, .1 + rand 1; # simulate processing tim +e return { from => 'read_genome', results => { 1..4} }; } sub read_mapfile { # do something select undef, undef, undef, .1 + rand 1; # simulate processing tim +e return { from => 'read_mapfile', results => { 5..8} }; } sub read_GTF { # do something select undef, undef, undef, .1 + rand 1; # simulate processing tim +e return { from => 'read_GTF', results => { 1..10} }; } sub read_RM { # do something select undef, undef, undef, .1 + rand 1; # simulate processing tim +e return { from => 'read_RM', results => { 2..5} }; }

    Prints (in a different order each time it's run)

    $VAR1 = { 'results' => { '2' => 3, '4' => 5 }, 'from' => 'read_RM' }; $VAR1 = { 'results' => { '5' => 6, '1' => 2, '9' => 10, '7' => 8, '3' => 4 }, 'from' => 'read_GTF' }; $VAR1 = { 'from' => 'read_genome', 'results' => { '3' => 4, '1' => 2 } }; $VAR1 = { 'from' => 'read_mapfile', 'results' => { '7' => 8, '5' => 6 } };

      Hi tybalt89,

      Using your solution, I tried adding exception support in the event the worker died.

      #!/usr/bin/perl use strict; use warnings; use feature 'say'; use Storable qw( freeze thaw ); use IO::Select; my $threads = 4; my $sel = IO::Select->new; my %seldata; my @name = ('read_genome', 'read_mapfile', 'read_GTF', 'read_RM'); my @task = (\&read_genome, \&read_mapfile, \&read_GTF, \&read_RM); my %ret; if ($threads == 1) { for my $id (0 .. $#task) { $ret{$id} = {}; $ret{$id}{result} = eval { $task[$id]->() }; $ret{$id}{error} = $@; } } else { # start all forks for my $id (0 .. $#task) { if (open my $fh, '-|') { $sel->add($fh); # parent $seldata{$fh} = ''; } else { my $data = {}; # child $data->{id} = $id; $data->{result} = eval { $task[$id]->() }; $data->{error} = $@; print freeze $data; exit; } } # acquire data while ( $sel->count ) { for my $fh ( $sel->can_read ) { if ( 0 >= sysread $fh, $seldata{$fh}, 16 * 1024, length $seldata{$fh} ) { my $answer = thaw delete $seldata{$fh}; $sel->remove($fh); $ret{ $answer->{id} } = { result => delete $answer->{result}, error => delete $answer->{error} }; } } } } sub read_genome { # do something return { 'aa' => 'bb' }; } sub read_mapfile { # do something return { 'cc' => 'dd' }; } sub read_GTF { # do something die 'exception'; return { 'ee' => 'ff' }; # not reached } sub read_RM { # do something return { 'gg' => 'hh' }; } # use data generated in the subroutines use Data::Dumper; for my $id (0 .. $#task) { say "## ", $name[$id]; if (length $ret{$id}{error}) { say "ERROR: ", $ret{$id}{error}; next; } say Dumper($ret{$id}{result}); }

      Output:

      ## read_genome $VAR1 = { 'aa' => 'bb' }; ## read_mapfile $VAR1 = { 'cc' => 'dd' }; ## read_GTF ERROR: exception at j0.pl line 71. ## read_RM $VAR1 = { 'gg' => 'hh' };

      Regards, Mario

Re: Return all the data from child to parent with Parallel::Forkmanager
by Anonymous Monk on Aug 18, 2017 at 13:48 UTC
    @eval_code=("read_genome();","read_mapfile();","read_GTF();","read +_RM();"); foreach$eval_code(@eval_code) { eval$eval_code; }
    Yikes! Since other monks have answered your actual question, I'm just going to point out that there's a better way to do this:
    my @subs = (\&read_genome, \&read_mapfile, \&read_GTF, \&read_RM); foreach my $sub (@subs) { &$sub(); }

      Also, why not just:

      my $ok = eval { read_genome(); read_mapfile(); read_GTF(); read_RM(); 1; }; if( ! $ok ) { warn "Got error while reading: $@"; };
        ... because the whole purpose of this exercise was to try to read the files in parallel?
Re: Return all the data from child to parent with Parallel::Forkmanager
by marioroy (Prior) on Aug 19, 2017 at 04:01 UTC

    Hi Microcebus,

    You were close. Notice the arguments given to $pm->start and $pm->finish. Start accepts an optional identifier for the process. That same identifier is used as the 3rd argument for the on_finish callback. In the demonstration that follows, I puposely have read_GTF die with an exception. The output is the same whether threads equals 1 or 4.

    Like afoken mentioned here, Parallel::ForkManager must be 0.7.6 or later for the demonstration to run.

    use strict; use warnings; use feature 'say'; use Parallel::ForkManager; my $threads = 4; my @name = ('read_genome', 'read_mapfile', 'read_GTF', 'read_RM'); my @task = (\&read_genome, \&read_mapfile, \&read_GTF, \&read_RM); my (@ret, @err); if ($threads == 1) { for my $id (0 .. $#task) { $ret[$id] = eval { $task[$id]->() }; $err[$id] = $@; } } else { my $pm = new Parallel::ForkManager($threads); $pm->set_waitpid_blocking_sleep(0); $pm->run_on_finish( sub { my ($pid, $exit, $id, $signal, $core, $data) = @_; $ret[$id] = delete $data->{ret}; $err[$id] = delete $data->{err}; }); for my $id (0 .. $#task) { $pm->start($id) and next; my $res = eval { $task[$id]->() }; $pm->finish(0, { ret => $res, err => $@ }); } $pm->wait_all_children; } sub read_genome { # do something return { 'aa' => 'bb' }; } sub read_mapfile { # do something return { 'cc' => 'dd' }; } sub read_GTF { # do something die 'exception'; return { 'ee' => 'ff' }; # not reached } sub read_RM { # do something return { 'gg' => 'hh' }; } # use data generated in the subroutines use Data::Dumper; for my $id (0 .. $#task) { say "## ", $name[$id]; if (length $err[$id]) { say "ERROR: ", $err[$id]; next; } say Dumper($ret[$id]); }

    Output:

    ## read_genome $VAR1 = { 'aa' => 'bb' }; ## read_mapfile $VAR1 = { 'cc' => 'dd' }; ## read_GTF ERROR: exception at j1.pl line 54. ## read_RM $VAR1 = { 'gg' => 'hh' };

    Regards, Mario

      Hi again,

      The following provides a MCE::Loop demonstration. This time, am using a hash for storing the returned data. Workers pass data using socket handles inside MCE. Thus, no temp files are made for sending data to the parent process.

      use strict; use warnings; use feature 'say'; use MCE::Loop; my $threads = 4; my @name = ('read_genome', 'read_mapfile', 'read_GTF', 'read_RM'); my @task = (\&read_genome, \&read_mapfile, \&read_GTF, \&read_RM); my %ret; if ($threads == 1) { for my $id (0 .. $#task) { $ret{"$id"} = eval { $task[$id]->() }; $ret{"$id:err"} = $@; } } else { MCE::Loop->init( max_workers => $threads, posix_exit => 1, chunk_size => 1 ); %ret = mce_loop { my $id = $_; my $res = eval { $task[$id]->() }; MCE->gather( $id => $res, "$id:err" => $@ ); } [ 0 .. $#task ]; MCE::Loop->finish; } sub read_genome { # do something return { 'aa' => 'bb' }; } sub read_mapfile { # do something return { 'cc' => 'dd' }; } sub read_GTF { # do something die 'exception'; return { 'ee' => 'ff' }; # not reached } sub read_RM { # do something return { 'gg' => 'hh' }; } # use data generated in the subroutines use Data::Dumper; for my $id (0 .. $#task) { say "## ", $name[$id]; if (length $ret{"$id:err"}) { say "ERROR: ", $ret{"$id:err"}; next; } say Dumper($ret{$id}); }

      Output:

      ## read_genome $VAR1 = { 'aa' => 'bb' }; ## read_mapfile $VAR1 = { 'cc' => 'dd' }; ## read_GTF ERROR: exception at j2.pl line 45, <__ANONIO__> line 2. ## read_RM $VAR1 = { 'gg' => 'hh' };

      Regards, Mario

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: perlquestion [id://1197603]
Approved by marto
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others imbibing at the Monastery: (4)
As of 2024-04-23 16:00 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found