package communication_server;
use Carp;
use threads;
use threads::shared;
use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
use Symbol qw(qualify_to_ref);
use strict;
use warnings::register;
$| = 1; #disable STDOUT buffering
# Carp errors from threads::shared calls should complain about caller
our @CARP_NOT = ("threads::shared");
sub new {
my $class = shift;
croak "Port number required" unless(@_);
my $port :shared = _validate_count(undef, shift);
my @txQueue :shared;
my @rxQueue :shared;
my $txlock :shared;
my $rxlock :shared;
my %self :shared = (
'port' => \$port,
'txQueue' => \@txQueue,
'rxQueue' => \@rxQueue,
'txlock' => \$txlock,
'rxlock' => \$rxlock,
);
bless(\%self, $class);
threads->create('_server', \%self)->detach();
return \%self;
}
#QUEUES----------------------------------------------------------------------------------------------
sub enqueue {
my $self = shift;
unless ($$self{'ENDED'}) {
lock(${$self->{'txlock'}});
push(@{$self->{'txQueue'}}, map { shared_clone($_) } @_) and cond_signal(${$self->{'txlock'}});
}
}
sub dequeue {
my $self = shift;
lock(${$self->{'rxlock'}});
my $count = @_ ? $self->_validate_count(shift) : 1;
cond_wait(${$self->{'rxlock'}}) while ((@{$self->{'rxQueue'}} < $count) && ! $$self{'ENDED'});
cond_signal(${$self->{'rxlock'}}) if ((@{$self->{'rxQueue'}} > $count) || $$self{'ENDED'});
# Return single item
return shift(@{$self->{'rxQueue'}}) if ($count == 1);
# Return multiple items
my @items;
for (1..$count) {
last if (! @{$self->{'rxQueue'}}); #this should only happen in the event there are either enough values or end has been called.
push(@items, shift(@{$self->{'rxQueue'}}));
}
return @items;
}
sub dequeue_nb {
my $self = shift;
lock(${$self->{'rxlock'}});
my $count = @_ ? $self->_validate_count(shift) : 1;
# Return single item
return shift(@{$self->{'rxQueue'}}) if ($count == 1);
# Return multiple items
my @items;
for (1..$count) {
last if (! @{$self->{'rxQueue'}}); #this should only happen in the event there are either enough values or end has been called.
push(@items, shift(@{$self->{'rxQueue'}}));
}
return @items;
}
sub end {
my $self = shift;
lock ${$self->{'txlock'}};
lock(${$self->{'rxlock'}});
# No more data is coming
$$self{'ENDED'} = 1;
# Try to release at least one blocked thread
cond_signal(${$self->{'txlock'}});
cond_signal(${$self->{'rxlock'}});
}
sub _txdequeue {
my $self = shift;
lock(${$self->{'txlock'}});
my $count = @_ ? $self->_validate_count(shift) : 1;
cond_wait(${$self->{'txlock'}}) while ((@{$self->{'txQueue'}} < $count) && ! $$self{'ENDED'});
cond_signal(${$self->{'txlock'}}) if ((@{$self->{'txQueue'}} > $count) || $$self{'ENDED'});
# Return single item
return shift(@{$self->{'txQueue'}}) if ($count == 1);
# Return multiple items
my @items;
for (1..$count) {
last if (! @{$self->{'txQueue'}}); #this should only happen in the event there are either enough values or end has been called.
push(@items, shift(@{$self->{'txQueue'}}));
}
}
sub _txdequeue_timed {
my $self = shift;
lock(${$self->{'txlock'}});
# Timeout may be relative or absolute
my $timeout = @_ ? $self->_validate_timeout(shift) : -1;
# Convert to an absolute time for use with cond_timedwait()
if ($timeout < 32000000) { # More than one year
$timeout += time();
}
my $count = @_ ? $self->_validate_count(shift) : 1;
# Wait for requisite number of items, or until timeout
while ((@{$self->{'txQueue'}} < $count) && ! $$self{'ENDED'}) {
last if (! cond_timedwait(${$self->{'txlock'}}, $timeout));
}
cond_signal(${$self->{'txlock'}}) if ((@{$self->{'txQueue'}} > $count) || $$self{'ENDED'});
# Return single item
return shift(@{$self->{'txQueue'}}) if ($count == 1);
# Return multiple items
my @items;
for (1..$count) {
last if (! @{$self->{'txQueue'}}); #this should only happen in the event there are either enough values or end has been called.
push(@items, shift(@{$self->{'txQueue'}}));
}
}
sub _rxenqueue {
my $self = shift;
unless ($$self{'ENDED'}) {
lock(${$self->{'rxlock'}});
push(@{$self->{'rxQueue'}}, map { shared_clone($_) } @_) and cond_signal(${$self->{'rxlock'}});
}
}
sub _validate_timeout {
my $self = shift;
my $timeout = shift;
if (! defined($timeout) || ! looks_like_number($timeout)) {
my ($method) = (caller(1))[3];
my $class_name = ref($self);
$method =~ s/$class_name\:://;
$timeout = 'undef' if (! defined($timeout));
croak("Invalid 'timeout' argument ($timeout) to '$method' method");
}
return $timeout;
};
# Check value of the requested count
sub _validate_count {
my $self = shift;
my $count = shift;
if (! defined($count) ||
! looks_like_number($count) ||
(int($count) != $count) ||
($count < 1))
{
require Carp;
my ($method) = (caller(1))[3];
my $class_name = ref($self);
$method =~ s/$class_name\:://;
$count = 'undef' if (! defined($count));
Carp::croak("Invalid 'count' argument ($count) to '$method' method");
}
return $count;
}
#QUEUES----------------------------------------------------------------------------------------------
#COMMUNICATION------------------------------------------------------------------------------------------
sub _server {
local *__ANON__ = '_server';
my $self = shift;
require XML::Simple;
XML::Simple->import();
require IO::Socket::INET;
IO::Socket::INET->import();
require IO::Select;
IO::Select->import();
my $xs = new XML::Simple(keeproot=>1, forcearray=>1, KeyAttr=>[]);
print "SERVER PORT ".${$self->{'port'}}."\n";
my $socket = new IO::Socket::INET (LocalHost => '0.0.0.0', LocalPort => ${$self->{'port'}}, Proto => 'tcp', Listen => 5, Reuse => 1);
croak "cannot create socket $!\n" unless $socket;
print "server waiting for client connection on port ".${$self->{'port'}}."\n";
my $sockets = IO::Select->new($socket);
my %buffer;
while (! $$self{'ENDED'}) {
#RX
my ($read) = IO::Select->select($sockets, undef, undef, 0.5);
foreach my $rh (@$read) {
if ($rh == $socket) {
my $ns = $rh->accept() or printf "ERROR (%d)(%s)(%d)(%s)", $!,$!,$^E,$^E;
print "Accepting client: ".$ns->peerhost().":".$ns->peerport()."\n";
$buffer{$ns->peerhost().":".$ns->peerport()} = "";
#send initial data
$self->_rxenqueue("add");
$sockets->add($ns);
} else {
my $handle = qualify_to_ref($rh, caller( ));
my $bits = '';
vec($bits, fileno($handle), 1) = 1;
my $line = ""; #line char buffer
while (select($bits,undef,undef,0.0)) { #While data in the buffer
my $was_blocking = $handle->blocking(0); #turn on blocking
my $reading = 1;
while ($reading == 1) {
my $ret = sysread($handle, my $nextbyte, 1);
print "SYSREAD RET: [$ret]\n" unless ($ret);
print "SOCKET STATUS: $!\n" if ($!);
if (defined($ret)) {
if ($ret == 0) {
#CLOSING SOCKET
$reading = -1;
} else {
$line .= $nextbyte;
$reading = 0 if $nextbyte eq "\n";
}
} elsif ($! !~ m/a non-blocking socket operation could not be completed immediately/i) {
$reading = -1;
}
}
if ($reading == -1) { #if read error is 'An existing connection was forcibly closed by the remote host.' remove host
#print "\n\nRemoving client: ".$rh->peerhost().":".$rh->peerport()."\n";
delete($buffer{$rh->peerhost().":".$rh->peerport()});
$sockets->remove($rh);
close($rh);
last;
}
$handle->blocking($was_blocking); #restore blocking state
if ($line =~ /\n\z/) { #if I have a 'line' of data
$buffer{$rh->peerhost().":".$rh->peerport()} .= $line;
$line = "";
if ($buffer{$rh->peerhost().":".$rh->peerport()} =~ m/^.+<\/message>$/s) { #if I have a full message
$self->_rxenqueue($buffer{$rh->peerhost().":".$rh->peerport()});
$buffer{$rh->peerhost().":".$rh->peerport()} = "";
last; #one message at a time per handle
}
}
}
}
}
#TX
if (defined(my $data = $self->_txdequeue_timed(1))) {
my $ref = $xs->XMLin($data);
foreach my $wh ($sockets->handles()) {
next if ($socket eq $wh);
$ref->{message}->[0]->{from}->[0]->{ip} = $wh->sockhost();
$ref->{message}->[0]->{to}->[0]->{ip} = $wh->peerhost();
my $xml = $xs->XMLout($ref);
print $xml;
print $wh $xml;
}
}
}
print "_server EXIT\n";
}
#COMMUNICATION------------------------------------------------------------------------------------------
1;