package communication_server; use Carp; use threads; use threads::shared; use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr); use Symbol qw(qualify_to_ref); use strict; use warnings::register; $| = 1; #disable STDOUT buffering # Carp errors from threads::shared calls should complain about caller our @CARP_NOT = ("threads::shared"); sub new { my $class = shift; croak "Port number required" unless(@_); my $port :shared = _validate_count(undef, shift); my @txQueue :shared; my @rxQueue :shared; my $txlock :shared; my $rxlock :shared; my %self :shared = ( 'port' => \$port, 'txQueue' => \@txQueue, 'rxQueue' => \@rxQueue, 'txlock' => \$txlock, 'rxlock' => \$rxlock, ); bless(\%self, $class); threads->create('_server', \%self)->detach(); return \%self; } #QUEUES---------------------------------------------------------------------------------------------- sub enqueue { my $self = shift; unless ($$self{'ENDED'}) { lock(${$self->{'txlock'}}); push(@{$self->{'txQueue'}}, map { shared_clone($_) } @_) and cond_signal(${$self->{'txlock'}}); } } sub dequeue { my $self = shift; lock(${$self->{'rxlock'}}); my $count = @_ ? $self->_validate_count(shift) : 1; cond_wait(${$self->{'rxlock'}}) while ((@{$self->{'rxQueue'}} < $count) && ! $$self{'ENDED'}); cond_signal(${$self->{'rxlock'}}) if ((@{$self->{'rxQueue'}} > $count) || $$self{'ENDED'}); # Return single item return shift(@{$self->{'rxQueue'}}) if ($count == 1); # Return multiple items my @items; for (1..$count) { last if (! @{$self->{'rxQueue'}}); #this should only happen in the event there are either enough values or end has been called. push(@items, shift(@{$self->{'rxQueue'}})); } return @items; } sub dequeue_nb { my $self = shift; lock(${$self->{'rxlock'}}); my $count = @_ ? $self->_validate_count(shift) : 1; # Return single item return shift(@{$self->{'rxQueue'}}) if ($count == 1); # Return multiple items my @items; for (1..$count) { last if (! @{$self->{'rxQueue'}}); #this should only happen in the event there are either enough values or end has been called. push(@items, shift(@{$self->{'rxQueue'}})); } return @items; } sub end { my $self = shift; lock ${$self->{'txlock'}}; lock(${$self->{'rxlock'}}); # No more data is coming $$self{'ENDED'} = 1; # Try to release at least one blocked thread cond_signal(${$self->{'txlock'}}); cond_signal(${$self->{'rxlock'}}); } sub _txdequeue { my $self = shift; lock(${$self->{'txlock'}}); my $count = @_ ? $self->_validate_count(shift) : 1; cond_wait(${$self->{'txlock'}}) while ((@{$self->{'txQueue'}} < $count) && ! $$self{'ENDED'}); cond_signal(${$self->{'txlock'}}) if ((@{$self->{'txQueue'}} > $count) || $$self{'ENDED'}); # Return single item return shift(@{$self->{'txQueue'}}) if ($count == 1); # Return multiple items my @items; for (1..$count) { last if (! @{$self->{'txQueue'}}); #this should only happen in the event there are either enough values or end has been called. push(@items, shift(@{$self->{'txQueue'}})); } } sub _txdequeue_timed { my $self = shift; lock(${$self->{'txlock'}}); # Timeout may be relative or absolute my $timeout = @_ ? $self->_validate_timeout(shift) : -1; # Convert to an absolute time for use with cond_timedwait() if ($timeout < 32000000) { # More than one year $timeout += time(); } my $count = @_ ? $self->_validate_count(shift) : 1; # Wait for requisite number of items, or until timeout while ((@{$self->{'txQueue'}} < $count) && ! $$self{'ENDED'}) { last if (! cond_timedwait(${$self->{'txlock'}}, $timeout)); } cond_signal(${$self->{'txlock'}}) if ((@{$self->{'txQueue'}} > $count) || $$self{'ENDED'}); # Return single item return shift(@{$self->{'txQueue'}}) if ($count == 1); # Return multiple items my @items; for (1..$count) { last if (! @{$self->{'txQueue'}}); #this should only happen in the event there are either enough values or end has been called. push(@items, shift(@{$self->{'txQueue'}})); } } sub _rxenqueue { my $self = shift; unless ($$self{'ENDED'}) { lock(${$self->{'rxlock'}}); push(@{$self->{'rxQueue'}}, map { shared_clone($_) } @_) and cond_signal(${$self->{'rxlock'}}); } } sub _validate_timeout { my $self = shift; my $timeout = shift; if (! defined($timeout) || ! looks_like_number($timeout)) { my ($method) = (caller(1))[3]; my $class_name = ref($self); $method =~ s/$class_name\:://; $timeout = 'undef' if (! defined($timeout)); croak("Invalid 'timeout' argument ($timeout) to '$method' method"); } return $timeout; }; # Check value of the requested count sub _validate_count { my $self = shift; my $count = shift; if (! defined($count) || ! looks_like_number($count) || (int($count) != $count) || ($count < 1)) { require Carp; my ($method) = (caller(1))[3]; my $class_name = ref($self); $method =~ s/$class_name\:://; $count = 'undef' if (! defined($count)); Carp::croak("Invalid 'count' argument ($count) to '$method' method"); } return $count; } #QUEUES---------------------------------------------------------------------------------------------- #COMMUNICATION------------------------------------------------------------------------------------------ sub _server { local *__ANON__ = '_server'; my $self = shift; require XML::Simple; XML::Simple->import(); require IO::Socket::INET; IO::Socket::INET->import(); require IO::Select; IO::Select->import(); my $xs = new XML::Simple(keeproot=>1, forcearray=>1, KeyAttr=>[]); print "SERVER PORT ".${$self->{'port'}}."\n"; my $socket = new IO::Socket::INET (LocalHost => '0.0.0.0', LocalPort => ${$self->{'port'}}, Proto => 'tcp', Listen => 5, Reuse => 1); croak "cannot create socket $!\n" unless $socket; print "server waiting for client connection on port ".${$self->{'port'}}."\n"; my $sockets = IO::Select->new($socket); my %buffer; while (! $$self{'ENDED'}) { #RX my ($read) = IO::Select->select($sockets, undef, undef, 0.5); foreach my $rh (@$read) { if ($rh == $socket) { my $ns = $rh->accept() or printf "ERROR (%d)(%s)(%d)(%s)", $!,$!,$^E,$^E; print "Accepting client: ".$ns->peerhost().":".$ns->peerport()."\n"; $buffer{$ns->peerhost().":".$ns->peerport()} = ""; #send initial data $self->_rxenqueue("add"); $sockets->add($ns); } else { my $handle = qualify_to_ref($rh, caller( )); my $bits = ''; vec($bits, fileno($handle), 1) = 1; my $line = ""; #line char buffer while (select($bits,undef,undef,0.0)) { #While data in the buffer my $was_blocking = $handle->blocking(0); #turn on blocking my $reading = 1; while ($reading == 1) { my $ret = sysread($handle, my $nextbyte, 1); print "SYSREAD RET: [$ret]\n" unless ($ret); print "SOCKET STATUS: $!\n" if ($!); if (defined($ret)) { if ($ret == 0) { #CLOSING SOCKET $reading = -1; } else { $line .= $nextbyte; $reading = 0 if $nextbyte eq "\n"; } } elsif ($! !~ m/a non-blocking socket operation could not be completed immediately/i) { $reading = -1; } } if ($reading == -1) { #if read error is 'An existing connection was forcibly closed by the remote host.' remove host #print "\n\nRemoving client: ".$rh->peerhost().":".$rh->peerport()."\n"; delete($buffer{$rh->peerhost().":".$rh->peerport()}); $sockets->remove($rh); close($rh); last; } $handle->blocking($was_blocking); #restore blocking state if ($line =~ /\n\z/) { #if I have a 'line' of data $buffer{$rh->peerhost().":".$rh->peerport()} .= $line; $line = ""; if ($buffer{$rh->peerhost().":".$rh->peerport()} =~ m/^.+<\/message>$/s) { #if I have a full message $self->_rxenqueue($buffer{$rh->peerhost().":".$rh->peerport()}); $buffer{$rh->peerhost().":".$rh->peerport()} = ""; last; #one message at a time per handle } } } } } #TX if (defined(my $data = $self->_txdequeue_timed(1))) { my $ref = $xs->XMLin($data); foreach my $wh ($sockets->handles()) { next if ($socket eq $wh); $ref->{message}->[0]->{from}->[0]->{ip} = $wh->sockhost(); $ref->{message}->[0]->{to}->[0]->{ip} = $wh->peerhost(); my $xml = $xs->XMLout($ref); print $xml; print $wh $xml; } } } print "_server EXIT\n"; } #COMMUNICATION------------------------------------------------------------------------------------------ 1;