Updated: Fully Perl-like behavior for the 1st demonstration.
Greetings Veltro,
Your post presents an interesting use case, for sure. Let me try and hoping that it all works, eventually :)
Q & A
Problem 1: The join is located at a problematic position due to the nature of how hashes work. Sometimes the code takes 6 seconds to execute and sometimes 2 + 6 seconds.
Hashes are not ordered in Perl. The ref key-value went first. In your execute routine, $hobo->join is called. The remedy is to remove the $hobo->join statement out of the routine. It is not needed inside the execute routine when max_workers is given to MCE::Hobo->init.
Problem 2: L2_counter1 => 3 is not incremented.
The tie statement does not deeply share key-values during construction (bug, fix planned for v1.837). The way to shared nested hash/array structures is explicitly via the STORE method.
Problem 3: Why do I need to use a share? The data is incremented independently, so I would think there should be no problems regards synchronicity. However if I don't use the share, nothing gets incremented at all...
The short answer is that workers have unique copies for non-shared variables. Thus, sharing is necessary. MCE::Shared spawns a separate process (a thread on the Windows platform) where the shared data resides. Workers including the main process communicate to the shared-manager process using sockets.
Demo 1: via Perl-like behavior
Shown with mutex in the event multiple workers update the same key. The reason is because ++ involves two trips to the shared-manager process { FETCH and STORE }.
use strict ;
use warnings ;
use MCE::Hobo ;
use MCE::Shared ;
use Data::Dumper ;
sub task1 {
print "Starting task 1 for $_[0]\n" ;
sleep 2 ;
print "Finished task 1 for $_[0]\n" ;
}
sub task2 {
print "Starting task 2 for $_[0]\n" ;
sleep 4 ;
print "Finished task 2 for $_[0]\n" ;
}
sub task3 {
print "Starting task 3 for $_[0]\n" ;
sleep 6 ;
print "Finished task 3 for $_[0]\n" ;
}
MCE::Hobo->init(
max_workers => 2,
# hobo_timeout => 10,
# posix_exit => 1,
) ;
my $mutex = MCE::Mutex->new ;
# Construct the shared hash first before assigning key-value pairs.
tie my %test, 'MCE::Shared' ;
# Internally, STORE deeply-shares array/hash references automatically.
%test = (
L1_counter1 => 1,
# L1_counter2 => 2,
# L1_counter3 => 3,
nested1 => {
L2_counter1 => 3,
# L2_counter2 => 2,
# L2_counter3 => 1,
},
) ;
sub executeTasks {
my $in = $_[0] ;
foreach( sort keys %{ $in } ) {
my $ref = ref( my $val = $in->{ $_ } ) ;
if ( $ref && $val->blessed eq 'MCE::Shared::Hash' ) {
executeTasks( $val ) ;
} else {
if ( $val == 1 ) {
mce_async {
task1( $_ ) ;
$mutex->enter(sub {
++$in->{ $_ } ;
}) ;
} ;
} elsif ( $val == 2 ) {
mce_async {
task2( $_ ) ;
$mutex->enter(sub {
++$in->{ $_ } ;
}) ;
} ;
} elsif ( $val == 3 ) {
mce_async {
task3( $_ ) ;
$mutex->enter(sub {
++$in->{ $_ } ;
}) ;
} ;
} ;
} ;
} ;
} ;
# Dump shared hash.
print Dumper( tied( %test )->export( { unbless => 1 } ) ) ;
# Begin processing.
executeTasks( \%test ) ;
# Reap any remaining Hobo workers. MCE::Hobo reaps workers
# automatically to not exceed max_workers when given.
MCE::Hobo->waitall ;
# Dump shared hash.
print "\n", Dumper( tied( %test )->export( { unbless => 1 } ) ) ;
Demo 2: using the OO interface
This eliminates having a mutex at the application level. Btw, the OO interface does not involve TIE for lesser overhead.
use strict ;
use warnings ;
use MCE::Hobo ;
use MCE::Shared ;
use Data::Dumper ;
sub task1 {
print "Starting task 1 for $_[0]\n" ;
sleep 2 ;
print "Finished task 1 for $_[0]\n" ;
}
sub task2 {
print "Starting task 2 for $_[0]\n" ;
sleep 4 ;
print "Finished task 2 for $_[0]\n" ;
}
sub task3 {
print "Starting task 3 for $_[0]\n" ;
sleep 6 ;
print "Finished task 3 for $_[0]\n" ;
}
MCE::Hobo->init(
max_workers => 2,
# hobo_timeout => 10,
# posix_exit => 1,
) ;
my $test = MCE::Shared->hash ;
# Must call STORE (not set) for deeply sharing to work.
$test->STORE( L1_counter1 => 1 ) ;
$test->STORE( nested1 => { 'L2_counter1' => 3 } ) ;
sub executeTasks {
my $in = $_[0] ;
foreach( sort $in->keys ) {
my $ref = ref( my $val = $in->get( $_ ) ) ;
if ( $ref && $val->blessed eq 'MCE::Shared::Hash' ) {
executeTasks( $val ) ;
} else {
if ( $val == 1 ) {
mce_async {
task1( $_ ) ;
$in->incr( $_ ) ;
} ;
} elsif ( $val == 2 ) {
mce_async {
task2( $_ ) ;
$in->incr( $_ ) ;
} ;
} elsif ( $val == 3 ) {
mce_async {
task3( $_ ) ;
$in->incr( $_ ) ;
} ;
} ;
} ;
} ;
} ;
# Dump shared hash.
print Dumper( $test->export( { unbless => 1 } ) ) ;
# Begin processing.
executeTasks( $test ) ;
# Reap any remaining Hobo workers. MCE::Hobo reaps workers
# automatically to not exceed max_workers when given.
MCE::Hobo->waitall ;
# Dump shared hash.
print "\n", Dumper( $test->export( { unbless => 1 } ) ) ;
Output
$VAR1 = {
'L1_counter1' => 1,
'nested1' => {
'L2_counter1' => 3
}
};
Starting task 1 for L1_counter1
Starting task 3 for L2_counter1
Finished task 1 for L1_counter1
Finished task 3 for L2_counter1
$VAR1 = {
'nested1' => {
'L2_counter1' => 4
},
'L1_counter1' => 2
};
Regards, Mario