Beefy Boxes and Bandwidth Generously Provided by pair Networks
Think about Loose Coupling
 
PerlMonks  

How do you parallelize STDIN for large file processing?

by forsaken75 (Novice)
on Feb 06, 2009 at 03:08 UTC ( [id://741775]=perlquestion: print w/replies, xml ) Need Help??

forsaken75 has asked for the wisdom of the Perl Monks concerning the following question:

Hi, I am trying to parse a large (4.1 GB) file. In a nutshell, I accept input via STDIN (for illustration purposes, let's say, cat file | script.pl) and the perl script takes each line, does black magic to it, and outputs it to a file. it is taking too long to do this. After weeks and weeks of trying various ways of speeding things up, I think at this point, the fastest way to accomplish what I am doing is to read in STDIN, and throw the lines out to multiple workers. I don't know if this would be expressed as being "threaded" or fork'd. So STDIN to boss, lines are given to X number of workers, and as each comes back, it prints to the file, and is given the next line. One condition I have is that the lines have to be output to the file in the order they came in. I think this is possible, but my perl-fu is very, very weak, and searching for this on google for days has left me weary. I have found things that are close, and have gotten very close. I think though that i will collapse from exhaustion, or have a mental breakdown if I don't reach out for help :-) Thank you. p.s. I know there is no code here, and no sample data or anything like that. I don't want to get wrapped around the axle on semantics or stylistic differences in regular expressions optimization or anything like that :-) The input data is not overly complex (say, 100 ascii characters) and the output data is not overly complex (again ~100 ascii characters) no binary data or anything weird like that. If anyone feels exact specifics are necessary than I will happily provide anything required.
  • Comment on How do you parallelize STDIN for large file processing?

Replies are listed 'Best First'.
Re: How do you parallelize STDIN for large file processing?
by samtregar (Abbot) on Feb 06, 2009 at 03:22 UTC
    The very easiest way you could do this would be to divide your input file into N pieces and process each piece in parallel by starting your script on that piece. Have each process output to a file of its own and stitch them together at the end. If you start one process per CPU and the black-magic you do is CPU bound you could get something like a linear speedup.

    That of course means you can't take data strictly from STDIN, but really that's a silly way to process 4.1GB of data!

    If you really, really need to read it from a stream and output it in-order then you'll have to have a parent reading the stream, forking kids and then collecting the results in buffers so you can output in-order. Start with Parallel::ForkManager, which will handle giving out the work and then mix in some IO::Pipe and IO::Select for collecting the results. Be sure you divide the work into sizable chunks, forking a new child for each line isn't going to help very much!

    Now go give it a try and don't come back until you have some code to post!

    -sam

Re: How do you parallelize STDIN for large file processing?
by ikegami (Patriarch) on Feb 06, 2009 at 03:27 UTC
    One way to handle the line ordering is to have each worker send its output to a different file. Have each line preceded with the line number where it resided in the original file. Then just merge the files:
    sort --field-separator=, --key=1,1 --numeric-sort --merge worker_output_* | cut --delimiter=, --fields=2-

    --merge is key.

      Would Tie::File be helpful?
      if you knew which line it came from you could slot right back in where it started.

      tie @array, 'Tie::File', filename or die ...;

      ### at the end
      my ($Toad, $LineIn) = $MagicWand -> HocusPocus
      $array[$LineIn] = $Toad;
      http://search.cpan.org/~mjd/Tie-File-0.96/lib/Tie/File.pm
        Definitely not. The goal is to speed things up, not set the hard drive on fire.
Re: How do you parallelize STDIN for large file processing?
by BrowserUk (Patriarch) on Feb 06, 2009 at 13:25 UTC

    This is untested on a multicore machine, but should be free from any deadlocking or syncing issues.

    And it will definitely need tuning for best performance, but you could try this on your data and see how you fare?

    Run as: scriptname inputfile >outputfile

    #! perl -sw use strict; use threads; use threads::shared; use Thread::Queue; sub worker { my( $Qin, $outRef ) = @_; while( my $in = $Qin->dequeue ) { my( $no, $text ) = split ':', $in, 2; ########## Manipulate data below ######## my $out = $text; lock $outRef; $outRef->[ $no ] = $out; } } my @out :shared; my $Qin = new Thread::Queue; my @threads = map threads->create( \&worker, $Qin, \@out ), 1 .. 4; my $last :shared = 0; async { while( <> ) { sleep 1 while $Qin->pending > 100; $Qin->enqueue( "$.:$_" ); lock $last; $last = $.; } }; my $next = 1; while( threads->list( threads->running ) ) { sleep 1 until defined $out[ $next ]; print $out[ $next ]; undef $out[ $next++ ]; } while( $next < $last ) { sleep 1 until defined $out[ $next ]; print $out[ $next ]; undef $out[ $next++ ]; } $_->join for @threads;

    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.
Re: How do you parallelize STDIN for large file processing?
by forsaken75 (Novice) on Feb 06, 2009 at 08:21 UTC
    Thank you for all of your responses.
    So... more detail is clearly needed.
    My input is from tcpdump (reading a capture file) The standard set of pcap modules from CPAN for
    reading the file, I won't say couldn't work, but would be very hard for me to implement as
    we don't use standard frame types, all custom protocols, so would have to write all my own frame types for the perl modules.

    I have been working on getting Parallel::Fork::BossWorker working, but it spawns never-ending defunct processes, despite setting the number of workers.
    There was a bug posted with a patch addressing that, and with that patch applied the perl script does nothing (the child pid's
    hang) - that I am working on, but Im getting side-tracked.

    I am using a slightly modified form of the afterglow tcpdump perl parsing script. By itself, it was outputting data to the output file
    at around 2 MB/s, so for a 1.5 GB file, it was taking around 12 minutes to finish.
    I tried to implement threads, and it was outputting at around 200 KB/s, so it was much slower
    (by the way, my box is an 8 CPU harpertown box with 8GB of ram, and the files are all on an EMC SAN)

    I don't blame the threads, and being such a novice at this, im sure it's the way I did it, so I will post the code.
    Please forgive my fumbling attempts at all of this, following examples and such, it's the best I could figure out.
    There are some commented out parts that I normally would not leave in when posting code, but im
    leaving them in as they show that I also (for the heck of it) tried using semaphore locking on the output file to
    make sure there was no contention on the file by multiple threads (didn't improve anything) -
    i have also tried file locking which didn't improve anything either.
    Right now, the sub that prints the output is locked so only one thread can access the print at a time, but that hasn't
    had any effect either, removing it wouldn't change the results.

    I have tried select on STDIN, STDOUT, both, and neither, which didn't have any effect either.

    I would say as well that this is CPU bound entirely, as
    the CPU definately pegs at 100%, using the threads, I do see the kernel spreading the load out across the CPU's.

    Also, I put the pipe in the perl script, running tcpdump (blah) | script vs. open (blah |) doesn't change any timings.
    One other note: I have tried various numbers of workers, with no effect.

    Anyway, this has been long enough. FWIW here's the script. Please be gentle :-)

    --- NOTE credits to http://afterglow.sourceforge.net for their tcpdump2sql.pl script which is what is mostly what you see below
    use strict; use Thread::Pool; #use Thread::Semaphore; open( OUTFILE, ">/crnch_data/foo.csv" ) or die "Cannot open output"; print OUTFILE "\n"; #select(STDIN); $| = 1; open( STDIN, "/usr/sbin/tcpdump -vttttnner /crnch_data/tcpdump_infile +|" ); select(OUTFILE); $| = 1; my $res = ""; my $pool = Thread::Pool->new( { workers => 10, do => \&do, stream => \&monitor, } ); $pool->job($_) while (<STDIN>); $pool->shutdown; sub do { chomp; my $input = $_; $input = shift; if ( $input =~ /(^\d\d\d\d-\d\d-\d\d .*)/ ) { if ( $input =~ /(.*) \(tos (\S+), ttl +(\d+), id (\d+), offset (\d+), flags ([\S\+]+) +(?:, ([\S\+]+))?, proto +(\S+ \(\d+\)), length (\d+)(?:, .*?)?\) (.*) +/ ) { $input = "$1 $2 $3 $4 $5 $6$7 $8 $9 $10"; } else { if ( $input =~ /(.*) (((?:(\d{1,2}|[a-fA-F]{1,2}){2})(?::|-*)){6}) (\>) (((?:(\d{1,2} +|[a-fA-F]{1,2}){2})(?::|-*)){6}), (.*?), length (\d+)\:(.*)/ ) { $input = "$1 $2 x $6 x x x x $10"; } else { $res = "error"; } } } else { $res = "error"; } my @fields = split( " ", $input ); my $timestamp = $fields[0] . " " . $fields[1]; my $microsecond = $fields[1]; $timestamp =~ s/(.*?)\.\d+$/\1/; $microsecond =~ s/(.*?)\.(\d+$)/\2/; my $sourcemac = $fields[2]; my $destmac = $fields[4]; $destmac =~ s/,//g; $fields[4] =~ s/,$//; my $len = $fields[9]; $len =~ s/:$//; my $tos = $fields[11]; my $ttl = $fields[12]; my $id = $fields[13]; my $offset = $fields[14]; my $ipflags = $fields[15] . " " . $fields[16]; $ipflags =~ s/\[(.*)\]/\1/g; my $sip = $fields[18]; $sip =~ s/([^\.]+\.[^\.]+\.[^\.]+\.[^\.]+).*/\1/; $sip =~ s/:$//; my $sport = $fields[18]; if ( $sport =~ /[^\.]+\.[^\.]+\.[^\.]+\.[^\.]+\.(.*)/ ) { $sport = $1; $sport =~ s/:$//; } else { $sport = "null"; } my $dip = $fields[20]; $dip =~ s/([^\.]+\.[^\.]+\.[^\.]+\.[^\.]+).*/\1/; $dip =~ s/:$//; my $dport = $fields[20]; if ( $dport =~ /[^\.]+\.[^\.]+\.[^\.]+\.[^\.]+\.(.*)/ ) { $dport = $1; $dport =~ s/:$//; } else { $dport = "null"; } my $proto = "null"; my $flags = $fields[21] if ( $fields[21] =~ /[SRPU.]+/ ); my $proto = "tcp" if ( $fields[21] =~ /[SRPU.]+/ ); $_ = "//$timestamp//$microsecond//$sourcemac//$destmac//$sip//$dip +//$sport//$dport//$proto//$flags//$len//$ttl//$id//$tos//$ipflags//$o +ffset?"; } sub monitor : locked { my $line = $_; # my $semaphore = new Thread::Semaphore; $line = shift; unless ( $line eq "error" ) { # $semaphore->down; print OUTFILE $line; # $semaphore->up; } } close(OUTFILE);
      The problem is that every worker writes to the file.
      As written before you should bundle the writting in one seperate thread so that you need no symchronisation for the file.

        Hi,

        I have kind of similar need. Can you please paste some sample and easy (as I am not a pro in perl) code; how can I bundle the writing in one separate thread ?

      Here's a cut down of some code i'm using which is based on Parallel::Fork::BossWorker to speed up some processing. You'll need to monkey with it to get it to do what you want...
      my $Worker_count = 2; fork_workers(); sub autoflush { select((select(shift), $| = 1)[0]) } sub fork_workers { # worker sends pid to boss when ready for more work pipe my $from_workers, my $to_boss; autoflush($to_boss); my %workers; foreach (1 .. $Worker_count) { # pipe for boss to send 'work' to worker pipe my $from_boss, my $to_worker; # pipe for worker to send message back to boss pipe my $from_worker2, my $to_boss2; autoflush($to_worker); autoflush($to_boss2); my $pid = fork; die "failed to fork: $!" unless defined $pid; if ($pid) { $workers{$pid} = [$to_worker, $from_worker2]; close $from_boss; close $to_boss2; } else { close $from_workers; close $from_worker2; close $to_worker; close STDIN; send_msg($to_boss, $$); worker($from_boss, $to_boss, $to_boss2); exit; } } close $to_boss; my $INT_handler = $SIG{INT}; local $SIG{INT} = sub { close $from_workers; for (keys %workers) { my ($to_worker, $from_worker2) = @{$workers{$_}}; close $to_worker; close $from_worker2; } $INT_handler->('INT'); }; my @work = get_work(); # ??? while (my $pid = receive_msg($from_workers)) { my ($to_worker, $from_worker2) = @{$workers{$pid}}; my $msg = receive_msg($from_worker2); if (@work) { my $whatever = something(@work); send_msg($to_worker, $whatever); @work = (); } else { close $to_worker; close $from_worker2; delete $workers{$pid}; } if (length $msg) { # something } } close $from_workers; } sub worker { my ($from_boss, $to_boss, $to_boss2) = @_; while (my $whatever = receive_msg($from_boss)) { my $msg = something_else($whatever); send_msg($to_boss, $$); send_msg($to_boss2, $msg); } } sub receive_msg { my $fh = shift; local $/ = "\003\000"; my $msg = <$fh>; chomp $msg if defined $msg; return $msg; } sub send_msg { my ($fh, $msg) = @_; print $fh $msg, "\003\000"; }
      There is one issue with the above mechanism that i'd be interested in having resolved... is it _safe_ for multiple child proceses to write to the same pipe? In the above code the  send_msg($to_boss, $$) is used by each child to notify the parent that it has something for it to process. The code used by Parallel::Fork::BossWorker allows the children to write arbitrarily large amounts to this pipe, but i found that this could occasionally result in corrupted data when writing large amounts of data.

      However by changing to the scheme used in the above code of only sending the pid (a small amount of data) down the shared pipe i've yet to see any corruption.

      So... is it safe to write small amounts (less than some internal perlio buffer size) to a shared pipe; or, is it unsafe, and i've just yet to hit the problem?

      Thanks for any tips! :-) alex.

      All the performance problems with Thread::Pool are inherent in its over-engineered designed and convoluted implementation. There's very little you can do from the outside to improve it. Despite its 5-star (member of the club) CPAN rating, it should have been strangled at birth.


      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority".
      In the absence of evidence, opinion is indistinguishable from prejudice.

      Or, to look at it another way, you are passing the input a line at a time to the worker threads -- so two things are happening:

      1. all of the thread communication overhead is being carried by every line -- I think this is the real issue.

      2. you are streaming the output, so Thread::Pool arranges for the output of each job to be done in job order.

        Having looked at the code, the way it does this is quite cunning: when a do routine completes a job before earlier jobs have completed, the results are put to one side, and the thread loops round to do the next job. When a job completes and all earlier have completed, the thread outputs its results and any results put aside earlier which can now be output.

        Note that this means that if one job takes a very long time indeed, all the other threads will continue taking jobs and generating and storing the output. This is a Good Thing, because slower jobs are not blocking faster ones. However, I don't see a mechanism to stop jobs being dispatched if the streamed output backs up. But that could be because I haven't studied the code hard enough.

        So apart from the cost of all this cunning, I don't think this is the issue.

      Without changing the structure of what you have too much, I'd change:

      $pool->job($_) while (<STDIN>);
      to something that reads a bunch of lines (say a 100K bytes worth), something like:
      while (!eof(STDIN)) { my $buffer ; read STDIN, $buffer, 100*1024 or die "$!" ; if (!eof(STDIN)) { my $nl = rindex($buffer, "\n") + 1 or die "No \\n in lump ?" ; if (my $trim = $nl - length($buffer) { seek STDIN, $trim, 1 or die "Seek failed ??" ; substr($buffer, $trim) = '' ; } ; } ; $pool->job($buffer) ; } ;
      (I haven't tested this, so caveat emptor !). Then the processing subroutine needs to break its input into lines and process into another buffer, and then return that lump of results. Something like:
      sub do { my ($lump) = @_ ; my $result = '' ; while ($lump =~ m/(.+)/g) { # Ignores blank lines my $line = $1 ; ... whatever ... $result .= ...whatever.. ."\n" ; } ; return $result ; } ;
      (also untested). There's some saving to be made by removing my ($lump) = @_ and doing while ($_[0] =~ m/(.+)/g) directly instead.

      This will amortise the job overhead over a reasonable chunk of lines. Obviously you can experiment to see if larger chunks means faster processing.

      BTW, your do subroutine starts:

      sub do { chomp; my $input = $_; $input = shift;
      which doesn't look right to me. The arguments are passed to do() in the usual way, so hacking around $_ appears redundant.

      You are suffering from NOT buffering. This becomes obvious by following the data flow of the pipe in upstream direction:

      1) Your perl script accepts the next line of input.

      2) Tcpdump refills the pipe buffer and blocks.

      3) Tcpdump requests a new "line" from tcpdump_infile and transforms it .

      As a consequence piping as a cheap form of using more processors does not work efficiently. It results in small read requests to the disk of one line each. The delay between the small requests starves tcpdump and your perl script. You should optimise the earliest bottleneck first.

      All disks like large read requests, so the solution is to add large buffers (e. g. 100 MB each) before and after tcpdump. This allows disk reading, tcpdump and the perl script to run in parallel. I like to use mbuffer for the task. Your input pipe could look like

      mbuffer -i tcpdump_infile -m 100M -P90 -p10 | \ tcpdump |\ mbuffer -m 100M -P90 -p10 | \ perl

      This gives you 100 MB buffers that write ou a large chunk when 90% full and start reading again when 10% full.

      I'd check if tcpdump is the first bottleneck. It has to do a lot of work and I doubt it is multithreaded. "time tcpdump -options >/dev/null" gives the minimum run time for the perl script. Look at the load or "top" while doing this.

      Recording smaller tcpdump_infiles or splitting them up at a good boundary allows running multiple tcpdump processes.

      OTOH it might be justified to parse the tcpdump_infile directly from perl.

Re: How do you parallelize STDIN for large file processing?
by GrandFather (Saint) on Feb 06, 2009 at 03:22 UTC

    Define "too long".

    What have you used to profile your current code and what are the bottle necks?

    How long does a simple script that copies STDIN to STDOUT take on your system compared to the time you have available for the task?


    Perl's payment curve coincides with its learning curve.
Re: How do you parallelize STDIN for large file processing?
by weismat (Friar) on Feb 06, 2009 at 05:53 UTC
    If you want to go the thread way:
    Create two queues - one for read data and one for write data.(Thread::Queue)
    Start x workers as detached (I would make this a parameter) and a result writer also detached.(threads)
    Workers read from the inqueue and send results to the result queue.
    Program terminates either when lines read = lines written or when the result queue is empty for x seconds.
    The main question is the bottleneck, but I have used this approach quite frequently on multicore machines with success.
Re: How do you parallelize STDIN for large file processing?
by bjmckenz (Initiate) on Feb 06, 2009 at 06:31 UTC

    Perhaps I'm missing something. If it's a static file (that is, not an input stream) you can start multiple worker processes each with a different byte offset into the file. Since you've established that it's newline-based, you can skip bytes until after the first newline (except for the worker at offset 0) to guarantee that you're processing a line at a time. Then your workers each build a "chapter" of output which you can then cat together later. These workers can be processes or threads. "Simple" seeks get you to the offset.

    The reason I say this is that it's not clear if accepting pipe-ish data (STDIN) is part of the problem statement or part of the solution (a design approach). If the former...nevermind.

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: perlquestion [id://741775]
Front-paged by ysth
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others cooling their heels in the Monastery: (7)
As of 2024-04-19 09:07 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found