Beefy Boxes and Bandwidth Generously Provided by pair Networks
Perl: the Markov chain saw
 
PerlMonks  

Re: Create parallel database handles... (MCE::Loop)

by 1nickt (Canon)
on Apr 11, 2020 at 15:59 UTC ( [id://11115379]=note: print w/replies, xml ) Need Help??


in reply to Create parallel database handles or SQL statements for multi-threaded/process access to Postgres DB using DBI, DBD::Pg and Parallel::ForkManager

Hi perlygapes,

Here are a few observations.

  • You don't ever want to share a DBI handle across processes or threads.

  • You don't need to. Databases support multiple connections; you just don't want too many open at the same time, and you want to avoid the overhead of opening the connection repeatedly.

  • You are using Parallel::ForkManager, so calling the child processes 'threads' seems to be misleading.

  • Your use of Parallel::ForkManager (a great tool I used and loved for years) is akin to using a slide rule when a programmable calculator is on your desk. It's better than the abacus up on the shelf, but ...

Using MCE for parallelization has two huge advantages (as well as all the others, haha):

  • It spawns a pool of workers that perform repeated jobs from the job list.

  • It uses chunking so a worker handles a bunch of jobs each time its turn comes up to grab a new chunk.

I would write your code something like the following working example.

Schema:

create table mytable( field1 integer, field2 varchar(24), field3 varchar(24), field4 varchar(24), field5 varchar(24) );

use strict; use warnings; use Data::GUID; use DBD::Pg; use SQL::Abstract; use Tie::Cycle; use MCE::Loop max_workers => 4; my $sqla = SQL::Abstract->new; my @cols = map {"field$_"} 1..5; my $ins_sql = $sqla->insert('mytable', { map { $_ => '' } @cols }); my $sel_sql = $sqla->select('mytable', 'count(*)', { field1 => '' }); #--------------------------------------------------------------------- +# mce_loop { my ($mce, $chunk, $chunk_id) = @_; my $dbh = get_dbh(); my $ins_sth = $dbh->prepare_cached($ins_sql); my $sel_sth = $dbh->prepare_cached($sel_sql); for my $record( @{$chunk} ) { $ins_sth->execute( @{$record} ); $sel_sth->execute( 42 ); my ($count) = $sel_sth->fetchrow_array; my $msg = sprintf 'wid %s; chnk %s; ins %s; cnt %s', $mce->wid, $chunk_id, $record->[0], $count; MCE->say($msg); } } @{ get_sample_data() }; #--------------------------------------------------------------------- +# sub get_dbh { my $dsn = 'DBI:Pg:dbname=test_db'; my $dbh = DBI->connect($dsn, $ENV{USER}, undef, { AutoCommit => 1, RaiseError => 1, PrintError => 1 }) or die "Connection failed!\n" . $DBI::errstr; return $dbh; } sub get_sample_data { tie my $value1, 'Tie::Cycle', [ 40 .. 49 ]; return [ map { [ $value1, map { Data::GUID->new->as_base64 } 0..3] } 1..1000 ]; } __END__

Partial output:

$ perl mce-pg.pl wid 4; chnk 3; ins 40; cnt 0 wid 1; chnk 1; ins 40; cnt 0 wid 3; chnk 2; ins 40; cnt 0 wid 2; chnk 4; ins 40; cnt 0 wid 4; chnk 3; ins 41; cnt 0 wid 1; chnk 1; ins 41; cnt 0 wid 3; chnk 2; ins 41; cnt 0 wid 2; chnk 4; ins 41; cnt 1 wid 4; chnk 3; ins 42; cnt 1 wid 1; chnk 1; ins 42; cnt 2 ... wid 2; chnk 97; ins 48; cnt 100 wid 4; chnk 99; ins 48; cnt 100 wid 1; chnk 100; ins 48; cnt 100 wid 3; chnk 98; ins 49; cnt 100 wid 2; chnk 97; ins 49; cnt 100 wid 4; chnk 99; ins 49; cnt 100 wid 1; chnk 100; ins 49; cnt 100

Notes:

  • MCE sets the chunk size automatically (in this case to 100)

  • We get 100 rows from the DB count query with field1 value 42, as expected.

  • As seen in the 8th and 9th line of output:
    wid 2; chnk 4; ins 41; cnt 1 wid 4; chnk 3; ins 42; cnt 1
    ... after worker 2 inserts its record with value 41 and when it makes the select query, worker 4 has already inserted a record with value 42, but has not yet made the select query or written its output message (comes right after).

  • I always use SQL::Abstract to generate SQL statements as I trust it more than I trust me.

Hope this helps!


The way forward always starts with a minimal test.

Replies are listed 'Best First'.
Re^2: Create parallel database handles... (MCE::Loop)
by marioroy (Prior) on Apr 12, 2020 at 21:36 UTC

    Hi 1nickt,

    Nice example! MCE has helpful user_begin and user_end options. That's a great place for workers to connect and clean up respectively. Basically, one connects to the DB one time including preparing any statement inside user_begin. The user_end block is for calling finish on every statement handle prior to calling disconnect on the DBI handle.

    Update: Added MCE interval option, stagger workers connecting to the DB.

    Schema:

    create table mytable( field1 integer, field2 varchar(24), field3 varchar(24), field4 varchar(24), field5 varchar(24) );

    Perl:

    use strict; use warnings; use Data::GUID; use DBD::Pg; use SQL::Abstract; use Tie::Cycle; use MCE::Loop; my $sqla = SQL::Abstract->new; my @cols = map {"field$_"} 1..5; # https://www.cattlegrid.info/2006/06/13/write-no-more-sql-abstract-it +.html my $ins_sql = $sqla->insert('mytable', { map { $_ => '' } @cols }); my $sel_sql = $sqla->select('mytable', 'count(*)', { field2 => '' }); my $upd_sql = $sqla->update('mytable', { field2 => '' }, { field2 => ' +' }); #--------------------------------------------------------------------# my $dsn = 'DBI:Pg:dbname=test_db;host=localhost;port=5432'; my ($dbh, $ins_sth, $sel_sth, $upd_sth); MCE::Loop->init( max_workers => 4, interval => 0.125, # delay period for MCE->yield user_begin => sub { MCE->yield; # stagger workers connecting to the DB $dbh = DBI->connect($dsn, $ENV{USER}, undef, { AutoCommit => 1, RaiseError => 1, PrintError => 1 }) or die "Connection failed!\n" . $DBI::errstr; $ins_sth = $dbh->prepare_cached($ins_sql); $sel_sth = $dbh->prepare_cached($sel_sql); $upd_sth = $dbh->prepare_cached($upd_sql); }, user_end => sub { $sel_sth->finish, $ins_sth->finish; $dbh->disconnect; }, ); mce_loop { my ($mce, $chunk, $chunk_id) = @_; for my $record( @{$chunk} ) { $ins_sth->execute( @{$record} ); my $field2_old = $record->[1]; my $field2_new1 = Data::GUID->new->as_base64; my $field2_new2 = Data::GUID->new->as_base64; # update using a prepared statement $upd_sth->execute( $field2_new1, $field2_old ); # update using the dbh handle my ($query, @bind) = $sqla->update( 'mytable', { field2 => $field2_new2 }, { field2 => $field2_new1 }, ); $dbh->do($query, undef, @bind); # select records $sel_sth->execute( $field2_new2 ); my ($count) = $sel_sth->fetchrow_array; # count is 1 due to selecting field2 = $field2_new2 my $msg = sprintf 'wid %s; chnk %s; ins %s; cnt %s', $mce->wid, $chunk_id, $record->[0], $count; MCE->say($msg); } } get_sample_data(); # ^^ do not pass @{ get_sample_data() } to mce_loop # it will not work if @{ [ has 1 element ] } # pass the array ref instead, MCE accepts it MCE::Loop->finish; #--------------------------------------------------------------------# sub get_sample_data { tie my $value1, 'Tie::Cycle', [ 40 .. 49 ]; return [ map { [ $value1, map { Data::GUID->new->as_base64 } 0..3] } 1..1000 ]; }

    See also this post for a version using a shared DBI handle.

    Regards, Mario

Re^2: Create parallel database handles... (MCE::Loop)
by perlygapes (Sexton) on Apr 14, 2020 at 00:27 UTC
    Thank you 1nickt for raising my attention to the difference between DB connections and DB handles. That is a very good point.

    I readily admit that:

  • I don't understand the exact nature of how Perl uses a DBH and how Postgres sees it
  • using a separate DB connection instead for each child feels intuitively right
  • my understanding and use of the terms 'thread' and 'process' are not fully mature and somewhat imprecise
  • I have never used SQL::Abstract but upon having a cursory glance after your example it looks like a good option that may save me some code
  • I've also never used MCE - this is not as easy to understand and I'll have to look into it

      Hi again perlygapes,

      The MCE::Loop code just abstracts away all your Parallel::ForkManager logic and improves it, just as Parallel::ForkManager abstracts away and improves some of the tedious manual work of using fork() directly. See how the logic is encapsulated in a sub just like in your code, only with less concurrency boilerplate.

      "using a separate DB connection instead for each child feels intuitively right"

      I agree, the code I shared keeps a connection open for each child, which itself stays alive and handles multiple jobs from the job list as managed by MCE.

      Here's a simpler example I've shared recently showing how to parallelize existing code for making a series of HTTP requests. How would you do the same using P::FM?

      single process

      use strict; use warnings; use 5.010; use Data::Dumper; use HTTP::Tiny; use Time::HiRes 'gettimeofday', 'tv_interval'; my $ua = HTTP::Tiny->new( timeout => 10 ); my @urls = qw< gap.com amazon.com ebay.com lego.com wunderground.com imdb.com underarmour.com disney.com espn.com dailymail.com >; my %report; foreach( @urls) { my $start = [gettimeofday]; $ua->get('https://' . $_); $report{$_} = tv_interval($start, [gettimeofday]) ); }); say Dumper \%report;

      six processes
      (workers stay alive, looping through the list, writing to a shared hash)
      (one added line, two slightly changed lines)

      use strict; use warnings; use 5.010; use Data::Dumper; use HTTP::Tiny; use Time::HiRes 'gettimeofday', 'tv_interval'; use MCE; use MCE::Shared; my $ua = HTTP::Tiny->new( timeout => 10 ); my @urls = qw< gap.com amazon.com ebay.com lego.com wunderground.com imdb.com underarmour.com disney.com espn.com dailymail.com >; my $report = MCE::Shared->hash; MCE->new( max_workers => 6 )->foreach( \@urls, sub { my $start = [gettimeofday]; $ua->get('https://' . $_); $report->set( $_, tv_interval($start, [gettimeofday]) ); }); say Dumper $report->export;

      Update: fixed error in first demo code, ++choroba

      Hope this helps!



      The way forward always starts with a minimal test.
        Something I just realised that I neglected to mention in my example was that I need to apply CPU affinity in the script. That is, I need to be able to specify that 'worker 1' MUST use CPU0, 'worker 2' MUST use CPU1, etc.

        This is because I need to have another parallel code block where each worker launches an external single-threaded executable that will be accessing another DB and writing results to a third DB but these MUST NOT access and write to the same table at the same time. This affinity is in essence to avoid access conflicts/violations.

        How can this be done in MCE?

        Thanks again.
Re^2: Create parallel database handles... (MCE::Loop)
by Anonymous Monk on Apr 12, 2020 at 04:01 UTC

    While it's nice to know that mce is good at parallel code, I'm pretty sure you meant to link to MCE

      tvm


      The way forward always starts with a minimal test.

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://11115379]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others learning in the Monastery: (3)
As of 2024-04-16 22:57 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found