I've mentioned this before, but I like the forking code in SpamAssassin. It's licensed under both the GPL and the Perl Artistic license, so it's pretty flexible.
See the function &start_children in this module. It uses IO::Socket to communicate with each child process. $opt_j is a flag for max threads.
The reap_children function is also useful.
Just write a function to "&do_something" for each child (replacing the $self->run_message line), then modify the parent code to supply your data to each child process.
You can start off a pool of 50 or 100 child processes, then they'll run until all your jobs are complete.
Update:I compared the current SA code to the code I typically use now (originally derived from the SA code). My version is less tied to their model of doing things, and might be more flexible/easier to understand. Besides, a concrete example makes what I was trying to say more clear.
#array of simple data types. One word hash keys, etc.
my @inputdata = qw(key1 key2 key3 key4);
my $opt_j = 10; #10 threads
# Worker function
sub do_processing {
my $indata = shift;
## You have to pass a simple value, but here is
## where you get the full data set.
# my $href = getData($indata);
# Do your processing
return "SUCCESS $indata";
}
## Main code
## Begin fork pool
print "Using $opt_j threads!\n";
my $io = IO::Socket->new();
my $select = IO::Select->new();
my @child;
my @parent;
my @pid;
# create children
+
for (my $i = 0; $i < $opt_j; $i++) {
($child[$i],$parent[$i]) = $io->socketpair(AF_UNIX,SOCK_STREAM,PF_
+UNSPEC)
or die "socketpair failed: $!";
if ($pid[$i] = fork) {
close $parent[$i];
$select->add($child[$i]);
next;
}
elsif (defined $pid[$i]) {
# Child routine
#print "Child $i started\n";
my $result;
my $line;
close $child[$i];
print { $parent[$i] } "START\n";
while ($line = readline $parent[$i]) {
chomp $line;
if ($line eq "exit") {
print { $parent[$i] } "END\n";
exit;
}
if ($line =~ /^DATA (\w+)/) {
my $data = $1;
#print "Child $i processing $data\n";
$result = &do_processing($data);
print { $parent[$i] } "$result\nRESULT $data\n";
}
}
exit;
## End child
}
else {
die "cannot fork: $!";
}
}
# feed childen
+
my $done = 0;
while (@inputdata || $done < $opt_j) {
foreach my $socket ($select->can_read()) {
my $result;
my $line;
while ($line = readline $socket) {
if ($line eq "END\n") {
$done++;
last;
}
if ($line =~ /^RESULT (\w+)/ || $line eq "START\n") {
#print "Got $line";
# Got result, if there are more, then run more.
print { $socket } (@inputdata ? "DATA " . (shift @inputdata)
+ : "exit") . "\n";
if ($result) {
chomp $result; # need to chop the \n before RESULT
# process result - should split from function.
if(defined($1)) {
#print "$\n";
print "$1: $result\n";
$result = "";
}
}
last;
}
$result .= $line;
}
}
}
# reap children
+
for (my $i = 0; $i < $opt_j; $i++) {
waitpid($pid[$i], 0);
}
~Jon