Beefy Boxes and Bandwidth Generously Provided by pair Networks
Perl Monk, Perl Meditation
 
PerlMonks  

Re: Fork Pool?

by jmanning2k (Pilgrim)
on Oct 08, 2003 at 14:47 UTC ( [id://297613]=note: print w/replies, xml ) Need Help??


in reply to Fork Pool?

I've mentioned this before, but I like the forking code in SpamAssassin. It's licensed under both the GPL and the Perl Artistic license, so it's pretty flexible.

See the function &start_children in this module. It uses IO::Socket to communicate with each child process. $opt_j is a flag for max threads.

The reap_children function is also useful.

Just write a function to "&do_something" for each child (replacing the $self->run_message line), then modify the parent code to supply your data to each child process.

You can start off a pool of 50 or 100 child processes, then they'll run until all your jobs are complete.

Update:I compared the current SA code to the code I typically use now (originally derived from the SA code). My version is less tied to their model of doing things, and might be more flexible/easier to understand. Besides, a concrete example makes what I was trying to say more clear.

#array of simple data types. One word hash keys, etc. my @inputdata = qw(key1 key2 key3 key4); my $opt_j = 10; #10 threads # Worker function sub do_processing { my $indata = shift; ## You have to pass a simple value, but here is ## where you get the full data set. # my $href = getData($indata); # Do your processing return "SUCCESS $indata"; } ## Main code ## Begin fork pool print "Using $opt_j threads!\n"; my $io = IO::Socket->new(); my $select = IO::Select->new(); my @child; my @parent; my @pid; # create children + for (my $i = 0; $i < $opt_j; $i++) { ($child[$i],$parent[$i]) = $io->socketpair(AF_UNIX,SOCK_STREAM,PF_ +UNSPEC) or die "socketpair failed: $!"; if ($pid[$i] = fork) { close $parent[$i]; $select->add($child[$i]); next; } elsif (defined $pid[$i]) { # Child routine #print "Child $i started\n"; my $result; my $line; close $child[$i]; print { $parent[$i] } "START\n"; while ($line = readline $parent[$i]) { chomp $line; if ($line eq "exit") { print { $parent[$i] } "END\n"; exit; } if ($line =~ /^DATA (\w+)/) { my $data = $1; #print "Child $i processing $data\n"; $result = &do_processing($data); print { $parent[$i] } "$result\nRESULT $data\n"; } } exit; ## End child } else { die "cannot fork: $!"; } } # feed childen + my $done = 0; while (@inputdata || $done < $opt_j) { foreach my $socket ($select->can_read()) { my $result; my $line; while ($line = readline $socket) { if ($line eq "END\n") { $done++; last; } if ($line =~ /^RESULT (\w+)/ || $line eq "START\n") { #print "Got $line"; # Got result, if there are more, then run more. print { $socket } (@inputdata ? "DATA " . (shift @inputdata) + : "exit") . "\n"; if ($result) { chomp $result; # need to chop the \n before RESULT # process result - should split from function. if(defined($1)) { #print "$\n"; print "$1: $result\n"; $result = ""; } } last; } $result .= $line; } } } # reap children + for (my $i = 0; $i < $opt_j; $i++) { waitpid($pid[$i], 0); }
~Jon

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://297613]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others browsing the Monastery: (4)
As of 2024-04-24 18:54 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found