Beefy Boxes and Bandwidth Generously Provided by pair Networks
Your skill will accomplish
what the force of many cannot
 
PerlMonks  

DBM as IPC - Any way to make this work?

by bennymack (Pilgrim)
on Jun 13, 2007 at 14:55 UTC ( [id://620991]=perlquestion: print w/replies, xml ) Need Help??

bennymack has asked for the wisdom of the Perl Monks concerning the following question:

Hello All

I'm working on using a DBM as a form of IPC in a very high performance application but I'm having some issues. I finally got around to putting together an end-to-end example that should hopefully help illustrate what I'm dealing with.

The short version is that I've got one writer process and serveral reader processes. The readers are accessing the DBM at a very fast rate. This causes writes to the DBM to not appear until a child process has been replaced. Essentially, the new process instantiates a fresh DBM handle and then will see all previous updates, but not any new ones for the remainder of its life. Etc, etc...

However, if I use BerkeleyDB with an Env object attached and DB_INIT_MPOOL turned on, the child processes will see updates to their DBM handle right away BUT this introduces random locking in running child processes and random Env instantiation errors in fresh process startups!

Also, I can re-instantiate the DBM handle in the child process for EVERY request but this is clearly inefficient and I'm hoping for a better way to do things.

So, I came up with an example script that demonstrates all of the above situations and if anyone could mark an X on the problem area perhaps or point me to some links or CPAN modules that have a recipe for a solution or an actual solution I'd greatly appreciate it!

#################### GDBM_File package gdbm; use strict; use warnings; use GDBM_File; use constant CACHE_FILE => '/tmp/gdbm'; sub new { my $class = shift; my $gdbm = GDBM_File->new( CACHE_FILE, GDBM_WRCREAT | GDBM_NOLOCK, oct( '0640' ), ) || die "Could not create GDBM_File - $!"; return bless { dbm => $gdbm }, $class; } sub set { my( $self, $key, $val ) = @_; return $self->{dbm}->STORE( $key, $val ); } sub get { my( $self, $key ) = @_; return $self->{dbm}->FETCH( $key ); } sub all_keys { my( $self ) = @_; # warn '$self = ', $self; my @keys; my $k = $self->{dbm}->FIRSTKEY; if( $k ) { do { # warn '$k = ', $k || 'undef'; my $v = $self->{dbm}->FETCH( $k ); # warn sprintf( '%s => %s', $k, Dumper( $v ) ); push @keys, $k if $k; } while( $k = $self->{dbm}->NEXTKEY( $k ) ); } return @keys; } sub init { unlink CACHE_FILE; } #################### BerkeleyDB w/o Env package bdb; use strict; use warnings; use BerkeleyDB; use constant CACHE_FILE => '/tmp/bdb'; sub new { my $class = shift; my $bdb = BerkeleyDB::Btree->new( -Filename => CACHE_FILE, -Flags => DB_CREATE ) || die "Could not create BerkeleyDB - $!"; return bless { dbm => $bdb }, $class; } sub set { my( $self, $key, $val ) = @_; $self->{dbm}->db_put( $key, $val ); $self->{dbm}->db_sync; return; } sub get { my( $self, $key ) = @_; $self->{dbm}->db_get( $key, my $val ); return $val; } sub all_keys { my( $self ) = @_; my( $cursor ) = $self->{dbm}->db_cursor(); my @keys; my( $k, $v ) = ( '', '' ); while( $cursor->c_get( $k, $v, DB_NEXT ) == 0 ) { push @keys, $k; } return @keys; } sub init { unlink CACHE_FILE, glob( '/tmp/__db.*' ); } #################### BerkeleyDB w/Env package bdbenv; use strict; use warnings; use base 'bdb'; # This is the only one that works as desired... # But has sporadic issues after running for serveral hours. sub new { my $class = shift; my $env = BerkeleyDB::Env->new( -Home => '/tmp', -Flags => __PACKAGE__->DB_CREATE | __PACKAGE__->DB_INIT_MPOOL, -Mode => oct( '0644' ), ) || die "Could not create BerkeleyDB::Env - '$BerkeleyDB::Error' +- '$!'"; my $bdb = BerkeleyDB::Btree->new( -Filename => __PACKAGE__->CACHE_FILE, -Flags => __PACKAGE__->DB_CREATE, -Mode => oct( '0644' ), -Env => $env, ) || die "Could not create BerkeleyDB::Btree - '$BerkeleyDB::Error +' - '$!'"; return bless { dbm => $bdb }, $class; } #################### Running code and utility subs. package main; use strict; use warnings; use POSIX(); use IO::Socket::INET; use Time::HiRes(); use constant DBM_PACKAGE => 'gdbm'; # use constant DBM_PACKAGE => 'bdb'; # use constant DBM_PACKAGE => 'bdbenv'; sub all_letters { return ( 'a' .. 'z', 'A' .. 'Z' ); } sub random_sleep { return Time::HiRes::sleep( ( .001, .002, .003, .004, .005, )[ int( + rand 5 ) ] ); } sub random_letter { return ( all_letters() )[ int rand 52 ]; } # Server process dies after 1000 requests and is replace by the parent +. sub start_server_process { my( $socket ) = @_; my $pid = fork; if( not $pid ) { my $db = DBM_PACKAGE->new(); # XXX # Switch these two around a +nd it works for GDBM_File and BerkeleyDB w/o Env for( 1 .. 1000 ) { my $client = $socket->accept; # my $db = DBM_PACKAGE->new(); # XXX # Switch these two around and it +works for GDBM_File and BerkeleyDB w/o Env my $output = "$$\n"; for my $k( $db->all_keys ) { my $v = $db->get( $k ); $output .= "$k -> $v\n"; } $client->send( $output ); } # warn $$, ' Server child exiting...'; exit; } return $pid; } sub start_reader_process { my $pid = fork; if( not $pid ) { $SIG{__WARN__} = sub { syswrite( STDERR, sprintf( '%s %s', $$, + join( q[, ], @_ ) ) ); }; $SIG{__DIE__} = 'IGNORE'; my $db = DBM_PACKAGE->new(); sleep 1; while( 1 ) { my $socket = IO::Socket::INET->new( PeerAddr => '127.0.0.1:54321', ) || Carp::confess 'No $socket: ', $!; $socket->recv( my $message, 4096 ); random_sleep(); } warn $$, ' Reader child exiting...'; exit; } return $pid; } sub run { my %options = @_; if( $options{daemonize} ) { POSIX::setsid(); fork and exit; } if( not %options or $options{start} ) { POSIX::setsid; DBM_PACKAGE->init; # Listen for UDP packets and add them to the cache. if( not fork ) { my $dbm = DBM_PACKAGE->new; my $socket = IO::Socket::INET->new( Proto => 'udp', LocalAddr => '127.0.0.1:12345', ReuseAddr => 1, ) || Carp::confess 'No $socket: ', $!; while( 1 ) { $socket->recv( my $message, 4096, ); my $time = time; warn $$, ' adding message ', $message, ' time ', $time +; $dbm->set( $message, $time ); } warn $$, ' Listener exiting...'; exit; } warn $$, ' Started listener...'; # Start listening socket in parent. my $socket = IO::Socket::INET->new( Proto => 'tcp', LocalAddr => '127.0.0.1:54321', Listen => 1, ReuseAddr => 1, ) || Carp::confess 'No $socket: ', $!; for( 1 .. 10 ) { # Listen for TCP connections and send them the contents of + the cache. warn $$, ' Parent started server ', start_server_process( +$socket ); } warn $$, ' Started servers...'; if( not $options{noreaders} ) { for ( 1 .. 40 ) { # Make TCP connections to the above listeners for simu +lated load. warn $$, ' Parent started reader ', start_reader_proce +ss(); } warn $$, ' Started readers...'; } else { warn $$, ' Skipped readers...'; } $SIG{__DIE__} = sub { kill 'TERM', -$$; }; while( my $pid = wait ) { last if $pid == -1; # start_server_process( $socket ); warn $$, ' Parent reaped server ', $pid, ' Parent started +server ', start_server_process( $socket ); } warn $$, ' Parent exiting...'; exit; } elsif( $options{readers} ) { POSIX::setsid(); for ( 1 .. 10 ) { # Make TCP connections to the above listeners for simulate +d load. warn $$, ' Parent started reader ', start_reader_process() +; } 1 while wait != -1; } elsif( my $message = $options{message} ) { my $socket = IO::Socket::INET->new( Proto => 'udp', PeerAddr => '127.0.0.1:12345', ReuseAddr => 1, ) || Carp::confess 'No $socket: ', $!; $socket->send( $message ); } elsif( $options{dump} ) { my $socket = IO::Socket::INET->new( PeerAddr => '127.0.0.1:54321', ) || Carp::confess 'No $socket: ', $!; $socket->recv( my $message, 4096 ); print $message, "\n"; } } 1; __END__

The main code can be started like so:

perl -Mtest_dbm_1 -e 'run( start => 1, );'

Then, to add things to the cache and view them through the server processes run:

perl -Mtest_dbm_1 -e 'run( message => random_letter() x 10, );' ; perl -Mtest_dbm_1 -e 'run( dump => 1, );'

A few seconds after starting the server the child processes will start to be recycled. Using the "gdbm" or "bdb" DBM_PACKAGE, if you send a message and dump the contents before the processes recycle, you won't get the message back. But you will see it once they recycle. If you update to DBM_PACKAGE to "bdbenv" it will work as expected but in my experience this will eventually fail mysteriously after an indeterminate amount of time :(

If you start the server with "( start => 1, noreaders => 1, )" which skips the starting of the readers that simulate load, the messages DO show up right away implying that they are causing the problems with the server processes, in case that wasn't obvious ;)

So, is there a recipe for GDBM_File with some combination of new() flags and/or flock usage that will actually make this work? Or should I just resign to re-instantiating the DBM handle for every request? Thanks so much!!!

Replies are listed 'Best First'.
Re: DBM as IPC - Any way to make this work?
by perrin (Chancellor) on Jun 13, 2007 at 16:15 UTC

    Most dbms don't flush writes until you close them. That means you must close and reopen GDBM_File every time. The module MLDBM::Sync will do this for you with a variety of dbm implementations.

    The BerkeleyDB approach should be much higher performance. Using database-wide locks instead of block-level locks usually helps with locking issues and doesn't slow it down much. I don't remember the flags to specify this or I'd give them to you. You might also try checking Oracle's support pages.

Re: DBM as IPC - Any way to make this work?
by Sartak (Hermit) on Jun 13, 2007 at 23:55 UTC
    Take a look at IPC::PubSub; one of the back-ends it can use is DBM::Deep. Reading your summary of what you need to do, it should all Just Work.
Re: DBM as IPC - Any way to make this work?
by BrowserUk (Patriarch) on Jun 14, 2007 at 18:59 UTC

    Here's a threaded solution. It seems to handle pretty much everything I've thought to throw at it with near instant response times.

    I've segregated the code into server and client scripts as it allows me to see the cpu load and memory consumption of the server separate from the clients that would normally be on the other machines. It also make it easier to develop.

    Despite their relatively small combined size, between them the do everything your code above does (I think).

    Server:

    #! perl -slw use strict; use threads; use threads::shared; use IO::Socket::INET; my %db :shared; async { my $tid = threads->self->tid; warn "$tid: In-bound server running\n"; my $receiver = IO::Socket::INET->new( Proto => 'udp', LocalAddr => '127.0.0.1:12345', ReuseAddr => 1 +, ) or warn 'No $socket: ', $!; while( 1 ) { $receiver->recv( my $message, 4096, ); my $time = time; warn $tid, ' adding message ', $message, ' time ', $time; { lock %db; $db{ $time } = $message; } } warn "$tid: In-bound server stopping\n"; }->detach; my $sender = IO::Socket::INET->new( Proto => 'tcp', LocalAddr => '127.0.0.1:54321', Listen => 50, ReuseAddr => 1, ) or die 'No $socket: ', $!; my $running :shared = 0; while( 1 ) { warn 'TOL'; async { warn"before lock"; { lock $running; ++$running ; } warn"after lock"; my $tid = threads->self->tid; warn "$tid: Out-bound server running\n"; for( 1 .. 1000 ) { my $client = $sender->accept; warn "$tid: Out: Connection on $client\n"; lock %db; $client->send( "$tid\n" . join "\n", map{ "$_ -> $db{ $_ }" } keys %d +b ); } warn "$tid: Out-bound server ending\n"; { lock $running; --$running; } }->detach; warn "running: $running"; sleep 1 while $running > 19; warn 'looping'; }

    client

    #! perl -slw use strict; use threads; use threads::shared; use IO::Socket::INET; async { while( 1 ) { my $socket = IO::Socket::INET->new( PeerAddr => '127.0.0.1:54321', ) or warn 'No $socket: ', $! and sleep 1 and next; $socket->recv( my $message, 4096 ); Win32::Sleep( ( 100, 200, 300, 400, 500 )[ rand 3 ] ); } sleep 1; }->detach for 1 .. 40; while( <STDIN> ) { chomp; if( m[^msg: \s* (.*) \s* $ ]x ) { my $socket = IO::Socket::INET->new( Proto => 'udp', PeerAddr => '127.0.0.1:12345', ReuseAddr => 1, ) or warn 'No socket: ', $! and next; $socket->send( $1 || 'A standard message' ); } elsif( m[^dump] ) { my $socket = IO::Socket::INET->new( PeerAddr => '127.0.0.1:54321', ) or warn 'No socket: ', $! and next; $socket->recv( my $message, 4096 ); print $message; } elsif( m[^listeners: \s* ( \d* ) ]x ) { async { while( 1 ) { my $socket = IO::Socket::INET->new( Proto => 'udp', PeerAddr => '127.0.0.1:54321', ) or warn 'No socket: ', $! and next; $socket->recv( my $message, 4096 ); Win32::Sleep( ( 100, 200, 300, 400, 500 )[ rand 3 ] ); } }->detach for 1 .. $1 || 10; } else { print "Unknown command '$_'\n"; print <<'HELP'; msg: message text listeners: nn dump HELP } }

    Examine what is said, not who speaks -- Silence betokens consent -- Love the truth but pardon error.
    "Science is about questioning the status quo. Questioning authority".
    In the absence of evidence, opinion is indistinguishable from prejudice.

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: perlquestion [id://620991]
Approved by Corion
Front-paged by Corion
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others surveying the Monastery: (8)
As of 2024-04-18 15:40 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found