http://qs321.pair.com?node_id=11115410

Dearest Monks,

Have you ever wanted to share a DBI handle?

Let's imagine an environment that consists of 200 compute blades. Each blade has 10 CPU cores, 20 logical cores total with hyperthreading/SMT enabled. That might be a lot of DB connections 4,000 (200 x 20) and IMHO not graceful at all. Better yet, imagine an environment with 400 compute blades. These are the new generations having 32 cores (64 logical cores with SMT enabled). Certainly ( 400 x 64 = 25,600 cores ) may be too much for the DB to handle.

Fortunately, there is a way. One DB connection per blade, no matter the number of CPU cores, is possible with Perl and MCE::Shared. The code that follows is based on my reply to 1nickt's elegant MCE demonstration.

First attempt

Creating a shared DBI handle is not a problem. Unfortunately, it does not work with STMT objects failing due to the STMT object looking and saying, wait a minute, this is not a DBI object. Ah...

my $dbh = MCE::Shared->share({ module => 'DBI', new => 'connect' }, $d +sn, $user, $password, $params );

Second attempt

Another way is writing a wrapper class with the things you need. Think of MCE::Shared as a proxy server. It does nothing more than passing the method name you want to call and arguments over to the shared-manager process. Likewise, returning data on the way back.

Update: Added missing 'do' method to the shared class. Also, updating a record.

Schema:

create table mytable( field1 integer, field2 varchar(24), field3 varchar(24), field4 varchar(24), field5 varchar(24) );

Perl:

use strict; use warnings; use Data::GUID; use DBD::Pg; use SQL::Abstract; use Tie::Cycle; use MCE::Loop max_workers => 4; use MCE::Shared; my $sqla = SQL::Abstract->new; my @cols = map {"field$_"} 1..5; # https://www.cattlegrid.info/2006/06/13/write-no-more-sql-abstract-it +.html my $ins_sql = $sqla->insert('mytable', { map { $_ => '' } @cols }); my $sel_sql = $sqla->select('mytable', 'count(*)', { field2 => '' }); my $upd_sql = $sqla->update('mytable', { field2 => '' }, { field2 => ' +' }); #--------------------------------------------------------------------# package My::DBI { use DBI; sub new { my ( $class, $dsn, $user, $password, $params ) = @_; my $self = {}; # MCE::Shared will emit the error and exit if fail to connect $self->{DBH} = DBI->connect($dsn, $user, $password, $params); $self->{STMT} = {}; bless $self, $class; } sub prepare_cached { my ( $self, $key, $sql ) = @_; $self->{STMT}{$key} = $self->{DBH}->prepare_cached($sql); 1; } sub do { my $self = shift; $self->{DBH}->do(@_); } sub execute { my ( $self, $key ) = ( shift, shift ); if ( my $stmt = $self->{STMT}{$key} ) { $stmt->execute(@_); } } sub fetchrow_array { my ( $self, $key ) = ( shift, shift ); if ( my $stmt = $self->{STMT}{$key} ) { $stmt->execute(@_); $stmt->fetchrow_array; } } sub finish { my ( $self, $key ) = @_; if ( $key ) { $self->{STMT}{$key}->finish if $self->{STMT}{$key}; } else { $self->{STMT}{$_}->finish for keys %{ $self->{STMT} }; } return 1; } sub disconnect { my ( $self ) = @_; $self->finish; $self->{DBH}->disconnect; 1; } }; #--------------------------------------------------------------------# my $dsn = 'DBI:Pg:dbname=test_db;host=localhost;port=5432'; my $sdb = MCE::Shared->share( { module => 'My::DBI' }, $dsn, $ENV{USER}, undef, { AutoCommit => 1, RaiseError => 1, PrintError => 1 }, ); $sdb->prepare_cached('ins_sth', $ins_sql); $sdb->prepare_cached('sel_sth', $sel_sql); $sdb->prepare_cached('upd_sth', $upd_sql); mce_loop { my ($mce, $chunk, $chunk_id) = @_; for my $record( @{$chunk} ) { $sdb->execute('ins_sth', @{$record}); my $field2_old = $record->[1]; my $field2_new1 = Data::GUID->new->as_base64; my $field2_new2 = Data::GUID->new->as_base64; # update using a prepared statement $sdb->execute('upd_sth', $field2_new1, $field2_old); # update using the dbh handle inside the shared class my ($query, @bind) = $sqla->update( 'mytable', { field2 => $field2_new2 }, { field2 => $field2_new1 }, ); $sdb->do($query, undef, @bind); # pass any arguments for execute inside the shared class my ($count) = $sdb->fetchrow_array('sel_sth', $field2_new2); # count is 1 due to selecting field2 = $field2_new2 my $msg = sprintf 'wid %s; chnk %s; ins %s; cnt %s', $mce->wid, $chunk_id, $record->[0], $count; MCE->say($msg); } } get_sample_data(); # ^^ do not pass @{ get_sample_data() } to mce_loop # it will not work if @{ [ has 1 element ] } # pass the array ref instead, MCE accepts it MCE::Loop->finish; $sdb->disconnect; #--------------------------------------------------------------------# sub get_sample_data { tie my $value1, 'Tie::Cycle', [ 40 .. 49 ]; return [ map { [ $value1, map { Data::GUID->new->as_base64 } 0..3] } 1..1000 ]; }

Add to the My::DBI class any DBI/STMT methods that your application uses. The code is straight forward I hope. The fetchrow_array is typically preceded with an execute. So the method in the shared class handles both execute and fetchrow_array. This is important. Likewise, be sure to pass the execute arguments when calling fetchrow_array in the application.

Well, the wrapper class works very well. The number of CPU cores keeps increasing every couple of years. Meaning that new problems emerge and so do possibilities.

Kind regards, Mario

Replies are listed 'Best First'.
Re: Shared DBI handle supporting threads and processes
by erix (Prior) on Apr 13, 2020 at 05:36 UTC

    my $ins_sql = ... my $sel_sql = ...

    Imho, SELECT and INSERT are not all that interesting. UPDATE is more of a challenge.

      Greetings erix. I updated the example by adding the missing 'do' method to the shared class. Also, updating a record.

        Thanks for the example. I ran it, and the DB was populated, but the code ended in the error:
        Bizarre copy of ARRAY in list assignment at C:/Users/USERNAME/eclipse- +workspace/.metadata/.plugins/org.epic.debug/perl5db.pl line 6548, <__ +ANONIO__> line 4. END failed--call queue aborted, <__ANONIO__> line 4. at C:/Users/USERNAME/eclipse-workspace/PROJECT/mce_example_3.pl line +0, <__ANONIO__> line 4.
        So I opened perl5db.pl, and line 6548 reads:
        ... sub _dump_trace_calc_saved_single_arg { my ($nothard, $arg) = @_; # <-- LINE 6548 my $type; if ( not defined $arg ) { # undefined parameter return "undef"; } ...
        Any ideas?
A reply falls below the community's threshold of quality. You may see it by logging in.
A reply falls below the community's threshold of quality. You may see it by logging in.
A reply falls below the community's threshold of quality. You may see it by logging in.
A reply falls below the community's threshold of quality. You may see it by logging in.