This topic presented an opportunity for me to brush up on MCE, particularly the input iterator. Notice the parallel routine to process files, the input iterator run by the manager process, and MCE::Loop to launch parallel into action. The $chunk_id is used to compute $i, matching the data orderly. Moreover, MCE->last to leave the loop entirely (including further input data) due to open file error.
Update: I had met for the input iterator to obtain data after workers have spawned. Now, workers consume little memory. Hence, the reason making this demonstration.
#!/usr/bin/perl
use strict;
use feature qw{ say };
use warnings;
use Env;
use utf8;
use Time::HiRes qw(gettimeofday tv_interval usleep);
use open ':std', ':encoding(UTF-8)';
use MCE::Loop;
my $benchmark = 1; # print timings for loops
my $TMP='./tmp';
my $IN;
my $OUT;
my $wordfile="data.dat";
truncate $wordfile, 0;
#$|=1;
# substitute whole words
my %whole = qw{
going go
getting get
goes go
knew know
trying try
tried try
told tell
coming come
saying say
men man
women woman
took take
lying lie
dying die
};
# substitute on prefix
my %prefix = qw{
need need
talk talk
tak take
used use
using use
};
# substitute on substring
my %substring = qw{
mean mean
work work
read read
allow allow
gave give
bought buy
want want
hear hear
came come
destr destroy
paid pay
selve self
cities city
fight fight
creat create
makin make
includ include
};
my $re1 = qr{\b(@{[ join '|', reverse sort keys %whole ]})\b}i;
my $re2 = qr{\b(@{[ join '|', reverse sort keys %prefix ]})\w*}i;
my $re3 = qr{\b\w*?(@{[ join '|', reverse sort keys %substring ]})\w*}
+i;
truncate $wordfile, 0;
my $maxforks = 64;
my $chunksize = 128; # not too big, so not impact small jobs e.g. 3,4
+57 files
my $subdir = 0;
my $subdircount = 255;
my $tempdir = "temp";
mkdir "$tempdir";
mkdir "$tempdir/$subdir" while ($subdir++ <= $subdircount);
my $t0 = [gettimeofday];
my $elapsed;
sub process_files {
my ($mce, $chunk_ref, $chunk_id) = @_;
# Chunk ID starts at 1; so -1 to have subdir start at 1
my $subdir = (($chunk_id - 1) % $subdircount) + 1;
# compute i, matching data order
my $i = ($chunk_id - 1) * MCE->chunk_size;
while (my $infile = shift @{ $chunk_ref }) {
open my $IN, '<', $infile
or warn("open error: infile"), MCE->last;
open my $OUT, '>', "$tempdir/$subdir/text-$i"
or warn("open error: outfile"), MCE->last;
while (<$IN>) {
tr/-!"#%&()*',.\/:;?@\[\\\]”_“{’}><^)(|/ /; # no punct "
s/^/ /;
s/\n/ \n/;
s/[[:digit:]]{1,12}//g;
s/w(as|ere)/be/gi;
s{$re2}{ $prefix{lc $1} }g; # prefix
s{$re3}{ $substring{lc $1} }g; # part
s{$re1}{ $whole{lc $1} }g; # whole
print $OUT "$_";
}
close $OUT;
close $IN;
$i++;
}
return;
}
sub input_iterator {
my ($filecount, @data);
my $init_data = 1;
return sub {
if ($init_data) {
@data = glob("data-* ??/data-*");
$filecount = scalar @data;
say "Parsing $filecount files"; # okay, zero files
say "maxforks: $maxforks";
say "chunksize: $chunksize";
$init_data = 0;
}
return unless @data;
return splice @data, 0, $chunksize;
};
}
MCE::Loop->init(
chunk_size => $chunksize,
max_workers => $maxforks,
posix_exit => 1,
use_threads => 0, # use emulated fork on Windows
);
MCE::Loop->run(\&process_files, input_iterator());
MCE::Loop->finish;
local @ARGV = glob("$tempdir/*/*");
die "No files were processed" unless @ARGV; # zero files above
open $OUT, '>', $wordfile or die "Error opening $wordfile";
print {$OUT} $_ while <>;
close $OUT;
unlink glob "$tempdir/*/*";
$elapsed = tv_interval($t0);
print "regex: $elapsed\n" if $benchmark;
Sample run from a 32-core, 64-threads box:
I captured the "real" via the UNIX time command. We're dealing with big data. The UNIX time command is helpful to capture the overall time. This includes loading modules, running, and reaping workers. Not to forget, global cleanup in Perl.
The total time includes post-processing, reading temp-files, writing to data.dat, and unlinking.
# threads: init data before spawning
$ rm -fr data.dat temp
$ time ../choroba1.pl
Parsing 345701 files
maxthreads: 64
regex: 44.197007
real 0m44.648s
user 20m 0.207s
sys 0m15.573s
# threads: spawning before init data
$ rm -fr data.dat temp
$ time ../choroba2.pl
Parsing 345701 files
maxthreads: 64
regex: 41.807656
real 0m41.858s
user 19m56.085s
sys 0m15.109s
# fork: nice parallel demo by kikuchiyo
$ rm -fr data.dat temp
$ time ../kikuchiyo.pl
Parsing 345701 files
maxforks: 64
regex: 32.620493
real 0m35.793s
user 12m36.278s
sys 0m12.721s
# fork: mce child, spawning before init data
$ time ../mario2.pl
Parsing 345701 files
maxforks: 64
regex: 32.759474
real 0m32.805s
user 12m 2.313s
sys 0m14.000s
# fork: mce loop, input iterator (delay init)
$ rm -fr data.dat temp
$ time ../mario_mceiter.pl
Parsing 345701 files
maxforks: 64
chunksize: 128
regex: 31.463816
real 0m31.510s
user 12m26.168s
sys 0m10.089s