Beefy Boxes and Bandwidth Generously Provided by pair Networks
Don't ask to ask, just ask
 
PerlMonks  

Re^2: Create parallel database handles... (MCE::Loop)

by marioroy (Vicar)
on Apr 12, 2020 at 21:36 UTC ( #11115404=note: print w/replies, xml ) Need Help??


in reply to Re: Create parallel database handles... (MCE::Loop)
in thread Create parallel database handles or SQL statements for multi-threaded/process access to Postgres DB using DBI, DBD::Pg and Parallel::ForkManager

Hi 1nickt,

Nice example! MCE has helpful user_begin and user_end options. That's a great place for workers to connect and clean up respectively. Basically, one connects to the DB one time including preparing any statement inside user_begin. The user_end block is for calling finish on every statement handle prior to calling disconnect on the DBI handle.

Update: Added MCE interval option, stagger workers connecting to the DB.

Schema:

create table mytable( field1 integer, field2 varchar(24), field3 varchar(24), field4 varchar(24), field5 varchar(24) );

Perl:

use strict; use warnings; use Data::GUID; use DBD::Pg; use SQL::Abstract; use Tie::Cycle; use MCE::Loop; my $sqla = SQL::Abstract->new; my @cols = map {"field$_"} 1..5; # https://www.cattlegrid.info/2006/06/13/write-no-more-sql-abstract-it +.html my $ins_sql = $sqla->insert('mytable', { map { $_ => '' } @cols }); my $sel_sql = $sqla->select('mytable', 'count(*)', { field2 => '' }); my $upd_sql = $sqla->update('mytable', { field2 => '' }, { field2 => ' +' }); #--------------------------------------------------------------------# my $dsn = 'DBI:Pg:dbname=test_db;host=localhost;port=5432'; my ($dbh, $ins_sth, $sel_sth, $upd_sth); MCE::Loop->init( max_workers => 4, interval => 0.125, # delay period for MCE->yield user_begin => sub { MCE->yield; # stagger workers connecting to the DB $dbh = DBI->connect($dsn, $ENV{USER}, undef, { AutoCommit => 1, RaiseError => 1, PrintError => 1 }) or die "Connection failed!\n" . $DBI::errstr; $ins_sth = $dbh->prepare_cached($ins_sql); $sel_sth = $dbh->prepare_cached($sel_sql); $upd_sth = $dbh->prepare_cached($upd_sql); }, user_end => sub { $sel_sth->finish, $ins_sth->finish; $dbh->disconnect; }, ); mce_loop { my ($mce, $chunk, $chunk_id) = @_; for my $record( @{$chunk} ) { $ins_sth->execute( @{$record} ); my $field2_old = $record->[1]; my $field2_new1 = Data::GUID->new->as_base64; my $field2_new2 = Data::GUID->new->as_base64; # update using a prepared statement $upd_sth->execute( $field2_new1, $field2_old ); # update using the dbh handle my ($query, @bind) = $sqla->update( 'mytable', { field2 => $field2_new2 }, { field2 => $field2_new1 }, ); $dbh->do($query, undef, @bind); # select records $sel_sth->execute( $field2_new2 ); my ($count) = $sel_sth->fetchrow_array; # count is 1 due to selecting field2 = $field2_new2 my $msg = sprintf 'wid %s; chnk %s; ins %s; cnt %s', $mce->wid, $chunk_id, $record->[0], $count; MCE->say($msg); } } get_sample_data(); # ^^ do not pass @{ get_sample_data() } to mce_loop # it will not work if @{ [ has 1 element ] } # pass the array ref instead, MCE accepts it MCE::Loop->finish; #--------------------------------------------------------------------# sub get_sample_data { tie my $value1, 'Tie::Cycle', [ 40 .. 49 ]; return [ map { [ $value1, map { Data::GUID->new->as_base64 } 0..3] } 1..1000 ]; }

See also this post for a version using a shared DBI handle.

Regards, Mario

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: note [id://11115404]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others taking refuge in the Monastery: (5)
As of 2020-08-14 09:04 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?
    Which rocket would you take to Mars?










    Results (75 votes). Check out past polls.

    Notices?