Beefy Boxes and Bandwidth Generously Provided by pair Networks
Syntactic Confectionery Delight
 
PerlMonks  

Re^2: Can't get multiple Parallel::ForkManager threads connecting to DBD::Pg database

by perlygapes (Sexton)
on Jul 22, 2021 at 13:56 UTC ( [id://11135294]=note: print w/replies, xml ) Need Help??


in reply to Re: Can't get multiple Parallel::ForkManager threads connecting to DBD::Pg database
in thread Can't get multiple Parallel::ForkManager threads connecting to DBD::Pg database

I got it working using the following code.

In short, this code creates multiple threads (maybe processes, not sure) using `Parallel::ForkManager`, and each thread gets it's own database handle and associated statement handles.
It abstracts `dbh` and `statement` handle creation to a sub, and also `db write` and `db handle close` functions.
use 5.24.0; use strict; use warnings; use Parallel::ForkManager; use SQL::Abstract; use DBI ':sql_types'; use DBD::Pg qw/:pg_types/; #my @codes = ("A"); # testing single thread my @codes = ("A","B","F","M","S"); # testing multi-thread my %varset; ################################################################ # get db connection info ################################################################ my $dsn = 'DBI:Pg:dbname=mt4_test'; my $userid = $ENV{DBI_USER}; my $sesame = $ENV{DBI_PASS}; my %dbh; # hash for storing dbh handles ################################################################ # prepare array and hash for matching db columns ################################################################ my %columns; # hash for persistent mapping of column-values my @columns; # deterministic - $values[$columns{$column}]; set_columns(); # define column<->value mapping for db table my $placeholders = join(", ", map { '?' } @columns); ################################################################ # Build the SELECT SQL statement ################################################################ my $sql_select_statement = qq(SELECT count(*) FROM mytable WHERE field1 = ?;); ################################################################ # Build the INSERT SQL statement ################################################################ my $sql_insert_statement = "INSERT INTO mytable (" . join(", ", @columns) # column names . ") VALUES ($placeholders)"; ################################################################ # hash for storing SQL statement handles for threads ################################################################ my %sth_select_code; my %sth_insert_code; ################################################################ # create Parallel::ForkManager object for @codes ################################################################ my $optimization = Parallel::ForkManager->new(scalar @codes); $optimization->run_on_start(sub{ my ($pid,$ident) = @_; print "Starting $ident under process id $pid\n"; }); $optimization->run_on_finish(sub{ my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_structure_reference) = @_; }); my $thread_count = 0; OPTIMIZATION: for my $code (@codes) { $thread_count++; print "Thread $thread_count running for $code\n"; # fork optimization threads - per code if (scalar @codes > 1) { $optimization->start($code) and next OPTIMIZATION; } else { $optimization->start($code); } launch_sub($code); print "\$optimization->finish on child $code\n"; $optimization->finish(0); } print "\$optimization->wait_all_children() is waiting...\n"; $optimization->wait_all_children(); ################################################################ # THE END ################################################################ exit; ################################################################ ################################################################ sub launch_sub { ################################################################ my $code = shift; if ($code =~ m/A/i) { sub_a("A"); } elsif ($code =~ m/B/i) { sub_b("B"); } elsif ($code =~ m/F/i) { sub_f("F"); } elsif ($code =~ m/M/i) { sub_m("M"); } elsif ($code =~ m/S/i) { sub_s("S"); } } ################################################################ sub sub_a { ################################################################ my $code = shift; # Code my @values; my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_sele +ct_code{$code},\$sth_insert_code{$code}); # generate values specific to A $varset{field2} = 'a_f1'; # for illustrative purposes $varset{field3} = 'a_f2'; $varset{field4} = 'a_f3'; $varset{field5} = 'a_f4'; foreach my $key (keys %varset) { my $column = $key; # the column name as key my $value = $varset{$key}; # the column value (field variable +) $values[$columns{$column}] = $value; # add to list of column v +alues } $values[0] = $code; write_to_db($code, @values); disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_ +code{$code}); } ################################################################ sub sub_b { ################################################################ my $code = shift; # Code my @values; my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_sele +ct_code{$code},\$sth_insert_code{$code}); # generate values specific to B $varset{field2} = 'b_f1'; # for illustrative purposes $varset{field3} = 'b_f2'; $varset{field4} = 'b_f3'; $varset{field5} = 'b_f4'; foreach my $key (keys %varset) { my $column = $key; # the column name as key my $value = $varset{$key}; # the column value (field variable +) $values[$columns{$column}] = $value; # add to list of column v +alues } $values[0] = $code; write_to_db($code, @values); disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_ +code{$code}); } ################################################################ sub sub_f { ################################################################ my $code = shift; # Code my @values; my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_sele +ct_code{$code},\$sth_insert_code{$code}); # generate values specific to F $varset{field2} = 'f_f1'; # for illustrative purposes $varset{field3} = 'f_f2'; $varset{field4} = 'f_f3'; $varset{field5} = 'f_f4'; foreach my $key (keys %varset) { my $column = $key; # the column name as key my $value = $varset{$key}; # the column value (field variable +) $values[$columns{$column}] = $value; # add to list of column v +alues } $values[0] = $code; write_to_db($code, @values); disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_ +code{$code}); } ################################################################ sub sub_m { ################################################################ my $code = shift; # Code my @values; my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_sele +ct_code{$code},\$sth_insert_code{$code}); # generate values specific to M $varset{field2} = 'm_f1'; # for illustrative purposes $varset{field3} = 'm_f2'; $varset{field4} = 'm_f3'; $varset{field5} = 'm_f4'; foreach my $key (keys %varset) { my $column = $key; # the column name as key my $value = $varset{$key}; # the column value (field variable +) $values[$columns{$column}] = $value; # add to list of column v +alues } $values[0] = $code; write_to_db($code, @values); disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_ +code{$code}); } ################################################################ sub sub_s { ################################################################ my $code = shift; # Code my @values; my ($dbh, $sel_sth, $ins_sth) = create_dbh(\$dbh{$code},\$sth_sele +ct_code{$code},\$sth_insert_code{$code}); # generate values specific to S $varset{field2} = 's_f1'; # for illustrative purposes $varset{field3} = 's_f2'; $varset{field4} = 's_f3'; $varset{field5} = 's_f4'; foreach my $key (keys %varset) { my $column = $key; # the column name as key my $value = $varset{$key}; # the column value (field variable +) $values[$columns{$column}] = $value; # add to list of column v +alues } $values[0] = $code; write_to_db($code, @values); disconnect_dbh(\$dbh{$code},\$sth_select_code{$code},\$sth_insert_ +code{$code}); } ################################################################ sub create_dbh { ################################################################ my $dbh_ref = shift; my $sel_ref = shift; my $ins_ref = shift; ${$dbh_ref} = DBI->connect($dsn, $userid, $sesame, { AutoCommit => 1, RaiseError => 1, PrintError => 1 }) or die "Connection failed!\n" . $DBI::errstr; # did it work? are we there yet? my $me = ${$dbh_ref}->{Driver}{Name}; my $sversion = ${$dbh_ref}->{pg_server_version}; print "DBI is version $DBI::VERSION, " . "I am $me, " . "version of DBD::Pg is $DBD::Pg::VERSION, " . "server is $sversion\n"; print "Name: ${$dbh_ref}->{Name}\n"; # prepare the SELECT statement handle ${$sel_ref} = ${$dbh_ref}->prepare_cached($sql_select_statement); # prepare the INSERT statement handle ${$ins_ref} = ${$dbh_ref}->prepare_cached($sql_insert_statement); } ################################################################ sub write_to_db { ################################################################ my ($code, @values) = @_; my $rv_code = $sth_select_code{$code}->execute($code); say "SQL SELECT for $code: rv_code = $rv_code"; if($rv_code < 0) { print $DBI::errstr; } my @row = $sth_select_code{$code}->fetchrow_array(); # if the SELECT found no existing records for this strategy, then +INSERT it unless ($row[0] > 0) { # INSERT settings into 'mytable' $sth_insert_code{$code}->execute(@values); say "SQL INSERT for $code"; } } ################################################################ sub disconnect_dbh { ################################################################ my $dbh_ref = shift; my $sel_ref = shift; my $ins_ref = shift; ${$sel_ref}->finish(); ${$ins_ref}->finish(); ${$dbh_ref}->disconnect; say "disconnected dbh_ref: $dbh_ref"; } ################################################################ sub set_columns { ################################################################ $columns{field1} = 0; $columns{field2} = 1; $columns{field3} = 2; $columns{field4} = 3; $columns{field5} = 4; $columns[0] = 'field1'; $columns[1] = 'field2'; $columns[2] = 'field3'; $columns[3] = 'field4'; $columns[4] = 'field5'; }

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: note [id://11135294]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others romping around the Monastery: (8)
As of 2024-03-28 12:01 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found