Beefy Boxes and Bandwidth Generously Provided by pair Networks
XP is just a number
 
PerlMonks  

Net::Async::WebSocket::Server in a separate main loop

by Talkabout (Novice)
on Jan 12, 2023 at 17:20 UTC ( [id://11149554]=perlquestion: print w/replies, xml ) Need Help??

Talkabout has asked for the wisdom of the Perl Monks concerning the following question:

Hi

I am trying to achieve a fairly simple thing: I want to run a websocket server which is controlled by a main loop instead of the Async::Loop, like this:

my $sslSocket = IO::Socket::SSL->new( Listen => 5, LocalPort => XXX, Proto => 'tcp', SSL_startHandshake => 0, SSL_cert_file => 'XXX', SSL_key_file => 'XXX', ) or die "failed to listen: $!"; my $server = Net::Async::WebSocket::Server->new( on_client => sub { my ( undef, $client ) = @_; $client->configure( on_text_frame => sub { my ( $self, $frame ) = @_; $self->send_text_frame( $frame ); }, ); }, on_handshake => sub { my ( $self, $client, $hs, $continue ) = @_; $continue->( 1 ); } ); my $loop = IO::Async::Loop->new; $loop->add( $server ); $server->listen( handle => $sslSocket )->get;

In the main loop I am calling "$loop->loop_once" whenever the listening socket "$sslSocket" has data available, like this:

while(1){ // something before if (//data to read) { $loop->loop_once; } // something after }

Unfortunately the websocket connection is never established. None of the callbacks is called. I think I am getting the purpose of "loop_once" wrong but I don't know what the exact issue is.

I already tried to use $loop->run", then the connection can be established correctly. But this is not what I require. I need the main loop to take care of other tasks inbetween.

Thanks for help!

Replies are listed 'Best First'.
Re: Net::Async::WebSocket::Server in a separate main loop
by Corion (Patriarch) on Jan 12, 2023 at 17:57 UTC
    if (//data to read) { $loop->loop_once; }

    That feels weird - I think you should always call ->loop_once from within your other main loop.

    Alternatively, you could set up a timer callback to be called from within IO::Async:

    use IO::Async::Timer::Periodic; use IO::Async::Loop; my $loop = IO::Async::Loop->new; my $timer = IO::Async::Timer::Periodic->new( interval => 1, on_tick => sub { // something before // something after }, ); $timer->start; $loop->add( $timer ); $loop->run;

      That is exactly the problem, I cannot use "loop->run" as this blocks the complete application. I need to find a way to call the websocket server logic from within my main loop only using "loop_once", as this is not a blocking call.

      But it seems I have figured out what the problem is... The check for data on socket seems to "destroy" the async loop logic. I will need to find another way to get the main loop to hook into my logic so I can process websocket messages.

        The main loop should not be checking to see if the socket is ready to read because that's the job of the event system. In your hybrid design, the main loop should just ask the event system every 1ms or so whether it has anything to do.

        But, what is your main loop doing that can't be done in IO::Async?

        Since perl is single-threaded, if you want anything to be event-driven, it should all be event-driven. That means you really do want to use ->run, and find a way to take the "main loop" of your script and re-package it into event-style code.

        Most event systems have an "idle" event that fires whenever the application lacks events to process, and you can use that to continue doing segments of a larger bulk task. But, you have to keep the segments of that large task brief, or it will block the events. For instance, if you want your application to respond to a network message within 1ms, then the 'idle' event should not run for more than 1ms at a time.

      Ok, finally even the main loop concept with "loop_once" does not work, as the main loop is blocked too much. My only current solution is to create the websocket server via threads, which works quite well. I was concerned about memory, as threads are copying a lot of stuff, but as I will create only 1 I think I can live with it.

      Thanks anyways

        Will you, please, if you'll come to solution with threads, post it here too?

        P.S.: I was trying to figure out about the same scheme - I'm already have CGI processed by Mojolicious server, which take HTTP requests, send them through RabbitMQ to backend, listening for answer. But with websocket connections server needs to listen for messages coming through RabbitMQ from backend to send them to websocket, and it's without request from user. So I need Mojolicious to read RabbitMQ queue in a Mojo::IOLoop event loop system while serving CGI and websocket requests, and for now there are two good queue readers - AnyEvent::RabbitMQ (nonblocking, based on other than Mojolicious AnyEvent loop system) and Net::AMQP::RabbitMQ (blocking), so none of them fit to Mojolicious system. Solution not found so far.

Re: Net::Async::WebSocket::Server in a separate main loop
by cavac (Parson) on Jan 13, 2023 at 11:34 UTC

    For my websocket stuff, i usually use Protocol::WebSocket::Frame and implement my own event handling, including accepting new clients. I don't have a complete example, because my webserver spreads protocol handling over multiple modules and processes. But basically, first you listen to the ports you need:

    my @tcpsockets; foreach my $service (@{$config->{external_network}->{service}}) { print '** Service at port ', $service->{port}, ' does ', $serv +ice->{usessl} ? '' : 'NOT', " use SSL/TLS\n"; foreach my $ip (@{$service->{bind_adresses}->{ip}}) { my $tcp = IO::Socket::IP->new( LocalHost => $ip, LocalPort => $service->{port}, Listen => 1, ReuseAddr => 1, Proto => 'tcp', ) or croak("Failed to bind: " . $ERRNO); #binmode($tcp, ':bytes'); push @tcpsockets, $tcp; print " Listening on ", $ip, ":, ", $service->{port}, "/ +tcp\n"; } } my $select = IO::Select->new(@tcpsockets); $self->{select} = $select;

    Then the main loop checks for new connections in the main loop:

    sub run($self) { while(1) { while((my @connections = $self->{select}->can_read)) { foreach my $connection (@connections) { my $client = $connection->accept; #print "**** Connection from ", $client->peerhost(), " + \n"; if(defined($self->{debugip})) { my $peerhost = $client->peerhost(); if($peerhost ne $self->{debugip}) { $client->close; next; } } if($childcount >= $self->{config}->{max_childs}) { #print "Too many children already!\n"; $client->close; next; } my $childpid = fork(); if(!defined($childpid)) { #print "FORK FAILED!\n"; $client->close; next; } elsif($childpid == 0) { # Child $PROGRAM_NAME = $self->{ps_appname}; $self->handleClient($client); #print "Child PID $PID is done, exiting...\n"; $self->endprogram(); } else { # Parent $childcount++; next; } } } } print "run() loop finished.\n"; return; }

    handleClient() upgrades the connection to SSL as required (my frontend server supports http and https and also supports vhosts. It's a bit too complex to explain here, but basically it uses IO::Socket::SSL->start_SSL to upgrade to SSL where required, then uses Net::SSLeay callbacks to change the encryption keys/certificates once we know which vhost (and therefore which cert) is required.

    Then a connection gets established (via Unix Domain Sockets) to the backend, the backend does all the HTTP protocol handling stuff and passes the requests to submodules, yadayadayada. The relevant module here in my software is called "BaseWebSocket" which then gets overloaded to actually implement the webpages using the websockets. You can ignore a lot of the boilerplate stuff. Also ignore that the module uses PageCamel::Helpers::WSockFrame (this is just a slightly modified local version of an older version of Protocol::WebSocket::Frame).

    Basically, the first thing to do is to upgrade the standard HTTP connection to a websocket. The client send an "Upgrade" request with some headers, and we answer according to RFC6455:

    sub socketstart($self, $ua) { my $sysh = $self->{server}->{modules}->{$self->{systemsettings}}; my $upgrade = $ua->{headers}->{"Upgrade"}; my $seckey = $ua->{headers}->{"Sec-WebSocket-Key"}; my $protocol = $ua->{headers}->{"Sec-WebSocket-Protocol"}; my $version = $ua->{headers}->{"Sec-WebSocket-Version"}; if(!defined($upgrade) || !defined($seckey) || !defined($version)) +{ return (status => 400); # BAAAD Request! Sit! Stay! } ... $seckey .= "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; # RFC6455 GUID +for Websockets $seckey = encode_base64(sha1($seckey), ''); my $proto = 'base64'; if($settings{binaryMode}) { $proto = 'binary'; } ... my %result = (status => 101, Upgrade => "websocket", Connection => "Upgrade", "Sec-WebSocket-Accept" => $seckey, "Sec-WebSocket-Protocol" => $proto, ); return %result; }

    Then we are finally ready to handle websocket frames. Be aware that these are async operations. Incoming websocket frames might not be complete yet (which is why you might have to call sysread() and $frame->append() multiple times before you get a new message out). You syswrite function may also need to be called multiple times to completely transfer an outgoing frame (see implementation of webPrint at the bottom).

    sub sockethandler($self, $ua) { ... { local $INPUT_RECORD_SEPARATOR = undef; my $socketclosed = 0; $ua->{realsocket}->blocking(0); binmode($ua->{realsocket}, ':bytes'); my $starttime = time + 10; while(!$socketclosed) { my $workCount = 0; # Read data from websocket my $buf; eval { ## no critic (ErrorHandling::RequireCheckingReturnV +alueOfEval) local $SIG{ALRM} = sub{croak "alarm"}; alarm 0.5; my $status = sysread($ua->{realsocket}, $buf, $setting +s{chunk_size} * 2); if(!$ua->{realsocket}) { #if(0 && defined($status) && $status == 0) { if($self->{isDebugging}) { print STDERR "Websocket closed\n"; } $socketclosed = 1; last; } alarm 0; }; if(defined($buf) && length($buf)) { $frame->append($buf); $workCount++; } while (my $message = $frame->next_bytes) { $workCount++; my $realmsg; my $parseok = 0; eval { ## no critic (ErrorHandling::RequireCheckingRet +urnValueOfEval) $realmsg = decode_json($message); $parseok = 1; }; if(!$parseok || !defined($realmsg) || !defined($realms +g->{type})) { # Broken message next; } if($frame->opcode == 8) { print STDERR "Connection closed by Browser\n"; $socketclosed = 1; last; } if($realmsg->{type} eq 'PING') { $timeout = time + $settings{client_disconnect_time +out}; my %msg = ( type => 'PING', ); if(!$self->wsprint(\%msg)) { print STDERR "Write to socket failed, closing +connection!\n"; $socketclosed = 1; last; } next; } else { if(!$self->wshandlemessage($realmsg)) { $socketclosed = 1; last; } } } # This is OUTSIDE the $frame->next_bytes loop, because a c +lose event never returns a full frame # from WSockFrame if($frame->is_close) { print STDERR "CLOSE FRAME RECIEVED!\n"; $socketclosed = 1; if(!webPrint($ua->{realsocket}, $frame->new(buffer => +'data', type => 'close')->to_bytes)) { print STDERR "Write to socket failed, failed to pr +operly close connection!\n"; } } ... if(!$workCount) { sleep($self->{sleeptime}); } if($timeout < time) { print STDERR "CLIENT TIMEOUT\n"; $socketclosed = 1; } } } ... return 1; }

    wsprint() just creates a new output stream and asks the webPrint helper to send it over the connection:

    sub wsprint($self, $message) { ... my $frametype = 'text'; my $buffer = encode_json($message); my $framedata = $frame->new(buffer => $buffer, type => $frametype) +->to_bytes; if(!webPrint($ua->{realsocket}, $framedata)) { print STDERR "Write to socket failed, closing connection!\n"; return 0; } return 1; }

    webPrint does the actual heavy lifting when it comes to stuffing all the data into the outgoing TCP socket. It also deals with that pesky Unicode-to-UTF8 encoding thing we are also forced to deal with these days. And yes, if you write to a socket, you *might* have to deal with SIGPIPE as well when the outgoing buffer is temporarily full.

    sub webPrint($ofh, @parts) { my $brokenpipe = 0; local $SIG{PIPE} = sub { $brokenpipe = 1;}; local $INPUT_RECORD_SEPARATOR = undef; binmode($ofh, ':bytes'); my $full; foreach my $npart (@parts) { if(!defined($npart)) { #print STDERR "Empty npart in Webprint!\n"; next; } if(is_utf8($npart)) { $full .= encode_utf8($npart); } else { $full .= $npart; } } my $shownlimitmessage = 0; my $timeoutthres = 20; # Need to be able to send at least one byte + per 20 seconds # Output bandwidth-limited stuff, in as big chunks as possible if(!defined($full) || $full eq '') { return 1; } my $written = 0; my $timeout = time + $timeoutthres; $ERRNO = 0; my $needprintdone = 0; while(1) { eval { ## no critic (ErrorHandling::RequireCheckingReturnValue +OfEval) $written = syswrite($ofh, $full); }; if($EVAL_ERROR) { print STDERR "Write error: $EVAL_ERROR\n"; return 0; } if(!defined($written)) { $written = 0; } last if($written == length($full)); #print STDERR "Sent $written bytes (", length($full) - $writte +n, "remaining)\n"; if($!{EWOULDBLOCK} || $!{EAGAIN}) { ## no critic (Variables::P +rohibitPunctuationVars) if(!$shownlimitmessage) { print STDERR "Rate limiting output\n"; $shownlimitmessage = 1; } $timeout = time + $timeoutthres; if(!$written) { sleep(0.01); } } elsif(0 && $brokenpipe) { print STDERR "webPrint write failure: SIGPIPE\n"; return 0; } elsif($ofh->error || $ERRNO ne '') { print STDERR "webPrint write failure: $ERRNO / ", $ofh->op +ened, " / ", $ofh->error, "\n"; return 0; } if($written) { $timeout = time + $timeoutthres; $full = substr($full, $written); $written = 0; next; } if($timeout < time) { print STDERR "***** webPrint TIMEOUT ****** $ERRNO\n"; return 0; } sleep(0.01); $needprintdone = 1; } if($needprintdone) { print STDERR "Webprint Done\n"; } return 1; }

    Hope that (un-)clarifies stuff enough to make or make not sense in the big scheme of things.

    PerlMonks XP is useless? Not anymore: XPD - Do more with your PerlMonks XP
Re: Net::Async::WebSocket::Server in a separate main loop
by Talkabout (Novice) on Mar 11, 2023 at 21:12 UTC

    Hi all,

    thanks for the answers. I finally ended up in doing the following:

    Create socket:

    my $sslSocket = IO::Socket::SSL->new( Listen => 5, LocalPort => XXXX, Proto => 'tcp', SSL_server => 1, SSL_startHandshake => 1, SSL_cert_file => 'CRT', SSL_key_file => 'KEY', ReuseAddr => 1, Blocking => 0 );

    Create Websocket Server via Async Lib:

    my $server = Net::Async::WebSocket::Server->new( ... );

    Create Async Loop and add server to that loop:

    my $loop = IO::Async::Loop->new; $loop->add( $server ); $server->listen( handle => $sslSocket )->get;

    In the main loop watching the socket for new clients, connect clients, watch their socket and if data for reading is there:

    while($loop->loop_once(0.0001) > 0) { };

    The "loop_once" call basically acts as long as there is "something to do", but exists eventually. With that concept my requirement works pretty well.

    Bye

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: perlquestion [id://11149554]
Approved by LanX
Front-paged by Corion
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others examining the Monastery: (2)
As of 2024-04-25 05:39 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found