I'm working on an server app that needs to be able to receive and store realtime updates and make the updates available for all child processes.
I chose BerkeleyDB initially because it's fast and supposed to be reliable. I'm just wondering what a better way to accomplish this might be.
My current setup filters ALL set and remove operations through one process due to the fact that when more than one process does them, deadlocks occur rather often. This setup hasn't deadlocked yet.
The problem with this setup is that, while it works, it was a major pain to get right what with all the different processes and cache handles floating around. So, should I look at something else or not fix it if it isn't broken?
#!/usr/bin/perl
# test_bdb_1.pl
use strict;
use warnings;
use Carp;
use POSIX;
use Socket;
use BerkeleyDB;
use Getopt::Long;
use IO::Socket::INET;
my %opts;
GetOptions(
\%opts,
'start_listener',
'start_server',
'view_message',
'send_message=s',
);
sub cleanup {
warn 'Caught SIG', $_[0];
kill 'TERM', -$$;
}
sub goaway {
warn 'Caught SIG', $_[0];
kill 'HUP', -$$;
}
my $cache_dir = '/tmp';
my $cache_file = 'test_cache';
sub get_cache {
my $env = BerkeleyDB::Env->new(
-Home => $cache_dir,
-Flags => DB_CREATE| DB_INIT_CDB | DB_INIT_MPOOL,
);
my $cache = BerkeleyDB::Btree->new(
-Filename => $cache_file,
-Flags => DB_CREATE,
-Env => $env,
);
return $cache;
}
if( $opts{start_listener} ) {
fork and exit;
POSIX::setsid;
if( not fork ) {
unlink( map { "$cache_dir/$_" } ( $cache_file, map { sprintf(
+'__db.%0.3d', $_ ) } ( 1 .. 5 ) ) );
my $cache = get_cache;
my $socket = IO::Socket::INET->new(
Proto => 'udp',
LocalAddr => '127.0.0.1:12345',
ReuseAddr => 1,
) || Carp::confess 'No $socket: ', $!;
while( 1 ) {
$socket->recv( my $message, 4096, );
my $time = time;
warn $$, ' adding message ', $message, ' time ', $time;
my $cds_lock = $cache->cds_lock;
$cache->db_put( $message, $time );
$cache->db_sync;
}
warn 'start_listener child exiting...';
exit;
}
local $SIG{TERM} = \&goaway;
local $SIG{INT} = \&cleanup;
local $SIG{__DIE__} = \&cleanup;
1 while wait != -1;
warn 'start_listener parent exiting...';
exit;
}
elsif( $opts{start_server} ) {
fork and exit;
POSIX::setsid;
if( not fork ) {
my $socket = IO::Socket::INET->new(
Proto => 'tcp',
LocalAddr => '127.0.0.1:54321',
Listen => 1,
ReuseAddr => 1,
) || Carp::confess 'No $socket: ', $!;
my $cache = get_cache;
while( 1 ) {
my $client = $socket->accept;
my $output = "$$\n";
my $cursor = $cache->db_cursor();
my( $k, $v ) = ( 0, 0 );
while( $cursor->c_get( $k, $v, DB_NEXT ) == 0 ) {
$output .= "$k -> $v\n";
}
$client->send( $output );
}
warn 'start_server child exiting...';
exit;
}
local $SIG{TERM} = \&goaway;
local $SIG{INT} = \&cleanup;
local $SIG{__DIE__} = \&cleanup;
1 while wait != -1;
warn 'start_server parent exiting...';
exit;
}
elsif( my $message = $opts{send_message} ) {
my $socket = IO::Socket::INET->new(
Proto => 'udp',
PeerAddr => '127.0.0.1:12345',
ReuseAddr => 1,
) || Carp::confess 'No $socket: ', $!;
$socket->send( $message || 'default message' );
}
elsif( $opts{view_message} ) {
my $socket = IO::Socket::INET->new(
PeerAddr => '127.0.0.1:54321',
) || Carp::confess 'No $socket: ', $!;
$socket->recv( my $message, 4096 );
print $message, "\n";
}
And then to run it:
$ perl test_bdb_1.pl -start_listener
$ perl test_bdb_1.pl -start_server
$ perl test_bdb_1.pl -send_message hello
13854 adding message hello time 1179504307 at test_bdb_1.pl line 67.
$ perl test_bdb_1.pl -view_message
13857
hello -> 1179504307
Which works fine. But keep in mind that for my purposes I need to have this be rock-solid under extremely high numbers of processes reading from the cache.
Thanks for any advice you may have!