BEGIN { use_ok 'Q' }; #### my $q = new_ok Q => [5]; #### package Thing; use if $^O eq 'MSWin32' ? 'Thing::Win32' : 'Thing::nix'; sub new { $^O eq 'MSWin32' ) ? &Thing::Win32->new() : &Thing::nix->new(); } #### can_ok $q => 'nq'; can_ok $q => 'dq'; can_ok $q => 'n'; #### not ok 1 - async::Q->can('pq') # Failed test 'async::Q->can('pq')' # at -e line 1. # async::Q->can('pq') failed #### Can't locate object method "pq" via package "async::Q" at -e line 1. #### #! perl -slw use strict; package async::Q; use async::Util; use threads; use threads::shared; use constant { NEXT_WRITE => -2, N => -1, }; sub new { # twarn "new: @_\n"; my( $class, $Qsize ) = @_; $Qsize //= 3; my @Q :shared; $#Q = $Qsize; @Q[ NEXT_WRITE, N ] = ( 0, 0 ); return bless \@Q, $class; } sub nq { # twarn "nq: @_\n"; my $self = shift; lock @$self; for( @_ ) { cond_wait @$self until $self->[ N ] < ( @$self-2 ); $self->[ $self->[ NEXT_WRITE ]++ ] = $_; ++$self->[ N ]; $self->[ NEXT_WRITE ] %= ( @$self - 2 ); cond_signal @$self; } } sub dq { # twarn "dq: @_\n"; my $self = shift; lock @$self; cond_wait @$self until $self->[ N ] > 0; my $p = $self->[ NEXT_WRITE ] - $self->[ N ]--; $p += @$self -2 if $p < 0; my $out = $self->[ $p ]; cond_signal @$self; return $out; } sub n { # twarn "n: @_\n"; my $self = shift; lock @$self; return $self->[ N ]; } sub _state { # twarn "_state: @_\n"; no warnings; my $self = shift; lock @$self; return join '|', @{ $self }; } return 1 if caller; package main; use strict; use warnings; use threads ( stack_size => 4096 ); use threads::shared; use async::Util; use Time::HiRes qw[ time sleep ]; our $SIZE //= 10; our $N //= 1e5; our $T //= 4; ++$T; $T &= ~1; my $Q1_n = new async::Q( $SIZE ); my $Qn_n = new async::Q( $SIZE ); my $Qn_1 = new async::Q( $SIZE ); my @t1 = map async( sub{ $Qn_n->nq( $_ ) while defined( $_ = $Q1_n->dq ); } ), 1 .. $T/2; my @t2 = map async( sub{ $Qn_1->nq( $_ ) while defined( $_ = $Qn_n->dq ); } ), 1 .. $T/2; my $bits :shared = chr(0); $bits x= $N/ 8 + 1; my $t = async{ while( defined( $_ = $Qn_1->dq ) ) { die "value duplicated" if vec( $bits, $_, 1 ); vec( $bits, $_, 1 ) = 1; } }; my $start = time; $Q1_n->nq( $_ ) for 1 .. $N; $Q1_n->nq( (undef) x ($T/2) ); $_->join for @t1; $Qn_n->nq( (undef) x ($T/2) ); $_->join for @t2; $Qn_1->nq( undef ); $_->join for $t; my $stop = time; my $b = unpack '%32b*', $bits; die "NOK: $b : \n" . $Q1_n->_state, $/, $Qn_n->_state, $/, $Qn_1->_state unless $b == $N; printf "$N items by $T threads via three Qs size $SIZE in %.6f seconds\n", $stop - $start; __END__ C:\test>perl async\Q.pm -N=1e4 -T=2 -SIZE=40 1e4 items by 2 threads via three Qs size 40 in 5.768000 seconds C:\test>perl async\Q.pm -N=1e4 -T=20 -SIZE=40 1e4 items by 20 threads via three Qs size 40 in 7.550000 seconds C:\test>perl async\Q.pm -N=1e4 -T=200 -SIZE=400 1e4 items by 200 threads via three Qs size 400 in 8.310000 seconds