perlygapes has asked for the wisdom of the Perl Monks concerning the following question:
Oh Great and Wholly Wons,
How may I make disciples of multiple child threads such that they will dutifully carry out my DB access statements in parallel harmony on the same tablet?
Example:
#!/usr/bin/perl
use 5.24.0;
use strict;
use warnings;
use Parallel::ForkManager;
use DBI ':sql_types';
use DBD::Pg qw/:pg_types/;
# connect to Postgres DB
my $DSN = 'DBI:Pg:dbname=db_test';
my $userid = "postgres";
my $sesame = "opensesame";
my $dbh = DBI->connect($DSN, $userid, $sesame, {
AutoCommit => 1,
RaiseError => 1,
PrintError => 1
}) or die "Connection failed!\n" . $DBI::errstr;
my %columns; # hash for persistent column array id mapping
$columns{field1} = 0;
$columns{field2} = 1;
$columns{field3} = 2;
$columns{field4} = 3;
my @columns; # deterministic - $values[$columns{$column}];
$columns[0] = 'field1';
$columns[1] = 'field2';
$columns[2] = 'field3';
$columns[3] = 'field4';
my $placeholders = join(", ", map { '?' } @columns);
# Build the SQL SELECT statement
my $sql_select =
qq(SELECT count(*) FROM mytable WHERE field1 = ?;);
# PREPARE the SELECT statement 'template' handle
my $sth_select = $dbh->prepare_cached($sql_select);
# Build the SQL INSERT statement
my $sql_insert =
"INSERT INTO mytable ("
. join(", ", @columns) # column names
. ") VALUES ($placeholders)";
# PREPARE the INSERT statement 'template' handle
my $sth_insert = $dbh->prepare_cached($sql_insert);
# create array for child threads/processes
my @children;
$children[0] = 'child1';
$children[1] = 'child2';
$children[2] = 'child3';
$children[3] = 'child4';
main();
# close template statement handles and disconnect from db
$sth_select->finish();
$sth_insert->finish();
$dbh->disconnect;
exit;
#--------<subs>----------
main {
# create Parallel::ForkManager object for MAX_PROCESSES
my $parallel = Parallel::ForkManager->new(4);
$parallel->run_on_start(sub{
my ($pid,$ident) = @_;
print "Starting $ident under process id $pid\n";
});
$parallel->run_on_finish(sub{
my ($pid,
$exit_code,
$ident,
$exit_signal,
$core_dump,
$data_structure_reference) = @_;
});
print "Running $child_count threads.\n";
my $thread_count = 0;
NEWPROCESS:
for my $child (@children) {
$thread_count++;
print "Thread $thread_count running for $child\n";
# fork parallel threads - per child
$parallel->start($child) and next NEWPROCESS;
if ($child =~ m/child1/i) {
my $sth_select_child1 = $sth_select; # can I do this?
my $sth_insert_child1 = $sth_insert; # or is it sinful?
# ... get and prepare data for db
# ... check db for existing rows with $sth_select_child1
# ... if not exist then insert with $sth_insert_child1
$sth_select_child1->finish();
$sth_insert_child1->finish();
} elsif ($child =~ m/child2/i) {
my $sth_select_child2 = $sth_select;
my $sth_insert_child2 = $sth_insert;
# ... get and prepare data for db
# ... check db for existing rows with $sth_select_child2
# ... if not exist then insert with $sth_insert_child2
$sth_select_child2->finish();
$sth_insert_child2->finish();
} elsif ($child =~ m/child3/i) {
my $sth_select_child3 = $sth_select;
my $sth_insert_child3 = $sth_insert;
# ... get and prepare data for db
# ... check db for existing rows with $sth_select_child3
# ... if not exist then insert with $sth_insert_child3
$sth_select_child3->finish();
$sth_insert_child3->finish();
} elsif ($child =~ m/child4/i) {
my $sth_select_child4 = $sth_select;
my $sth_insert_child4 = $sth_insert;
# ... get and prepare data for db
# ... check db for existing rows with $sth_select_child4
# ... if not exist then insert with $sth_insert_child4
$sth_select_child4->finish();
$sth_insert_child4->finish();
}
print "\$parallel->finish on child $child\n";
$parallel->finish(0);
}
print "\$parallel->wait_all_children() is waiting...\n";
$parallel->wait_all_children();
} # main
I suppose the key question is this:
how do I create parallel handles for simultaneous read/write access to the same db table safely?
Homage to all ye who read and understand.
Re: Create parallel database handles... (MCE::Loop)
by 1nickt (Canon) on Apr 11, 2020 at 15:59 UTC
|
Hi perlygapes,
Here are a few observations.
- You don't ever want to share a DBI handle across processes or threads.
- You don't need to. Databases support multiple connections; you just don't want too many open at the same time, and you want to avoid the overhead of opening the connection repeatedly.
- You are using Parallel::ForkManager, so calling the child processes 'threads' seems to be misleading.
- Your use of Parallel::ForkManager (a great tool I used and loved for years) is akin to using a slide rule when a programmable calculator is on your desk. It's better than the abacus up on the shelf, but ...
Using MCE for parallelization has two huge advantages (as well as all the others, haha):
- It spawns a pool of workers that perform repeated jobs from the job list.
- It uses chunking so a worker handles a bunch of jobs each time its turn comes up to grab a new chunk.
I would write your code something like the following working example.
Schema:
create table mytable(
field1 integer,
field2 varchar(24),
field3 varchar(24),
field4 varchar(24),
field5 varchar(24)
);
use strict; use warnings;
use Data::GUID;
use DBD::Pg;
use SQL::Abstract;
use Tie::Cycle;
use MCE::Loop max_workers => 4;
my $sqla = SQL::Abstract->new;
my @cols = map {"field$_"} 1..5;
my $ins_sql = $sqla->insert('mytable', { map { $_ => '' } @cols });
my $sel_sql = $sqla->select('mytable', 'count(*)', { field1 => '' });
#---------------------------------------------------------------------
+#
mce_loop {
my ($mce, $chunk, $chunk_id) = @_;
my $dbh = get_dbh();
my $ins_sth = $dbh->prepare_cached($ins_sql);
my $sel_sth = $dbh->prepare_cached($sel_sql);
for my $record( @{$chunk} ) {
$ins_sth->execute( @{$record} );
$sel_sth->execute( 42 );
my ($count) = $sel_sth->fetchrow_array;
my $msg = sprintf 'wid %s; chnk %s; ins %s; cnt %s',
$mce->wid, $chunk_id, $record->[0], $count;
MCE->say($msg);
}
} @{ get_sample_data() };
#---------------------------------------------------------------------
+#
sub get_dbh {
my $dsn = 'DBI:Pg:dbname=test_db';
my $dbh = DBI->connect($dsn, $ENV{USER}, undef, {
AutoCommit => 1,
RaiseError => 1,
PrintError => 1
}) or die "Connection failed!\n" . $DBI::errstr;
return $dbh;
}
sub get_sample_data {
tie my $value1, 'Tie::Cycle', [ 40 .. 49 ];
return [ map {
[ $value1, map { Data::GUID->new->as_base64 } 0..3]
} 1..1000 ];
}
__END__
Partial output:
$ perl mce-pg.pl
wid 4; chnk 3; ins 40; cnt 0
wid 1; chnk 1; ins 40; cnt 0
wid 3; chnk 2; ins 40; cnt 0
wid 2; chnk 4; ins 40; cnt 0
wid 4; chnk 3; ins 41; cnt 0
wid 1; chnk 1; ins 41; cnt 0
wid 3; chnk 2; ins 41; cnt 0
wid 2; chnk 4; ins 41; cnt 1
wid 4; chnk 3; ins 42; cnt 1
wid 1; chnk 1; ins 42; cnt 2
...
wid 2; chnk 97; ins 48; cnt 100
wid 4; chnk 99; ins 48; cnt 100
wid 1; chnk 100; ins 48; cnt 100
wid 3; chnk 98; ins 49; cnt 100
wid 2; chnk 97; ins 49; cnt 100
wid 4; chnk 99; ins 49; cnt 100
wid 1; chnk 100; ins 49; cnt 100
Notes:
Hope this helps!
The way forward always starts with a minimal test.
| [reply] [d/l] [select] |
|
Hi 1nickt,
Nice example! MCE has helpful user_begin and user_end options. That's a great place for workers to connect and clean up respectively. Basically, one connects to the DB one time including preparing any statement inside user_begin. The user_end block is for calling finish on every statement handle prior to calling disconnect on the DBI handle.
Update: Added MCE interval option, stagger workers connecting to the DB.
Schema:
create table mytable(
field1 integer,
field2 varchar(24),
field3 varchar(24),
field4 varchar(24),
field5 varchar(24)
);
Perl:
use strict;
use warnings;
use Data::GUID;
use DBD::Pg;
use SQL::Abstract;
use Tie::Cycle;
use MCE::Loop;
my $sqla = SQL::Abstract->new;
my @cols = map {"field$_"} 1..5;
# https://www.cattlegrid.info/2006/06/13/write-no-more-sql-abstract-it
+.html
my $ins_sql = $sqla->insert('mytable', { map { $_ => '' } @cols });
my $sel_sql = $sqla->select('mytable', 'count(*)', { field2 => '' });
my $upd_sql = $sqla->update('mytable', { field2 => '' }, { field2 => '
+' });
#--------------------------------------------------------------------#
my $dsn = 'DBI:Pg:dbname=test_db;host=localhost;port=5432';
my ($dbh, $ins_sth, $sel_sth, $upd_sth);
MCE::Loop->init(
max_workers => 4,
interval => 0.125, # delay period for MCE->yield
user_begin => sub {
MCE->yield; # stagger workers connecting to the DB
$dbh = DBI->connect($dsn, $ENV{USER}, undef, {
AutoCommit => 1,
RaiseError => 1,
PrintError => 1
}) or die "Connection failed!\n" . $DBI::errstr;
$ins_sth = $dbh->prepare_cached($ins_sql);
$sel_sth = $dbh->prepare_cached($sel_sql);
$upd_sth = $dbh->prepare_cached($upd_sql);
},
user_end => sub {
$sel_sth->finish, $ins_sth->finish;
$dbh->disconnect;
},
);
mce_loop {
my ($mce, $chunk, $chunk_id) = @_;
for my $record( @{$chunk} ) {
$ins_sth->execute( @{$record} );
my $field2_old = $record->[1];
my $field2_new1 = Data::GUID->new->as_base64;
my $field2_new2 = Data::GUID->new->as_base64;
# update using a prepared statement
$upd_sth->execute( $field2_new1, $field2_old );
# update using the dbh handle
my ($query, @bind) = $sqla->update(
'mytable',
{ field2 => $field2_new2 },
{ field2 => $field2_new1 },
);
$dbh->do($query, undef, @bind);
# select records
$sel_sth->execute( $field2_new2 );
my ($count) = $sel_sth->fetchrow_array;
# count is 1 due to selecting field2 = $field2_new2
my $msg = sprintf 'wid %s; chnk %s; ins %s; cnt %s',
$mce->wid, $chunk_id, $record->[0], $count;
MCE->say($msg);
}
} get_sample_data();
# ^^ do not pass @{ get_sample_data() } to mce_loop
# it will not work if @{ [ has 1 element ] }
# pass the array ref instead, MCE accepts it
MCE::Loop->finish;
#--------------------------------------------------------------------#
sub get_sample_data {
tie my $value1, 'Tie::Cycle', [ 40 .. 49 ];
return [ map {
[ $value1, map { Data::GUID->new->as_base64 } 0..3]
} 1..1000 ];
}
See also this post for a version using a shared DBI handle.
Regards, Mario | [reply] [d/l] [select] |
|
Thank you 1nickt for raising my attention to the difference between DB connections and DB handles. That is a very good point.
I readily admit that:
- I don't understand the exact nature of how Perl uses a DBH and how Postgres sees it
- using a separate DB connection instead for each child feels intuitively right
- my understanding and use of the terms 'thread' and 'process' are not fully mature and somewhat imprecise
- I have never used SQL::Abstract but upon having a cursory glance after your example it looks like a good option that may save me some code
- I've also never used MCE - this is not as easy to understand and I'll have to look into it
| [reply] |
|
Hi again perlygapes,
The MCE::Loop code just abstracts away all your Parallel::ForkManager logic and improves it, just as Parallel::ForkManager abstracts away and improves some of the tedious manual work of using fork() directly. See how the logic is encapsulated in a sub just like in your code, only with less concurrency boilerplate.
"using a separate DB connection instead for each child feels intuitively right"
I agree, the code I shared keeps a connection open for each child, which itself stays alive and handles multiple jobs from the job list as managed by MCE.
Here's a simpler example I've shared recently showing how to parallelize existing code for making a series of HTTP requests. How would you do the same using P::FM?
single process
use strict; use warnings; use 5.010;
use Data::Dumper; use HTTP::Tiny;
use Time::HiRes 'gettimeofday', 'tv_interval';
my $ua = HTTP::Tiny->new( timeout => 10 );
my @urls = qw< gap.com amazon.com ebay.com lego.com wunderground.com
imdb.com underarmour.com disney.com espn.com dailymail.com >;
my %report;
foreach( @urls) {
my $start = [gettimeofday];
$ua->get('https://' . $_);
$report{$_} = tv_interval($start, [gettimeofday]) );
});
say Dumper \%report;
six processes
(workers stay alive, looping through the list, writing to a shared hash)
(one added line, two slightly changed lines)
use strict; use warnings; use 5.010;
use Data::Dumper; use HTTP::Tiny;
use Time::HiRes 'gettimeofday', 'tv_interval';
use MCE; use MCE::Shared;
my $ua = HTTP::Tiny->new( timeout => 10 );
my @urls = qw< gap.com amazon.com ebay.com lego.com wunderground.com
imdb.com underarmour.com disney.com espn.com dailymail.com >;
my $report = MCE::Shared->hash;
MCE->new( max_workers => 6 )->foreach( \@urls, sub {
my $start = [gettimeofday];
$ua->get('https://' . $_);
$report->set( $_, tv_interval($start, [gettimeofday]) );
});
say Dumper $report->export;
Update: fixed error in first demo code, ++choroba
Hope this helps!
The way forward always starts with a minimal test.
| [reply] [d/l] [select] |
|
|
|
|
| [reply] |
|
| [reply] |
Re: Create parallel database handles or SQL statements for multi-threaded/process access to Postgres DB using DBI, DBD::Pg and Parallel::ForkManager
by erix (Prior) on Apr 12, 2020 at 07:33 UTC
|
Postgres nor a DBI dbhandle do threads (as 1nickt already implied).
I don't think doing this kind of parallel access is going to help all that much to get more out of the DB. (But it might be useful for testing, I suppose.)
And it seems to me you're forgetting the hard part: UPDATE. Do you not need that?
| [reply] |
|
Thanks erix.
I do not need to update, actually.
All I need to do is check if an identical record already exists and INSERT if not, else move onto the next 'candidate record' to insert.
I will have multiple CPUs all generating candidates for insertion into the table but the records must be unique.
I know Postgres will tell me if I try to create a duplicate record on a field I have set to UNIQUE, but I am not sure exactly how to check for that in Perl and make sure the script continues rather than dies.
Incorporating the 'dbh-per-thread/process' idea,
the pseudo-code would go something like:
foreach $CPU (@CPUs) {
create dbh for this CPU;
prepare_cached SELECT;
prepare_cached INSERT;
foreach combination { # criterion have affinity with CPU core
# so criterion is always unique to CPU
generate row-data; #
check if row already exists; # what the SELECT sth is for
if not then {
insert new row; # what the INSERT sth is for
}
}
}
| [reply] [d/l] |
|
INSERT INTO myTable (foo, bar)
VALUES ('baz', 'qux')
ON CONFLICT (foo)
DO NOTHING;
... where foo is your column that has a unique key constraint.
Hope this helps!
The way forward always starts with a minimal test.
| [reply] [d/l] [select] |
|
|
|
|
|
|