Beefy Boxes and Bandwidth Generously Provided by pair Networks
No such thing as a small change

Queuing DBI transactions

by Akoya (Scribe)
on Oct 04, 2007 at 17:16 UTC ( [id://642709] : perlquestion . print w/replies, xml ) Need Help??

Akoya has asked for the wisdom of the Perl Monks concerning the following question:

I have any interesting problem. I need to add a degree of fault tolerance to a program that monitors device activity, and updates database tables when there are certain changes. Currently, when needed, I update records as follows:
$sql = qq{ UPDATE wms_rf_server_port SET available = ?, last_updated_by = ?, last_updated_date = ? WHERE port_id IN ( $port_ids ) }; eval { $rows = $dbh->do($sql,undef,$available,$0,_timestamp()); $dbh->commit(); }; if ($@) { _DB_ErrorHandler($@); $dbh->rollback(); }

At this point, _DB_ErrorHandler() only logs the errors. What I would like to do is, in the event that the error is due to loss of connection to the database, to queue up this transaction, so that it can be processed when the connection is reestablished. The example SQL statement is representative of the type of statements that would need to be queued, however, there are actually 6 different statements, with differing tables, parameters, etc.

Does anyone know of a module that does what I need, or comes close? As always, any suggestions are welcome.


Replies are listed 'Best First'.
Re: Queuing DBI transactions
by jfroebe (Parson) on Oct 04, 2007 at 17:57 UTC

    Not too difficult. Throw the queries into a queue prior to sending them to the DBMS. If the query was successful, pop the query from the queue. There are many different types of queues but a simple @query_queue sounds like it will be sufficient (pseudo code):

    my @query_queue; my $query = qq{ UPDATE wms_rf_server_port SET available = ?, last_updated_by = ?, last_updated_date = ? WHERE port_id IN ( $port_ids ) }; push @query_queue, $query .... if the query is successful { pop @query_queue; .... else, reconnect and try sending the query back to the DBMS

    Jason L. Froebe

    Blog, Tech Blog

      You need to save the args too

      my @query_queue; sub do_query { my ($sth, @args) = @_; push @query_queue, [ $sth, @args ]; flush_queries(); } sub flush_queries { while (@query_queue) { my ($sth, @args) = @{ $query_queue[0] }; ...[ attempt query ]... if (...[ successful ]...) { shift @query_queue; } else { return; } } return 1; } do_query(...); do_query(...); do_query(...); flush_queries() or die;

      Depending on the reconnect code, it might be more advantageous to pass the dbh and the stmt instead of the sth, but you get the idea.

      Your suggestion accounts for the query string, but not the parameters that must accompany it. Perhaps, I could push a hash reference onto the queue, with the hash containing 1) the SQL statement, and 2) an array of parameters.

        I would tack on another level to the hash which would contain the connection the query was supposed to go out on - else you will have to handle multiple queues if you have multiple connections.

        Jason L. Froebe

        Blog, Tech Blog

      That sounds more like a query stack, what with the push vs pop. Perhaps push and shift would be better. (With unshift to allow an operation to "jump the queue")

        Partially :) In this simple example, the queue would be a queue with the exception of removing the good query run from the end of the queue. It's a hybrid queue/stack depending on your point of view ;-)

        Jason L. Froebe

        Blog, Tech Blog

Re: Queuing DBI transactions
by InfiniteLoop (Hermit) on Oct 04, 2007 at 20:40 UTC
    You could fork a process to do the update, close the process if it succeeds. If it catches an error, retry after a short interval.
Re: Queuing DBI transactions
by talexb (Chancellor) on Oct 05, 2007 at 14:43 UTC

    I'm surprised no one's suggested this yet -- once the change has been submitted to the database without error, try to get the new or updated data back out. If you manage that OK, you can consider that query or command successful, and shift it off the stack.

    Alex / talexb / Toronto

    "Groklaw is the open-source mentality applied to legal research" ~ Linus Torvalds

      I expect that if the query has been submitted without error, all went fine.
      Checking again should be not only a waste, but also risky: if someone is fast enough (think like a race condition) the data could have been changed again unless highly isolated (TX_SERIALIZABLE).
      Also, it's not always possible to know what is the correct data: update counters set ct=ct+1


        Very good points. Well, it was just a suggestion.

        Alex / talexb / Toronto

        "Groklaw is the open-source mentality applied to legal research" ~ Linus Torvalds

Re: Queuing DBI transactions
by Akoya (Scribe) on Nov 12, 2007 at 19:48 UTC
    The final solution was to develop a module, QueryQueue. The source is below (minus the POD, which of course is there in the final product). Feel free to comment, as always. Example uses...
    my $queue = QueryQueue->instance('scott','tiger','server'); ... my $sql = qq{ SELECT available FROM wms_rf_server_port WHERE server_id = ? AND device_name = ? }; my $queue = QueryQueue->instance(); my $results = $queue->submit_job($sql, $server_id, $device); ... my $sql = qq{ UPDATE wms_rf_server_port SET available = ?, last_updated_by = ?, last_updated_date = ? WHERE port_id IN ( $port_ids ) }; my @args = ( $available, $0, _timestamp(), ); my $queue = QueryQueue->instance(); $queue->submit_job($sql, @args); SOURCE: package QueryQueue; use 5.008005; use strict; use warnings; require Exporter; use base 'Class::Singleton'; use Log::Log4perl; use Lingua::EN::Numbers::Ordinate; use DBI; our @ISA = qw(Exporter Class::Singleton); # Items to export into callers namespace by default. Note: do not expo +rt # names by default without a very good reason. Use EXPORT_OK instead. # Do not simply export all your public functions/methods/constants. our @EXPORT_OK = qw( instance submit_job ); our $VERSION = '0.01'; our @queue; our $dsn; our $dbuser; our $dbpass; =head2 _new_instance Creates a new QueryQueue singleton object - only for first call of ins +tance() =cut sub _new_instance { my $class = shift; ($dsn, $dbuser, $dbpass) = @_; my $self = bless { }, $class; my $logger = Log::Log4perl->get_logger('QueryQueue'); if( $dsn && $dbuser && $dbpass ) { $logger->debug("Created a new QueryQueue singleton object"); return $self; } $logger->error("Missing required parameters"); return undef; } =head2 _dequeue Processes the queue =cut sub _dequeue { my $class = shift; my $logger = Log::Log4perl->get_logger('QueryQueue'); my ($dbh, $sth); my $results_ref = undef; while (@queue) { # while I have things in the queue $queue[0][0]++; # increment the number of tries my ($tries, $sql, @args) = @{ $queue[0] }; eval { $logger->debug('Connecting (cached) to database ', $dsn, ' + as ', $dbuser); $dbh = DBI->connect_cached($dsn, $dbuser, $dbpass, { AutoCommit => 0, RaiseError => 1, PrintError => 0 +, }, ); $dbh->{LongReadLen} = 1024 * 1024 * 32; # read up to 32MB $dbh->{LongTruncOk} = 0; # throw exception instead of trun +cating $logger->debug('Preparing statement.'); $sth = $dbh->prepare_cached($sql); $logger->debug(ordinate($tries), ' try to execute ', _sqls +tr($sql, @args)); $sth->execute(@args); if( $sth->{NUM_OF_FIELDS} ) { $results_ref = $sth->fetchall_arrayref(); } $dbh->commit; }; if ($@) { $logger->error('Error executing statement: ', $@); if(! $dbh->{Active}) { $logger->warn('Not connected to database ', $dsn); sleep 5; } else { $dbh->rollback; $logger->warn('*** Discarding the statement'); shift @queue; return undef; } } else { $logger->debug('Successully executed statement - ', $sth-> +rows, ' rows affected.'); shift @queue; } if( $#queue >= 0 ) { $logger->debug('Still have ', $#queue + 1, ' items in the +queue'); } else { $logger->debug('The queue is empty.'); } } return $results_ref; } =head2 submit_job Submit a job to the queue =cut sub submit_job { my $class = shift; my ($sql, @args) = @_; my $logger = Log::Log4perl->get_logger('QueryQueue'); $logger->debug('Enqueuing ', _sqlstr($sql, @args)); push @queue, [ 0, $sql, @args ]; _dequeue(); } =head2 _sqlstr Formats a sql string with its args for logging =cut sub _sqlstr { my ($sql, @args) = @_; my $argstr = join ', ', @args; return "\"$sql ($argstr)\""; } 1;
A reply falls below the community's threshold of quality. You may see it by logging in.