Beefy Boxes and Bandwidth Generously Provided by pair Networks
Problems? Is your data what you think it is?
 
PerlMonks  

comment on

( [id://3333]=superdoc: print w/replies, xml ) Need Help??
This is a followup to

Using functional programming to reduce the pain of parallel-execution programming (with threads, forks, or name your poison)

In this post, I was attempting to create numerous variations of a function builder I'm calling hashmap_parallel.

This function builder should theoretically allow me to implement, for example, many of the utilities in List::MoreUtils in a transparently parallel way. Eg, instead of using flexygrep to build up a parallel grepping function on top of a parallel mapping function and an element test, as shown below, I would use flexy_any, flexy_all, flexy_pairwise, flexy_mesh, or what have you. These could all build on top of diverse custom hashmap_parallel, which you could plug in at will depending on your resources and the paricular bottleneck for the computation you are faced with.

So, if you have multiple processors, you can take advantage of that without cluttering up your code. And if you happen to have a 1000 commodity-box cluster, perhaps you too will one day be able to sort a 10^10 element array in under two minutes, with a simple call to distributed_sort($cmp_function, $my_big_list)

Since posting the above meditation, I have been able to implement hashmap_parallel in two ways.

1) threads (with help from LanceDeeply)
2) forks, along with multiple DBM::Deep data stores (using Parallel::ForkManager on tilly's advice, using DBM::Deep because... well, I couldn't think of any other way to do it that would work for any type of data.)

In addition, I have a simple "serial hashmap" function that I can plug into my "flexygrep" grep builder, as a sanity check.

At present, the threading solution is faster. I'm attributing this to the multiple hard drive read/writes that are required with the multiple DBM::Deep solution.

I was hoping to also implement a solution using a single DBM::Deep db and locking, which should at least reduce the read time. However, I was unable to do so. My test is to match a collection of letters against /[abc]/, where this test is made after sleeping for one second (we delay so that there is an artificial "reason" to want to take advantage of parallelism). There should be 9 matches in each case.

perl test_hashmap.pl test strings: a b c a b c a b c ok 1 - threads, matches: 9 -- a a b c c b b a c -- should be 9 ok 2 - threads, time elapsed: 2, wanted max seconds: 2 ok 3 - fork with many dbm::deep dbs, matches: 9 -- a a b c c b b a c - +- should be 9 ok 4 - fork with many dbm::deep dbs, time elapsed: 1, wanted max secon +ds: 2 Use of uninitialized value in join or string at test_hashmap.pl line 4 +8. not ok 5 - fork with 1 dbm::deep db, matches: 7 -- a b b b c c -- sho +uld be 9 # Failed test 'fork with 1 dbm::deep db, matches: 7 -- a b b b c c +-- should be 9' # in test_hashmap.pl at line 48. ok 6 - fork with 1 dbm::deep db, time elapsed: 1, wanted max seconds: +2 test strings: a b c d e f g h i j k l m n o p q r s t u v w x y z a b +c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k + l m n o p q r s t u v w x y z ok 7 - threads, matches: 9 -- a b a c c b c b a -- should be 9 ok 8 - threads, time elapsed: 2, wanted max seconds: 2 ok 9 - fork with many dbm::deep dbs, matches: 9 -- a b a c c b c b a - +- should be 9 not ok 10 - fork with many dbm::deep dbs, time elapsed: 9, wanted max +seconds: 2 # Failed test 'fork with many dbm::deep dbs, time elapsed: 9, wanted + max seconds: 2' # in test_hashmap.pl at line 49. not ok 11 - fork with 1 dbm::deep db, matches: 8 -- b c a c b c a b -- + should be 9 # Failed test 'fork with 1 dbm::deep db, matches: 8 -- b c a c b c a + b -- should be 9' # in test_hashmap.pl at line 48. not ok 12 - fork with 1 dbm::deep db, time elapsed: 9, wanted max seco +nds: 2 # Failed test 'fork with 1 dbm::deep db, time elapsed: 9, wanted max + seconds: 2' # in test_hashmap.pl at line 49. 1..12 # Looks like you failed 4 tests of 12.

In addition to the test failures for the single DBM::Deep file shown above, about 25% of the time the single DBM::Deep file case seems to lock up, and the test never completes.

I would be very interested in receiving advice for how to do hashmap_paralell with a single DBM::Deep. And if there are suggestions for other strategies I could try out, or changes to my function builders that would speed things up, I'll be much obliged.

Thanks.

test_hashmap.pl: #!/usr/bin/perl use strict; use warnings; use Test::More qw( no_plan ); use Data::Dumper; use Grep; # several different versions of grep, built up functionally. my $slow_matches_b = sub { sleep 1; return unless $_[0]; return 1 if $_[0] =~ /[abc]/; }; my $justafew_letters = [ my @letters = (('a'..'c') x 3) ]; my $lotsa_letters = [ (('a'..'z') x 3) ]; for ( $justafew_letters, $lotsa_letters ){ testem($_) } sub testem { my $test_strings = shift or die "no test strings"; print "test strings: @$test_strings\n"; my $paralell_tests = [ { testname => 'threads', function => sub { Grep::threadgrep( $_[0], $_[1]) } }, {testname=> 'fork with many dbm::deep dbs', function => sub { Grep::fork_manydbs_grep($_[0], $_[1]) } }, {testname => 'fork with 1 dbm::deep db', function => sub { Grep::fork_onedb_grep($_[0], $_[1]) } }, #{testname => 'simple serial map', #function => sub { Grep::slowgrep($_[0], $_[1]) } #} ]; my $max_seconds_parallel=2; # parallel execution should speed things + up for (@$paralell_tests ) { my $timestarted=time; my $test_name = $_->{testname}; my $matches = $_->{function}->($slow_matches_b, $test_strings); my $timeelapsed=time - $timestarted; my $num_matches = @$matches; ok( $num_matches == 9, "$test_name, matches: $num_matches -- @$mat +ches -- should be 9"); ok( $timeelapsed <= $max_seconds_parallel, "$test_name, time elaps +ed: $timeelapsed, wanted max seconds: $max_seconds_parallel"); } } #my ($timestarted, $timeelapsed); Grep.pm: package Grep; use strict; use warnings; use Data::Dumper; use Map; # grep can be parallelized by building it on top of map_parallel # which uses forks, threads, distributed computations with MapReduce # or some such black magic # in some cases this may be faster, but not always, # it depends where your bottleneck is. # Whatever black magic is going on in the background, # by abstracting it out, the code we get is clean and easy to read. sub threadgrep { my $test_function = shift; my $in_array = shift; my $map_function = sub { Map::hashmap_parallel_threads(@_)}; return flexygrep($test_function, $map_function, $in_array); } sub fork_onedb_grep { my $test_function = shift; my $in_array = shift; my $map_function = sub { Map::hashmap_parallel_forks_onedb(@_)}; return flexygrep($test_function, $map_function, $in_array); } sub fork_manydbs_grep { my $test_function = shift; my $in_array = shift; my $map_function = sub { Map::hashmap_parallel_forks_manydbs(@_)}; return flexygrep($test_function, $map_function, $in_array); } # or you could do it in a non-forked/threaded/distributed/whatever # way, by basing it on the conceptually simpler function map_serial. sub slowgrep { my $test_function = shift; my $in_array = shift; my $map_function = sub { Map::hashmap_serialized(@_)}; return flexygrep($test_function, $map_function, $in_array); } sub flexygrep { my $test_function = shift; my $hashmap_function = shift; my $in_array = shift; my $in_hash = Map::hash_from_array($in_array); my $result_hash = $hashmap_function->($test_function, $in_hash); my $out_array = []; for my $key (keys %$result_hash) { if ( my $out_true = $result_hash->{$key}->{out} ) { push @$out_array, $result_hash->{$key}->{in} } } return $out_array; } 1; Map.pm: package Map; use strict; use warnings; # Black magic for doing stuff in parallel is encapsulated here # use MapReduce; -- not yet, but it's on the list. use Data::Dumper; sub hash_from_array { my $array = shift; my $hash; for my $index (0..$#$array) { $hash->{$index}->{in} = $array->[$index]; } return $hash; } # input is a funcion (eg, my $sub_multiply by ten = { return $_[0] * 1 +0 } ), and # a hash like # my $input_values = { blee => { in => 1 }, # blah => { in => 2} # } # output is a hash like #{ blee => { in => 1, out => 10 }, # blah => { in => 2, out => 20 } #} sub hashmap_serial { my $function = shift; my $hash = shift; die "bad hash" . Dumper($hash) if grep { ! defined($hash->{$_}->{in} +) } (keys %$hash); # hash keys are processed in whatever order for my $key ( keys %$hash) { my $in = $hash->{$key}->{in}; my $out = $function->($in); #print "result for $in is $out\n"; $hash->{$key}->{out} = $out; } return $hash; } # does the same thing as hashmap_serial # but saves the value on the hard drive # (serialized in this context means a memory value gets put on the har +d disk, # not to be confused with the sense of "serial as opposed to parallel" sub hashmap_serialized { my $function = shift; my $hash = shift; die "bad hash" . Dumper($hash) if grep { ! defined($hash->{$_}->{in} +) } (keys %$hash); use File::Path qw(mkpath); my $dir="c:/tmp/map_serialized"; mkpath($dir) unless -d "$dir"; die "no directory: $dir" unless -d "$dir"; my $file="$dir/$$.db"; my $db = DBM::Deep->new( $file ); $db->{result}=$hash; for my $key ( keys %$hash ) { my $in = $hash->{$key}->{in}; my $out = $function->($in); $hash->{$key}->{out} = $out; } #unlink $file; #die "couldn't delete file" if -f $file; return $hash; } # uses many DBM::Deep stores, along with forks # works # but slow sub hashmap_parallel_forks_manydbs { my $function = shift; my $hash = shift; die "bad hash" . Dumper($hash) if grep { ! defined($hash->{$_}->{in} +) } (keys %$hash); use File::Temp qw(tempdir); my $dir = tempdir(); use Parallel::ForkManager; use DBM::Deep; my $pm=new Parallel::ForkManager(10); for my $key ( keys %$hash ) { $pm->start and next; my $in = $hash->{$key}->{in}; my $out = $function->($in); my $file="$dir/$key"; #print "file: $file\n"; my $db = DBM::Deep->new( $file ); $db->{result}=$out; #print "in $in, out $out\n"; $pm->finish; } $pm->wait_all_children; for my $key ( keys %$hash ) { my $file="$dir/$key"; my $db = DBM::Deep->new( $file ); defined( my $out = $db->{result} ) or die "no result in $file for +key $key"; $hash->{$key}->{out}=$out; } #print "hash: " . Dumper($hash); return $hash; } # tries to use one locked DBM::Deep store, along with forks # doesn't work sub hashmap_parallel_forks_onedb { my $function = shift; my $hash = shift; die "bad hash" . Dumper($hash) if grep { ! defined($hash->{$_}->{in} +) } (keys %$hash); use File::Temp qw(tempfile); my ($fh, $file) = tempfile(); use DBM::Deep; my $db = DBM::Deep->new( file => $file, locking => 1 ); use Parallel::ForkManager; my $pm=new Parallel::ForkManager(10); for my $key ( keys %$hash ) { $pm->start and next; my $in = $hash->{$key}->{in}; my $out = $function->($in); #print "in $in, out $out\n"; #$db->lock(); $db->{result}->{$key}->{in}=$in; $db->{result}->{$key}->{out}=$out; #$db->unlock(); $pm->finish; } $pm->wait_all_children; my $result = $db->{result};; #print "result: " . Dumper($result); return $result; #return $hash; } #works sub hashmap_parallel_threads { my $function = shift; my $hash = shift; use threads; my @threads; for ( keys %$hash ) { my $in = $hash->{$_}->{in}; my $t = threads->create( sub { map_element($_, $function, $in ) +} ); push @threads, $t; } # wait for threads to return ( this implementation is bound by +slowest thread ) my %results = map { %{ $_->join() }; } @threads; #print Dumper \%results; return {%results}; } sub map_element { my $key = shift; my $function = shift; my $in = shift; my $out = $function->($in); return { $key => { in => $in, out => $out } }; } 1;

In reply to Using DBM::Deep and Parallel::ForkManager for a generalized parallel hashmap function builder (followup to "reducing pain of parallelization with FP") by tphyahoo

Title:
Use:  <p> text here (a paragraph) </p>
and:  <code> code here </code>
to format your post; it's "PerlMonks-approved HTML":



  • Are you posting in the right place? Check out Where do I post X? to know for sure.
  • Posts may use any of the Perl Monks Approved HTML tags. Currently these include the following:
    <code> <a> <b> <big> <blockquote> <br /> <dd> <dl> <dt> <em> <font> <h1> <h2> <h3> <h4> <h5> <h6> <hr /> <i> <li> <nbsp> <ol> <p> <small> <strike> <strong> <sub> <sup> <table> <td> <th> <tr> <tt> <u> <ul>
  • Snippets of code should be wrapped in <code> tags not <pre> tags. In fact, <pre> tags should generally be avoided. If they must be used, extreme care should be taken to ensure that their contents do not have long lines (<70 chars), in order to prevent horizontal scrolling (and possible janitor intervention).
  • Want more info? How to link or How to display code and escape characters are good places to start.
Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others rifling through the Monastery: (3)
As of 2024-03-28 17:57 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found