http://qs321.pair.com?node_id=369584

BazB has asked for the wisdom of the Perl Monks concerning the following question:

Salutations fellow monks.

I have some code which is designed to read arguments from a CSV configuration file and fork a number of child processes to call a subroutine reference with the arguments from the configuration file in the child.
If there are more lines in the configuration file than the maximum number of allowed concurrent child processes, the code should wait for at least one of the current children to complete, then read another line of arguments from the configuration file and start a new child.

Unfortunately, this isn't exactly what happens.

The code starts the initial children and waits on them finishing. As one child process finishes, another is started as expected.
It all goes wrong when the last line of the configuration file is read. Rather than jump out of the while loop and just wait for all the children to finish, the code seems to start back at the start of the configuration file and kicks off children all over again!

I've got a mix of while(){...} and do {...} until() loops, but I still can't see how that would make any difference.

Can anyone spot the problem? The following code is rather on the long side, but should demonstrate the problem:

#!/usr/bin/perl use strict; use warnings; use POSIX ':sys_wait_h'; use constant MAX_KIDS => 5; my %config; $config{'sleep_time'} = 2; sub fork_proc { # Fork a new process and execute subroutine within child. my $command = shift; my @args = @_; die "Subroutine reference not specified" unless $command; FORK: { my $pid = fork(); if ($pid) { # Parent - successful fork. return $pid; } elsif (defined $pid) { # Child my $status = &$command(@args); } elsif ($! =~ /No more process/) { # Temporary error forking. Wait and retry. sleep 1; redo FORK; } else { # Bad error die "Fork failed: $!"; } } } sub wait_for_pids { my %pids = map { $_ => 1 } grep { ! /^\s*$/ } @_; my %failed; while (keys %pids) { foreach my $pid ( keys %pids ) { my $reaped = waitpid($pid, WNOHANG); if ($reaped > 0) { my $rc = $?; delete $pids{$reaped}; my $exit = $rc >> 8; if ( $exit) { warn "Nonzero exit code returned by pid $reaped"; $failed{$reaped}++; } } sleep $config{'sleep_time'}; # Don't waste processor time spinni +ng. } } if ( keys %failed ) { die "Non-zero exit code returned by the following PIDs: ", join(", + ", keys %failed), ". Stopping"; } } sub parallel_update { # This subroutine takes the following arguments: # code: a subroutine reference # config: a configuration filename (the file must containing a comma # separated list of arguments to the subroutine passed in) # max_kids: maximum number of child processes to run concurrently. # A number of processes will be forked, upto the maximum specified # in the max_kids argument, and each process will execute the # subroutine with the arguments given in the line read from the # configuration file. # As processes finish, they are reaped and a new process started # until all lines in the configuration file have been processed. my %arg = @_; die "Subroutine reference not specified" unless $arg{'c +ode'}; die "Configuration filename not specified" unless $arg{'c +onfig'}; die "Maximum number of child processes not specified" unless $arg{'m +ax_kids'}; my %pids; # track PIDs of child processes. open CONF, "<", $arg{'config'} or die "Unable to open configuration +file: $!"; while (<CONF>) { last unless defined $_; next if /^$/; chomp; my @cfg = split /,/, $_; # arguments from config file. if (keys %pids < $arg{'max_kids'}) { # start new process, store new PID. my $pid = fork_proc( $arg{'code'}, @cfg); $pids{$pid}++; } else { # wait on at least one currently running child exiting. check # exit code and delete PID from list of currently running child # processes. my $reaped; do { foreach my $pid ( keys %pids ) { $reaped = waitpid($pid, WNOHANG); if ($reaped > 0) { my $rc = $?; delete $pids{$reaped}; my $exit = $rc >> 8; print "$reaped returned $exit.\n"; die "Nonzero exit code returned by pid $reaped. Stopped" i +f $exit; } } sleep $config{'sleep_time'}; # Don't waste processor time spin +ning. } until ($reaped > 0); # start new process, store new PID. my $pid = fork_proc( $arg{'code'}, @cfg); $pids{$pid}++; } } close CONF or die "Problems closing configuration file: $!"; # Cleanup the last few child processes as they finish. wait_for_pids( keys %pids ); } # -------------------------------------------------------------------- +---------- my $code = sub { print "$$ started. Got arguments: ", "@_", "\n"; sleep 2; print "Foobar!\n"; exit 0; }; parallel_update( max_kids => MAX_KIDS, config => "foo.csv", code => $code, );

The CSV file used as test input ("foo.csv") is as follows:

a,b c,d e,f g,h i,j k,l m,n o,p q,r s,t u,v w,y x,z a1,b1 c1,d1 e1,f1 g1,h1 i1,j1 k1,l1 m1,n1 o1,p1 q1,r1 s1,t1 u1,v1 w1,y1 x1,z1 a2,b2 c2,d2 e2,f2 g2,h2 i2,j2 k2,l2 m2,n2 o2,p2 q2,r2 s2,t2 u2,v2 w2,y2 x2,z2

Update: BTW, I'm running v5.6.1 built for sun4-solaris.


If the information in this post is inaccurate, or just plain wrong, don't just downvote - please post explaining what's wrong.
That way everyone learns.