use strict; use warnings; use threads; use threads::shared; use Thread::Queue; use YAML (); use Time::HiRes qw[ time ]; #======================================================== # Skeleton class - pretend this is your data #======================================================== { package MyData; sub new { my $sClass= shift; bless({@_}, $sClass); } sub getProperty { my ($self, $sProperty) = @_; return $self->{$sProperty}; } sub setProperty { my ($self, $sProperty, $sValue) = @_; return $self->{$sProperty} = $sValue; } } #======================================================== # Server definition #======================================================== # Note: this data will be copied to each thread # However, this does NOT mean that $oData object will # be copied. As the output statements show when this # script is run, only the server thread assigns a value # to $oData. All other threads have the value undef. # This is because the server thread creates $oData _after_ # it is launched and has its own separate copy of $oData. my @aClients; my $oRequest = Thread::Queue->new(); my $oAnswer = Thread::Queue->new(); my $oData; # This variable is shared so that the server can see changes # made to this value my $bServerAlive :shared; #--------------------------------------------------------- sub addClient { my $crRun = shift; my $aArgs = \@_; my $t = async { $crRun->(@$aArgs); removeClient(threads->self); }; push @aClients, $t; } #--------------------------------------------------------- sub removeClient { my $tid = $_[0]->tid(); for my $i (0..$#aClients) { my $t = $aClients[$i]; next unless $t->tid() == $tid; $t->detach(); splice @aClients, $i, 1; return 1; } return 0; } #--------------------------------------------------------- sub startServer { $bServerAlive=1; threads->create(\&serveData)->detach(); } #--------------------------------------------------------- sub shutdownServer { $_->join() foreach @aClients; $bServerAlive=0; } #--------------------------------------------------------- sub serveData { my $sMethod = shift; my $tid = 'server'; # crate the object if it isn't created already if (!defined($oData)) { print STDERR "$tid: Creating data server data\n"; $oData = MyData->new(A => 1, B => 2, C => 3); } # quit when we've reached the maximum number of request POLL_STATUS: while ($bServerAlive) { # Note: Perl does not have an unlock command. Instead there is # an implicit unlock when we leave this block due to return # or die # grab method and parameters my ($sMethod, @aArgs); { lock($oRequest); if (!$oRequest->pending()) { #print STDERR "$tid: No requests.... yielding\n"; threads->yield(); next POLL_STATUS; } # print STDERR "$tid: retrieving request\n"; $sMethod = $oRequest->dequeue(); while ($oRequest->pending()) { push @aArgs, $oRequest->dequeue(); } } # print STDERR "$tid: Excuting call $oData->$sMethod(" # , join(',', map {$_?$_:'undef'} @aArgs), ")\n"; # call method and convert result into a string my $crMethod = $oData->can($sMethod); if (!defined($crMethod)) { warn "No such method: $sMethod"; next; } my $xResult = YAML::Dump($crMethod->($oData, @aArgs)); # print STDERR "$tid: returning result\n"; lock($oAnswer); $oAnswer->enqueue($xResult); cond_signal($oAnswer); } } #======================================================== # Client definition #======================================================== my $bRequestInProgress : shared; sub makeServerRequest { # my ($sMethod, $arg1, $arg2, ...) = @_; # Note: Perl does not have an unlock command. Instead there is # an implicit unlock when we leave this block due to return # or die # locking $bRequestInProgress ensures that only one request # may be made at a time. This thread passes control to the # server thread my $tid = threads->tid; lock($bRequestInProgress); # print STDERR "$tid: Asking a question\n"; { lock($oRequest); $oRequest->enqueue($_) foreach @_; } # print STDERR "$tid: Waiting for an answer\n"; lock($oAnswer); cond_wait($oAnswer); # when we get an answer convert froms string form to Perl # data structure return YAML::Load($oAnswer->dequeue()); } sub demoClient { my ($sName) = @_; my $tid = threads->tid(); print STDERR "$tid: Hello... I'm $sName\n"; printf STDERR "$tid: I'm a %s(data=%s)\n" , (defined($oData)? ('server', $oData) : ('client', 'undef')); makeServerRequest('setProperty', 'name', $sName); print STDERR "\n"; foreach my $sPropName qw(name A B C) { my $v = makeServerRequest('getProperty', $sPropName); printf STDERR "$tid: $sPropName=$v\n\n"; } } our $N //= 1e3; our $T //= 10; my $running :shared = 0; sub demo2 { my $tid = threads->tid; { lock $running; ++$running } printf STDERR "$tid: started: %f\n", time; for ( 1 .. 10 * $N ) { my $prop = int rand $N; my $curVal = makeServerRequest( 'getProperty', $prop ); makeServerRequest( 'setProperty', $prop, $curVal + 1 ) } { lock $running; --$running } printf STDERR "$tid: started: %f\n", time; } #--------------------------------------------------------- # Demo #--------------------------------------------------------- startServer(); makeServerRequest( 'setProperty', $_, 0 ) for 1 .. $N; addClient( \&demo2 ) for 1 .. $T; sleep 1; sleep 1 while $running; printf "Cheek memory usage";; shutdownServer(); #### #! perl -slw use strict; use threads; use threads::shared; use Time::HiRes qw[ time ]; our $N //= 1e3; our $T //= 10; my %data :shared; $data{ $_ } = 0 for 1 .. $N; my $running :shared = 0; my @t = map{ async { my $tid = threads->tid; { lock $running; ++$running } printf STDERR "$tid: started: %d\n", time; for ( 1 .. 10 * $N ) { my $prop = int rand $N; ++$data{ $prop }; } { lock $running; --$running } printf STDERR "$tid: started: %d\n", time; }; } 1 .. $T; sleep 1; sleep 1 while $running; printf "Cheek memory usage";; $_->join for @t;