% time ./re.sh
real 0m5,201s
user 0m43,394s
sys 0m1,302
####
#!/usr/bin/env perl
# https://www.perlmonks.org/?node_id=11147200
use strict;
use warnings;
use MCE;
die "usage: $0 infile1.txt [ infile2.txt ... ]\n" unless @ARGV;
my $OUT_FH; # output file-handle used by workers
# Spawn worker pool.
my $mce = MCE->new(
max_workers => MCE::Util::get_ncpu(),
chunk_size => '64K',
init_relay => 0, # specifying init_relay loads MCE::Relay
use_slurpio => 1, # enable slurpio
user_begin => sub {
# worker begin routine per each file to be processed
my ($outfile) = @{ MCE->user_args() };
open $OUT_FH, '>>', $outfile;
},
user_end => sub {
# worker end routine per each file to be processed
close $OUT_FH if defined $OUT_FH;
},
user_func => sub {
# worker chunk routine
my ($mce, $chunk_ref, $chunk_id) = @_;
process_chunk($chunk_ref);
}
)->spawn;
##
##
# first, truncate output file
{ open my $fh, '>', "out-sed.dat" or die "$!\n"; }
$mce->process("in.txt", { user_args => [ "out-sed.dat" ] })
$mce->shutdown;
##
##
# Process file(s).
my $status = 0;
while (my $infile = shift @ARGV) {
if (-d $infile) {
warn "WARN: '$infile': Is a directory, skipped\n";
$status = 1;
}
elsif (! -f $infile) {
warn "WARN: '$infile': No such file, skipped\n";
$status = 1;
}
else {
my $outfile = $infile; $outfile =~ s/\.txt$/.dat/;
if ($outfile eq $infile) {
warn "WARN: '$outfile': matches input name, skipped\n";
$status = 1;
next;
}
# truncate output file
open my $fh, '>', $outfile or do {
warn "WARN: '$outfile': $!, skipped\n";
$status = 1;
next;
};
close $fh;
# process file; pass argument(s) to workers
$mce->process($infile, { user_args => [ $outfile ] });
}
}
$mce->shutdown; # reap workers
exit $status;
##
##
# Worker function.
sub process_chunk {
my ($chunk_ref) = @_;
my $output = '';
open my $fh, '<', $chunk_ref;
while (<$fh>) {
s/[[:punct:]]//g;
s/[0-9]//g;
s/w(as|ere)/be/gi;
...
# append to output var
$output .= $_;
}
close $fh;
# Output orderly and serially.
MCE->relay_lock;
print $OUT_FH $output; $OUT_FH->flush;
MCE->relay_unlock;
}
##
##
# Worker function.
sub process_chunk {
my ($chunk_ref) = @_;
$$chunk_ref =~ s/[[:punct:]]//g;
$$chunk_ref =~ s/[0-9]//g;
$$chunk_ref =~ s/w(as|ere)/be/gi;
...
# Output orderly and serially.
MCE->relay_lock;
print $OUT_FH $$chunk_ref; $OUT_FH->flush;
MCE->relay_unlock;
}