Using the OP's example, I modified it to do locking via MCE::Mutex. Afterwards, toyed with a couple MCE demonstrations. All demonstrations output orderly.
OP's example, locking via MCE::Mutex
#!/opt/perl/bin/perl
use strict;
use threads;
use MCE::Mutex;
use MCE::Shared;
open my $fh, '|-', 'gzip > test.txt.gz';
foreach (1..10000) {
print {$fh} sprintf("%05d%s\n", $_, ('abc123' x 10));
}
close $fh;
mce_open $fh,'-|','gzip -cd test.txt.gz'
or die "Failed to uncompress: $!\n";
$| = 1;
my @threads = ();
my $mutex = MCE::Mutex->new();
foreach (1..3) {
push @threads, threads->create(\&test);
}
$_->join() foreach @threads;
close $fh;
print "\n";
sub test {
my $tid = threads->tid();
my $line;
while(1) {
threads->yield();
$mutex->lock();
$line = <$fh> or last;
print "Thread $tid ".$line;
$mutex->unlock();
}
$mutex->unlock;
}
MCE, no chunking
#!/opt/perl/bin/perl
use strict;
use threads;
use MCE;
open my $fh, '|-', 'gzip > test.txt.gz';
foreach (1..10000) {
print {$fh} sprintf("%05d%s\n", $_, ('abc123' x 10));
}
close $fh;
open $fh, '-|', 'gzip -cd test.txt.gz'
or die "Failed to uncompress: $!\n";
$| = 1;
# MCE spawns threads when threads is present
MCE->new(
chunk_size => 1, max_workers => 3,
input_data => $fh,
init_relay => 1,
user_func => sub {
my ($mce, $chunk_ref, $chunk_id) = @_;
my $tid = threads->tid();
MCE::relay sub {
print "Thread $tid ".$chunk_ref->[0];
};
}
)->run;
close $fh;
print "\n";
MCE, chunking enabled
#!/opt/perl/bin/perl
use strict;
use threads;
use MCE;
open my $fh, '|-', 'gzip > test.txt.gz';
foreach (1..10000) {
print {$fh} sprintf("%05d%s\n", $_, ('abc123' x 10));
}
close $fh;
open $fh, '-|', 'gzip -cd test.txt.gz'
or die "Failed to uncompress: $!\n";
$| = 1;
# MCE spawns threads when threads is present
MCE->new(
chunk_size => 500, max_workers => 3,
input_data => $fh,
init_relay => 1,
user_func => sub {
my ($mce, $chunk_ref, $chunk_id) = @_;
my $tid = threads->tid();
my $buf = '';
foreach my $line ( @{ $chunk_ref } ) {
$buf .= "Thread $tid ".$line;
}
MCE::relay sub {
print $buf;
};
}
)->run;
close $fh;
print "\n";
Results taken from a Linux CentOS 7.3 VM
My laptop is a late 2013 Macbook Pro, 2.6 Ghz i7 quad CPU. The Linux virtual machine is configured with 4 cores.
* 3 workers
time perl script_sem.pl | wc -l # 0.543s OP's example
time perl script_mutex.pl | wc -l # 0.663s Ditto, MCE::Mutex
time perl script_mce.pl | wc -l # 0.225s MCE, no chunking
time perl script_chunk.pl | wc -l # 0.077s MCE, chunking enabled
* 10 workers
time perl script_sem.pl | wc -l # 1.072s OP's example
time perl script_mutex.pl | wc -l # 0.726s Ditto, MCE::Mutex
time perl script_mce.pl | wc -l # 0.255s MCE, no chunking
time perl script_chunk.pl | wc -l # 0.112s MCE, chunking enabled
* 20 workers
time perl script_sem.pl | wc -l # 1.849s OP's example
time perl script_mutex.pl | wc -l # 0.803s Ditto, MCE::Mutex
time perl script_mce.pl | wc -l # 0.339s MCE, no chunking
time perl script_chunk.pl | wc -l # 0.179s MCE, chunking enabled
Regards, Mario |