forsaken75 has asked for the wisdom of the Perl Monks concerning the following question:
Hi,
I am trying to parse a large (4.1 GB) file.
In a nutshell, I accept input via STDIN (for illustration purposes, let's say, cat file | script.pl) and
the perl script takes each line, does black magic to it,
and outputs it to a file.
it is taking too long to do this. After weeks and weeks
of trying various ways of speeding things up,
I think at this point, the fastest way to accomplish what I am doing is to read in STDIN, and throw the lines out to multiple workers. I don't know if this would be expressed
as being "threaded" or fork'd. So STDIN to boss, lines are given to X number of workers, and as each comes back, it prints to the file, and is given the next line. One condition I have is that the lines have to be output to the file in the order they came in.
I think this is possible, but my perl-fu is very, very weak, and searching for this on google for days has left me weary. I have found things that are close, and have gotten very close. I think though that i will collapse from exhaustion, or have a mental breakdown if I don't reach out for help :-)
Thank you.
p.s. I know there is no code here, and no sample data or anything like that. I don't want to get wrapped around the axle on semantics or stylistic differences in regular expressions optimization or anything like that :-)
The input data is not overly complex (say, 100 ascii characters) and the output data is not overly complex (again ~100 ascii characters) no binary data or anything weird like that. If anyone feels exact specifics are necessary than I will happily provide anything required.
Re: How do you parallelize STDIN for large file processing?
by samtregar (Abbot) on Feb 06, 2009 at 03:22 UTC
|
The very easiest way you could do this would be to divide your input file into N pieces and process each piece in parallel by starting your script on that piece. Have each process output to a file of its own and stitch them together at the end. If you start one process per CPU and the black-magic you do is CPU bound you could get something like a linear speedup.
That of course means you can't take data strictly from STDIN, but really that's a silly way to process 4.1GB of data!
If you really, really need to read it from a stream and output it in-order then you'll have to have a parent reading the stream, forking kids and then collecting the results in buffers so you can output in-order. Start with Parallel::ForkManager, which will handle giving out the work and then mix in some IO::Pipe and IO::Select for collecting the results. Be sure you divide the work into sizable chunks, forking a new child for each line isn't going to help very much!
Now go give it a try and don't come back until you have some code to post!
-sam
| [reply] |
Re: How do you parallelize STDIN for large file processing?
by ikegami (Patriarch) on Feb 06, 2009 at 03:27 UTC
|
One way to handle the line ordering is to have each worker send its output to a different file. Have each line preceded with the line number where it resided in the original file. Then just merge the files:
sort --field-separator=,
--key=1,1
--numeric-sort
--merge
worker_output_*
| cut --delimiter=,
--fields=2-
--merge is key.
| [reply] [d/l] [select] |
|
Would Tie::File be helpful?
if you knew which line it came from you could slot right back in where it started.
tie @array, 'Tie::File', filename or die ...;
### at the end
my ($Toad, $LineIn) = $MagicWand -> HocusPocus
$array[$LineIn] = $Toad;
http://search.cpan.org/~mjd/Tie-File-0.96/lib/Tie/File.pm
| [reply] |
|
Definitely not. The goal is to speed things up, not set the hard drive on fire.
| [reply] |
Re: How do you parallelize STDIN for large file processing?
by BrowserUk (Patriarch) on Feb 06, 2009 at 13:25 UTC
|
This is untested on a multicore machine, but should be free from any deadlocking or syncing issues.
And it will definitely need tuning for best performance, but you could try this on your data and see how you fare?
Run as: scriptname inputfile >outputfile
#! perl -sw
use strict;
use threads;
use threads::shared;
use Thread::Queue;
sub worker {
my( $Qin, $outRef ) = @_;
while( my $in = $Qin->dequeue ) {
my( $no, $text ) = split ':', $in, 2;
########## Manipulate data below ########
my $out = $text;
lock $outRef;
$outRef->[ $no ] = $out;
}
}
my @out :shared;
my $Qin = new Thread::Queue;
my @threads = map threads->create( \&worker, $Qin, \@out ), 1 .. 4;
my $last :shared = 0;
async {
while( <> ) {
sleep 1 while $Qin->pending > 100;
$Qin->enqueue( "$.:$_" );
lock $last;
$last = $.;
}
};
my $next = 1;
while( threads->list( threads->running ) ) {
sleep 1 until defined $out[ $next ];
print $out[ $next ];
undef $out[ $next++ ];
}
while( $next < $last ) {
sleep 1 until defined $out[ $next ];
print $out[ $next ];
undef $out[ $next++ ];
}
$_->join for @threads;
Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
"Science is about questioning the status quo. Questioning authority".
In the absence of evidence, opinion is indistinguishable from prejudice.
| [reply] [d/l] [select] |
Re: How do you parallelize STDIN for large file processing?
by forsaken75 (Novice) on Feb 06, 2009 at 08:21 UTC
|
Thank you for all of your responses.
So... more detail is clearly needed.
My input is from tcpdump (reading a capture file)
The standard set of pcap modules from CPAN for reading
the file, I won't say couldn't work, but would be very hard for me to implement as we don't use standard frame types, all custom protocols, so would have to write all my own frame types for the perl modules.
I have been working on getting Parallel::Fork::BossWorker
working, but it spawns never-ending defunct processes,
despite setting the number of workers. There was a bug posted with a patch addressing that, and with that patch applied the perl script does nothing (the child pid's hang) - that I am working on, but Im getting side-tracked.
I am using a slightly modified form of the afterglow tcpdump perl parsing script. By itself, it was outputting data to the output file at around 2 MB/s, so for a 1.5 GB file, it was taking around 12 minutes to finish.
I tried to implement threads, and it was outputting
at around 200 KB/s, so it was much slower (by the way,
my box is an 8 CPU harpertown box with 8GB of ram, and the files are all on an EMC SAN)
I don't blame the threads, and being such a novice at this, im sure it's the way I did it, so I will post the code. Please forgive my fumbling attempts at all of this, following examples and such, it's the best I could figure out. There are some commented out parts that I normally would not leave in when posting code, but im leaving them in as they show that I also (for the heck of it) tried using semaphore locking on the output file to make sure there was no contention on the file by multiple threads (didn't improve anything) - i have also tried file locking which didn't improve anything either. Right now, the
sub that prints the output is locked so only one thread can access the print at a time, but that hasn't had
any effect either, removing it wouldn't change the results.
I have tried select on STDIN, STDOUT, both, and neither, which didn't have any effect either.
I would say as well that this is CPU bound entirely, as the CPU definately pegs at 100%, using the threads, I do see the kernel spreading the load out across the CPU's.
Also, I put the pipe in the perl script, running
tcpdump (blah) | script vs. open (blah |) doesn't change any timings. One other note: I have tried various numbers of workers, with no effect.
Anyway, this has been long enough. FWIW here's the script.
Please be gentle :-)
--- NOTE credits to http://afterglow.sourceforge.net
for their tcpdump2sql.pl script which is what is mostly
what you see below
use strict;
use Thread::Pool;
#use Thread::Semaphore;
open( OUTFILE, ">/crnch_data/foo.csv" ) or die "Cannot open output";
print OUTFILE "\n";
#select(STDIN); $| = 1;
open( STDIN, "/usr/sbin/tcpdump -vttttnner /crnch_data/tcpdump_infile
+|" );
select(OUTFILE); $| = 1;
my $res = "";
my $pool = Thread::Pool->new(
{
workers => 10,
do => \&do,
stream => \&monitor,
}
);
$pool->job($_) while (<STDIN>);
$pool->shutdown;
sub do {
chomp;
my $input = $_;
$input = shift;
if ( $input =~ /(^\d\d\d\d-\d\d-\d\d .*)/ ) {
if ( $input =~
/(.*) \(tos (\S+), ttl +(\d+), id (\d+), offset (\d+), flags ([\S\+]+)
+(?:, ([\S\+]+))?, proto +(\S+ \(\d+\)), length (\d+)(?:, .*?)?\) (.*)
+/
)
{
$input = "$1 $2 $3 $4 $5 $6$7 $8 $9 $10";
}
else {
if ( $input =~
/(.*) (((?:(\d{1,2}|[a-fA-F]{1,2}){2})(?::|-*)){6}) (\>) (((?:(\d{1,2}
+|[a-fA-F]{1,2}){2})(?::|-*)){6}), (.*?), length (\d+)\:(.*)/
)
{
$input = "$1 $2 x $6 x x x x $10";
}
else {
$res = "error";
}
}
}
else {
$res = "error";
}
my @fields = split( " ", $input );
my $timestamp = $fields[0] . " " . $fields[1];
my $microsecond = $fields[1];
$timestamp =~ s/(.*?)\.\d+$/\1/;
$microsecond =~ s/(.*?)\.(\d+$)/\2/;
my $sourcemac = $fields[2];
my $destmac = $fields[4];
$destmac =~ s/,//g;
$fields[4] =~ s/,$//;
my $len = $fields[9];
$len =~ s/:$//;
my $tos = $fields[11];
my $ttl = $fields[12];
my $id = $fields[13];
my $offset = $fields[14];
my $ipflags = $fields[15] . " " . $fields[16];
$ipflags =~ s/\[(.*)\]/\1/g;
my $sip = $fields[18];
$sip =~ s/([^\.]+\.[^\.]+\.[^\.]+\.[^\.]+).*/\1/;
$sip =~ s/:$//;
my $sport = $fields[18];
if ( $sport =~ /[^\.]+\.[^\.]+\.[^\.]+\.[^\.]+\.(.*)/ ) {
$sport = $1;
$sport =~ s/:$//;
}
else { $sport = "null"; }
my $dip = $fields[20];
$dip =~ s/([^\.]+\.[^\.]+\.[^\.]+\.[^\.]+).*/\1/;
$dip =~ s/:$//;
my $dport = $fields[20];
if ( $dport =~ /[^\.]+\.[^\.]+\.[^\.]+\.[^\.]+\.(.*)/ ) {
$dport = $1;
$dport =~ s/:$//;
}
else { $dport = "null"; }
my $proto = "null";
my $flags = $fields[21] if ( $fields[21] =~ /[SRPU.]+/ );
my $proto = "tcp" if ( $fields[21] =~ /[SRPU.]+/ );
$_ = "//$timestamp//$microsecond//$sourcemac//$destmac//$sip//$dip
+//$sport//$dport//$proto//$flags//$len//$ttl//$id//$tos//$ipflags//$o
+ffset?";
}
sub monitor : locked {
my $line = $_;
# my $semaphore = new Thread::Semaphore;
$line = shift;
unless ( $line eq "error" ) {
# $semaphore->down;
print OUTFILE $line;
# $semaphore->up;
}
}
close(OUTFILE);
| [reply] [d/l] |
|
The problem is that every worker writes to the file.
As written before you should bundle the writting in one seperate thread so that you need no symchronisation for the file.
| [reply] |
|
| [reply] |
|
|
|
Here's a cut down of some code i'm using which is based on Parallel::Fork::BossWorker to speed up some processing.
You'll need to monkey with it to get it to do what you want...
my $Worker_count = 2;
fork_workers();
sub autoflush { select((select(shift), $| = 1)[0]) }
sub fork_workers {
# worker sends pid to boss when ready for more work
pipe my $from_workers, my $to_boss;
autoflush($to_boss);
my %workers;
foreach (1 .. $Worker_count) {
# pipe for boss to send 'work' to worker
pipe my $from_boss, my $to_worker;
# pipe for worker to send message back to boss
pipe my $from_worker2, my $to_boss2;
autoflush($to_worker);
autoflush($to_boss2);
my $pid = fork;
die "failed to fork: $!" unless defined $pid;
if ($pid) {
$workers{$pid} = [$to_worker, $from_worker2];
close $from_boss;
close $to_boss2;
} else {
close $from_workers;
close $from_worker2;
close $to_worker;
close STDIN;
send_msg($to_boss, $$);
worker($from_boss, $to_boss, $to_boss2);
exit;
}
}
close $to_boss;
my $INT_handler = $SIG{INT};
local $SIG{INT} = sub {
close $from_workers;
for (keys %workers) {
my ($to_worker, $from_worker2) = @{$workers{$_}};
close $to_worker;
close $from_worker2;
}
$INT_handler->('INT');
};
my @work = get_work(); # ???
while (my $pid = receive_msg($from_workers)) {
my ($to_worker, $from_worker2) = @{$workers{$pid}};
my $msg = receive_msg($from_worker2);
if (@work) {
my $whatever = something(@work);
send_msg($to_worker, $whatever);
@work = ();
} else {
close $to_worker;
close $from_worker2;
delete $workers{$pid};
}
if (length $msg) {
# something
}
}
close $from_workers;
}
sub worker {
my ($from_boss, $to_boss, $to_boss2) = @_;
while (my $whatever = receive_msg($from_boss)) {
my $msg = something_else($whatever);
send_msg($to_boss, $$);
send_msg($to_boss2, $msg);
}
}
sub receive_msg {
my $fh = shift;
local $/ = "\003\000";
my $msg = <$fh>;
chomp $msg if defined $msg;
return $msg;
}
sub send_msg {
my ($fh, $msg) = @_;
print $fh $msg, "\003\000";
}
There is one issue with the above mechanism that i'd be interested in having resolved... is it _safe_ for multiple child proceses to write to the same pipe? In the above code the send_msg($to_boss, $$) is used by each child to notify the parent that it has something for it to process. The code used by Parallel::Fork::BossWorker allows the children to write arbitrarily large amounts to this pipe, but i found that this could occasionally result in corrupted data when writing large amounts of data.
However by changing to the scheme used in the above code of only sending the pid (a small amount of data) down the shared pipe i've yet to see any corruption.
So... is it safe to write small amounts (less than some internal perlio buffer size) to a shared pipe; or, is it unsafe, and i've just yet to hit the problem?
Thanks for any tips! :-)
alex.
| [reply] [d/l] [select] |
|
| [reply] |
|
Or, to look at it another way, you are passing the input a line at a time to the worker threads -- so two things are happening:
all of the thread communication overhead is being carried by every line -- I think this is the real issue.
you are streaming the output, so Thread::Pool arranges for the output of each job to be done in job order.
Having looked at the code, the way it does this is quite cunning: when a do routine completes a job before earlier jobs have completed, the results are put to one side, and the thread loops round to do the next job. When a job completes and all earlier have completed, the thread outputs its results and any results put aside earlier which can now be output.
Note that this means that if one job takes a very long time indeed, all the other threads will continue taking jobs and generating and storing the output. This is a Good Thing, because slower jobs are not blocking faster ones. However, I don't see a mechanism to stop jobs being dispatched if the streamed output backs up. But that could be because I haven't studied the code hard enough.
So apart from the cost of all this cunning, I don't think this is the issue.
Without changing the structure of what you have too much, I'd change:
$pool->job($_) while (<STDIN>);
to something that reads a bunch of lines (say a 100K bytes worth), something like: while (!eof(STDIN)) {
my $buffer ;
read STDIN, $buffer, 100*1024 or die "$!" ;
if (!eof(STDIN)) {
my $nl = rindex($buffer, "\n") + 1 or die "No \\n in lump ?" ;
if (my $trim = $nl - length($buffer) {
seek STDIN, $trim, 1 or die "Seek failed ??" ;
substr($buffer, $trim) = '' ;
} ;
} ;
$pool->job($buffer) ;
} ;
(I haven't tested this, so caveat emptor !). Then the processing subroutine needs to break its input into lines and process into another buffer, and then return that lump of results. Something like: sub do {
my ($lump) = @_ ;
my $result = '' ;
while ($lump =~ m/(.+)/g) { # Ignores blank lines
my $line = $1 ;
... whatever ...
$result .= ...whatever.. ."\n" ;
} ;
return $result ;
} ;
(also untested). There's some saving to be made by removing my ($lump) = @_ and doing while ($_[0] =~ m/(.+)/g) directly instead.
This will amortise the job overhead over a reasonable chunk of lines. Obviously you can experiment to see if larger chunks means faster processing.
BTW, your do subroutine starts: sub do {
chomp;
my $input = $_;
$input = shift;
which doesn't look right to me. The arguments are passed to do() in the usual way, so hacking around $_ appears redundant.
| [reply] [d/l] [select] |
|
You are suffering from NOT buffering. This becomes obvious by following the data flow of the pipe in upstream direction:
1) Your perl script accepts the next line of input.
2) Tcpdump refills the pipe buffer and blocks.
3) Tcpdump requests a new "line" from tcpdump_infile and transforms it .
As a consequence piping as a cheap form of using more processors does not work efficiently. It results in small read requests to the disk of one line each. The delay between the small requests starves tcpdump and your perl script. You should optimise the earliest bottleneck first.
All disks like large read requests, so the solution is to add large buffers (e. g. 100 MB each) before and after tcpdump. This allows disk reading, tcpdump and the perl script to run in parallel. I like to use mbuffer for the task. Your input pipe could look like
mbuffer -i tcpdump_infile -m 100M -P90 -p10 | \
tcpdump |\
mbuffer -m 100M -P90 -p10 | \
perl
This gives you 100 MB buffers that write ou a large chunk when 90% full and start reading again when 10% full. | [reply] [d/l] |
|
| [reply] |
Re: How do you parallelize STDIN for large file processing?
by GrandFather (Saint) on Feb 06, 2009 at 03:22 UTC
|
Define "too long".
What have you used to profile your current code and what are the bottle necks?
How long does a simple script that copies STDIN to STDOUT take on your system compared to the time you have available for the task?
Perl's payment curve coincides with its learning curve.
| [reply] |
Re: How do you parallelize STDIN for large file processing?
by weismat (Friar) on Feb 06, 2009 at 05:53 UTC
|
If you want to go the thread way:
Create two queues - one for read data and one for write data.(Thread::Queue)
Start x workers as detached (I would make this a parameter) and a result writer also detached.(threads)
Workers read from the inqueue and send results to the result queue.
Program terminates either when lines read = lines written or when the result queue is empty for x seconds.
The main question is the bottleneck, but I have used this approach quite frequently on multicore machines with success. | [reply] |
Re: How do you parallelize STDIN for large file processing?
by bjmckenz (Initiate) on Feb 06, 2009 at 06:31 UTC
|
Perhaps I'm missing something. If it's a static file (that is, not an input stream) you can start multiple worker processes each with a different byte offset into the file. Since you've established that it's newline-based, you can skip bytes until after the first newline (except for the worker at offset 0) to guarantee that you're processing a line at a time. Then your workers each build a "chapter" of output which you can then cat together later. These workers can be processes or threads. "Simple" seeks get you to the offset.
The reason I say this is that it's not clear if accepting pipe-ish data (STDIN) is part of the problem statement or part of the solution (a design approach). If the former...nevermind.
| [reply] |
|
|