http://qs321.pair.com?node_id=11115368

perlygapes has asked for the wisdom of the Perl Monks concerning the following question:

Oh Great and Wholly Wons,

How may I make disciples of multiple child threads such that they will dutifully carry out my DB access statements in parallel harmony on the same tablet?

Example:

#!/usr/bin/perl use 5.24.0; use strict; use warnings; use Parallel::ForkManager; use DBI ':sql_types'; use DBD::Pg qw/:pg_types/; # connect to Postgres DB my $DSN = 'DBI:Pg:dbname=db_test'; my $userid = "postgres"; my $sesame = "opensesame"; my $dbh = DBI->connect($DSN, $userid, $sesame, { AutoCommit => 1, RaiseError => 1, PrintError => 1 }) or die "Connection failed!\n" . $DBI::errstr; my %columns; # hash for persistent column array id mapping $columns{field1} = 0; $columns{field2} = 1; $columns{field3} = 2; $columns{field4} = 3; my @columns; # deterministic - $values[$columns{$column}]; $columns[0] = 'field1'; $columns[1] = 'field2'; $columns[2] = 'field3'; $columns[3] = 'field4'; my $placeholders = join(", ", map { '?' } @columns); # Build the SQL SELECT statement my $sql_select = qq(SELECT count(*) FROM mytable WHERE field1 = ?;); # PREPARE the SELECT statement 'template' handle my $sth_select = $dbh->prepare_cached($sql_select); # Build the SQL INSERT statement my $sql_insert = "INSERT INTO mytable (" . join(", ", @columns) # column names . ") VALUES ($placeholders)"; # PREPARE the INSERT statement 'template' handle my $sth_insert = $dbh->prepare_cached($sql_insert); # create array for child threads/processes my @children; $children[0] = 'child1'; $children[1] = 'child2'; $children[2] = 'child3'; $children[3] = 'child4'; main(); # close template statement handles and disconnect from db $sth_select->finish(); $sth_insert->finish(); $dbh->disconnect; exit; #--------<subs>---------- main { # create Parallel::ForkManager object for MAX_PROCESSES my $parallel = Parallel::ForkManager->new(4); $parallel->run_on_start(sub{ my ($pid,$ident) = @_; print "Starting $ident under process id $pid\n"; }); $parallel->run_on_finish(sub{ my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_structure_reference) = @_; }); print "Running $child_count threads.\n"; my $thread_count = 0; NEWPROCESS: for my $child (@children) { $thread_count++; print "Thread $thread_count running for $child\n"; # fork parallel threads - per child $parallel->start($child) and next NEWPROCESS; if ($child =~ m/child1/i) { my $sth_select_child1 = $sth_select; # can I do this? my $sth_insert_child1 = $sth_insert; # or is it sinful? # ... get and prepare data for db # ... check db for existing rows with $sth_select_child1 # ... if not exist then insert with $sth_insert_child1 $sth_select_child1->finish(); $sth_insert_child1->finish(); } elsif ($child =~ m/child2/i) { my $sth_select_child2 = $sth_select; my $sth_insert_child2 = $sth_insert; # ... get and prepare data for db # ... check db for existing rows with $sth_select_child2 # ... if not exist then insert with $sth_insert_child2 $sth_select_child2->finish(); $sth_insert_child2->finish(); } elsif ($child =~ m/child3/i) { my $sth_select_child3 = $sth_select; my $sth_insert_child3 = $sth_insert; # ... get and prepare data for db # ... check db for existing rows with $sth_select_child3 # ... if not exist then insert with $sth_insert_child3 $sth_select_child3->finish(); $sth_insert_child3->finish(); } elsif ($child =~ m/child4/i) { my $sth_select_child4 = $sth_select; my $sth_insert_child4 = $sth_insert; # ... get and prepare data for db # ... check db for existing rows with $sth_select_child4 # ... if not exist then insert with $sth_insert_child4 $sth_select_child4->finish(); $sth_insert_child4->finish(); } print "\$parallel->finish on child $child\n"; $parallel->finish(0); } print "\$parallel->wait_all_children() is waiting...\n"; $parallel->wait_all_children(); } # main

I suppose the key question is this:
how do I create parallel handles for simultaneous read/write access to the same db table safely?

Homage to all ye who read and understand.

  • Comment on Create parallel database handles or SQL statements for multi-threaded/process access to Postgres DB using DBI, DBD::Pg and Parallel::ForkManager
  • Download Code

Replies are listed 'Best First'.
Re: Create parallel database handles... (MCE::Loop)
by 1nickt (Abbot) on Apr 11, 2020 at 15:59 UTC

    Hi perlygapes,

    Here are a few observations.

    • You don't ever want to share a DBI handle across processes or threads.

    • You don't need to. Databases support multiple connections; you just don't want too many open at the same time, and you want to avoid the overhead of opening the connection repeatedly.

    • You are using Parallel::ForkManager, so calling the child processes 'threads' seems to be misleading.

    • Your use of Parallel::ForkManager (a great tool I used and loved for years) is akin to using a slide rule when a programmable calculator is on your desk. It's better than the abacus up on the shelf, but ...

    Using MCE for parallelization has two huge advantages (as well as all the others, haha):

    • It spawns a pool of workers that perform repeated jobs from the job list.

    • It uses chunking so a worker handles a bunch of jobs each time its turn comes up to grab a new chunk.

    I would write your code something like the following working example.

    Schema:

    create table mytable( field1 integer, field2 varchar(24), field3 varchar(24), field4 varchar(24), field5 varchar(24) );

    use strict; use warnings; use Data::GUID; use DBD::Pg; use SQL::Abstract; use Tie::Cycle; use MCE::Loop max_workers => 4; my $sqla = SQL::Abstract->new; my @cols = map {"field$_"} 1..5; my $ins_sql = $sqla->insert('mytable', { map { $_ => '' } @cols }); my $sel_sql = $sqla->select('mytable', 'count(*)', { field1 => '' }); #--------------------------------------------------------------------- +# mce_loop { my ($mce, $chunk, $chunk_id) = @_; my $dbh = get_dbh(); my $ins_sth = $dbh->prepare_cached($ins_sql); my $sel_sth = $dbh->prepare_cached($sel_sql); for my $record( @{$chunk} ) { $ins_sth->execute( @{$record} ); $sel_sth->execute( 42 ); my ($count) = $sel_sth->fetchrow_array; my $msg = sprintf 'wid %s; chnk %s; ins %s; cnt %s', $mce->wid, $chunk_id, $record->[0], $count; MCE->say($msg); } } @{ get_sample_data() }; #--------------------------------------------------------------------- +# sub get_dbh { my $dsn = 'DBI:Pg:dbname=test_db'; my $dbh = DBI->connect($dsn, $ENV{USER}, undef, { AutoCommit => 1, RaiseError => 1, PrintError => 1 }) or die "Connection failed!\n" . $DBI::errstr; return $dbh; } sub get_sample_data { tie my $value1, 'Tie::Cycle', [ 40 .. 49 ]; return [ map { [ $value1, map { Data::GUID->new->as_base64 } 0..3] } 1..1000 ]; } __END__

    Partial output:

    $ perl mce-pg.pl wid 4; chnk 3; ins 40; cnt 0 wid 1; chnk 1; ins 40; cnt 0 wid 3; chnk 2; ins 40; cnt 0 wid 2; chnk 4; ins 40; cnt 0 wid 4; chnk 3; ins 41; cnt 0 wid 1; chnk 1; ins 41; cnt 0 wid 3; chnk 2; ins 41; cnt 0 wid 2; chnk 4; ins 41; cnt 1 wid 4; chnk 3; ins 42; cnt 1 wid 1; chnk 1; ins 42; cnt 2 ... wid 2; chnk 97; ins 48; cnt 100 wid 4; chnk 99; ins 48; cnt 100 wid 1; chnk 100; ins 48; cnt 100 wid 3; chnk 98; ins 49; cnt 100 wid 2; chnk 97; ins 49; cnt 100 wid 4; chnk 99; ins 49; cnt 100 wid 1; chnk 100; ins 49; cnt 100

    Notes:

    • MCE sets the chunk size automatically (in this case to 100)

    • We get 100 rows from the DB count query with field1 value 42, as expected.

    • As seen in the 8th and 9th line of output:
      wid 2; chnk 4; ins 41; cnt 1 wid 4; chnk 3; ins 42; cnt 1
      ... after worker 2 inserts its record with value 41 and when it makes the select query, worker 4 has already inserted a record with value 42, but has not yet made the select query or written its output message (comes right after).

    • I always use SQL::Abstract to generate SQL statements as I trust it more than I trust me.

    Hope this helps!


    The way forward always starts with a minimal test.

      Hi 1nickt,

      Nice example! MCE has helpful user_begin and user_end options. That's a great place for workers to connect and clean up respectively. Basically, one connects to the DB one time including preparing any statement inside user_begin. The user_end block is for calling finish on every statement handle prior to calling disconnect on the DBI handle.

      Update: Added MCE interval option, stagger workers connecting to the DB.

      Schema:

      create table mytable( field1 integer, field2 varchar(24), field3 varchar(24), field4 varchar(24), field5 varchar(24) );

      Perl:

      use strict; use warnings; use Data::GUID; use DBD::Pg; use SQL::Abstract; use Tie::Cycle; use MCE::Loop; my $sqla = SQL::Abstract->new; my @cols = map {"field$_"} 1..5; # https://www.cattlegrid.info/2006/06/13/write-no-more-sql-abstract-it +.html my $ins_sql = $sqla->insert('mytable', { map { $_ => '' } @cols }); my $sel_sql = $sqla->select('mytable', 'count(*)', { field2 => '' }); my $upd_sql = $sqla->update('mytable', { field2 => '' }, { field2 => ' +' }); #--------------------------------------------------------------------# my $dsn = 'DBI:Pg:dbname=test_db;host=localhost;port=5432'; my ($dbh, $ins_sth, $sel_sth, $upd_sth); MCE::Loop->init( max_workers => 4, interval => 0.125, # delay period for MCE->yield user_begin => sub { MCE->yield; # stagger workers connecting to the DB $dbh = DBI->connect($dsn, $ENV{USER}, undef, { AutoCommit => 1, RaiseError => 1, PrintError => 1 }) or die "Connection failed!\n" . $DBI::errstr; $ins_sth = $dbh->prepare_cached($ins_sql); $sel_sth = $dbh->prepare_cached($sel_sql); $upd_sth = $dbh->prepare_cached($upd_sql); }, user_end => sub { $sel_sth->finish, $ins_sth->finish; $dbh->disconnect; }, ); mce_loop { my ($mce, $chunk, $chunk_id) = @_; for my $record( @{$chunk} ) { $ins_sth->execute( @{$record} ); my $field2_old = $record->[1]; my $field2_new1 = Data::GUID->new->as_base64; my $field2_new2 = Data::GUID->new->as_base64; # update using a prepared statement $upd_sth->execute( $field2_new1, $field2_old ); # update using the dbh handle my ($query, @bind) = $sqla->update( 'mytable', { field2 => $field2_new2 }, { field2 => $field2_new1 }, ); $dbh->do($query, undef, @bind); # select records $sel_sth->execute( $field2_new2 ); my ($count) = $sel_sth->fetchrow_array; # count is 1 due to selecting field2 = $field2_new2 my $msg = sprintf 'wid %s; chnk %s; ins %s; cnt %s', $mce->wid, $chunk_id, $record->[0], $count; MCE->say($msg); } } get_sample_data(); # ^^ do not pass @{ get_sample_data() } to mce_loop # it will not work if @{ [ has 1 element ] } # pass the array ref instead, MCE accepts it MCE::Loop->finish; #--------------------------------------------------------------------# sub get_sample_data { tie my $value1, 'Tie::Cycle', [ 40 .. 49 ]; return [ map { [ $value1, map { Data::GUID->new->as_base64 } 0..3] } 1..1000 ]; }

      See also this post for a version using a shared DBI handle.

      Regards, Mario

      Thank you 1nickt for raising my attention to the difference between DB connections and DB handles. That is a very good point.

      I readily admit that:

    • I don't understand the exact nature of how Perl uses a DBH and how Postgres sees it
    • using a separate DB connection instead for each child feels intuitively right
    • my understanding and use of the terms 'thread' and 'process' are not fully mature and somewhat imprecise
    • I have never used SQL::Abstract but upon having a cursory glance after your example it looks like a good option that may save me some code
    • I've also never used MCE - this is not as easy to understand and I'll have to look into it

        Hi again perlygapes,

        The MCE::Loop code just abstracts away all your Parallel::ForkManager logic and improves it, just as Parallel::ForkManager abstracts away and improves some of the tedious manual work of using fork() directly. See how the logic is encapsulated in a sub just like in your code, only with less concurrency boilerplate.

        "using a separate DB connection instead for each child feels intuitively right"

        I agree, the code I shared keeps a connection open for each child, which itself stays alive and handles multiple jobs from the job list as managed by MCE.

        Here's a simpler example I've shared recently showing how to parallelize existing code for making a series of HTTP requests. How would you do the same using P::FM?

        single process

        use strict; use warnings; use 5.010; use Data::Dumper; use HTTP::Tiny; use Time::HiRes 'gettimeofday', 'tv_interval'; my $ua = HTTP::Tiny->new( timeout => 10 ); my @urls = qw< gap.com amazon.com ebay.com lego.com wunderground.com imdb.com underarmour.com disney.com espn.com dailymail.com >; my %report; foreach( @urls) { my $start = [gettimeofday]; $ua->get('https://' . $_); $report{$_} = tv_interval($start, [gettimeofday]) ); }); say Dumper \%report;

        six processes
        (workers stay alive, looping through the list, writing to a shared hash)
        (one added line, two slightly changed lines)

        use strict; use warnings; use 5.010; use Data::Dumper; use HTTP::Tiny; use Time::HiRes 'gettimeofday', 'tv_interval'; use MCE; use MCE::Shared; my $ua = HTTP::Tiny->new( timeout => 10 ); my @urls = qw< gap.com amazon.com ebay.com lego.com wunderground.com imdb.com underarmour.com disney.com espn.com dailymail.com >; my $report = MCE::Shared->hash; MCE->new( max_workers => 6 )->foreach( \@urls, sub { my $start = [gettimeofday]; $ua->get('https://' . $_); $report->set( $_, tv_interval($start, [gettimeofday]) ); }); say Dumper $report->export;

        Update: fixed error in first demo code, ++choroba

        Hope this helps!



        The way forward always starts with a minimal test.

      While it's nice to know that mce is good at parallel code, I'm pretty sure you meant to link to MCE

        tvm


        The way forward always starts with a minimal test.
Re: Create parallel database handles or SQL statements for multi-threaded/process access to Postgres DB using DBI, DBD::Pg and Parallel::ForkManager
by erix (Parson) on Apr 12, 2020 at 07:33 UTC

    Postgres nor a DBI dbhandle do threads (as 1nickt already implied).

    I don't think doing this kind of parallel access is going to help all that much to get more out of the DB. (But it might be useful for testing, I suppose.)

    And it seems to me you're forgetting the hard part: UPDATE. Do you not need that?

      Thanks erix.

      I do not need to update, actually.
      All I need to do is check if an identical record already exists and INSERT if not, else move onto the next 'candidate record' to insert.
      I will have multiple CPUs all generating candidates for insertion into the table but the records must be unique.
      I know Postgres will tell me if I try to create a duplicate record on a field I have set to UNIQUE, but I am not sure exactly how to check for that in Perl and make sure the script continues rather than dies.
      Incorporating the 'dbh-per-thread/process' idea,
      the pseudo-code would go something like:

      foreach $CPU (@CPUs) { create dbh for this CPU; prepare_cached SELECT; prepare_cached INSERT; foreach combination { # criterion have affinity with CPU core # so criterion is always unique to CPU generate row-data; # check if row already exists; # what the SELECT sth is for if not then { insert new row; # what the INSERT sth is for } } }

        Hi again, maybe what you need is an no-op "upsert", implemented in Postgres with INSERT ON CONFLICT ?

        Something like

        INSERT INTO myTable (foo, bar) VALUES ('baz', 'qux') ON CONFLICT (foo) DO NOTHING;
        ... where foo is your column that has a unique key constraint.

        Hope this helps!


        The way forward always starts with a minimal test.