Hi perlygapes,
Here are a few observations.
- You don't ever want to share a DBI handle across processes or threads.
- You don't need to. Databases support multiple connections; you just don't want too many open at the same time, and you want to avoid the overhead of opening the connection repeatedly.
- You are using Parallel::ForkManager, so calling the child processes 'threads' seems to be misleading.
- Your use of Parallel::ForkManager (a great tool I used and loved for years) is akin to using a slide rule when a programmable calculator is on your desk. It's better than the abacus up on the shelf, but ...
Using MCE for parallelization has two huge advantages (as well as all the others, haha):
- It spawns a pool of workers that perform repeated jobs from the job list.
- It uses chunking so a worker handles a bunch of jobs each time its turn comes up to grab a new chunk.
I would write your code something like the following working example.
Schema:
create table mytable( field1 integer, field2 varchar(24), field3 varchar(24), field4 varchar(24), field5 varchar(24) );
use strict; use warnings; use Data::GUID; use DBD::Pg; use SQL::Abstract; use Tie::Cycle; use MCE::Loop max_workers => 4; my $sqla = SQL::Abstract->new; my @cols = map {"field$_"} 1..5; my $ins_sql = $sqla->insert('mytable', { map { $_ => '' } @cols }); my $sel_sql = $sqla->select('mytable', 'count(*)', { field1 => '' }); #--------------------------------------------------------------------- +# mce_loop { my ($mce, $chunk, $chunk_id) = @_; my $dbh = get_dbh(); my $ins_sth = $dbh->prepare_cached($ins_sql); my $sel_sth = $dbh->prepare_cached($sel_sql); for my $record( @{$chunk} ) { $ins_sth->execute( @{$record} ); $sel_sth->execute( 42 ); my ($count) = $sel_sth->fetchrow_array; my $msg = sprintf 'wid %s; chnk %s; ins %s; cnt %s', $mce->wid, $chunk_id, $record->[0], $count; MCE->say($msg); } } @{ get_sample_data() }; #--------------------------------------------------------------------- +# sub get_dbh { my $dsn = 'DBI:Pg:dbname=test_db'; my $dbh = DBI->connect($dsn, $ENV{USER}, undef, { AutoCommit => 1, RaiseError => 1, PrintError => 1 }) or die "Connection failed!\n" . $DBI::errstr; return $dbh; } sub get_sample_data { tie my $value1, 'Tie::Cycle', [ 40 .. 49 ]; return [ map { [ $value1, map { Data::GUID->new->as_base64 } 0..3] } 1..1000 ]; } __END__
Partial output:
$ perl mce-pg.pl wid 4; chnk 3; ins 40; cnt 0 wid 1; chnk 1; ins 40; cnt 0 wid 3; chnk 2; ins 40; cnt 0 wid 2; chnk 4; ins 40; cnt 0 wid 4; chnk 3; ins 41; cnt 0 wid 1; chnk 1; ins 41; cnt 0 wid 3; chnk 2; ins 41; cnt 0 wid 2; chnk 4; ins 41; cnt 1 wid 4; chnk 3; ins 42; cnt 1 wid 1; chnk 1; ins 42; cnt 2 ... wid 2; chnk 97; ins 48; cnt 100 wid 4; chnk 99; ins 48; cnt 100 wid 1; chnk 100; ins 48; cnt 100 wid 3; chnk 98; ins 49; cnt 100 wid 2; chnk 97; ins 49; cnt 100 wid 4; chnk 99; ins 49; cnt 100 wid 1; chnk 100; ins 49; cnt 100
Notes:
- MCE sets the chunk size automatically (in this case to 100)
- We get 100 rows from the DB count query with field1 value 42, as expected.
- As seen in the 8th and 9th line of output:
... after worker 2 inserts its record with value 41 and when it makes the select query, worker 4 has already inserted a record with value 42, but has not yet made the select query or written its output message (comes right after).wid 2; chnk 4; ins 41; cnt 1 wid 4; chnk 3; ins 42; cnt 1 - I always use SQL::Abstract to generate SQL statements as I trust it more than I trust me.
Hope this helps!
The way forward always starts with a minimal test.
|
---|
Replies are listed 'Best First'. | |
---|---|
Re^2: Create parallel database handles... (MCE::Loop)
by marioroy (Prior) on Apr 12, 2020 at 21:36 UTC | |
Re^2: Create parallel database handles... (MCE::Loop)
by perlygapes (Sexton) on Apr 14, 2020 at 00:27 UTC | |
by 1nickt (Canon) on Apr 14, 2020 at 03:13 UTC | |
by perlygapes (Sexton) on May 08, 2020 at 06:10 UTC | |
by 1nickt (Canon) on May 08, 2020 at 16:44 UTC | |
by perlygapes (Sexton) on Aug 03, 2020 at 23:49 UTC | |
| |
Re^2: Create parallel database handles... (MCE::Loop)
by Anonymous Monk on Apr 12, 2020 at 04:01 UTC | |
by 1nickt (Canon) on Apr 12, 2020 at 04:29 UTC |
In Section
Seekers of Perl Wisdom