Beefy Boxes and Bandwidth Generously Provided by pair Networks
XP is just a number
 
PerlMonks  

RFC: Bi-directional multi-client non-blocking TCP server/client

by glenn (Scribe)
on Oct 17, 2014 at 15:45 UTC ( [id://1104190]=perlmeditation: print w/replies, xml ) Need Help??

I created these two libraries to handle multiple clients connecting to multiple servers. It is designed where the client will send data to a specific server while the server sends updates to all clients. In my case the client is the Tk UI for our testing program which is running on the server and managing test systems. This allows not only remote control of the server but keeps all interested people up to date. The data is passed as XML as it gives nice control structures and the IPs for the sender and receiver can be added from the socket info.

Perhaps someone can enlighten me, in my original design I used two threads one for RX the other for TX and blocked until action needed to be taken. To accomplish this I had to deconstruct the IO::Select lib so that the INET socket should be shared between the two threads; however, I was never able to successfully store and share the socket. This would further reduce CPU usage by allowing the TX queue and RX socket to block until there was data. I appreciate appreciate any insight.

If this can be accomplished without threads...

Update: When the socket is cleanly closed the client does not close the local socket. FIXED

TESTED: 2 servers, 4 clients

USAGE:
use threads; use Thread::Queue; use xml::simple; use communication_client; my $com = communication_client->new(40000); #DST port my $thr = threads->create(sub{ while (my $data = $com->dequeue()) { print "Received:\n$data\n"; my $input = $xs->XMLin($data); my $ip = $input->{message}->[0]->{from}->[0]->{ip}; #WORK if ($input->{message}->[0]->{anyfieldsyouwant}->[0] eq "whatev +er") { #WORK } } }); $com->enqueue("<message><to ip=\"IPADDR\"><data>INFO</data><anyfieldsy +ouwant>whatever</anyfieldsyouwant></message>");
SERVER:
package communication_server; use Carp; use threads; use threads::shared; use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr); use Symbol qw(qualify_to_ref); use strict; use warnings::register; $| = 1; #disable STDOUT buffering # Carp errors from threads::shared calls should complain about caller our @CARP_NOT = ("threads::shared"); sub new { my $class = shift; croak "Port number required" unless(@_); my $port :shared = _validate_count(undef, shift); my @txQueue :shared; my @rxQueue :shared; my $txlock :shared; my $rxlock :shared; my %self :shared = ( 'port' => \$port, 'txQueue' => \@txQueue, 'rxQueue' => \@rxQueue, 'txlock' => \$txlock, 'rxlock' => \$rxlock, ); bless(\%self, $class); threads->create('_server', \%self)->detach(); return \%self; } #QUEUES--------------------------------------------------------------- +------------------------------- sub enqueue { my $self = shift; unless ($$self{'ENDED'}) { lock(${$self->{'txlock'}}); push(@{$self->{'txQueue'}}, map { shared_clone($_) } @_) and c +ond_signal(${$self->{'txlock'}}); } } sub dequeue { my $self = shift; lock(${$self->{'rxlock'}}); my $count = @_ ? $self->_validate_count(shift) : 1; cond_wait(${$self->{'rxlock'}}) while ((@{$self->{'rxQueue'}} < $c +ount) && ! $$self{'ENDED'}); cond_signal(${$self->{'rxlock'}}) if ((@{$self->{'rxQueue'}} > $co +unt) || $$self{'ENDED'}); # Return single item return shift(@{$self->{'rxQueue'}}) if ($count == 1); # Return multiple items my @items; for (1..$count) { last if (! @{$self->{'rxQueue'}}); #this should only happen in + the event there are either enough values or end has been called. push(@items, shift(@{$self->{'rxQueue'}})); } return @items; } sub dequeue_nb { my $self = shift; lock(${$self->{'rxlock'}}); my $count = @_ ? $self->_validate_count(shift) : 1; # Return single item return shift(@{$self->{'rxQueue'}}) if ($count == 1); # Return multiple items my @items; for (1..$count) { last if (! @{$self->{'rxQueue'}}); #this should only happen in + the event there are either enough values or end has been called. push(@items, shift(@{$self->{'rxQueue'}})); } return @items; } sub end { my $self = shift; lock ${$self->{'txlock'}}; lock(${$self->{'rxlock'}}); # No more data is coming $$self{'ENDED'} = 1; # Try to release at least one blocked thread cond_signal(${$self->{'txlock'}}); cond_signal(${$self->{'rxlock'}}); } sub _txdequeue { my $self = shift; lock(${$self->{'txlock'}}); my $count = @_ ? $self->_validate_count(shift) : 1; cond_wait(${$self->{'txlock'}}) while ((@{$self->{'txQueue'}} < $c +ount) && ! $$self{'ENDED'}); cond_signal(${$self->{'txlock'}}) if ((@{$self->{'txQueue'}} > $co +unt) || $$self{'ENDED'}); # Return single item return shift(@{$self->{'txQueue'}}) if ($count == 1); # Return multiple items my @items; for (1..$count) { last if (! @{$self->{'txQueue'}}); #this should only happen in + the event there are either enough values or end has been called. push(@items, shift(@{$self->{'txQueue'}})); } } sub _txdequeue_timed { my $self = shift; lock(${$self->{'txlock'}}); # Timeout may be relative or absolute my $timeout = @_ ? $self->_validate_timeout(shift) : -1; # Convert to an absolute time for use with cond_timedwait() if ($timeout < 32000000) { # More than one year $timeout += time(); } my $count = @_ ? $self->_validate_count(shift) : 1; # Wait for requisite number of items, or until timeout while ((@{$self->{'txQueue'}} < $count) && ! $$self{'ENDED'}) { last if (! cond_timedwait(${$self->{'txlock'}}, $timeout)); } cond_signal(${$self->{'txlock'}}) if ((@{$self->{'txQueue'}} > $co +unt) || $$self{'ENDED'}); # Return single item return shift(@{$self->{'txQueue'}}) if ($count == 1); # Return multiple items my @items; for (1..$count) { last if (! @{$self->{'txQueue'}}); #this should only happen in + the event there are either enough values or end has been called. push(@items, shift(@{$self->{'txQueue'}})); } } sub _rxenqueue { my $self = shift; unless ($$self{'ENDED'}) { lock(${$self->{'rxlock'}}); push(@{$self->{'rxQueue'}}, map { shared_clone($_) } @_) and c +ond_signal(${$self->{'rxlock'}}); } } sub _validate_timeout { my $self = shift; my $timeout = shift; if (! defined($timeout) || ! looks_like_number($timeout)) { my ($method) = (caller(1))[3]; my $class_name = ref($self); $method =~ s/$class_name\:://; $timeout = 'undef' if (! defined($timeout)); croak("Invalid 'timeout' argument ($timeout) to '$method' meth +od"); } return $timeout; }; # Check value of the requested count sub _validate_count { my $self = shift; my $count = shift; if (! defined($count) || ! looks_like_number($count) || (int($count) != $count) || ($count < 1)) { require Carp; my ($method) = (caller(1))[3]; my $class_name = ref($self); $method =~ s/$class_name\:://; $count = 'undef' if (! defined($count)); Carp::croak("Invalid 'count' argument ($count) to '$method' me +thod"); } return $count; } #QUEUES--------------------------------------------------------------- +------------------------------- #COMMUNICATION-------------------------------------------------------- +---------------------------------- sub _server { local *__ANON__ = '_server'; my $self = shift; require XML::Simple; XML::Simple->import(); require IO::Socket::INET; IO::Socket::INET->import(); require IO::Select; IO::Select->import(); my $xs = new XML::Simple(keeproot=>1, forcearray=>1, KeyAttr=>[]); print "SERVER PORT ".${$self->{'port'}}."\n"; my $socket = new IO::Socket::INET (LocalHost => '0.0.0.0', LocalPo +rt => ${$self->{'port'}}, Proto => 'tcp', Listen => 5, Reuse => 1); croak "cannot create socket $!\n" unless $socket; print "server waiting for client connection on port ".${$self->{'p +ort'}}."\n"; my $sockets = IO::Select->new($socket); my %buffer; while (! $$self{'ENDED'}) { #RX my ($read) = IO::Select->select($sockets, undef, undef, 0.5); foreach my $rh (@$read) { if ($rh == $socket) { my $ns = $rh->accept() or printf "ERROR (%d)(%s)(%d)(% +s)", $!,$!,$^E,$^E; print "Accepting client: ".$ns->peerhost().":".$ns->pe +erport()."\n"; $buffer{$ns->peerhost().":".$ns->peerport()} = ""; #send initial data $self->_rxenqueue("<message><from ip='".$ns->peerhost( +)."'/><method>add</method></message>"); $sockets->add($ns); } else { my $handle = qualify_to_ref($rh, caller( )); my $bits = ''; vec($bits, fileno($handle), 1) = 1; my $line = ""; #line char buffer while (select($bits,undef,undef,0.0)) { #While data in + the buffer my $was_blocking = $handle->blocking(0); #turn on +blocking my $reading = 1; while ($reading == 1) { my $ret = sysread($handle, my $nextbyte, 1); print "SYSREAD RET: [$ret]\n" unless ($ret); print "SOCKET STATUS: $!\n" if ($!); if (defined($ret)) { if ($ret == 0) { #CLOSING SOCKET $reading = -1; } else { $line .= $nextbyte; $reading = 0 if $nextbyte eq "\n"; } } elsif ($! !~ m/a non-blocking socket operati +on could not be completed immediately/i) { $reading = -1; } } if ($reading == -1) { #if read error is 'An existi +ng connection was forcibly closed by the remote host.' remove host #print "\n\nRemoving client: ".$rh->peerhost() +.":".$rh->peerport()."\n"; delete($buffer{$rh->peerhost().":".$rh->peerpo +rt()}); $sockets->remove($rh); close($rh); last; } $handle->blocking($was_blocking); #restore blockin +g state if ($line =~ /\n\z/) { #if I have a 'line' of data $buffer{$rh->peerhost().":".$rh->peerport()} . += $line; $line = ""; if ($buffer{$rh->peerhost().":".$rh->peerport( +)} =~ m/^<message>.+<\/message>$/s) { #if I have a full message $self->_rxenqueue($buffer{$rh->peerhost(). +":".$rh->peerport()}); $buffer{$rh->peerhost().":".$rh->peerport( +)} = ""; last; #one message at a time per handle } } } } } #TX if (defined(my $data = $self->_txdequeue_timed(1))) { my $ref = $xs->XMLin($data); foreach my $wh ($sockets->handles()) { next if ($socket eq $wh); $ref->{message}->[0]->{from}->[0]->{ip} = $wh->sockhos +t(); $ref->{message}->[0]->{to}->[0]->{ip} = $wh->peerhost( +); my $xml = $xs->XMLout($ref); print $xml; print $wh $xml; } } } print "_server EXIT\n"; } #COMMUNICATION-------------------------------------------------------- +---------------------------------- 1;
CLIENT:
package communication_client; use Carp; use threads; use threads::shared; use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr); use Symbol qw(qualify_to_ref); use strict; use warnings::register; $| = 1; #disable STDOUT buffering # Carp errors from threads::shared calls should complain about caller our @CARP_NOT = ("threads::shared"); sub new { my $class = shift; croak "Port number required" unless(@_); my $port :shared = _validate_count(undef, shift); my @txQueue :shared; my @rxQueue :shared; my $txlock :shared; my $rxlock :shared; my %self :shared = ( 'port' => \$port, 'txQueue' => \@txQueue, 'rxQueue' => \@rxQueue, 'txlock' => \$txlock, 'rxlock' => \$rxlock, ); bless(\%self, $class); threads->create('_client', \%self)->detach(); return \%self; } #QUEUES--------------------------------------------------------------- +------------------------------- sub enqueue { my $self = shift; unless ($$self{'ENDED'}) { lock(${$self->{'txlock'}}); push(@{$self->{'txQueue'}}, map { shared_clone($_) } @_) and c +ond_signal(${$self->{'txlock'}}); } } sub dequeue { my $self = shift; lock(${$self->{'rxlock'}}); my $count = @_ ? $self->_validate_count(shift) : 1; cond_wait(${$self->{'rxlock'}}) while ((@{$self->{'rxQueue'}} < $c +ount) && ! $$self{'ENDED'}); cond_signal(${$self->{'rxlock'}}) if ((@{$self->{'rxQueue'}} > $co +unt) || $$self{'ENDED'}); # Return single item return shift(@{$self->{'rxQueue'}}) if ($count == 1); # Return multiple items my @items; for (1..$count) { last if (! @{$self->{'rxQueue'}}); #this should only happen in + the event there are either enough values or end has been called. push(@items, shift(@{$self->{'rxQueue'}})); } return @items; } sub dequeue_nb { my $self = shift; lock(${$self->{'rxlock'}}); my $count = @_ ? $self->_validate_count(shift) : 1; # Return single item return shift(@{$self->{'rxQueue'}}) if ($count == 1); # Return multiple items my @items; for (1..$count) { last if (! @{$self->{'rxQueue'}}); #this should only happen in + the event there are either enough values or end has been called. push(@items, shift(@{$self->{'rxQueue'}})); } return @items; } sub end { my $self = shift; lock ${$self->{'txlock'}}; lock(${$self->{'rxlock'}}); # No more data is coming $$self{'ENDED'} = 1; # Try to release at least one blocked thread cond_signal(${$self->{'txlock'}}); cond_signal(${$self->{'rxlock'}}); } sub _txdequeue { my $self = shift; lock(${$self->{'txlock'}}); my $count = @_ ? $self->_validate_count(shift) : 1; cond_wait(${$self->{'txlock'}}) while ((@{$self->{'txQueue'}} < $c +ount) && ! $$self{'ENDED'}); cond_signal(${$self->{'txlock'}}) if ((@{$self->{'txQueue'}} > $co +unt) || $$self{'ENDED'}); # Return single item return shift(@{$self->{'txQueue'}}) if ($count == 1); # Return multiple items my @items; for (1..$count) { last if (! @{$self->{'txQueue'}}); #this should only happen in + the event there are either enough values or end has been called. push(@items, shift(@{$self->{'txQueue'}})); } } sub _txdequeue_nb { my $self = shift; lock(${$self->{'txlock'}}); my $count = @_ ? $self->_validate_count(shift) : 1; # Return single item return shift(@{$self->{'txQueue'}}) if ($count == 1); # Return multiple items my @items; for (1..$count) { last if (! @{$self->{'txQueue'}}); #this should only happen in + the event there are either enough values or end has been called. push(@items, shift(@{$self->{'txQueue'}})); } return @items; } sub _txdequeue_timed { my $self = shift; lock(${$self->{'txlock'}}); # Timeout may be relative or absolute my $timeout = @_ ? $self->_validate_timeout(shift) : -1; # Convert to an absolute time for use with cond_timedwait() if ($timeout < 32000000) { # More than one year $timeout += time(); } my $count = @_ ? $self->_validate_count(shift) : 1; # Wait for requisite number of items, or until timeout while ((@{$self->{'txQueue'}} < $count) && ! $$self{'ENDED'}) { last if (! cond_timedwait(${$self->{'txlock'}}, $timeout)); } cond_signal(${$self->{'txlock'}}) if ((@{$self->{'txQueue'}} > $co +unt) || $$self{'ENDED'}); # Return single item return shift(@{$self->{'txQueue'}}) if ($count == 1); # Return multiple items my @items; for (1..$count) { last if (! @{$self->{'txQueue'}}); #this should only happen in + the event there are either enough values or end has been called. push(@items, shift(@{$self->{'txQueue'}})); } } sub _rxenqueue { my $self = shift; unless ($$self{'ENDED'}) { lock(${$self->{'rxlock'}}); push(@{$self->{'rxQueue'}}, map { shared_clone($_) } @_) and c +ond_signal(${$self->{'rxlock'}}); } } sub _validate_timeout { my $self = shift; my $timeout = shift; if (! defined($timeout) || ! looks_like_number($timeout)) { my ($method) = (caller(1))[3]; my $class_name = ref($self); $method =~ s/$class_name\:://; $timeout = 'undef' if (! defined($timeout)); croak("Invalid 'timeout' argument ($timeout) to '$method' meth +od"); } return $timeout; }; # Check value of the requested count sub _validate_count { my $self = shift; my $count = shift; if (! defined($count) || ! looks_like_number($count) || (int($count) != $count) || ($count < 1)) { require Carp; my ($method) = (caller(1))[3]; my $class_name = ref($self); $method =~ s/$class_name\:://; $count = 'undef' if (! defined($count)); Carp::croak("Invalid 'count' argument ($count) to '$method' me +thod"); } return $count; } #QUEUES--------------------------------------------------------------- +------------------------------- #COMMUNICATION-------------------------------------------------------- +---------------------------------- sub _client { local *__ANON__ = '_client'; my $self = shift; require XML::Simple; XML::Simple->import(); require IO::Socket::INET; IO::Socket::INET->import(); require IO::Select; IO::Select->import(); my $xs = new XML::Simple(keeproot=>1, forcearray=>1, KeyAttr=>[]); my $sockets = IO::Select->new(); my %buffer; while (! $$self{'ENDED'}) { #RX my ($read) = IO::Select->select($sockets, undef, undef, 0.5); foreach my $rh (@$read) { my $handle = qualify_to_ref($rh, caller( )); my $bits = ''; vec($bits, fileno($handle), 1) = 1; my $line = ""; #line char buffer while (select($bits,undef,undef,0.0)) { #While data in the + buffer my $was_blocking = $handle->blocking(0); #turn on bloc +king my $reading = 1; while ($reading == 1) { my $ret = sysread($handle, my $nextbyte, 1); print "SYSREAD RET: [$ret]\n" unless ($ret); print "SOCKET STATUS: $!\n" if ($!); if (defined($ret)) { if ($ret == 0) { #CLOSING SOCKET $reading = -1; } else { $line .= $nextbyte; $reading = 0 if $nextbyte eq "\n"; } } elsif ($! !~ m/a non-blocking socket operation c +ould not be completed immediately/i) { $reading = -1; } } if ($reading == -1) { #if read error is 'An existing c +onnection was forcibly closed by the remote host.' remove host #print "\n\nRemoving client: ".$rh->peerhost()."\n +"; $self->_rxenqueue("<message><from ip=\"".$rh->peer +host()."\"/><method>disconnected</method></message>\n"); delete($buffer{$rh->peerhost().":".$rh->peerport() +}); $sockets->remove($rh); close($rh); last; } $handle->blocking($was_blocking); #restore blocking st +ate if ($line =~ /\n\z/) { #if I have a 'line' of data $buffer{$rh->peerhost().":".$rh->peerport()} .= $l +ine; $line = ""; if ($buffer{$rh->peerhost().":".$rh->peerport()} = +~ m/^<message>.+<\/message>$/s) { #if I have a full message $self->_rxenqueue($buffer{$rh->peerhost().":". +$rh->peerport()}); $buffer{$rh->peerhost().":".$rh->peerport()} = + ""; last; #one message at a time per handle } } } } #TX if (defined(my $data = $self->_txdequeue_nb())) { if ($data =~ m/^\d+\.\d+\.\d+\.\d+$/) { my @connections; foreach my $fd ($sockets->handles()) { push(@connections, $fd->peerhost()); } unless ($data ~~ @connections) { my $ns = new IO::Socket::INET (PeerAddr => $data, +PeerPort => ${$self->{'port'}}, Proto => 'tcp'); if ($ns) { $sockets->add($ns); print "Connected to server [".$ns->peerhost(). +"]\n"; } else { print "ERROR: Failed to connect to server [$da +ta] [$!]\n"; } } else { print "WARN: Already connected to server [$data]\n +"; } } else { my $ref = $xs->XMLin($data); foreach my $wh ($sockets->handles()) { if ($wh->peerhost() eq $ref->{message}->[0]->{to}- +>[0]->{ip}) { print $wh $xs->XMLout($ref); } } } } } print "_client EXIT\n"; } #COMMUNICATION-------------------------------------------------------- +---------------------------------- 1;

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: perlmeditation [id://1104190]
Approved by hominid
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others sharing their wisdom with the Monastery: (5)
As of 2024-03-19 10:41 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found