Beefy Boxes and Bandwidth Generously Provided by pair Networks
Perl Monk, Perl Meditation
 
PerlMonks  

comment on

( #3333=superdoc: print w/replies, xml ) Need Help??

I created these two libraries to handle multiple clients connecting to multiple servers. It is designed where the client will send data to a specific server while the server sends updates to all clients. In my case the client is the Tk UI for our testing program which is running on the server and managing test systems. This allows not only remote control of the server but keeps all interested people up to date. The data is passed as XML as it gives nice control structures and the IPs for the sender and receiver can be added from the socket info.

Perhaps someone can enlighten me, in my original design I used two threads one for RX the other for TX and blocked until action needed to be taken. To accomplish this I had to deconstruct the IO::Select lib so that the INET socket should be shared between the two threads; however, I was never able to successfully store and share the socket. This would further reduce CPU usage by allowing the TX queue and RX socket to block until there was data. I appreciate appreciate any insight.

If this can be accomplished without threads...

Update: When the socket is cleanly closed the client does not close the local socket. FIXED

TESTED: 2 servers, 4 clients

USAGE:
use threads; use Thread::Queue; use xml::simple; use communication_client; my $com = communication_client->new(40000); #DST port my $thr = threads->create(sub{ while (my $data = $com->dequeue()) { print "Received:\n$data\n"; my $input = $xs->XMLin($data); my $ip = $input->{message}->[0]->{from}->[0]->{ip}; #WORK if ($input->{message}->[0]->{anyfieldsyouwant}->[0] eq "whatev +er") { #WORK } } }); $com->enqueue("<message><to ip=\"IPADDR\"><data>INFO</data><anyfieldsy +ouwant>whatever</anyfieldsyouwant></message>");
SERVER:
package communication_server; use Carp; use threads; use threads::shared; use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr); use Symbol qw(qualify_to_ref); use strict; use warnings::register; $| = 1; #disable STDOUT buffering # Carp errors from threads::shared calls should complain about caller our @CARP_NOT = ("threads::shared"); sub new { my $class = shift; croak "Port number required" unless(@_); my $port :shared = _validate_count(undef, shift); my @txQueue :shared; my @rxQueue :shared; my $txlock :shared; my $rxlock :shared; my %self :shared = ( 'port' => \$port, 'txQueue' => \@txQueue, 'rxQueue' => \@rxQueue, 'txlock' => \$txlock, 'rxlock' => \$rxlock, ); bless(\%self, $class); threads->create('_server', \%self)->detach(); return \%self; } #QUEUES--------------------------------------------------------------- +------------------------------- sub enqueue { my $self = shift; unless ($$self{'ENDED'}) { lock(${$self->{'txlock'}}); push(@{$self->{'txQueue'}}, map { shared_clone($_) } @_) and c +ond_signal(${$self->{'txlock'}}); } } sub dequeue { my $self = shift; lock(${$self->{'rxlock'}}); my $count = @_ ? $self->_validate_count(shift) : 1; cond_wait(${$self->{'rxlock'}}) while ((@{$self->{'rxQueue'}} < $c +ount) && ! $$self{'ENDED'}); cond_signal(${$self->{'rxlock'}}) if ((@{$self->{'rxQueue'}} > $co +unt) || $$self{'ENDED'}); # Return single item return shift(@{$self->{'rxQueue'}}) if ($count == 1); # Return multiple items my @items; for (1..$count) { last if (! @{$self->{'rxQueue'}}); #this should only happen in + the event there are either enough values or end has been called. push(@items, shift(@{$self->{'rxQueue'}})); } return @items; } sub dequeue_nb { my $self = shift; lock(${$self->{'rxlock'}}); my $count = @_ ? $self->_validate_count(shift) : 1; # Return single item return shift(@{$self->{'rxQueue'}}) if ($count == 1); # Return multiple items my @items; for (1..$count) { last if (! @{$self->{'rxQueue'}}); #this should only happen in + the event there are either enough values or end has been called. push(@items, shift(@{$self->{'rxQueue'}})); } return @items; } sub end { my $self = shift; lock ${$self->{'txlock'}}; lock(${$self->{'rxlock'}}); # No more data is coming $$self{'ENDED'} = 1; # Try to release at least one blocked thread cond_signal(${$self->{'txlock'}}); cond_signal(${$self->{'rxlock'}}); } sub _txdequeue { my $self = shift; lock(${$self->{'txlock'}}); my $count = @_ ? $self->_validate_count(shift) : 1; cond_wait(${$self->{'txlock'}}) while ((@{$self->{'txQueue'}} < $c +ount) && ! $$self{'ENDED'}); cond_signal(${$self->{'txlock'}}) if ((@{$self->{'txQueue'}} > $co +unt) || $$self{'ENDED'}); # Return single item return shift(@{$self->{'txQueue'}}) if ($count == 1); # Return multiple items my @items; for (1..$count) { last if (! @{$self->{'txQueue'}}); #this should only happen in + the event there are either enough values or end has been called. push(@items, shift(@{$self->{'txQueue'}})); } } sub _txdequeue_timed { my $self = shift; lock(${$self->{'txlock'}}); # Timeout may be relative or absolute my $timeout = @_ ? $self->_validate_timeout(shift) : -1; # Convert to an absolute time for use with cond_timedwait() if ($timeout < 32000000) { # More than one year $timeout += time(); } my $count = @_ ? $self->_validate_count(shift) : 1; # Wait for requisite number of items, or until timeout while ((@{$self->{'txQueue'}} < $count) && ! $$self{'ENDED'}) { last if (! cond_timedwait(${$self->{'txlock'}}, $timeout)); } cond_signal(${$self->{'txlock'}}) if ((@{$self->{'txQueue'}} > $co +unt) || $$self{'ENDED'}); # Return single item return shift(@{$self->{'txQueue'}}) if ($count == 1); # Return multiple items my @items; for (1..$count) { last if (! @{$self->{'txQueue'}}); #this should only happen in + the event there are either enough values or end has been called. push(@items, shift(@{$self->{'txQueue'}})); } } sub _rxenqueue { my $self = shift; unless ($$self{'ENDED'}) { lock(${$self->{'rxlock'}}); push(@{$self->{'rxQueue'}}, map { shared_clone($_) } @_) and c +ond_signal(${$self->{'rxlock'}}); } } sub _validate_timeout { my $self = shift; my $timeout = shift; if (! defined($timeout) || ! looks_like_number($timeout)) { my ($method) = (caller(1))[3]; my $class_name = ref($self); $method =~ s/$class_name\:://; $timeout = 'undef' if (! defined($timeout)); croak("Invalid 'timeout' argument ($timeout) to '$method' meth +od"); } return $timeout; }; # Check value of the requested count sub _validate_count { my $self = shift; my $count = shift; if (! defined($count) || ! looks_like_number($count) || (int($count) != $count) || ($count < 1)) { require Carp; my ($method) = (caller(1))[3]; my $class_name = ref($self); $method =~ s/$class_name\:://; $count = 'undef' if (! defined($count)); Carp::croak("Invalid 'count' argument ($count) to '$method' me +thod"); } return $count; } #QUEUES--------------------------------------------------------------- +------------------------------- #COMMUNICATION-------------------------------------------------------- +---------------------------------- sub _server { local *__ANON__ = '_server'; my $self = shift; require XML::Simple; XML::Simple->import(); require IO::Socket::INET; IO::Socket::INET->import(); require IO::Select; IO::Select->import(); my $xs = new XML::Simple(keeproot=>1, forcearray=>1, KeyAttr=>[]); print "SERVER PORT ".${$self->{'port'}}."\n"; my $socket = new IO::Socket::INET (LocalHost => '0.0.0.0', LocalPo +rt => ${$self->{'port'}}, Proto => 'tcp', Listen => 5, Reuse => 1); croak "cannot create socket $!\n" unless $socket; print "server waiting for client connection on port ".${$self->{'p +ort'}}."\n"; my $sockets = IO::Select->new($socket); my %buffer; while (! $$self{'ENDED'}) { #RX my ($read) = IO::Select->select($sockets, undef, undef, 0.5); foreach my $rh (@$read) { if ($rh == $socket) { my $ns = $rh->accept() or printf "ERROR (%d)(%s)(%d)(% +s)", $!,$!,$^E,$^E; print "Accepting client: ".$ns->peerhost().":".$ns->pe +erport()."\n"; $buffer{$ns->peerhost().":".$ns->peerport()} = ""; #send initial data $self->_rxenqueue("<message><from ip='".$ns->peerhost( +)."'/><method>add</method></message>"); $sockets->add($ns); } else { my $handle = qualify_to_ref($rh, caller( )); my $bits = ''; vec($bits, fileno($handle), 1) = 1; my $line = ""; #line char buffer while (select($bits,undef,undef,0.0)) { #While data in + the buffer my $was_blocking = $handle->blocking(0); #turn on +blocking my $reading = 1; while ($reading == 1) { my $ret = sysread($handle, my $nextbyte, 1); print "SYSREAD RET: [$ret]\n" unless ($ret); print "SOCKET STATUS: $!\n" if ($!); if (defined($ret)) { if ($ret == 0) { #CLOSING SOCKET $reading = -1; } else { $line .= $nextbyte; $reading = 0 if $nextbyte eq "\n"; } } elsif ($! !~ m/a non-blocking socket operati +on could not be completed immediately/i) { $reading = -1; } } if ($reading == -1) { #if read error is 'An existi +ng connection was forcibly closed by the remote host.' remove host #print "\n\nRemoving client: ".$rh->peerhost() +.":".$rh->peerport()."\n"; delete($buffer{$rh->peerhost().":".$rh->peerpo +rt()}); $sockets->remove($rh); close($rh); last; } $handle->blocking($was_blocking); #restore blockin +g state if ($line =~ /\n\z/) { #if I have a 'line' of data $buffer{$rh->peerhost().":".$rh->peerport()} . += $line; $line = ""; if ($buffer{$rh->peerhost().":".$rh->peerport( +)} =~ m/^<message>.+<\/message>$/s) { #if I have a full message $self->_rxenqueue($buffer{$rh->peerhost(). +":".$rh->peerport()}); $buffer{$rh->peerhost().":".$rh->peerport( +)} = ""; last; #one message at a time per handle } } } } } #TX if (defined(my $data = $self->_txdequeue_timed(1))) { my $ref = $xs->XMLin($data); foreach my $wh ($sockets->handles()) { next if ($socket eq $wh); $ref->{message}->[0]->{from}->[0]->{ip} = $wh->sockhos +t(); $ref->{message}->[0]->{to}->[0]->{ip} = $wh->peerhost( +); my $xml = $xs->XMLout($ref); print $xml; print $wh $xml; } } } print "_server EXIT\n"; } #COMMUNICATION-------------------------------------------------------- +---------------------------------- 1;
CLIENT:
package communication_client; use Carp; use threads; use threads::shared; use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr); use Symbol qw(qualify_to_ref); use strict; use warnings::register; $| = 1; #disable STDOUT buffering # Carp errors from threads::shared calls should complain about caller our @CARP_NOT = ("threads::shared"); sub new { my $class = shift; croak "Port number required" unless(@_); my $port :shared = _validate_count(undef, shift); my @txQueue :shared; my @rxQueue :shared; my $txlock :shared; my $rxlock :shared; my %self :shared = ( 'port' => \$port, 'txQueue' => \@txQueue, 'rxQueue' => \@rxQueue, 'txlock' => \$txlock, 'rxlock' => \$rxlock, ); bless(\%self, $class); threads->create('_client', \%self)->detach(); return \%self; } #QUEUES--------------------------------------------------------------- +------------------------------- sub enqueue { my $self = shift; unless ($$self{'ENDED'}) { lock(${$self->{'txlock'}}); push(@{$self->{'txQueue'}}, map { shared_clone($_) } @_) and c +ond_signal(${$self->{'txlock'}}); } } sub dequeue { my $self = shift; lock(${$self->{'rxlock'}}); my $count = @_ ? $self->_validate_count(shift) : 1; cond_wait(${$self->{'rxlock'}}) while ((@{$self->{'rxQueue'}} < $c +ount) && ! $$self{'ENDED'}); cond_signal(${$self->{'rxlock'}}) if ((@{$self->{'rxQueue'}} > $co +unt) || $$self{'ENDED'}); # Return single item return shift(@{$self->{'rxQueue'}}) if ($count == 1); # Return multiple items my @items; for (1..$count) { last if (! @{$self->{'rxQueue'}}); #this should only happen in + the event there are either enough values or end has been called. push(@items, shift(@{$self->{'rxQueue'}})); } return @items; } sub dequeue_nb { my $self = shift; lock(${$self->{'rxlock'}}); my $count = @_ ? $self->_validate_count(shift) : 1; # Return single item return shift(@{$self->{'rxQueue'}}) if ($count == 1); # Return multiple items my @items; for (1..$count) { last if (! @{$self->{'rxQueue'}}); #this should only happen in + the event there are either enough values or end has been called. push(@items, shift(@{$self->{'rxQueue'}})); } return @items; } sub end { my $self = shift; lock ${$self->{'txlock'}}; lock(${$self->{'rxlock'}}); # No more data is coming $$self{'ENDED'} = 1; # Try to release at least one blocked thread cond_signal(${$self->{'txlock'}}); cond_signal(${$self->{'rxlock'}}); } sub _txdequeue { my $self = shift; lock(${$self->{'txlock'}}); my $count = @_ ? $self->_validate_count(shift) : 1; cond_wait(${$self->{'txlock'}}) while ((@{$self->{'txQueue'}} < $c +ount) && ! $$self{'ENDED'}); cond_signal(${$self->{'txlock'}}) if ((@{$self->{'txQueue'}} > $co +unt) || $$self{'ENDED'}); # Return single item return shift(@{$self->{'txQueue'}}) if ($count == 1); # Return multiple items my @items; for (1..$count) { last if (! @{$self->{'txQueue'}}); #this should only happen in + the event there are either enough values or end has been called. push(@items, shift(@{$self->{'txQueue'}})); } } sub _txdequeue_nb { my $self = shift; lock(${$self->{'txlock'}}); my $count = @_ ? $self->_validate_count(shift) : 1; # Return single item return shift(@{$self->{'txQueue'}}) if ($count == 1); # Return multiple items my @items; for (1..$count) { last if (! @{$self->{'txQueue'}}); #this should only happen in + the event there are either enough values or end has been called. push(@items, shift(@{$self->{'txQueue'}})); } return @items; } sub _txdequeue_timed { my $self = shift; lock(${$self->{'txlock'}}); # Timeout may be relative or absolute my $timeout = @_ ? $self->_validate_timeout(shift) : -1; # Convert to an absolute time for use with cond_timedwait() if ($timeout < 32000000) { # More than one year $timeout += time(); } my $count = @_ ? $self->_validate_count(shift) : 1; # Wait for requisite number of items, or until timeout while ((@{$self->{'txQueue'}} < $count) && ! $$self{'ENDED'}) { last if (! cond_timedwait(${$self->{'txlock'}}, $timeout)); } cond_signal(${$self->{'txlock'}}) if ((@{$self->{'txQueue'}} > $co +unt) || $$self{'ENDED'}); # Return single item return shift(@{$self->{'txQueue'}}) if ($count == 1); # Return multiple items my @items; for (1..$count) { last if (! @{$self->{'txQueue'}}); #this should only happen in + the event there are either enough values or end has been called. push(@items, shift(@{$self->{'txQueue'}})); } } sub _rxenqueue { my $self = shift; unless ($$self{'ENDED'}) { lock(${$self->{'rxlock'}}); push(@{$self->{'rxQueue'}}, map { shared_clone($_) } @_) and c +ond_signal(${$self->{'rxlock'}}); } } sub _validate_timeout { my $self = shift; my $timeout = shift; if (! defined($timeout) || ! looks_like_number($timeout)) { my ($method) = (caller(1))[3]; my $class_name = ref($self); $method =~ s/$class_name\:://; $timeout = 'undef' if (! defined($timeout)); croak("Invalid 'timeout' argument ($timeout) to '$method' meth +od"); } return $timeout; }; # Check value of the requested count sub _validate_count { my $self = shift; my $count = shift; if (! defined($count) || ! looks_like_number($count) || (int($count) != $count) || ($count < 1)) { require Carp; my ($method) = (caller(1))[3]; my $class_name = ref($self); $method =~ s/$class_name\:://; $count = 'undef' if (! defined($count)); Carp::croak("Invalid 'count' argument ($count) to '$method' me +thod"); } return $count; } #QUEUES--------------------------------------------------------------- +------------------------------- #COMMUNICATION-------------------------------------------------------- +---------------------------------- sub _client { local *__ANON__ = '_client'; my $self = shift; require XML::Simple; XML::Simple->import(); require IO::Socket::INET; IO::Socket::INET->import(); require IO::Select; IO::Select->import(); my $xs = new XML::Simple(keeproot=>1, forcearray=>1, KeyAttr=>[]); my $sockets = IO::Select->new(); my %buffer; while (! $$self{'ENDED'}) { #RX my ($read) = IO::Select->select($sockets, undef, undef, 0.5); foreach my $rh (@$read) { my $handle = qualify_to_ref($rh, caller( )); my $bits = ''; vec($bits, fileno($handle), 1) = 1; my $line = ""; #line char buffer while (select($bits,undef,undef,0.0)) { #While data in the + buffer my $was_blocking = $handle->blocking(0); #turn on bloc +king my $reading = 1; while ($reading == 1) { my $ret = sysread($handle, my $nextbyte, 1); print "SYSREAD RET: [$ret]\n" unless ($ret); print "SOCKET STATUS: $!\n" if ($!); if (defined($ret)) { if ($ret == 0) { #CLOSING SOCKET $reading = -1; } else { $line .= $nextbyte; $reading = 0 if $nextbyte eq "\n"; } } elsif ($! !~ m/a non-blocking socket operation c +ould not be completed immediately/i) { $reading = -1; } } if ($reading == -1) { #if read error is 'An existing c +onnection was forcibly closed by the remote host.' remove host #print "\n\nRemoving client: ".$rh->peerhost()."\n +"; $self->_rxenqueue("<message><from ip=\"".$rh->peer +host()."\"/><method>disconnected</method></message>\n"); delete($buffer{$rh->peerhost().":".$rh->peerport() +}); $sockets->remove($rh); close($rh); last; } $handle->blocking($was_blocking); #restore blocking st +ate if ($line =~ /\n\z/) { #if I have a 'line' of data $buffer{$rh->peerhost().":".$rh->peerport()} .= $l +ine; $line = ""; if ($buffer{$rh->peerhost().":".$rh->peerport()} = +~ m/^<message>.+<\/message>$/s) { #if I have a full message $self->_rxenqueue($buffer{$rh->peerhost().":". +$rh->peerport()}); $buffer{$rh->peerhost().":".$rh->peerport()} = + ""; last; #one message at a time per handle } } } } #TX if (defined(my $data = $self->_txdequeue_nb())) { if ($data =~ m/^\d+\.\d+\.\d+\.\d+$/) { my @connections; foreach my $fd ($sockets->handles()) { push(@connections, $fd->peerhost()); } unless ($data ~~ @connections) { my $ns = new IO::Socket::INET (PeerAddr => $data, +PeerPort => ${$self->{'port'}}, Proto => 'tcp'); if ($ns) { $sockets->add($ns); print "Connected to server [".$ns->peerhost(). +"]\n"; } else { print "ERROR: Failed to connect to server [$da +ta] [$!]\n"; } } else { print "WARN: Already connected to server [$data]\n +"; } } else { my $ref = $xs->XMLin($data); foreach my $wh ($sockets->handles()) { if ($wh->peerhost() eq $ref->{message}->[0]->{to}- +>[0]->{ip}) { print $wh $xs->XMLout($ref); } } } } } print "_client EXIT\n"; } #COMMUNICATION-------------------------------------------------------- +---------------------------------- 1;

In reply to RFC: Bi-directional multi-client non-blocking TCP server/client by glenn

Title:
Use:  <p> text here (a paragraph) </p>
and:  <code> code here </code>
to format your post; it's "PerlMonks-approved HTML":



  • Are you posting in the right place? Check out Where do I post X? to know for sure.
  • Posts may use any of the Perl Monks Approved HTML tags. Currently these include the following:
    <code> <a> <b> <big> <blockquote> <br /> <dd> <dl> <dt> <em> <font> <h1> <h2> <h3> <h4> <h5> <h6> <hr /> <i> <li> <nbsp> <ol> <p> <small> <strike> <strong> <sub> <sup> <table> <td> <th> <tr> <tt> <u> <ul>
  • Snippets of code should be wrapped in <code> tags not <pre> tags. In fact, <pre> tags should generally be avoided. If they must be used, extreme care should be taken to ensure that their contents do not have long lines (<70 chars), in order to prevent horizontal scrolling (and possible janitor intervention).
  • Want more info? How to link or or How to display code and escape characters are good places to start.
Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others examining the Monastery: (2)
As of 2021-10-24 23:23 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?
    My first memorable Perl project was:







    Results (89 votes). Check out past polls.

    Notices?