Beefy Boxes and Bandwidth Generously Provided by pair Networks
The stupid question is the question not asked

Re^2: How do you parallelize STDIN for large file processing?

by gone2015 (Deacon)
on Feb 06, 2009 at 15:55 UTC ( [id://741929] : note . print w/replies, xml ) Need Help??

in reply to Re: How do you parallelize STDIN for large file processing?
in thread How do you parallelize STDIN for large file processing?

Or, to look at it another way, you are passing the input a line at a time to the worker threads -- so two things are happening:

  1. all of the thread communication overhead is being carried by every line -- I think this is the real issue.

  2. you are streaming the output, so Thread::Pool arranges for the output of each job to be done in job order.

    Having looked at the code, the way it does this is quite cunning: when a do routine completes a job before earlier jobs have completed, the results are put to one side, and the thread loops round to do the next job. When a job completes and all earlier have completed, the thread outputs its results and any results put aside earlier which can now be output.

    Note that this means that if one job takes a very long time indeed, all the other threads will continue taking jobs and generating and storing the output. This is a Good Thing, because slower jobs are not blocking faster ones. However, I don't see a mechanism to stop jobs being dispatched if the streamed output backs up. But that could be because I haven't studied the code hard enough.

    So apart from the cost of all this cunning, I don't think this is the issue.

Without changing the structure of what you have too much, I'd change:

$pool->job($_) while (<STDIN>);
to something that reads a bunch of lines (say a 100K bytes worth), something like:
while (!eof(STDIN)) { my $buffer ; read STDIN, $buffer, 100*1024 or die "$!" ; if (!eof(STDIN)) { my $nl = rindex($buffer, "\n") + 1 or die "No \\n in lump ?" ; if (my $trim = $nl - length($buffer) { seek STDIN, $trim, 1 or die "Seek failed ??" ; substr($buffer, $trim) = '' ; } ; } ; $pool->job($buffer) ; } ;
(I haven't tested this, so caveat emptor !). Then the processing subroutine needs to break its input into lines and process into another buffer, and then return that lump of results. Something like:
sub do { my ($lump) = @_ ; my $result = '' ; while ($lump =~ m/(.+)/g) { # Ignores blank lines my $line = $1 ; ... whatever ... $result .= ...whatever.. ."\n" ; } ; return $result ; } ;
(also untested). There's some saving to be made by removing my ($lump) = @_ and doing while ($_[0] =~ m/(.+)/g) directly instead.

This will amortise the job overhead over a reasonable chunk of lines. Obviously you can experiment to see if larger chunks means faster processing.

BTW, your do subroutine starts:

sub do { chomp; my $input = $_; $input = shift;
which doesn't look right to me. The arguments are passed to do() in the usual way, so hacking around $_ appears redundant.