I created these two libraries to handle multiple clients connecting to multiple servers. It is designed where the client will send data to a specific server while the server sends updates to all clients. In my case the client is the Tk UI for our testing program which is running on the server and managing test systems. This allows not only remote control of the server but keeps all interested people up to date. The data is passed as XML as it gives nice control structures and the IPs for the sender and receiver can be added from the socket info.
Perhaps someone can enlighten me, in my original design I used two threads one for RX the other for TX and blocked until action needed to be taken. To accomplish this I had to deconstruct the IO::Select lib so that the INET socket should be shared between the two threads; however, I was never able to successfully store and share the socket. This would further reduce CPU usage by allowing the TX queue and RX socket to block until there was data. I appreciate appreciate any insight.
If this can be accomplished without threads...
Update: When the socket is cleanly closed the client does not close the local socket. FIXED
TESTED: 2 servers, 4 clients
USAGE:
use threads;
use Thread::Queue;
use xml::simple;
use communication_client;
my $com = communication_client->new(40000); #DST port
my $thr = threads->create(sub{
while (my $data = $com->dequeue()) {
print "Received:\n$data\n";
my $input = $xs->XMLin($data);
my $ip = $input->{message}->[0]->{from}->[0]->{ip};
#WORK
if ($input->{message}->[0]->{anyfieldsyouwant}->[0] eq "whatev
+er") {
#WORK
}
}
});
$com->enqueue("<message><to ip=\"IPADDR\"><data>INFO</data><anyfieldsy
+ouwant>whatever</anyfieldsyouwant></message>");
SERVER:
package communication_server;
use Carp;
use threads;
use threads::shared;
use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
use Symbol qw(qualify_to_ref);
use strict;
use warnings::register;
$| = 1; #disable STDOUT buffering
# Carp errors from threads::shared calls should complain about caller
our @CARP_NOT = ("threads::shared");
sub new {
my $class = shift;
croak "Port number required" unless(@_);
my $port :shared = _validate_count(undef, shift);
my @txQueue :shared;
my @rxQueue :shared;
my $txlock :shared;
my $rxlock :shared;
my %self :shared = (
'port' => \$port,
'txQueue' => \@txQueue,
'rxQueue' => \@rxQueue,
'txlock' => \$txlock,
'rxlock' => \$rxlock,
);
bless(\%self, $class);
threads->create('_server', \%self)->detach();
return \%self;
}
#QUEUES---------------------------------------------------------------
+-------------------------------
sub enqueue {
my $self = shift;
unless ($$self{'ENDED'}) {
lock(${$self->{'txlock'}});
push(@{$self->{'txQueue'}}, map { shared_clone($_) } @_) and c
+ond_signal(${$self->{'txlock'}});
}
}
sub dequeue {
my $self = shift;
lock(${$self->{'rxlock'}});
my $count = @_ ? $self->_validate_count(shift) : 1;
cond_wait(${$self->{'rxlock'}}) while ((@{$self->{'rxQueue'}} < $c
+ount) && ! $$self{'ENDED'});
cond_signal(${$self->{'rxlock'}}) if ((@{$self->{'rxQueue'}} > $co
+unt) || $$self{'ENDED'});
# Return single item
return shift(@{$self->{'rxQueue'}}) if ($count == 1);
# Return multiple items
my @items;
for (1..$count) {
last if (! @{$self->{'rxQueue'}}); #this should only happen in
+ the event there are either enough values or end has been called.
push(@items, shift(@{$self->{'rxQueue'}}));
}
return @items;
}
sub dequeue_nb {
my $self = shift;
lock(${$self->{'rxlock'}});
my $count = @_ ? $self->_validate_count(shift) : 1;
# Return single item
return shift(@{$self->{'rxQueue'}}) if ($count == 1);
# Return multiple items
my @items;
for (1..$count) {
last if (! @{$self->{'rxQueue'}}); #this should only happen in
+ the event there are either enough values or end has been called.
push(@items, shift(@{$self->{'rxQueue'}}));
}
return @items;
}
sub end {
my $self = shift;
lock ${$self->{'txlock'}};
lock(${$self->{'rxlock'}});
# No more data is coming
$$self{'ENDED'} = 1;
# Try to release at least one blocked thread
cond_signal(${$self->{'txlock'}});
cond_signal(${$self->{'rxlock'}});
}
sub _txdequeue {
my $self = shift;
lock(${$self->{'txlock'}});
my $count = @_ ? $self->_validate_count(shift) : 1;
cond_wait(${$self->{'txlock'}}) while ((@{$self->{'txQueue'}} < $c
+ount) && ! $$self{'ENDED'});
cond_signal(${$self->{'txlock'}}) if ((@{$self->{'txQueue'}} > $co
+unt) || $$self{'ENDED'});
# Return single item
return shift(@{$self->{'txQueue'}}) if ($count == 1);
# Return multiple items
my @items;
for (1..$count) {
last if (! @{$self->{'txQueue'}}); #this should only happen in
+ the event there are either enough values or end has been called.
push(@items, shift(@{$self->{'txQueue'}}));
}
}
sub _txdequeue_timed {
my $self = shift;
lock(${$self->{'txlock'}});
# Timeout may be relative or absolute
my $timeout = @_ ? $self->_validate_timeout(shift) : -1;
# Convert to an absolute time for use with cond_timedwait()
if ($timeout < 32000000) { # More than one year
$timeout += time();
}
my $count = @_ ? $self->_validate_count(shift) : 1;
# Wait for requisite number of items, or until timeout
while ((@{$self->{'txQueue'}} < $count) && ! $$self{'ENDED'}) {
last if (! cond_timedwait(${$self->{'txlock'}}, $timeout));
}
cond_signal(${$self->{'txlock'}}) if ((@{$self->{'txQueue'}} > $co
+unt) || $$self{'ENDED'});
# Return single item
return shift(@{$self->{'txQueue'}}) if ($count == 1);
# Return multiple items
my @items;
for (1..$count) {
last if (! @{$self->{'txQueue'}}); #this should only happen in
+ the event there are either enough values or end has been called.
push(@items, shift(@{$self->{'txQueue'}}));
}
}
sub _rxenqueue {
my $self = shift;
unless ($$self{'ENDED'}) {
lock(${$self->{'rxlock'}});
push(@{$self->{'rxQueue'}}, map { shared_clone($_) } @_) and c
+ond_signal(${$self->{'rxlock'}});
}
}
sub _validate_timeout {
my $self = shift;
my $timeout = shift;
if (! defined($timeout) || ! looks_like_number($timeout)) {
my ($method) = (caller(1))[3];
my $class_name = ref($self);
$method =~ s/$class_name\:://;
$timeout = 'undef' if (! defined($timeout));
croak("Invalid 'timeout' argument ($timeout) to '$method' meth
+od");
}
return $timeout;
};
# Check value of the requested count
sub _validate_count {
my $self = shift;
my $count = shift;
if (! defined($count) ||
! looks_like_number($count) ||
(int($count) != $count) ||
($count < 1))
{
require Carp;
my ($method) = (caller(1))[3];
my $class_name = ref($self);
$method =~ s/$class_name\:://;
$count = 'undef' if (! defined($count));
Carp::croak("Invalid 'count' argument ($count) to '$method' me
+thod");
}
return $count;
}
#QUEUES---------------------------------------------------------------
+-------------------------------
#COMMUNICATION--------------------------------------------------------
+----------------------------------
sub _server {
local *__ANON__ = '_server';
my $self = shift;
require XML::Simple;
XML::Simple->import();
require IO::Socket::INET;
IO::Socket::INET->import();
require IO::Select;
IO::Select->import();
my $xs = new XML::Simple(keeproot=>1, forcearray=>1, KeyAttr=>[]);
print "SERVER PORT ".${$self->{'port'}}."\n";
my $socket = new IO::Socket::INET (LocalHost => '0.0.0.0', LocalPo
+rt => ${$self->{'port'}}, Proto => 'tcp', Listen => 5, Reuse => 1);
croak "cannot create socket $!\n" unless $socket;
print "server waiting for client connection on port ".${$self->{'p
+ort'}}."\n";
my $sockets = IO::Select->new($socket);
my %buffer;
while (! $$self{'ENDED'}) {
#RX
my ($read) = IO::Select->select($sockets, undef, undef, 0.5);
foreach my $rh (@$read) {
if ($rh == $socket) {
my $ns = $rh->accept() or printf "ERROR (%d)(%s)(%d)(%
+s)", $!,$!,$^E,$^E;
print "Accepting client: ".$ns->peerhost().":".$ns->pe
+erport()."\n";
$buffer{$ns->peerhost().":".$ns->peerport()} = "";
#send initial data
$self->_rxenqueue("<message><from ip='".$ns->peerhost(
+)."'/><method>add</method></message>");
$sockets->add($ns);
} else {
my $handle = qualify_to_ref($rh, caller( ));
my $bits = '';
vec($bits, fileno($handle), 1) = 1;
my $line = ""; #line char buffer
while (select($bits,undef,undef,0.0)) { #While data in
+ the buffer
my $was_blocking = $handle->blocking(0); #turn on
+blocking
my $reading = 1;
while ($reading == 1) {
my $ret = sysread($handle, my $nextbyte, 1);
print "SYSREAD RET: [$ret]\n" unless ($ret);
print "SOCKET STATUS: $!\n" if ($!);
if (defined($ret)) {
if ($ret == 0) {
#CLOSING SOCKET
$reading = -1;
} else {
$line .= $nextbyte;
$reading = 0 if $nextbyte eq "\n";
}
} elsif ($! !~ m/a non-blocking socket operati
+on could not be completed immediately/i) {
$reading = -1;
}
}
if ($reading == -1) { #if read error is 'An existi
+ng connection was forcibly closed by the remote host.' remove host
#print "\n\nRemoving client: ".$rh->peerhost()
+.":".$rh->peerport()."\n";
delete($buffer{$rh->peerhost().":".$rh->peerpo
+rt()});
$sockets->remove($rh);
close($rh);
last;
}
$handle->blocking($was_blocking); #restore blockin
+g state
if ($line =~ /\n\z/) { #if I have a 'line' of data
$buffer{$rh->peerhost().":".$rh->peerport()} .
+= $line;
$line = "";
if ($buffer{$rh->peerhost().":".$rh->peerport(
+)} =~ m/^<message>.+<\/message>$/s) { #if I have a full message
$self->_rxenqueue($buffer{$rh->peerhost().
+":".$rh->peerport()});
$buffer{$rh->peerhost().":".$rh->peerport(
+)} = "";
last; #one message at a time per handle
}
}
}
}
}
#TX
if (defined(my $data = $self->_txdequeue_timed(1))) {
my $ref = $xs->XMLin($data);
foreach my $wh ($sockets->handles()) {
next if ($socket eq $wh);
$ref->{message}->[0]->{from}->[0]->{ip} = $wh->sockhos
+t();
$ref->{message}->[0]->{to}->[0]->{ip} = $wh->peerhost(
+);
my $xml = $xs->XMLout($ref);
print $xml;
print $wh $xml;
}
}
}
print "_server EXIT\n";
}
#COMMUNICATION--------------------------------------------------------
+----------------------------------
1;
CLIENT:
package communication_client;
use Carp;
use threads;
use threads::shared;
use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
use Symbol qw(qualify_to_ref);
use strict;
use warnings::register;
$| = 1; #disable STDOUT buffering
# Carp errors from threads::shared calls should complain about caller
our @CARP_NOT = ("threads::shared");
sub new {
my $class = shift;
croak "Port number required" unless(@_);
my $port :shared = _validate_count(undef, shift);
my @txQueue :shared;
my @rxQueue :shared;
my $txlock :shared;
my $rxlock :shared;
my %self :shared = (
'port' => \$port,
'txQueue' => \@txQueue,
'rxQueue' => \@rxQueue,
'txlock' => \$txlock,
'rxlock' => \$rxlock,
);
bless(\%self, $class);
threads->create('_client', \%self)->detach();
return \%self;
}
#QUEUES---------------------------------------------------------------
+-------------------------------
sub enqueue {
my $self = shift;
unless ($$self{'ENDED'}) {
lock(${$self->{'txlock'}});
push(@{$self->{'txQueue'}}, map { shared_clone($_) } @_) and c
+ond_signal(${$self->{'txlock'}});
}
}
sub dequeue {
my $self = shift;
lock(${$self->{'rxlock'}});
my $count = @_ ? $self->_validate_count(shift) : 1;
cond_wait(${$self->{'rxlock'}}) while ((@{$self->{'rxQueue'}} < $c
+ount) && ! $$self{'ENDED'});
cond_signal(${$self->{'rxlock'}}) if ((@{$self->{'rxQueue'}} > $co
+unt) || $$self{'ENDED'});
# Return single item
return shift(@{$self->{'rxQueue'}}) if ($count == 1);
# Return multiple items
my @items;
for (1..$count) {
last if (! @{$self->{'rxQueue'}}); #this should only happen in
+ the event there are either enough values or end has been called.
push(@items, shift(@{$self->{'rxQueue'}}));
}
return @items;
}
sub dequeue_nb {
my $self = shift;
lock(${$self->{'rxlock'}});
my $count = @_ ? $self->_validate_count(shift) : 1;
# Return single item
return shift(@{$self->{'rxQueue'}}) if ($count == 1);
# Return multiple items
my @items;
for (1..$count) {
last if (! @{$self->{'rxQueue'}}); #this should only happen in
+ the event there are either enough values or end has been called.
push(@items, shift(@{$self->{'rxQueue'}}));
}
return @items;
}
sub end {
my $self = shift;
lock ${$self->{'txlock'}};
lock(${$self->{'rxlock'}});
# No more data is coming
$$self{'ENDED'} = 1;
# Try to release at least one blocked thread
cond_signal(${$self->{'txlock'}});
cond_signal(${$self->{'rxlock'}});
}
sub _txdequeue {
my $self = shift;
lock(${$self->{'txlock'}});
my $count = @_ ? $self->_validate_count(shift) : 1;
cond_wait(${$self->{'txlock'}}) while ((@{$self->{'txQueue'}} < $c
+ount) && ! $$self{'ENDED'});
cond_signal(${$self->{'txlock'}}) if ((@{$self->{'txQueue'}} > $co
+unt) || $$self{'ENDED'});
# Return single item
return shift(@{$self->{'txQueue'}}) if ($count == 1);
# Return multiple items
my @items;
for (1..$count) {
last if (! @{$self->{'txQueue'}}); #this should only happen in
+ the event there are either enough values or end has been called.
push(@items, shift(@{$self->{'txQueue'}}));
}
}
sub _txdequeue_nb {
my $self = shift;
lock(${$self->{'txlock'}});
my $count = @_ ? $self->_validate_count(shift) : 1;
# Return single item
return shift(@{$self->{'txQueue'}}) if ($count == 1);
# Return multiple items
my @items;
for (1..$count) {
last if (! @{$self->{'txQueue'}}); #this should only happen in
+ the event there are either enough values or end has been called.
push(@items, shift(@{$self->{'txQueue'}}));
}
return @items;
}
sub _txdequeue_timed {
my $self = shift;
lock(${$self->{'txlock'}});
# Timeout may be relative or absolute
my $timeout = @_ ? $self->_validate_timeout(shift) : -1;
# Convert to an absolute time for use with cond_timedwait()
if ($timeout < 32000000) { # More than one year
$timeout += time();
}
my $count = @_ ? $self->_validate_count(shift) : 1;
# Wait for requisite number of items, or until timeout
while ((@{$self->{'txQueue'}} < $count) && ! $$self{'ENDED'}) {
last if (! cond_timedwait(${$self->{'txlock'}}, $timeout));
}
cond_signal(${$self->{'txlock'}}) if ((@{$self->{'txQueue'}} > $co
+unt) || $$self{'ENDED'});
# Return single item
return shift(@{$self->{'txQueue'}}) if ($count == 1);
# Return multiple items
my @items;
for (1..$count) {
last if (! @{$self->{'txQueue'}}); #this should only happen in
+ the event there are either enough values or end has been called.
push(@items, shift(@{$self->{'txQueue'}}));
}
}
sub _rxenqueue {
my $self = shift;
unless ($$self{'ENDED'}) {
lock(${$self->{'rxlock'}});
push(@{$self->{'rxQueue'}}, map { shared_clone($_) } @_) and c
+ond_signal(${$self->{'rxlock'}});
}
}
sub _validate_timeout {
my $self = shift;
my $timeout = shift;
if (! defined($timeout) || ! looks_like_number($timeout)) {
my ($method) = (caller(1))[3];
my $class_name = ref($self);
$method =~ s/$class_name\:://;
$timeout = 'undef' if (! defined($timeout));
croak("Invalid 'timeout' argument ($timeout) to '$method' meth
+od");
}
return $timeout;
};
# Check value of the requested count
sub _validate_count {
my $self = shift;
my $count = shift;
if (! defined($count) ||
! looks_like_number($count) ||
(int($count) != $count) ||
($count < 1))
{
require Carp;
my ($method) = (caller(1))[3];
my $class_name = ref($self);
$method =~ s/$class_name\:://;
$count = 'undef' if (! defined($count));
Carp::croak("Invalid 'count' argument ($count) to '$method' me
+thod");
}
return $count;
}
#QUEUES---------------------------------------------------------------
+-------------------------------
#COMMUNICATION--------------------------------------------------------
+----------------------------------
sub _client {
local *__ANON__ = '_client';
my $self = shift;
require XML::Simple;
XML::Simple->import();
require IO::Socket::INET;
IO::Socket::INET->import();
require IO::Select;
IO::Select->import();
my $xs = new XML::Simple(keeproot=>1, forcearray=>1, KeyAttr=>[]);
my $sockets = IO::Select->new();
my %buffer;
while (! $$self{'ENDED'}) {
#RX
my ($read) = IO::Select->select($sockets, undef, undef, 0.5);
foreach my $rh (@$read) {
my $handle = qualify_to_ref($rh, caller( ));
my $bits = '';
vec($bits, fileno($handle), 1) = 1;
my $line = ""; #line char buffer
while (select($bits,undef,undef,0.0)) { #While data in the
+ buffer
my $was_blocking = $handle->blocking(0); #turn on bloc
+king
my $reading = 1;
while ($reading == 1) {
my $ret = sysread($handle, my $nextbyte, 1);
print "SYSREAD RET: [$ret]\n" unless ($ret);
print "SOCKET STATUS: $!\n" if ($!);
if (defined($ret)) {
if ($ret == 0) {
#CLOSING SOCKET
$reading = -1;
} else {
$line .= $nextbyte;
$reading = 0 if $nextbyte eq "\n";
}
} elsif ($! !~ m/a non-blocking socket operation c
+ould not be completed immediately/i) {
$reading = -1;
}
}
if ($reading == -1) { #if read error is 'An existing c
+onnection was forcibly closed by the remote host.' remove host
#print "\n\nRemoving client: ".$rh->peerhost()."\n
+";
$self->_rxenqueue("<message><from ip=\"".$rh->peer
+host()."\"/><method>disconnected</method></message>\n");
delete($buffer{$rh->peerhost().":".$rh->peerport()
+});
$sockets->remove($rh);
close($rh);
last;
}
$handle->blocking($was_blocking); #restore blocking st
+ate
if ($line =~ /\n\z/) { #if I have a 'line' of data
$buffer{$rh->peerhost().":".$rh->peerport()} .= $l
+ine;
$line = "";
if ($buffer{$rh->peerhost().":".$rh->peerport()} =
+~ m/^<message>.+<\/message>$/s) { #if I have a full message
$self->_rxenqueue($buffer{$rh->peerhost().":".
+$rh->peerport()});
$buffer{$rh->peerhost().":".$rh->peerport()} =
+ "";
last; #one message at a time per handle
}
}
}
}
#TX
if (defined(my $data = $self->_txdequeue_nb())) {
if ($data =~ m/^\d+\.\d+\.\d+\.\d+$/) {
my @connections;
foreach my $fd ($sockets->handles()) {
push(@connections, $fd->peerhost());
}
unless ($data ~~ @connections) {
my $ns = new IO::Socket::INET (PeerAddr => $data,
+PeerPort => ${$self->{'port'}}, Proto => 'tcp');
if ($ns) {
$sockets->add($ns);
print "Connected to server [".$ns->peerhost().
+"]\n";
} else {
print "ERROR: Failed to connect to server [$da
+ta] [$!]\n";
}
} else {
print "WARN: Already connected to server [$data]\n
+";
}
} else {
my $ref = $xs->XMLin($data);
foreach my $wh ($sockets->handles()) {
if ($wh->peerhost() eq $ref->{message}->[0]->{to}-
+>[0]->{ip}) {
print $wh $xs->XMLout($ref);
}
}
}
}
}
print "_client EXIT\n";
}
#COMMUNICATION--------------------------------------------------------
+----------------------------------
1;