BEGIN { use_ok 'Q' };
####
my $q = new_ok Q => [5];
##
##
package Thing;
use if $^O eq 'MSWin32' ? 'Thing::Win32' : 'Thing::nix';
sub new { $^O eq 'MSWin32' ) ? &Thing::Win32->new() : &Thing::nix->new(); }
##
##
can_ok $q => 'nq';
can_ok $q => 'dq';
can_ok $q => 'n';
##
##
not ok 1 - async::Q->can('pq')
# Failed test 'async::Q->can('pq')'
# at -e line 1.
# async::Q->can('pq') failed
##
##
Can't locate object method "pq" via package "async::Q" at -e line 1.
##
##
#! perl -slw
use strict;
package async::Q;
use async::Util;
use threads;
use threads::shared;
use constant {
NEXT_WRITE => -2,
N => -1,
};
sub new { # twarn "new: @_\n";
my( $class, $Qsize ) = @_;
$Qsize //= 3;
my @Q :shared;
$#Q = $Qsize;
@Q[ NEXT_WRITE, N ] = ( 0, 0 );
return bless \@Q, $class;
}
sub nq { # twarn "nq: @_\n";
my $self = shift;
lock @$self;
for( @_ ) {
cond_wait @$self until $self->[ N ] < ( @$self-2 );
$self->[ $self->[ NEXT_WRITE ]++ ] = $_;
++$self->[ N ];
$self->[ NEXT_WRITE ] %= ( @$self - 2 );
cond_signal @$self;
}
}
sub dq { # twarn "dq: @_\n";
my $self = shift;
lock @$self;
cond_wait @$self until $self->[ N ] > 0;
my $p = $self->[ NEXT_WRITE ] - $self->[ N ]--;
$p += @$self -2 if $p < 0;
my $out = $self->[ $p ];
cond_signal @$self;
return $out;
}
sub n { # twarn "n: @_\n";
my $self = shift;
lock @$self;
return $self->[ N ];
}
sub _state { # twarn "_state: @_\n";
no warnings;
my $self = shift;
lock @$self;
return join '|', @{ $self };
}
return 1 if caller;
package main;
use strict;
use warnings;
use threads ( stack_size => 4096 );
use threads::shared;
use async::Util;
use Time::HiRes qw[ time sleep ];
our $SIZE //= 10;
our $N //= 1e5;
our $T //= 4; ++$T; $T &= ~1;
my $Q1_n = new async::Q( $SIZE );
my $Qn_n = new async::Q( $SIZE );
my $Qn_1 = new async::Q( $SIZE );
my @t1 = map async( sub{ $Qn_n->nq( $_ ) while defined( $_ = $Q1_n->dq ); } ), 1 .. $T/2;
my @t2 = map async( sub{ $Qn_1->nq( $_ ) while defined( $_ = $Qn_n->dq ); } ), 1 .. $T/2;
my $bits :shared = chr(0); $bits x= $N/ 8 + 1;
my $t = async{
while( defined( $_ = $Qn_1->dq ) ) {
die "value duplicated" if vec( $bits, $_, 1 );
vec( $bits, $_, 1 ) = 1;
}
};
my $start = time;
$Q1_n->nq( $_ ) for 1 .. $N;
$Q1_n->nq( (undef) x ($T/2) ); $_->join for @t1;
$Qn_n->nq( (undef) x ($T/2) ); $_->join for @t2;
$Qn_1->nq( undef ); $_->join for $t;
my $stop = time;
my $b = unpack '%32b*', $bits;
die "NOK: $b : \n" . $Q1_n->_state, $/, $Qn_n->_state, $/, $Qn_1->_state unless $b == $N;
printf "$N items by $T threads via three Qs size $SIZE in %.6f seconds\n",
$stop - $start;
__END__
C:\test>perl async\Q.pm -N=1e4 -T=2 -SIZE=40
1e4 items by 2 threads via three Qs size 40 in 5.768000 seconds
C:\test>perl async\Q.pm -N=1e4 -T=20 -SIZE=40
1e4 items by 20 threads via three Qs size 40 in 7.550000 seconds
C:\test>perl async\Q.pm -N=1e4 -T=200 -SIZE=400
1e4 items by 200 threads via three Qs size 400 in 8.310000 seconds