http://qs321.pair.com?node_id=11115379


in reply to Create parallel database handles or SQL statements for multi-threaded/process access to Postgres DB using DBI, DBD::Pg and Parallel::ForkManager

Hi perlygapes,

Here are a few observations.

Using MCE for parallelization has two huge advantages (as well as all the others, haha):

I would write your code something like the following working example.

Schema:

create table mytable( field1 integer, field2 varchar(24), field3 varchar(24), field4 varchar(24), field5 varchar(24) );

use strict; use warnings; use Data::GUID; use DBD::Pg; use SQL::Abstract; use Tie::Cycle; use MCE::Loop max_workers => 4; my $sqla = SQL::Abstract->new; my @cols = map {"field$_"} 1..5; my $ins_sql = $sqla->insert('mytable', { map { $_ => '' } @cols }); my $sel_sql = $sqla->select('mytable', 'count(*)', { field1 => '' }); #--------------------------------------------------------------------- +# mce_loop { my ($mce, $chunk, $chunk_id) = @_; my $dbh = get_dbh(); my $ins_sth = $dbh->prepare_cached($ins_sql); my $sel_sth = $dbh->prepare_cached($sel_sql); for my $record( @{$chunk} ) { $ins_sth->execute( @{$record} ); $sel_sth->execute( 42 ); my ($count) = $sel_sth->fetchrow_array; my $msg = sprintf 'wid %s; chnk %s; ins %s; cnt %s', $mce->wid, $chunk_id, $record->[0], $count; MCE->say($msg); } } @{ get_sample_data() }; #--------------------------------------------------------------------- +# sub get_dbh { my $dsn = 'DBI:Pg:dbname=test_db'; my $dbh = DBI->connect($dsn, $ENV{USER}, undef, { AutoCommit => 1, RaiseError => 1, PrintError => 1 }) or die "Connection failed!\n" . $DBI::errstr; return $dbh; } sub get_sample_data { tie my $value1, 'Tie::Cycle', [ 40 .. 49 ]; return [ map { [ $value1, map { Data::GUID->new->as_base64 } 0..3] } 1..1000 ]; } __END__

Partial output:

$ perl mce-pg.pl wid 4; chnk 3; ins 40; cnt 0 wid 1; chnk 1; ins 40; cnt 0 wid 3; chnk 2; ins 40; cnt 0 wid 2; chnk 4; ins 40; cnt 0 wid 4; chnk 3; ins 41; cnt 0 wid 1; chnk 1; ins 41; cnt 0 wid 3; chnk 2; ins 41; cnt 0 wid 2; chnk 4; ins 41; cnt 1 wid 4; chnk 3; ins 42; cnt 1 wid 1; chnk 1; ins 42; cnt 2 ... wid 2; chnk 97; ins 48; cnt 100 wid 4; chnk 99; ins 48; cnt 100 wid 1; chnk 100; ins 48; cnt 100 wid 3; chnk 98; ins 49; cnt 100 wid 2; chnk 97; ins 49; cnt 100 wid 4; chnk 99; ins 49; cnt 100 wid 1; chnk 100; ins 49; cnt 100

Notes:

Hope this helps!


The way forward always starts with a minimal test.

Replies are listed 'Best First'.
Re^2: Create parallel database handles... (MCE::Loop)
by marioroy (Prior) on Apr 12, 2020 at 21:36 UTC

    Hi 1nickt,

    Nice example! MCE has helpful user_begin and user_end options. That's a great place for workers to connect and clean up respectively. Basically, one connects to the DB one time including preparing any statement inside user_begin. The user_end block is for calling finish on every statement handle prior to calling disconnect on the DBI handle.

    Update: Added MCE interval option, stagger workers connecting to the DB.

    Schema:

    create table mytable( field1 integer, field2 varchar(24), field3 varchar(24), field4 varchar(24), field5 varchar(24) );

    Perl:

    use strict; use warnings; use Data::GUID; use DBD::Pg; use SQL::Abstract; use Tie::Cycle; use MCE::Loop; my $sqla = SQL::Abstract->new; my @cols = map {"field$_"} 1..5; # https://www.cattlegrid.info/2006/06/13/write-no-more-sql-abstract-it +.html my $ins_sql = $sqla->insert('mytable', { map { $_ => '' } @cols }); my $sel_sql = $sqla->select('mytable', 'count(*)', { field2 => '' }); my $upd_sql = $sqla->update('mytable', { field2 => '' }, { field2 => ' +' }); #--------------------------------------------------------------------# my $dsn = 'DBI:Pg:dbname=test_db;host=localhost;port=5432'; my ($dbh, $ins_sth, $sel_sth, $upd_sth); MCE::Loop->init( max_workers => 4, interval => 0.125, # delay period for MCE->yield user_begin => sub { MCE->yield; # stagger workers connecting to the DB $dbh = DBI->connect($dsn, $ENV{USER}, undef, { AutoCommit => 1, RaiseError => 1, PrintError => 1 }) or die "Connection failed!\n" . $DBI::errstr; $ins_sth = $dbh->prepare_cached($ins_sql); $sel_sth = $dbh->prepare_cached($sel_sql); $upd_sth = $dbh->prepare_cached($upd_sql); }, user_end => sub { $sel_sth->finish, $ins_sth->finish; $dbh->disconnect; }, ); mce_loop { my ($mce, $chunk, $chunk_id) = @_; for my $record( @{$chunk} ) { $ins_sth->execute( @{$record} ); my $field2_old = $record->[1]; my $field2_new1 = Data::GUID->new->as_base64; my $field2_new2 = Data::GUID->new->as_base64; # update using a prepared statement $upd_sth->execute( $field2_new1, $field2_old ); # update using the dbh handle my ($query, @bind) = $sqla->update( 'mytable', { field2 => $field2_new2 }, { field2 => $field2_new1 }, ); $dbh->do($query, undef, @bind); # select records $sel_sth->execute( $field2_new2 ); my ($count) = $sel_sth->fetchrow_array; # count is 1 due to selecting field2 = $field2_new2 my $msg = sprintf 'wid %s; chnk %s; ins %s; cnt %s', $mce->wid, $chunk_id, $record->[0], $count; MCE->say($msg); } } get_sample_data(); # ^^ do not pass @{ get_sample_data() } to mce_loop # it will not work if @{ [ has 1 element ] } # pass the array ref instead, MCE accepts it MCE::Loop->finish; #--------------------------------------------------------------------# sub get_sample_data { tie my $value1, 'Tie::Cycle', [ 40 .. 49 ]; return [ map { [ $value1, map { Data::GUID->new->as_base64 } 0..3] } 1..1000 ]; }

    See also this post for a version using a shared DBI handle.

    Regards, Mario

Re^2: Create parallel database handles... (MCE::Loop)
by perlygapes (Sexton) on Apr 14, 2020 at 00:27 UTC
    Thank you 1nickt for raising my attention to the difference between DB connections and DB handles. That is a very good point.

    I readily admit that:

  • I don't understand the exact nature of how Perl uses a DBH and how Postgres sees it
  • using a separate DB connection instead for each child feels intuitively right
  • my understanding and use of the terms 'thread' and 'process' are not fully mature and somewhat imprecise
  • I have never used SQL::Abstract but upon having a cursory glance after your example it looks like a good option that may save me some code
  • I've also never used MCE - this is not as easy to understand and I'll have to look into it

      Hi again perlygapes,

      The MCE::Loop code just abstracts away all your Parallel::ForkManager logic and improves it, just as Parallel::ForkManager abstracts away and improves some of the tedious manual work of using fork() directly. See how the logic is encapsulated in a sub just like in your code, only with less concurrency boilerplate.

      "using a separate DB connection instead for each child feels intuitively right"

      I agree, the code I shared keeps a connection open for each child, which itself stays alive and handles multiple jobs from the job list as managed by MCE.

      Here's a simpler example I've shared recently showing how to parallelize existing code for making a series of HTTP requests. How would you do the same using P::FM?

      single process

      use strict; use warnings; use 5.010; use Data::Dumper; use HTTP::Tiny; use Time::HiRes 'gettimeofday', 'tv_interval'; my $ua = HTTP::Tiny->new( timeout => 10 ); my @urls = qw< gap.com amazon.com ebay.com lego.com wunderground.com imdb.com underarmour.com disney.com espn.com dailymail.com >; my %report; foreach( @urls) { my $start = [gettimeofday]; $ua->get('https://' . $_); $report{$_} = tv_interval($start, [gettimeofday]) ); }); say Dumper \%report;

      six processes
      (workers stay alive, looping through the list, writing to a shared hash)
      (one added line, two slightly changed lines)

      use strict; use warnings; use 5.010; use Data::Dumper; use HTTP::Tiny; use Time::HiRes 'gettimeofday', 'tv_interval'; use MCE; use MCE::Shared; my $ua = HTTP::Tiny->new( timeout => 10 ); my @urls = qw< gap.com amazon.com ebay.com lego.com wunderground.com imdb.com underarmour.com disney.com espn.com dailymail.com >; my $report = MCE::Shared->hash; MCE->new( max_workers => 6 )->foreach( \@urls, sub { my $start = [gettimeofday]; $ua->get('https://' . $_); $report->set( $_, tv_interval($start, [gettimeofday]) ); }); say Dumper $report->export;

      Update: fixed error in first demo code, ++choroba

      Hope this helps!



      The way forward always starts with a minimal test.
        Something I just realised that I neglected to mention in my example was that I need to apply CPU affinity in the script. That is, I need to be able to specify that 'worker 1' MUST use CPU0, 'worker 2' MUST use CPU1, etc.

        This is because I need to have another parallel code block where each worker launches an external single-threaded executable that will be accessing another DB and writing results to a third DB but these MUST NOT access and write to the same table at the same time. This affinity is in essence to avoid access conflicts/violations.

        How can this be done in MCE?

        Thanks again.
Re^2: Create parallel database handles... (MCE::Loop)
by Anonymous Monk on Apr 12, 2020 at 04:01 UTC

    While it's nice to know that mce is good at parallel code, I'm pretty sure you meant to link to MCE

      tvm


      The way forward always starts with a minimal test.