Beefy Boxes and Bandwidth Generously Provided by pair Networks
Come for the quick hacks, stay for the epiphanies.
 
PerlMonks  

Wait for individual sub processes [SOLVED]

by crackerjack.tej (Novice)
on Apr 25, 2015 at 07:01 UTC ( [id://1124649]=perlquestion: print w/replies, xml ) Need Help??

crackerjack.tej has asked for the wisdom of the Perl Monks concerning the following question:

Dear monks,

I am essentially writing a Perl script that divides a large input file for a text processing tool, so that I can process the files faster. I am working on a CentOS 6 based cluster, where each CPU has 16 cores. My idea is to split the input file into 16 parts, and run 16 instances of the text processing tool, and once all of them are done, I parse the output and merge it into a single file. In addition, the script will continue to process the next input file in a similar way. I have achieved that using fork(), wait() and exec() as follows (Omitting code that is not relevant):

use strict; use warnings; use POSIX ":sys_wait_h"; #Split input files into parts and store the filenames into array @ +parts ... my %children; foreach my $part (@parts) { my $pid = fork(); die "Cannot fork for $part\n" unless defined $pid; if ($pid == 0) { exec("sh text_tool $part > $part.out") or die "Cannot exec + $part\n"; } print STDERR "Started processing $part with $pid at ".localtim +e."\n"; $children{$pid} = $part; } while(%children) { my $pid = wait(); die "$!\n" if $pid < 1; my $part = delete($children{$pid}); print STDERR "Finished processing $part at ".localtime."\n"; }

While I got what I wanted, there is a small problem. Due to the nature of the text processing tool, some parts get completed much before others, in no specific order. The difference is in hours, which means that many cores of the CPU are idle for a long time, just waiting for few parts to finish.

This is where I need help. I want to keep checking which part (or corresponding process) has exited successfully, so that I can start the processing of the same part of the next input file. I need your wisdom on how I can achieve this. I tried searching a lot on various forums, but did not understand correctly how this can be done.

Thanks.

------UPDATE---------

Using a hash, I can now find out which process is exiting when. But I fail to understand how to use this code in an if block, so that I can start the next process. Can someone help me with that? I have updated the code accordingly.

----------------UPDATE 2--------------

I guess it's working now. Using Parallel::ForkManager, and a hash of arrays that stores the pids of each input file, I am able to track the sub processes of each file separately. By maintaining a count of number of subprocesses exited, I can call the sub for output parsing as soon as the count reaches 16 for an input file. I will come back if I run into any other problem.

Thanks a lot for all the help :)

P.S. Is there any flag that I have to set that this thread is answered/solved?

Replies are listed 'Best First'.
Re: Wait for individual sub processes
by BrowserUk (Patriarch) on Apr 25, 2015 at 10:11 UTC
    My idea is to split the input file into 16 parts, and run 16 instances of the text processing tool, and once all of them are done, I parse the output and merge it into a single file.
    there is a small problem. Due to the nature of the text processing tool, some parts get completed much before others, in no specific order. The difference is in hours, which means that many cores of the CPU are idle for a long time, just waiting for few parts to finish. I want to keep checking which part (or corresponding process) has exited successfully, so that I can start the processing of the same part of the next input file.

    The problem with that is Sod's Law guarantees that it will always be the first, middle and last chunks of the files that take the longest, so you'll still end up with 13 cpus standing idle while those 3 run on for hours trying to catch up. (Or some other twist of the numbers.)

    My suggestion to you is to split your files into 256 chunks (or more depending on their size) and feed whichever processor finishes first the next chunk. This will have the affect of distributing the processing far more evenly amongst cores -- some cores may process many more than their 16 chunks, whilst others many less -- with the overall effect of minimising overall time for the total file.

    Within reason -- ie. the startup/teardown/merge costs -- the more, smaller chunks you divide the processing, the more evenly distributed the processing will be.

    Ideally, if this was a shared-memory cluster, I'd use a shared memory queue feeding 16 persistent threads processing one minimal processing unit (ie. 1 line or multi-line record) at a time; and have another thread gathering and reassembling the output as it is produced and writing it back to a single output file.

    That eliminates both process startup & teardown time; and ensures the absolute best possible fairness of the workload distribution across the processors. arranging the merge-back of the output on-the-fly takes some thought, but is doable. I'll go into detail if this approach is feasible for you and interests you.


    With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority". I'm with torvalds on this
    In the absence of evidence, opinion is indistinguishable from prejudice. Agile (and TDD) debunked
      Sod's Law guarantees that it will always be the first, middle and last chunks of the files that take the longest, so you'll still end up with 13 cpus standing idle while those 3 run on for hours trying to catch up.

      As far as I could see, the speed of chunks is totally random. Sometimes, it is only one part (let's say part 3 of 16) that keeps running whereas the remaining 15 CPUs are idle. The reason I don't want to split the files further is because I have to deal with enough files already, and I would rather avoid the confusion. Also, the start of each process loads a huge file into memory (about 10GB), which takes some time in itself, and I would like to minimize that time as well. So I am not sure if I can follow this path.

        Sometimes, it is only one part (let's say part 3 of 16) that keeps running whereas the remaining 15 CPUs are idle

        My numbers were only by way of example.

        Let's say your chunk 3 takes an hour whereas the other 15 chunks take 5 minutes. Your overall processing time is 1 hour, with 15*55 = 13.75 hours of wasted, idle processor by the time you complete.

        Now let's say that you split the 16 chunks into 16 bits for a total of 256 bits; and assume that the cost of processing those smaller bits is 1/16th of the time of the larger chunks.

        You now have 240 bits that take 0.3125 minutes each; and 16 bits that take 3.75 minutes each.

        1. The 16 bits of chunk 1 are processed in parallel in 0.3125 minutes.
        2. The 16 bits of chunk 2 are processed in parallel in 0.3125 minutes.
        3. The 16 bits of chunk 3 are processed in parallel in 3.75 minutes.
        4. The 16 bits of chunks 4 through 16 are processed in parallel in 0.3125 minutes each.

        Total processing time is 15*0.3125 + 1*3.75 = 8.4375 minutes; with 0 wasted cpu. You've saved 85% of your runtime, and fully utilised the cluster.

        It won't split as nice and evenly as that; but the smaller the bits you process; the more evenly the processing will divided between the processors; no matter how the slow bits are (randomly) distributed through the file.

        The reason I don't want to split the files further is because I have to deal with enough files already, and I would rather avoid the confusion.

        More seriously, the simple way to avoid the too-many-files syndrome, is don't create many files. Instead of having each process write a separate output file, have them all write to a single output file.

        I know, I know. you're thinking that the output file with get all mixed up and it involves a mess of locking to prevent corruption; but it doesn't have to be that way!

        Instead of writing variable length output lines (records) sequentially, you write fixed length records using random access. (Bear with me!)

        Let's say you normally write a variable length output line, of between 30 and 120 characters, for each input record. Instead, you round that up (and pad with spaces) to the largest possible output record (say 128 bytes), and then seek to position input_record_number * 128 and write this fixed-length, space-padded record.

        Because each input record is only ever read by one process, each output position will only ever be written by one process, thus you don't need locking. And as each output record has a known (calculated) fixed position in the file, it doesn't matter what order, or which processor writes them.

        Once the input file is processed and the output file completely written, you run a simple, line oriented filter program on the output file that reads each line, trims off any trailing space-padding and writes the result to a new output file. You then delete the padded one. You end up with your required variable length record output file all in the correct order.

        This final processing is far more efficient than merging many small output files together, and the whole too-many-small-files problem simply disappears.

        Your call.


        With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority". I'm with torvalds on this
        In the absence of evidence, opinion is indistinguishable from prejudice. Agile (and TDD) debunked

      OK.. To test whether dividing the input further will help my case, I ran the tool (to clarify, this tool is not mine, not written in Perl, compiled, and not open source) with varying sizes of input (1, 10, 100, 1000 and 10000 lines).

      Ignoring the startup time (as I said, it first loads a big file into memory, but I assumed we can take care of this somehow), the tool itself provides some stats on the time it spends processing the file. Here is the time it took for each input:

      1.txt: 3.06 seconds 10.txt: 16.11 seconds 100.txt: 54.12 seconds 1000.txt: 7 min 14 seconds (434 seconds) 10000.txt: 69 min 44 seconds (4184 seconds)

      So, I guess, splitting the files is only advantageous if I can process them simultaneously. A very interesting suggestion, nevertheless. Thank you :)

        You need to adapt things a little. See Re^3: Wait for individual sub processes.


        With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
        Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
        "Science is about questioning the status quo. Questioning authority". I'm with torvalds on this
        In the absence of evidence, opinion is indistinguishable from prejudice. Agile (and TDD) debunked
Re: Wait for individual sub processes
by pme (Monsignor) on Apr 25, 2015 at 07:25 UTC
    Hi crackerjack.tej,

    Welcome to the monastery!

    You can try Parallel::ForkManager. You can set up to 16 the number of forked off children.

    my $pm = Parallel::ForkManager->new(16);
    This way new child process will be forked whenever one running child has been finished.

      Thanks for the quick response, pme. I had looked at Parallel::ForkManager before. What I did not understand was waiting for each sub-process individually, as I need to submit the corresponding chunk of my next input file. I will look into it again to see if I understand it better now.

      Thank you

Re: Wait for individual sub processes
by marioroy (Prior) on Apr 25, 2015 at 12:30 UTC

    The following is an example using MCE to process a file. Splitting the file into parts before running is not necessary. Chunking is integrated into MCE allowing for maximum CPU utilization from start to end.

    Update 1: Added merge_to_iter to merge the output into one file while preserving order.

    Update 2: $slurp_ref is a scalar reference, thus print $fh $$slurp_ref;

    Update 3: Changed chunk_size from 'auto' to 200. A chunk size smaller than or equal to 8192 is the number of records (or # of lines). A value greater than 8192 is the number of bytes with MCE reading until the end of record. MCE quietly sets to 64M if higher than 64M. The OP provided timings with 100 rows taking ~ 1 minute.

    Update 4: Changed max_workers from 'auto' to 16. The 'auto' value will never go higher than 8. Thus, one must set explicitly if wanting to run on all available cores or with max_workers => MCE::Util::get_ncpu().

    use MCE::Flow; die "Not enough arguments given\n" if @ARGV < 1; my $file = shift; my $odir = "/path/to/output_dir"; sub merge_to_iter { my ($ofile) = @_; my %tmp; my $order_id = 1; open my $ofh, '>', $ofile or die "Cannot open $ofile: $!\n"; select $ofh; $| = 1; # flush immediately return sub { my ($chunk_id, $opart) = @_; $tmp{$chunk_id} = $opart; while (1) { last unless exists $tmp{ $order_id }; $opart = delete $tmp{ $order_id++ }; # slurp (append $ifh) to $ofh open my $ifh, '<', $opart; local $/; print $ofh scalar <$ifh>; close $ifh; unlink $opart; } }; } mce_flow_f { gather => merge_to_iter("$odir/$file.out"), max_workers => 16, chunk_size => 200, use_slurpio => 1, }, sub { my ($mce, $slurp_ref, $chunk_id) = @_; my $part = "$odir/$file.$chunk_id"; open my $fh, '>', $part or die "Cannot open $part: $!\n"; print $fh $$slurp_ref; close $fh; exec("sh text_tool $part > $part.out") or die "Cannot exec $part\n"; print {*STDERR} "Finished processing $part at ".localtime."\n"; $mce->gather($chunk_id, "$part.out"); unlink $part; }, $file;
      Splitting the file into parts before running is not necessary. Chunking is integrated into MCE

      But does it solve the OPs problem of uneven processor use due to disparate processing requirements of records?

      If so, how?

      Does it reassemble the output in the correct order without allowing the slow processing of some records to block processing or output from other subsequent records?

      If so, how?


      With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority". I'm with torvalds on this
      In the absence of evidence, opinion is indistinguishable from prejudice. Agile (and TDD) debunked

        Yes and yes. I added merge_to_iter to the MCE example.

        MCE follows a bank-teller queuing model when processing input. A slow chunk will not delay or block subsequent chunks. Each chunk comes with a $chunk_id value which is beneficial for preserving output order. Out of order items from gathering are held temporarily until ordered items arrive.

        MCE processes immediately. Thus, the pre-processing step to split the file into parts is not necessary.

        I applied a correction to the example; $slurp_ref is a scalar reference, thus print $fh $$slurp_ref;

        There merge_to_iter (the iterator itself) is executed by the manager process while running.

Re: Wait for individual sub processes
by dmitri (Priest) on Apr 25, 2015 at 11:21 UTC
    Have a look at MPE -- Many-Core Engine for Perl providing parallel processing capabilities. There is a bit of a learning curve, but once you're past that, the abstractions MPE provides make it easy to parallelize tasks and modify the code quickly.
Re: Wait for individual sub processes
by stevieb (Canon) on Apr 25, 2015 at 07:56 UTC

    You Sir, fit right in with the Monks way ^ up there. You've done your homework not only on your issue, but apparently on the site itself.

    Welcome.

    -stevieb

      Thanks a lot :)

Re: Wait for individual sub processes
by choroba (Cardinal) on Apr 25, 2015 at 19:35 UTC
    Is there any flag that I have to set that this thread is answered/solved?
    Some monks add [SOLVED] to the title.
    لսႽ† ᥲᥒ⚪⟊Ⴙᘓᖇ Ꮅᘓᖇ⎱ Ⴙᥲ𝇋ƙᘓᖇ
Re: Wait for individual sub processes [SOLVED]
by RonW (Parson) on Apr 27, 2015 at 17:13 UTC

    I do see this is solved, but I am curious. In terms of overall processing, processing 16 whole files in parallel would have less overhead than splitting each file and processing the chunks. So, why add the complexity of splitting the input files and concatenating the outputs?

      I suspect that the OP doesn't have 16 whole files to process. Just one fricking huge file that takes hours when done sequentially.


      With the rise and rise of 'Social' network sites: 'Computers are making people easier to use everyday'
      Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
      "Science is about questioning the status quo. Questioning authority". I'm with torvalds on this
      In the absence of evidence, opinion is indistinguishable from prejudice. Agile (and TDD) debunked

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: perlquestion [id://1124649]
Approved by stevieb
Front-paged by stevieb
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others having an uproarious good time at the Monastery: (5)
As of 2024-04-24 03:54 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found