Beefy Boxes and Bandwidth Generously Provided by pair Networks
Clear questions and runnable code
get the best and fastest answer
 
PerlMonks  

Re: Improve my tcp-mqtt server

by huck (Parson)
on Jun 20, 2020 at 18:19 UTC ( #11118278=note: print w/replies, xml ) Need Help??


in reply to Improve my tcp-mqtt server

Ah yes, the perils of async processing

The place that "blocks" is $client_socket = $socket->accept();

That wont return until a new socket is ready. That is where i would focus my debug efforts. Put debug prints before and after that line. There are two other datums i would like to see, the absolute number of connections and the number of items in @clients. My guess is that you will see it happens after a certain number of inbound connections.

Why is that blocking point important when you think you are mostly using threads? Because if it cannot get a new connection it will not run your "join-cleanup" section. And if a number of threads are joinable and they are not reaped strange things can happen. If the place it hangs is on the 64th/65th connection suspect this.

The next thing i noticed was that nothing ever leaves @clients. If you dont remove the thread objects from that array they wont ever be DESTROYED and strange things will happen. Since the socket is localed to the thread the socket is never "closed" either. If this was the case again i would expect to see it happen on some number that is some power of 2. But this doesnt matter anyway.

Please explain to me how you expect to reach the line

print "Client exit from ".$user{peer_address}.":".$user{peer_port}."\n +";
Once you realize it cannot be reached you also realize nothing is ever joinable. so even if $client_socket is no longer a operating socket that loop never leaves. Every 5 min it tries to read/write to the socket and just fails. Id expect the prints to throw a warning at least but it is possible they instead block because the socket is not writable.

Even fixing those problems you are still stuck with the accept blocking and preventing any cleanup. fixing that is not so easy. You will find that to solve this problem most use IO::Select;

Ive been playing with this since before IO:Select. some untested cut&paste code for you to look at.

my $server = IO::Socket::INET->new( Proto => 'tcp', PeerAddr => "0.0.0.0", LocalPort => $main::PORT, Listen => SOMAXCONN, Reuse => 1); my $sel_read = IO::Select->new(); my $sel_write = IO::Select->new(); my $sel_error = IO::Select->new(); $sel_read-> add($server); my $timeout=50; my %clients; superloop:while(1) { my ($rd_ptr,$wr_ptr,$er_ptr)=IO::Select->select($sel_read,undef,$sel +_error,$timeout) ; # or die "Error in select: $! "; my $outtime=time; eachfh: for my $fh (@$rd_ptr) { if ($fh == $server) { my $client_socket = $server->accept; $sel_read->add($client_socket); $client_socket->autoflush(1); ## $clients {$client_socket}={ socket=>$client_socket ,thread=>threads->create( \&clientHandler, $client_so +cket ) }; ## } # fh = server ## my @opened=keys(%clients); my @eofable=(); foreach $open ( @opened ) { my $thread=$opened{$open}{thread}; if( $thread->is_joinable() ) { $thread->join(); push @eofable,$open; } } foreach $open ( @eofable ) { my $socket=$opened{$open}{socket}; # magic lost in string +ified key close($socket); $sel_read->remove($socket) if ($sel_read->exists($socket +)) ; $sel_write->remove($socket) if ($sel_write->exists($socke +t)); $sel_error->remove($socket) if ($sel_error->exists($socke +t)); delete $opened{$open}; # destroys socket and thread objec +ts; } ## } # superloop

Replies are listed 'Best First'.
Re^2: Improve my tcp-mqtt server
by leostereo (Beadle) on Jun 29, 2020 at 23:02 UTC
    Dear huck.
    Currently I have only one connection.(I expect to have more in the future).
    I need to request device params and keep this connection for 5min (as polling time) and then release it.
    Device will reconnect inmediatly after closing tcp socket.
    This single connection is crashing my service.
    Im traying to apply all concepts you mentioned but it is really hard for me , besides , the code you provided is not working.
    Any way ; this my new version with my own eval blocks:
    #!/usr/bin/perl use strict; use warnings; use IO::Socket::INET; use threads; use Net::MQTT::Simple; use POSIX qw(strftime); use Sys::Syslog qw(:DEFAULT :standard :macros); my $mqtt; my $client; my $ident='tcp-mqtt_server'; my $logopt='ndelay'; my $facility='LOG_USER'; my $th_id; my $topic; my $value; sub Main { $mqtt = Net::MQTT::Simple->new("localhost:1883"); openlog($ident, $logopt, $facility); # don't forget this # flush after every write $| = 1; my ( $socket, $client_socket ); # Bind to listening address and port $socket = new IO::Socket::INET ( LocalHost => '0.0.0.0', LocalPort => '1001', Proto => 'tcp', Listen => 5, Reuse => 1 ) or die "Could not open socket: ".$!."\n"; print "SERVER Waiting for client connections...\n"; syslog(LOG_INFO,"tcp-mqtt server starting"); my @clients = (); while(1) { # Waiting for new client connection. $client_socket = $socket->accept(); # Push new client connection to it's own thread push ( @clients, threads->create( \&clientHandler, $client_soc +ket ) ); foreach ( @clients ) { if( $_->is_joinable() ) { $_->join(); } } } $socket->close(); return 1; } sub clientHandler { my ($client_socket) = @_; my %user = (); $user{peer_address} = $client_socket->peerhost(); $user{peer_port} = $client_socket->peerport(); $user{local_port} = $client_socket->sockport(); $th_id = threads->tid(); print "Client ".$user{peer_address}.":".$user{peer_port}.":".$user +{local_port}."\n"; syslog(LOG_INFO,"Client $user{peer_address}:$user{peer_port}:$u +ser{local_port}:$th_id is connected"); eval{ my $datestring = strftime "%a %b %e %H:%M:%S %Y", localtim +e; printf("$datestring\n"); print $client_socket "\n"; print $client_socket "VLIN\n"; while( my $buffer = <$client_socket> ) { print $buffer; my $value = substr $buffer, 0,-2; $mqtt->retain("minimon/VLIN" => $buffer); syslog(LOG_INFO,"VLIN: $buffer"); last; } sleep(300); }; if($@){ syslog(LOG_INFO,"tcp-mqtt exception:$@->getErrorMessage()") +; } $client_socket->shutdown(2); #$client_socket->close(); print "Client exit from ".$user{peer_address}.":".$user{peer_port} +."\n"; # Client has exited so thread should exit too threads->exit(); } # Start the Main loop Main();
    Thanks again for your words.

      "besides , the code you provided is not working.". I guess you missed where i mentioned "some untested cut&paste code for you to look at. "

      This is a proxie for your client, you may run as many of these as you like. ive had dozens running at once on multiple computers.

      #!/usr/bin/perl use strict; use warnings; use IO::Socket::INET; # proxy for client use Getopt::Long; my $toip ='127.0.0.1'; my $toport =1001; my @optdef=("toip=s" => \$toip ,"toport=s" => \$toport ); GetOptions ( @optdef ) or die("Error in command line arguments\n"); my $n=0; while (1) { if (my $toserver=IO::Socket::INET->new(Proto=> "tcp",PeerAddr => +$toip.':'.$toport)) { my $local_port = $toserver->sockport(); print " connect $local_port\n"; while (my $in=<$toserver>){ chomp $in; next unless (length($in)); unless (int(rand(20))) {print "dont reply close\n"; last;} # unless (int(rand(20))) {print "dont reply wait\n"; next;} print $toserver $$,' ',$local_port,' ',$in,' ',$n++,"\n"; } close $toserver; my $sleep=int(rand(20)); print "sleep $sleep\n"; sleep $sleep; } # got }
      This is the server that cleans up after itself, does not spawn a new thread for each client every 5 min, and handles all three datapoints. watch it run for a while and figure out what is happening.
      #!/usr/bin/perl use strict; use warnings; use IO::Socket::INET; use threads; #use Net::MQTT::Simple; use POSIX qw(strftime); #use Sys::Syslog qw(:DEFAULT :standard :macros); use IO::Select; my $mqtt; my $client; my $ident='tcp-mqtt_server'; my $logopt='ndelay'; my $facility='LOG_USER'; my $th_id; my $topic; my $value; my $holdtime=20; my $timeout=5; my @wants=qw/TEMP VBAT VLIN/; sub Main { my $sel_read = IO::Select->new(); my $conn=0; # $mqtt = Net::MQTT::Simple->new("localhost:1883"); # openlog($ident, $logopt, $facility); # don't forget this # flush after every write $| = 1; my ( $socket, $client_socket ); # Bind to listening address and port $socket = new IO::Socket::INET ( LocalHost => '0.0.0.0', LocalPort => '1001', Proto => 'tcp', Listen => 5, Reuse => 1 ) or die "Could not open socket: ".$!."\n"; $sel_read-> add($socket); print "SERVER Waiting for client connections...\n"; # syslog(LOG_INFO,"tcp-mqtt server starting"); my %clients; superloop:while(1) { my ($rd_ptr,$wr_ptr,$er_ptr)=IO::Select->select($sel_read,unde +f,undef,$timeout); print "superloop clients ",scalar((keys(%clients))),"\n"; for my $client_socket( sort { $clients{$a}{topen} <=> $clients +{$b}{topen}} keys(%clients) ) { my $pid=$clients{$client_socket}{peer_address}.':'.$client +s{$client_socket}{peer_port}; print ' ',$pid,' ',$clients{$client_socket}{copen},"\n"; } for my $fh (@$rd_ptr) { if ($fh == $socket) { $conn++; # Waiting for new client connection. $client_socket = $socket->accept(); # Push new client connection to it's own thread $clients {$client_socket}={ socket =>$client_socket ,thread =>threads->create( \&clientHan +dler, $client_socket ) ,peer_address =>$client_socket->peerhost() ,peer_port =>$client_socket->peerport() ,local_port =>$client_socket->sockport() ,topen =>time }; $clients{$client_socket}{copen}=strftime "%a %b %e %H: +%M:%S %Y", localtime $clients{$client_socket}{topen}; my $started=$clients{$client_socket}{thread}; my $nclients=scalar((keys(%clients))); my $pid=$clients{$client_socket}{peer_address}.':'.$cl +ients{$client_socket}{peer_port}; print "Connection $conn $pid clients $nclients\n" } # socket } # fh my @opened=keys(%clients); my @eofable=(); for my $open ( @opened ) { my $thread=$clients{$open}{thread}; if( $thread->is_joinable() ) { $thread->join(); my $pid=$clients{$open}{peer_address}.':'.$clients{$op +en}{peer_port}; print "Joined $pid\n"; push @eofable,$open; } # joinable } #opened for my $open ( @eofable ) { # my $socket=$clients{$open}{socket}; # magic lost in strin +gified key # close($socket); my $pid=$clients{$open}{peer_address}.':'.$clients{$open}{p +eer_port}; delete $clients{$open}; # destroys socket and thread object +s; my $nclients=scalar((keys(%clients))); print "Cleaned $pid clients $nclients\n"; } # eofable } # superloop $socket->close(); return 1; } sub clientHandler { my ($client_socket) = @_; my %user = (); $user{peer_address} = $client_socket->peerhost(); $user{peer_port} = $client_socket->peerport(); $user{local_port} = $client_socket->sockport(); $th_id = threads->tid(); my $isopen=1; my $pid=$user{peer_address}.":".$user{peer_port}; print "Client ".$user{peer_address}.":".$user{peer_port}.":".$user +{local_port}."\n"; # syslog(LOG_INFO,"Client $user{peer_address}:$user{peer_port}:$ +user{local_port}:$th_id is connected"); eval{ while ($isopen) { my $datestring = strftime "%a %b %e %H:%M:%S %Y", localtim +e; print "$datestring\n"; print $client_socket "\n"; for my $want (@wants) { $isopen=0; print $client_socket $want,"\n"; while( my $buffer = <$client_socket> ) { $isopen=1; print $pid,' ',$buffer; my $value = substr $buffer, 0,-2; # $mqtt->retain("minimon/VLIN" => $buffer); # syslog(LOG_INFO,"VLIN: $buffer"); last; } last unless ($isopen); } # want sleep($holdtime) if ($isopen); } # isopen }; if($@){ print "tcp-mqtt exception:$@->getErrorMessage()"; # syslog(LOG_INFO,"tcp-mqtt exception:$@->getErrorMessage()" +); } $client_socket->shutdown(2); #$client_socket->close(); print "Client exit from ".$user{peer_address}.":".$user{peer_port} +."\n"; # Client has exited so thread should exit too threads->exit(); } # Start the Main loop Main();

      This method still suffers from some problems. One is in your choice of line ends.

      #perlipc: 
      #The Internet line terminator is "\015\012". 
      #Under ASCII variants of Unix, that could usually be written as "\r\n", 
      #but under other systems, "\r\n" might at times be "\015\015\012", 
      #        "\012\012\015", 
      # or something completely different. 
      # The standards specify writing "\015\012" to be conformant (be strict in what you provide), 
      # but they also recommend accepting a lone "\012" on input (but be lenient in what you require). 
      

      Another is the semantics of closeing both ends of a socket between computers is complicated and and result in the server end seeming to be open with an outstanding blocking read on it, while the other end is plain gone. This will result in a client haning out in the %clients table forever

Re^2: Improve my tcp-mqtt server
by leostereo (Beadle) on Jun 29, 2020 at 22:44 UTC
    Dear , im not following you 100% but thanks for you detailed explanation.
    btw , I tryed your example but does not work.
    Regards.

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: note [id://11118278]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others meditating upon the Monastery: (3)
As of 2020-08-09 11:35 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?
    Which rocket would you take to Mars?










    Results (54 votes). Check out past polls.

    Notices?