$ ./test_hashmap.pl ok 1 - parallel-y threadgrep works not ok 2 - parallel-y forkgrep works # Failed test 'parallel-y forkgrep works' # in ./test_hashmap.pl at line 21. ok 3 - serially executing code works 1..3 # Looks like you failed 1 test of 3. #### test_hashmap: #!/usr/bin/perl use strict; use warnings; use Test::More qw( no_plan ); use Data::Dumper; use Grep; my $slow_matches_b = sub { sleep 1; return unless $_[0]; return 1 if $_[0] =~ /b/; }; my $test_strings = [ ('blee','blah','bloo', 'qoo', 'fwee' ) ]; my $matches; $matches = Grep::threadgrep( $slow_matches_b, $test_strings ); ok( @$matches == 3, "parallel-y threadgrep works" ); # should get blee, blah bloo, but not fwee or qoo $matches = Grep::forkgrep( $slow_matches_b, $test_strings ); ok( @$matches == 3, "parallel-y forkgrep works" ); $matches = Grep::slowgrep( $slow_matches_b, $test_strings ); ok( @$matches == 3, "serially executing code works" ); Grep.pm: package Grep; use strict; use warnings; use Data::Dumper; use Map; # grep can be parallelized by building it on top of map_parallel # which uses forks, threads, distributed computations with MapReduce # or some such black magic # in some cases this may be faster, but not always, # it depends where your bottleneck is. # Whatever black magic is going on in the background, # by abstracting it out, the code we get is clean and easy to read. sub threadgrep { my $test_function = shift; my $in_array = shift; my $map_function = sub { Map::hashmap_parallel_threads(@_)}; return flexygrep($test_function, $map_function, $in_array); } sub forkgrep { my $test_function = shift; my $in_array = shift; my $map_function = sub { Map::hashmap_parallel_forks(@_)}; return flexygrep($test_function, $map_function, $in_array); } # or you could do it in a non-forked/threaded/distributed/whatever # way, by basing it on the conceptually simpler function map_serial. sub slowgrep { my $test_function = shift; my $in_array = shift; my $map_function = sub { Map::hashmap_serialized(@_)}; return flexygrep($test_function, $map_function, $in_array); } sub flexygrep { my $test_function = shift; my $hashmap_function = shift; my $in_array = shift; my $in_hash = Map::hash_from_array($in_array); my $result_hash = $hashmap_function->($test_function, $in_hash); my $out_array = []; for my $key (keys %$result_hash) { if ( my $out_true = $result_hash->{$key}->{out} ) { push @$out_array, $result_hash->{$key}->{in} } } return $out_array; } 1; Map.pm: package Map; use strict; use warnings; # Black magic for doing stuff in parallel is encapsulated here # use MapReduce; use Parallel::ForkManager; use threads; # use threads::shared qw(is_shared); use DBM::Deep; use Data::Dumper; sub hash_from_array { my $array = shift; my $hash; for my $index (0..$#$array) { $hash->{$index}->{in} = $array->[$index]; } return $hash; } # input is a funcion (eg, my $sub_multiply by ten = { return $_[0] * 10 } ), and # a hash like # my $input_values = { blee => { in => 1 }, # blah => { in => 2} # } # output is a hash like #{ blee => { in => 1, out => 10 }, # blah => { in => 2, out => 20 } #} sub hashmap_serial { my $function = shift; my $hash = shift; die "bad hash" . Dumper($hash) if grep { ! defined($hash->{$_}->{in}) } (keys %$hash); # hash keys are processed in whatever order for my $key ( keys %$hash) { my $in = $hash->{$key}->{in}; my $out = $function->($in); #print "result for $in is $out\n"; $hash->{$key}->{out} = $out; } return $hash; } # does the same thing as hashmap_serial # but saves the value on the hard drive # (serialized in this context means a memory value gets put on the hard disk, # not to be confused with the sense of "serial as opposed to parallel" sub hashmap_serialized { my $function = shift; my $hash = shift; die "bad hash" . Dumper($hash) if grep { ! defined($hash->{$_}->{in}) } (keys %$hash); use File::Path qw(mkpath); my $dir="c:/tmp/map_serialized"; mkpath($dir) unless -d "$dir"; die "no directory: $dir" unless -d "$dir"; my $file="$dir/$$.db"; my $db = DBM::Deep->new( $file ); $db->{result}=$hash; for my $key ( keys %$hash ) { my $in = $hash->{$key}->{in}; my $out = $function->($in); $hash->{$key}->{out} = $out; } #unlink $file; #die "couldn't delete file" if -f $file; return $hash; } # but uses threads to compute "out" values in a parallel way # doesn't work. sub hashmap_parallel_forks { my $function = shift; my $hash = shift; die "bad hash" . Dumper($hash) if grep { ! defined($hash->{$_}->{in}) } (keys %$hash); return {}; use File::Path qw(mkpath); my $dir="c:/tmp/map_serialized"; mkpath($dir) unless -d "$dir"; die "no directory: $dir" unless -d "$dir"; my $file="$dir/$$.db"; my $db = DBM::Deep->new( $file ); $db->{result}=$hash; my $pm=new Parallel::ForkManager(10); for my $key ( keys %$hash ) { $pm->start and next; my $in = $hash->{$key}->{in}; my $out = $function->($in); print "in $in, out $out\n"; $hash->{$key}->{out} = $out; $pm->finish; } $pm->wait_all_children; print "hash: " . Dumper($hash); #unlink $file; #die "couldn't delete file" if -f $file; #die "forkgrep result: " . Dumper($hash); return $hash; } #works sub hashmap_parallel_threads { my $function = shift; my $hash = shift; my @threads; for ( keys %$hash ) { my $in = $hash->{$_}->{in}; my $t = threads->create( sub { map_element($_, $function, $in ) } ); push @threads, $t; } # wait for threads to return ( this implementation is bound by slowest thread ) my %results = map { %{ $_->join() }; } @threads; #print Dumper \%results; return {%results}; } sub map_element { my $key = shift; my $function = shift; my $in = shift; my $out = $function->($in); return { $key => { in => $in, out => $out } }; } 1;