Beefy Boxes and Bandwidth Generously Provided by pair Networks
No such thing as a small change
 
PerlMonks  

Improve my tcp-mqtt server

by leostereo (Beadle)
on Jun 20, 2020 at 15:20 UTC ( #11118272=perlquestion: print w/replies, xml ) Need Help??

leostereo has asked for the wisdom of the Perl Monks concerning the following question:

Hi guys:
Some days ago I wrote a multithread tcp server.
It should accept incoming tcp connections in a while loop, then request some data and push to my mosquitto every 5 minutes.
It is working ok but after some ramdom time, 2 , 3 ,10 hours it stop working and can not find out why.
So , questions are:
How can I find out what is making my server exi ?
Where/how should I use eval block to handle the exception?
Any other advice would be wellcome, of course.
Thanks !!!
This is my code:
#!/usr/bin/perl use strict; use warnings; use IO::Socket::INET; use threads; use Net::MQTT::Simple; use POSIX qw(strftime); use Sys::Syslog qw(:DEFAULT :standard :macros); my $mqtt; my $ident='tcp-mqtt_server'; my $logopt='ndelay'; my $facility='LOG_USER'; my $th_id; print "parent $$\n"; sub Main { $mqtt = Net::MQTT::Simple->new("localhost:1883"); openlog($ident, $logopt, $facility); # don't forget this # flush after every write $| = 1; my ( $socket, $client_socket ); $socket = new IO::Socket::INET ( LocalHost => '0.0.0.0', LocalPort => '1001', Proto => 'tcp', Listen => 5, Reuse => 1 ) or die "Could not open socket: ".$!."\n"; print "SERVER Waiting for client connections...\n"; syslog(LOG_INFO,"tcp-mqtt server starting"); my @clients = (); while(1) { $client_socket = $socket->accept(); push ( @clients, threads->create( \&clientHandler, $client_soc +ket ) ); foreach ( @clients ) { if( $_->is_joinable() ) { $_->join(); } } } $socket->close(); return 1; } sub clientHandler { my ($client_socket) = @_; my %user = (); $user{peer_address} = $client_socket->peerhost(); $user{peer_port} = $client_socket->peerport(); $user{local_port} = $client_socket->sockport(); $th_id = threads->tid(); print "Client ".$user{peer_address}.":".$user{peer_port}.":".$user +{local_port}."\n"; syslog(LOG_INFO,"Client $user{peer_address}:$user{peer_port}:$u +ser{local_port}:$th_id is connected"); while(1){ my $datestring = strftime "%a %b %e %H:%M:%S %Y", localtime; printf("$datestring\n"); print $client_socket "TEMP\n"; while( my $buffer = <$client_socket> ) { print $buffer; $mqtt->retain("minimon/temp" => $buffer); syslog(LOG_INFO,"temperature: $buffer"); last; } print $client_socket "VBAT\n"; while( my $buffer = <$client_socket> ) { print $buffer; $mqtt->retain("minimon/vbat" => $buffer); last; } print $client_socket "VLIN\n"; while( my $buffer = <$client_socket> ) { print $buffer; $mqtt->retain("minimon/vlin" => $buffer); last; } sleep(300); } print "Client exit from ".$user{peer_address}.":".$user{peer_port} +."\n"; threads->exit(); } # Start the Main loop Main();
BTW , Im launching this doing:
nohup ./tcp-mqtt_server_multiclient.pl 2> nohup.out &
So I can check for errors on nohup.out.

Replies are listed 'Best First'.
Re: Improve my tcp-mqtt server
by huck (Parson) on Jun 20, 2020 at 18:19 UTC

    Ah yes, the perils of async processing

    The place that "blocks" is $client_socket = $socket->accept();

    That wont return until a new socket is ready. That is where i would focus my debug efforts. Put debug prints before and after that line. There are two other datums i would like to see, the absolute number of connections and the number of items in @clients. My guess is that you will see it happens after a certain number of inbound connections.

    Why is that blocking point important when you think you are mostly using threads? Because if it cannot get a new connection it will not run your "join-cleanup" section. And if a number of threads are joinable and they are not reaped strange things can happen. If the place it hangs is on the 64th/65th connection suspect this.

    The next thing i noticed was that nothing ever leaves @clients. If you dont remove the thread objects from that array they wont ever be DESTROYED and strange things will happen. Since the socket is localed to the thread the socket is never "closed" either. If this was the case again i would expect to see it happen on some number that is some power of 2. But this doesnt matter anyway.

    Please explain to me how you expect to reach the line

    print "Client exit from ".$user{peer_address}.":".$user{peer_port}."\n +";
    Once you realize it cannot be reached you also realize nothing is ever joinable. so even if $client_socket is no longer a operating socket that loop never leaves. Every 5 min it tries to read/write to the socket and just fails. Id expect the prints to throw a warning at least but it is possible they instead block because the socket is not writable.

    Even fixing those problems you are still stuck with the accept blocking and preventing any cleanup. fixing that is not so easy. You will find that to solve this problem most use IO::Select;

    Ive been playing with this since before IO:Select. some untested cut&paste code for you to look at.

    my $server = IO::Socket::INET->new( Proto => 'tcp', PeerAddr => "0.0.0.0", LocalPort => $main::PORT, Listen => SOMAXCONN, Reuse => 1); my $sel_read = IO::Select->new(); my $sel_write = IO::Select->new(); my $sel_error = IO::Select->new(); $sel_read-> add($server); my $timeout=50; my %clients; superloop:while(1) { my ($rd_ptr,$wr_ptr,$er_ptr)=IO::Select->select($sel_read,undef,$sel +_error,$timeout) ; # or die "Error in select: $! "; my $outtime=time; eachfh: for my $fh (@$rd_ptr) { if ($fh == $server) { my $client_socket = $server->accept; $sel_read->add($client_socket); $client_socket->autoflush(1); ## $clients {$client_socket}={ socket=>$client_socket ,thread=>threads->create( \&clientHandler, $client_so +cket ) }; ## } # fh = server ## my @opened=keys(%clients); my @eofable=(); foreach $open ( @opened ) { my $thread=$opened{$open}{thread}; if( $thread->is_joinable() ) { $thread->join(); push @eofable,$open; } } foreach $open ( @eofable ) { my $socket=$opened{$open}{socket}; # magic lost in string +ified key close($socket); $sel_read->remove($socket) if ($sel_read->exists($socket +)) ; $sel_write->remove($socket) if ($sel_write->exists($socke +t)); $sel_error->remove($socket) if ($sel_error->exists($socke +t)); delete $opened{$open}; # destroys socket and thread objec +ts; } ## } # superloop

      Dear huck.
      Currently I have only one connection.(I expect to have more in the future).
      I need to request device params and keep this connection for 5min (as polling time) and then release it.
      Device will reconnect inmediatly after closing tcp socket.
      This single connection is crashing my service.
      Im traying to apply all concepts you mentioned but it is really hard for me , besides , the code you provided is not working.
      Any way ; this my new version with my own eval blocks:
      #!/usr/bin/perl use strict; use warnings; use IO::Socket::INET; use threads; use Net::MQTT::Simple; use POSIX qw(strftime); use Sys::Syslog qw(:DEFAULT :standard :macros); my $mqtt; my $client; my $ident='tcp-mqtt_server'; my $logopt='ndelay'; my $facility='LOG_USER'; my $th_id; my $topic; my $value; sub Main { $mqtt = Net::MQTT::Simple->new("localhost:1883"); openlog($ident, $logopt, $facility); # don't forget this # flush after every write $| = 1; my ( $socket, $client_socket ); # Bind to listening address and port $socket = new IO::Socket::INET ( LocalHost => '0.0.0.0', LocalPort => '1001', Proto => 'tcp', Listen => 5, Reuse => 1 ) or die "Could not open socket: ".$!."\n"; print "SERVER Waiting for client connections...\n"; syslog(LOG_INFO,"tcp-mqtt server starting"); my @clients = (); while(1) { # Waiting for new client connection. $client_socket = $socket->accept(); # Push new client connection to it's own thread push ( @clients, threads->create( \&clientHandler, $client_soc +ket ) ); foreach ( @clients ) { if( $_->is_joinable() ) { $_->join(); } } } $socket->close(); return 1; } sub clientHandler { my ($client_socket) = @_; my %user = (); $user{peer_address} = $client_socket->peerhost(); $user{peer_port} = $client_socket->peerport(); $user{local_port} = $client_socket->sockport(); $th_id = threads->tid(); print "Client ".$user{peer_address}.":".$user{peer_port}.":".$user +{local_port}."\n"; syslog(LOG_INFO,"Client $user{peer_address}:$user{peer_port}:$u +ser{local_port}:$th_id is connected"); eval{ my $datestring = strftime "%a %b %e %H:%M:%S %Y", localtim +e; printf("$datestring\n"); print $client_socket "\n"; print $client_socket "VLIN\n"; while( my $buffer = <$client_socket> ) { print $buffer; my $value = substr $buffer, 0,-2; $mqtt->retain("minimon/VLIN" => $buffer); syslog(LOG_INFO,"VLIN: $buffer"); last; } sleep(300); }; if($@){ syslog(LOG_INFO,"tcp-mqtt exception:$@->getErrorMessage()") +; } $client_socket->shutdown(2); #$client_socket->close(); print "Client exit from ".$user{peer_address}.":".$user{peer_port} +."\n"; # Client has exited so thread should exit too threads->exit(); } # Start the Main loop Main();
      Thanks again for your words.

        "besides , the code you provided is not working.". I guess you missed where i mentioned "some untested cut&paste code for you to look at. "

        This is a proxie for your client, you may run as many of these as you like. ive had dozens running at once on multiple computers.

        #!/usr/bin/perl use strict; use warnings; use IO::Socket::INET; # proxy for client use Getopt::Long; my $toip ='127.0.0.1'; my $toport =1001; my @optdef=("toip=s" => \$toip ,"toport=s" => \$toport ); GetOptions ( @optdef ) or die("Error in command line arguments\n"); my $n=0; while (1) { if (my $toserver=IO::Socket::INET->new(Proto=> "tcp",PeerAddr => +$toip.':'.$toport)) { my $local_port = $toserver->sockport(); print " connect $local_port\n"; while (my $in=<$toserver>){ chomp $in; next unless (length($in)); unless (int(rand(20))) {print "dont reply close\n"; last;} # unless (int(rand(20))) {print "dont reply wait\n"; next;} print $toserver $$,' ',$local_port,' ',$in,' ',$n++,"\n"; } close $toserver; my $sleep=int(rand(20)); print "sleep $sleep\n"; sleep $sleep; } # got }
        This is the server that cleans up after itself, does not spawn a new thread for each client every 5 min, and handles all three datapoints. watch it run for a while and figure out what is happening.
        #!/usr/bin/perl use strict; use warnings; use IO::Socket::INET; use threads; #use Net::MQTT::Simple; use POSIX qw(strftime); #use Sys::Syslog qw(:DEFAULT :standard :macros); use IO::Select; my $mqtt; my $client; my $ident='tcp-mqtt_server'; my $logopt='ndelay'; my $facility='LOG_USER'; my $th_id; my $topic; my $value; my $holdtime=20; my $timeout=5; my @wants=qw/TEMP VBAT VLIN/; sub Main { my $sel_read = IO::Select->new(); my $conn=0; # $mqtt = Net::MQTT::Simple->new("localhost:1883"); # openlog($ident, $logopt, $facility); # don't forget this # flush after every write $| = 1; my ( $socket, $client_socket ); # Bind to listening address and port $socket = new IO::Socket::INET ( LocalHost => '0.0.0.0', LocalPort => '1001', Proto => 'tcp', Listen => 5, Reuse => 1 ) or die "Could not open socket: ".$!."\n"; $sel_read-> add($socket); print "SERVER Waiting for client connections...\n"; # syslog(LOG_INFO,"tcp-mqtt server starting"); my %clients; superloop:while(1) { my ($rd_ptr,$wr_ptr,$er_ptr)=IO::Select->select($sel_read,unde +f,undef,$timeout); print "superloop clients ",scalar((keys(%clients))),"\n"; for my $client_socket( sort { $clients{$a}{topen} <=> $clients +{$b}{topen}} keys(%clients) ) { my $pid=$clients{$client_socket}{peer_address}.':'.$client +s{$client_socket}{peer_port}; print ' ',$pid,' ',$clients{$client_socket}{copen},"\n"; } for my $fh (@$rd_ptr) { if ($fh == $socket) { $conn++; # Waiting for new client connection. $client_socket = $socket->accept(); # Push new client connection to it's own thread $clients {$client_socket}={ socket =>$client_socket ,thread =>threads->create( \&clientHan +dler, $client_socket ) ,peer_address =>$client_socket->peerhost() ,peer_port =>$client_socket->peerport() ,local_port =>$client_socket->sockport() ,topen =>time }; $clients{$client_socket}{copen}=strftime "%a %b %e %H: +%M:%S %Y", localtime $clients{$client_socket}{topen}; my $started=$clients{$client_socket}{thread}; my $nclients=scalar((keys(%clients))); my $pid=$clients{$client_socket}{peer_address}.':'.$cl +ients{$client_socket}{peer_port}; print "Connection $conn $pid clients $nclients\n" } # socket } # fh my @opened=keys(%clients); my @eofable=(); for my $open ( @opened ) { my $thread=$clients{$open}{thread}; if( $thread->is_joinable() ) { $thread->join(); my $pid=$clients{$open}{peer_address}.':'.$clients{$op +en}{peer_port}; print "Joined $pid\n"; push @eofable,$open; } # joinable } #opened for my $open ( @eofable ) { # my $socket=$clients{$open}{socket}; # magic lost in strin +gified key # close($socket); my $pid=$clients{$open}{peer_address}.':'.$clients{$open}{p +eer_port}; delete $clients{$open}; # destroys socket and thread object +s; my $nclients=scalar((keys(%clients))); print "Cleaned $pid clients $nclients\n"; } # eofable } # superloop $socket->close(); return 1; } sub clientHandler { my ($client_socket) = @_; my %user = (); $user{peer_address} = $client_socket->peerhost(); $user{peer_port} = $client_socket->peerport(); $user{local_port} = $client_socket->sockport(); $th_id = threads->tid(); my $isopen=1; my $pid=$user{peer_address}.":".$user{peer_port}; print "Client ".$user{peer_address}.":".$user{peer_port}.":".$user +{local_port}."\n"; # syslog(LOG_INFO,"Client $user{peer_address}:$user{peer_port}:$ +user{local_port}:$th_id is connected"); eval{ while ($isopen) { my $datestring = strftime "%a %b %e %H:%M:%S %Y", localtim +e; print "$datestring\n"; print $client_socket "\n"; for my $want (@wants) { $isopen=0; print $client_socket $want,"\n"; while( my $buffer = <$client_socket> ) { $isopen=1; print $pid,' ',$buffer; my $value = substr $buffer, 0,-2; # $mqtt->retain("minimon/VLIN" => $buffer); # syslog(LOG_INFO,"VLIN: $buffer"); last; } last unless ($isopen); } # want sleep($holdtime) if ($isopen); } # isopen }; if($@){ print "tcp-mqtt exception:$@->getErrorMessage()"; # syslog(LOG_INFO,"tcp-mqtt exception:$@->getErrorMessage()" +); } $client_socket->shutdown(2); #$client_socket->close(); print "Client exit from ".$user{peer_address}.":".$user{peer_port} +."\n"; # Client has exited so thread should exit too threads->exit(); } # Start the Main loop Main();

        This method still suffers from some problems. One is in your choice of line ends.

        #perlipc: 
        #The Internet line terminator is "\015\012". 
        #Under ASCII variants of Unix, that could usually be written as "\r\n", 
        #but under other systems, "\r\n" might at times be "\015\015\012", 
        #        "\012\012\015", 
        # or something completely different. 
        # The standards specify writing "\015\012" to be conformant (be strict in what you provide), 
        # but they also recommend accepting a lone "\012" on input (but be lenient in what you require). 
        

        Another is the semantics of closeing both ends of a socket between computers is complicated and and result in the server end seeming to be open with an outstanding blocking read on it, while the other end is plain gone. This will result in a client haning out in the %clients table forever

      Dear , im not following you 100% but thanks for you detailed explanation.
      btw , I tryed your example but does not work.
      Regards.
Re: Improve my tcp-mqtt server
by hippo (Chancellor) on Jun 22, 2020 at 09:36 UTC
    nohup ./tcp-mqtt_server_multiclient.pl  2> nohup.out &

    In addition to what perlfan said about this I would just point out that nohup sends both stdout and stderr to nohup.out by default so this clobbering is entirely unnecessary. Simply

    $ nohup ./tcp-mqtt_server_multiclient.pl &

    will work as you intend and without clobbering.

Re: Improve my tcp-mqtt server
by perlfan (Priest) on Jun 22, 2020 at 06:22 UTC
    nohup ./tcp-mqtt_server_multiclient.pl 2> nohup.out &

    Probably not your issue, but make sure by not clobering nohup.out:

    nohup ./tcp-mqtt_server_multiclient.pl 2> nohup.out.errs &
    You can checkout nohup.out for messages via STDOUT, still.

    You can skip nohup and catch the signal in the Perl code, then do something actually useful, like reread a config or reset the service, etc:

    $SIG{HUP} = sub { warn "never gonna give you up!" };

Log In?
Username:
Password:

What's my password?
Create A New User
Node Status?
node history
Node Type: perlquestion [id://11118272]
Approved by haukex
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others meditating upon the Monastery: (5)
As of 2020-08-09 12:24 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?
    Which rocket would you take to Mars?










    Results (54 votes). Check out past polls.

    Notices?