#!/usr/bin/perl use warnings; use strict; use IO::Handle; use IO::Select; use IPC::Open3; BEGIN { eval "sub WSAEINVAL () { 10022 }"; eval "sub WSAEWOULDBLOCK () { 10035 }"; eval "sub WSAEINPROGRESS () { 10036 }"; }; BEGIN { # cribbed from AnyEvent::Util use Socket; sub _win32_socketpair { # perl's socketpair emulation fails on many vista machines, because # vista returns fantasy port numbers. for (1..10) { socket my $l, &Socket::AF_INET, &Socket::SOCK_STREAM, 0 or next; bind $l, Socket::pack_sockaddr_in 0, "\x7f\x00\x00\x01" or next; my $sa = getsockname $l or next; listen $l, 1 or next; socket my $r, &Socket::AF_INET, &Socket::SOCK_STREAM, 0 or next; bind $r, Socket::pack_sockaddr_in 0, "\x7f\x00\x00\x01" or next; connect $r, $sa or next; accept my $w, $l or next; # vista has completely broken peername/sockname that return # fantasy ports. this combo seems to work, though. # (Socket::unpack_sockaddr_in getpeername $r)[0] == (Socket::unpack_sockaddr_in getsockname $w)[0] or (($! = WSAEINVAL), next); # vista example (you can't make this shit up...): #(Socket::unpack_sockaddr_in getsockname $r)[0] == 53364 #(Socket::unpack_sockaddr_in getpeername $r)[0] == 53363 #(Socket::unpack_sockaddr_in getsockname $w)[0] == 53363 #(Socket::unpack_sockaddr_in getpeername $w)[0] == 53365 return ($r, $w); }; () }; *IPC::Open3::xpipe = sub { use Socket qw(AF_UNIX SOCK_STREAM PF_UNSPEC); use IO::Handle; #socketpair $_[0], $_[1], AF_UNIX, SOCK_STREAM, PF_UNSPEC # or die "socketpair: $!"; (*{$_[0]},*{$_[1]}) = _win32_socketpair(); # or die "socketpair: $!"; # stop reading on the write handle: shutdown $_[1], 0 or die "shutdown: $!"; # stop writing on the read handle: shutdown $_[0], 1 or die "shutdown: $!"; }; }; my $expected = 100; #my $cat_self = q{"%s" -ple "$|++;sleep(rand(0));$_=qq{%s_$_};END{sleep 5}" %s}; #my $cat_self = q{"%s" -ple "$|++;sleep(rand(0));$_=qq{%s_$_};" %s}; my $proc = q{"%s" -le "$|=1;$n=shift;for(shift..shift){sleep(rand(3));print+qq($n $_)};" %s %s %s}; my @procs = ( sprintf( $proc, $^X, 'child_1', 1,$expected), sprintf( $proc, $^X, 'child_2', 1,$expected), sprintf( $proc, $^X, 'child_3', 1,$expected), ); my @kill_pids; END { print "Cleaning up children @kill_pids\n"; kill 9 => @kill_pids }; my %children; my $select = IO::Select->new; for my $child (@procs) { my $child_err = IO::Handle->new(); my $pid= open3 my $child_in, my $child_out, $child_err, $child or die "Launching $child: $!"; push @kill_pids,$pid; $children{ $child_out } = $pid; print "[$pid] $child launched\n"; $select->add($child_out); }; print "Launched children, waiting for things to become readable\n"; print "Expecting $expected lines\n", ; my %buffer; my %received; $SIG{CHLD} = sub { warn "Child: " . wait }; $SIG{PIPE} = sub { warn "Child: $_" }; warn $select->count() . " children to read from."; while ($select->count) { my @ready = $select->can_read(); #warn "Got " . scalar(@ready) . " handles ready.\n"; for my $fh (@ready) { if (not exists $buffer{$fh}) { $buffer{$fh} = ""; }; my $bytesread = sysread($fh,$buffer{$fh},1024,length($buffer{$fh})); if ($bytesread == 0) { use POSIX ':sys_wait_h'; if ((my $state = waitpid($children{ $fh }, WNOHANG)) <= 0) { print "\n$children{ $fh } is done\n"; } else { #print "\n$children{ $fh } is still alive :(\n"; } $select->remove($fh); } elsif (! defined $bytesread) { #print "\nError on reading from $fh: $! / $^E\n"; $select->remove($fh); } elsif (! $bytesread) { print "The lights are on but nobody's home for $fh\n"; } elsif ($buffer{$fh} =~ /\n$/) { $received{$fh}++ for split /\n/, $buffer{$fh}; #print $buffer{$fh}; $buffer{$fh} = ""; } }; for (sort keys %received) { print "[$children{ $_ }]\t$received{$_} lines\t"; } print "\n"; }; print "Waitpid-dding for children\n"; for (@kill_pids) { waitpid $_,0 }; for (sort keys %received) { print "$children{$_}\t$received{$_}\n"; } print "Done.";