my $num_workers = 5; use SexCoder; use MCE::Hobo; use MCE::Shared; my $input_q = MCE::Shared->queue( fast => 1 ); my $output_q = MCE::Shared->queue( fast => 1 ); mce_open my $OUT, ">>", \*STDOUT or die "open error: $!"; mce_open my $ERR, ">>", \*STDERR or die "open error: $!"; MCE::Hobo->create({ posix_exit => 1 }, 'input_task', $_) for 1 .. $num_workers; # open the files, etc... - not shown here ######### MAIN INPUT LOOP while (my $row = readdata($fhin, \$dbfrownum, \$inbuffer, @sortedfields)) { $input_q->enqueue($row); } ######### END OF INPUT LOOP # exit worker threads (flag end of input queue) $input_q->end(); ######### MAIN OUTPUT LOOP while(my $result = $output_q->dequeue()) { if (exists $result->{finished}) { MCE::Hobo->waitone; $output_q->end unless MCE::Hobo->pending; next; } output_row($result); # THIS OUTPUTS A ROW } ######### END OF MAIN OUTPUT LOOP exit; sub input_task { # THIS CALLS SOME JAVA CODE TO CREATE THE CONNECTION TO DATA SERVICES $sexcoder=SexCoder->new("$ds_server", 4000); while (my $row = $input_q->dequeue()) { my $outrow = mapdata($row); my @result = ($reject, $row, $outrow); $output_q->enqueue(\@result); } # THIS DESTROYS THE BLESSED OBJECT, CLOSING THE CONNECTION $sexcoder = undef; $output_q->enqueue({ finished => $$ }); }