use strict; use warnings; use Data::GUID; use DBD::Pg; use SQL::Abstract; use Tie::Cycle; use MCE::Loop max_workers => 4; use MCE::Shared; my $sqla = SQL::Abstract->new; my @cols = map {"field$_"} 1..5; # https://www.cattlegrid.info/2006/06/13/write-no-more-sql-abstract-it.html my $ins_sql = $sqla->insert('mytable', { map { $_ => '' } @cols }); my $sel_sql = $sqla->select('mytable', 'count(*)', { field2 => '' }); my $upd_sql = $sqla->update('mytable', { field2 => '' }, { field2 => '' }); #--------------------------------------------------------------------# package My::DBI { use DBI; sub new { my ( $class, $dsn, $user, $password, $params ) = @_; my $self = {}; # MCE::Shared will emit the error and exit if fail to connect $self->{DBH} = DBI->connect($dsn, $user, $password, $params); $self->{STMT} = {}; bless $self, $class; } sub prepare_cached { my ( $self, $key, $sql ) = @_; $self->{STMT}{$key} = $self->{DBH}->prepare_cached($sql); 1; } sub do { my $self = shift; $self->{DBH}->do(@_); } sub execute { my ( $self, $key ) = ( shift, shift ); if ( my $stmt = $self->{STMT}{$key} ) { $stmt->execute(@_); } } sub fetchrow_array { my ( $self, $key ) = ( shift, shift ); if ( my $stmt = $self->{STMT}{$key} ) { $stmt->execute(@_); $stmt->fetchrow_array; } } sub finish { my ( $self, $key ) = @_; if ( $key ) { $self->{STMT}{$key}->finish if $self->{STMT}{$key}; } else { $self->{STMT}{$_}->finish for keys %{ $self->{STMT} }; } return 1; } sub disconnect { my ( $self ) = @_; $self->finish; $self->{DBH}->disconnect; 1; } }; #--------------------------------------------------------------------# my $dsn = 'DBI:Pg:dbname=test_db;host=localhost;port=5432'; my $sdb = MCE::Shared->share( { module => 'My::DBI' }, $dsn, $ENV{USER}, undef, { AutoCommit => 1, RaiseError => 1, PrintError => 1 }, ); $sdb->prepare_cached('ins_sth', $ins_sql); $sdb->prepare_cached('sel_sth', $sel_sql); $sdb->prepare_cached('upd_sth', $upd_sql); mce_loop { my ($mce, $chunk, $chunk_id) = @_; for my $record( @{$chunk} ) { $sdb->execute('ins_sth', @{$record}); my $field2_old = $record->[1]; my $field2_new1 = Data::GUID->new->as_base64; my $field2_new2 = Data::GUID->new->as_base64; # update using a prepared statement $sdb->execute('upd_sth', $field2_new1, $field2_old); # update using the dbh handle inside the shared class my ($query, @bind) = $sqla->update( 'mytable', { field2 => $field2_new2 }, { field2 => $field2_new1 }, ); $sdb->do($query, undef, @bind); # pass any arguments for execute inside the shared class my ($count) = $sdb->fetchrow_array('sel_sth', $field2_new2); # count is 1 due to selecting field2 = $field2_new2 my $msg = sprintf 'wid %s; chnk %s; ins %s; cnt %s', $mce->wid, $chunk_id, $record->[0], $count; MCE->say($msg); } } get_sample_data(); # ^^ do not pass @{ get_sample_data() } to mce_loop # it will not work if @{ [ has 1 element ] } # pass the array ref instead, MCE accepts it MCE::Loop->finish; $sdb->disconnect; #--------------------------------------------------------------------# sub get_sample_data { tie my $value1, 'Tie::Cycle', [ 40 .. 49 ]; return [ map { [ $value1, map { Data::GUID->new->as_base64 } 0..3] } 1..1000 ]; }