# script to load-test/profile a PSGI application use strict; use warnings; use 5.010; #---------------------------------------------------------------------# # Some configs you might want to change my ($log_file, $db_file, $consumers); BEGIN { $log_file = '/tmp/test.log'; $db_file = '/tmp/test.sqlite'; $consumers = 8; } #---------------------------------------------------------------------# # Load libraries needed by test framework use Data::GUID; use DBD::SQLite; use HTTP::Request::Common; use JSON::MaybeXS; use Log::Any '$log'; use Log::Any::Adapter File => $log_file; use MCE::Flow; use MCE::Queue; use Plack::Test; use Tie::Cycle; use Time::HiRes 'usleep'; #---------------------------------------------------------------------# # Define the web application package MyApp { use Dancer2; use Dancer2::Plugin::Database; unlink $db_file; #-----------------------------------------------------------------# # Config set charset => 'UTF-8'; set serializer => 'JSON'; set engines => { logger => { LogAny => { category => 'Test', logger => ['File', $log_file], }, }, }; set logger => 'LogAny'; set log => 'debug'; set plugins => { Database => { driver => 'SQLite', database => $db_file, dbi_params => { RaiseError => 1, AutoCommit => 1 }, }, }; #-----------------------------------------------------------------# # DB schema database->do(q{ CREATE TABLE account( account_id INTEGER PRIMARY KEY, user_id VARCHAR(36) NOT NULL, caller_id VARCHAR(36) NOT NULL, remote_id VARCHAR(36) DEFAULT NULL, status TEXT CHECK( status IN ('pending','active','deleted') ) NOT NULL DEFAULT 'pending' ); }); #-----------------------------------------------------------------# # Route handlers ## post '/' => sub { my $user_id = body_parameters->get('user_id'); my $caller_id = body_parameters->get('caller_id'); my $wid = body_parameters->get('wid'); my $remote_id = Data::GUID->new->as_string; debug " app: $wid: handling POST with caller_id $caller_id"; debug " app: $wid: generated remote_id $remote_id"; database->quick_insert('account', { user_id => $user_id, caller_id => $caller_id, remote_id => $remote_id, status => 'pending', }); return {remote_id => $remote_id, status => 'pending'}; }; ## get '/:remote_id/:wid' => sub { my $remote_id = route_parameters->get('remote_id'); my $wid = route_parameters->get('wid'); my $record = database->quick_select('account', { remote_id => $remote_id, }); debug " app: $wid: handling GET with remote_id $remote_id"; # simulate a needed external call or some other delay before # account activation; return status pending on first attempt if ( $record->{status} eq 'pending' ) { debug " app: $wid: found account (remote_id $remote_id)" . "has status pending; updating"; database->quick_update('account', {remote_id => $remote_id }, { status => 'active' } ); } return {remote_id => $remote_id, status => $record->{status}}; }; ## put '/:remote_id/:wid' => sub { my $remote_id = route_parameters->get('remote_id'); my $wid = route_parameters->get('wid'); debug " app: $wid: handling PUT with remote_id $remote_id"; database->quick_update('account', { remote_id => $remote_id }, { status => 'deleted' }); }; }; # End web application definition #---------------------------------------------------------------------# # Test code my $app = MyApp->to_app; test_psgi $app, sub { my $cb = shift; my $q = MCE::Queue->new( await => 1, fast => 0 ); tie my $user_id, 'Tie::Cycle', [ map { Data::GUID->new->as_string } 1..10 ]; #-----------------------------------------------------------------# # Caller workflow mce_flow { task_name => [ 'producer', 'consumer' ], max_workers => [ 1, $consumers ], }, sub { ## producer for my $idx (1 .. 1_000) { my $wid = MCE->wid; $log->debug("test: $wid: creating account #$idx"); usleep(10_000); # simulate staggered requests my $caller_id = Data::GUID->new->as_string; my $content = encode_json({ wid => $wid, user_id => $user_id, caller_id => $caller_id, }); $log->debug("test: $wid: enqueueing post job " . "for account #$idx"); $q->enqueue([ post => { content => $content } ]); } $q->enqueue('done'); # trigger to signal end of work }, sub { ## consumers my $done = 0; while (1) { my $job = $q->dequeue_nb; my $wid = MCE->wid; # Handle end of work if ( ! defined $job ) { if ( $done ) { $log->debug("test: $wid: found no job in queue"); $q->enqueue('done'); last; } usleep(30_000); next; } if ( $job eq 'done' ) { $done = 1; usleep(30_000); next; } # do work my ($method, $args) = @{ $job }; if ( $method eq 'post' ) { $log->debug("test: $wid: dequeued POST job"); my $header = [ 'Content-Type' => 'application/json; charset=UTF-8' ]; my $body = $args->{content}; my $resp = $cb->(POST '/', $header, Content => $body); my $resp_content = decode_json($resp->decoded_content); my $remote_id = $resp_content->{remote_id}; $log->debug("test: $wid: created account " . "(remote_id $remote_id; enqueueing GET call"); usleep(15_000); # simulate staggered requests $q->enqueue([ get => { wid => $wid, remote_id => $remote_id, }]); } elsif ( $method eq 'get' ) { my $remote_id = $args->{remote_id}; $log->debug("test: $wid: dequeued GET job for " . "remote_id $remote_id"); my $resp = $cb->(GET "/$remote_id/$wid"); my $status = decode_json( $resp->decoded_content )->{status}; if ( $status eq 'pending' ) { $log->debug("test: $wid: re-enqueueing GET job " . "for remote id $remote_id"); usleep(15_000); # simulate staggered requests $q->enqueue([ get => { wid => $wid, remote_id => $remote_id, }]); } else { $log->debug("test: $wid: enqueueing PUT job for" . " remote_id $remote_id"); usleep(15_000); # simulate staggered requests $q->enqueue([ put => { wid => $wid, remote_id => $remote_id, }]); } } elsif ( $method eq 'put' ) { my $remote_id = $args->{remote_id}; $log->debug("test: $wid: dequeued PUT job for " . "remote_id $remote_id"); my $resp = $cb->( PUT "/$remote_id/$wid" ); } else { $log->error('test: queue job HTTP method unknown!'); } } }; MCE::Flow->finish; }; __END__