http://qs321.pair.com?node_id=11149562


in reply to Net::Async::WebSocket::Server in a separate main loop

For my websocket stuff, i usually use Protocol::WebSocket::Frame and implement my own event handling, including accepting new clients. I don't have a complete example, because my webserver spreads protocol handling over multiple modules and processes. But basically, first you listen to the ports you need:

my @tcpsockets; foreach my $service (@{$config->{external_network}->{service}}) { print '** Service at port ', $service->{port}, ' does ', $serv +ice->{usessl} ? '' : 'NOT', " use SSL/TLS\n"; foreach my $ip (@{$service->{bind_adresses}->{ip}}) { my $tcp = IO::Socket::IP->new( LocalHost => $ip, LocalPort => $service->{port}, Listen => 1, ReuseAddr => 1, Proto => 'tcp', ) or croak("Failed to bind: " . $ERRNO); #binmode($tcp, ':bytes'); push @tcpsockets, $tcp; print " Listening on ", $ip, ":, ", $service->{port}, "/ +tcp\n"; } } my $select = IO::Select->new(@tcpsockets); $self->{select} = $select;

Then the main loop checks for new connections in the main loop:

sub run($self) { while(1) { while((my @connections = $self->{select}->can_read)) { foreach my $connection (@connections) { my $client = $connection->accept; #print "**** Connection from ", $client->peerhost(), " + \n"; if(defined($self->{debugip})) { my $peerhost = $client->peerhost(); if($peerhost ne $self->{debugip}) { $client->close; next; } } if($childcount >= $self->{config}->{max_childs}) { #print "Too many children already!\n"; $client->close; next; } my $childpid = fork(); if(!defined($childpid)) { #print "FORK FAILED!\n"; $client->close; next; } elsif($childpid == 0) { # Child $PROGRAM_NAME = $self->{ps_appname}; $self->handleClient($client); #print "Child PID $PID is done, exiting...\n"; $self->endprogram(); } else { # Parent $childcount++; next; } } } } print "run() loop finished.\n"; return; }

handleClient() upgrades the connection to SSL as required (my frontend server supports http and https and also supports vhosts. It's a bit too complex to explain here, but basically it uses IO::Socket::SSL->start_SSL to upgrade to SSL where required, then uses Net::SSLeay callbacks to change the encryption keys/certificates once we know which vhost (and therefore which cert) is required.

... if($usessl) { my $defaultdomain = $self->{config}->{sslconfig}->{ssldefa +ultdomain}; my $encrypted; my $ok = 0; eval { $encrypted = IO::Socket::SSL->start_SSL($client, SSL_server => 1, SSL_key_file=> $self->{config}->{sslconfig}->{ssl +domains}->{$defaultdomain}->{sslkey}, SSL_cert_file=> $self->{config}->{sslconfig}->{ssl +domains}->{$defaultdomain}->{sslcert}, SSL_cipher_list => $self->{config}->{sslconfig}->{ +sslciphers}, SSL_create_ctx_callback => sub { my $ctx = shift; #print STDERR "******************* CREATING NE +W CONTEXT ********************\n"; # Enable workarounds for broken clients Net::SSLeay::CTX_set_options($ctx, &Net::SSLea +y::OP_ALL); # Disable session resumption completely Net::SSLeay::CTX_set_session_cache_mode($ctx, +$SSL_SESS_CACHE_OFF); # Disable session tickets Net::SSLeay::CTX_set_options($ctx, &Net::SSLea +y::OP_NO_TICKET); # Load certificate chain my $defaultdomain = $self->{config}->{sslconfi +g}->{ssldefaultdomain}; Net::SSLeay::CTX_use_certificate_chain_file($c +tx, $self->{config}->{sslconfig}->{ssldomains}->{$defaultdomain}->{ss +lcert}); # Check requested server name Net::SSLeay::CTX_set_tlsext_servername_callbac +k($ctx, sub { my $ssl = shift; my $h = Net::SSLeay::get_servername($ssl); if(!defined($h)) { #print STDERR "SSL: No Hostname given +during SSL setup\n"; return; } if(!defined($self->{config}->{sslconfig}-> +{ssldomains}->{$h})) { #print STDERR "SSL: Hostname $h not co +nfigured\n"; #print STDERR Dumper($self->{config}-> +{sslconfig}->{ssldomains}); return; } if(defined($self->{config}->{sslconfig}->{ +ssldomains}->{$h}->{internal_socket})) { # This SSL connection uses a different + backend $selectedbackend = $self->{config}->{s +slconfig}->{ssldomains}->{$h}->{internal_socket}; } if($h eq $self->{config}->{sslconfig}->{ss +ldefaultdomain}) { # Already the correct CTX setting, jus +t return return; } #print STDERR "§§§§§§§§§§§§§§§§§§§§§§§ R +equested Hostname: $h §§§\n"; my $newctx; if(defined($self->{config}->{sslconfig}->{ +ssldomains}->{$h}->{ctx})) { $newctx = $self->{config}->{sslconfig} +->{ssldomains}->{$h}->{ctx}; } else { $newctx = Net::SSLeay::CTX_new or croa +k("Can't create new SSL CTX"); Net::SSLeay::CTX_set_cipher_list($newc +tx, $self->{config}->{sslconfig}->{sslciphers}); Net::SSLeay::set_cert_and_key($newctx, + $self->{config}->{sslconfig}->{ssldomains}->{$h}->{sslcert}, $s +elf->{config}->{sslconfig}->{ssldomains}->{$h}->{sslkey}) or croak("Can't set cert and k +ey file"); Net::SSLeay::CTX_use_certificate_chain +_file($newctx, $self->{config}->{sslconfig}->{ssldomains}->{$h}->{ssl +cert}); #print STDERR "Cert: ", $self->{config +}->{sslconfig}->{ssldomains}->{$h}->{sslcert}, " Key: ", $self->{conf +ig}->{sslconfig}->{ssldomains}->{$h}->{sslkey}, "\n"; $self->{config}->{sslconfig}->{ssldoma +ins}->{$h}->{ctx} = $newctx; } Net::SSLeay::set_SSL_CTX($ssl, $newctx); }); # Prepared/tested for future ALPN needs (e. +g. HTTP/2) ## Advertise supported HTTP versions #Net::SSLeay::CTX_set_alpn_select_cb($ctx, ['h +ttp/1.1', 'http/2.0']); }, ); $ok = 1; }; if(!$ok) { print "EVAL ERROR: ", $EVAL_ERROR, "\n"; $self->endprogram(); } elsif(!$ok || !defined($encrypted) || !$encrypted) { print "startSSL failed: ", $SSL_ERROR, "\n"; $self->endprogram(); } } ...

Then a connection gets established (via Unix Domain Sockets) to the backend, the backend does all the HTTP protocol handling stuff and passes the requests to submodules, yadayadayada. The relevant module here in my software is called "BaseWebSocket" which then gets overloaded to actually implement the webpages using the websockets. You can ignore a lot of the boilerplate stuff. Also ignore that the module uses PageCamel::Helpers::WSockFrame (this is just a slightly modified local version of an older version of Protocol::WebSocket::Frame).

Basically, the first thing to do is to upgrade the standard HTTP connection to a websocket. The client send an "Upgrade" request with some headers, and we answer according to RFC6455:

sub socketstart($self, $ua) { my $sysh = $self->{server}->{modules}->{$self->{systemsettings}}; my $upgrade = $ua->{headers}->{"Upgrade"}; my $seckey = $ua->{headers}->{"Sec-WebSocket-Key"}; my $protocol = $ua->{headers}->{"Sec-WebSocket-Protocol"}; my $version = $ua->{headers}->{"Sec-WebSocket-Version"}; if(!defined($upgrade) || !defined($seckey) || !defined($version)) +{ return (status => 400); # BAAAD Request! Sit! Stay! } ... $seckey .= "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; # RFC6455 GUID +for Websockets $seckey = encode_base64(sha1($seckey), ''); my $proto = 'base64'; if($settings{binaryMode}) { $proto = 'binary'; } ... my %result = (status => 101, Upgrade => "websocket", Connection => "Upgrade", "Sec-WebSocket-Accept" => $seckey, "Sec-WebSocket-Protocol" => $proto, ); return %result; }

Then we are finally ready to handle websocket frames. Be aware that these are async operations. Incoming websocket frames might not be complete yet (which is why you might have to call sysread() and $frame->append() multiple times before you get a new message out). You syswrite function may also need to be called multiple times to completely transfer an outgoing frame (see implementation of webPrint at the bottom).

sub sockethandler($self, $ua) { ... { local $INPUT_RECORD_SEPARATOR = undef; my $socketclosed = 0; $ua->{realsocket}->blocking(0); binmode($ua->{realsocket}, ':bytes'); my $starttime = time + 10; while(!$socketclosed) { my $workCount = 0; # Read data from websocket my $buf; eval { ## no critic (ErrorHandling::RequireCheckingReturnV +alueOfEval) local $SIG{ALRM} = sub{croak "alarm"}; alarm 0.5; my $status = sysread($ua->{realsocket}, $buf, $setting +s{chunk_size} * 2); if(!$ua->{realsocket}) { #if(0 && defined($status) && $status == 0) { if($self->{isDebugging}) { print STDERR "Websocket closed\n"; } $socketclosed = 1; last; } alarm 0; }; if(defined($buf) && length($buf)) { $frame->append($buf); $workCount++; } while (my $message = $frame->next_bytes) { $workCount++; my $realmsg; my $parseok = 0; eval { ## no critic (ErrorHandling::RequireCheckingRet +urnValueOfEval) $realmsg = decode_json($message); $parseok = 1; }; if(!$parseok || !defined($realmsg) || !defined($realms +g->{type})) { # Broken message next; } if($frame->opcode == 8) { print STDERR "Connection closed by Browser\n"; $socketclosed = 1; last; } if($realmsg->{type} eq 'PING') { $timeout = time + $settings{client_disconnect_time +out}; my %msg = ( type => 'PING', ); if(!$self->wsprint(\%msg)) { print STDERR "Write to socket failed, closing +connection!\n"; $socketclosed = 1; last; } next; } else { if(!$self->wshandlemessage($realmsg)) { $socketclosed = 1; last; } } } # This is OUTSIDE the $frame->next_bytes loop, because a c +lose event never returns a full frame # from WSockFrame if($frame->is_close) { print STDERR "CLOSE FRAME RECIEVED!\n"; $socketclosed = 1; if(!webPrint($ua->{realsocket}, $frame->new(buffer => +'data', type => 'close')->to_bytes)) { print STDERR "Write to socket failed, failed to pr +operly close connection!\n"; } } ... if(!$workCount) { sleep($self->{sleeptime}); } if($timeout < time) { print STDERR "CLIENT TIMEOUT\n"; $socketclosed = 1; } } } ... return 1; }

wsprint() just creates a new output stream and asks the webPrint helper to send it over the connection:

sub wsprint($self, $message) { ... my $frametype = 'text'; my $buffer = encode_json($message); my $framedata = $frame->new(buffer => $buffer, type => $frametype) +->to_bytes; if(!webPrint($ua->{realsocket}, $framedata)) { print STDERR "Write to socket failed, closing connection!\n"; return 0; } return 1; }

webPrint does the actual heavy lifting when it comes to stuffing all the data into the outgoing TCP socket. It also deals with that pesky Unicode-to-UTF8 encoding thing we are also forced to deal with these days. And yes, if you write to a socket, you *might* have to deal with SIGPIPE as well when the outgoing buffer is temporarily full.

sub webPrint($ofh, @parts) { my $brokenpipe = 0; local $SIG{PIPE} = sub { $brokenpipe = 1;}; local $INPUT_RECORD_SEPARATOR = undef; binmode($ofh, ':bytes'); my $full; foreach my $npart (@parts) { if(!defined($npart)) { #print STDERR "Empty npart in Webprint!\n"; next; } if(is_utf8($npart)) { $full .= encode_utf8($npart); } else { $full .= $npart; } } my $shownlimitmessage = 0; my $timeoutthres = 20; # Need to be able to send at least one byte + per 20 seconds # Output bandwidth-limited stuff, in as big chunks as possible if(!defined($full) || $full eq '') { return 1; } my $written = 0; my $timeout = time + $timeoutthres; $ERRNO = 0; my $needprintdone = 0; while(1) { eval { ## no critic (ErrorHandling::RequireCheckingReturnValue +OfEval) $written = syswrite($ofh, $full); }; if($EVAL_ERROR) { print STDERR "Write error: $EVAL_ERROR\n"; return 0; } if(!defined($written)) { $written = 0; } last if($written == length($full)); #print STDERR "Sent $written bytes (", length($full) - $writte +n, "remaining)\n"; if($!{EWOULDBLOCK} || $!{EAGAIN}) { ## no critic (Variables::P +rohibitPunctuationVars) if(!$shownlimitmessage) { print STDERR "Rate limiting output\n"; $shownlimitmessage = 1; } $timeout = time + $timeoutthres; if(!$written) { sleep(0.01); } } elsif(0 && $brokenpipe) { print STDERR "webPrint write failure: SIGPIPE\n"; return 0; } elsif($ofh->error || $ERRNO ne '') { print STDERR "webPrint write failure: $ERRNO / ", $ofh->op +ened, " / ", $ofh->error, "\n"; return 0; } if($written) { $timeout = time + $timeoutthres; $full = substr($full, $written); $written = 0; next; } if($timeout < time) { print STDERR "***** webPrint TIMEOUT ****** $ERRNO\n"; return 0; } sleep(0.01); $needprintdone = 1; } if($needprintdone) { print STDERR "Webprint Done\n"; } return 1; }

Hope that (un-)clarifies stuff enough to make or make not sense in the big scheme of things.

PerlMonks XP is useless? Not anymore: XPD - Do more with your PerlMonks XP