http://qs321.pair.com?node_id=1229231

IPC::Msg as a fork safe queue. Couple caveats that make it less cool, the by-default maximum queue size seems to be very small. Configurable though by changing /proc/sys/kernel/msgmnb which is the max size in bytes of an individual queue up to 2^31 - 1 on my system at least. Check with  ipcs -l. You also have to delete the queue when finished. Or not if you want to use it later. You can also find the same queue in different scripts with ftok, not shown. You can also see queue sizes / number of messages with ipcs command, add flags to pull off different messages from the queue. You don't always have to do FIFO, you can add different kinds of messages to the same queue, and you can block or not block if you want to have multiple queues being worked at the same time.

For complex data structures you would need to serialize or pack / unpack which other cpan modules may already address and better. Message queues are already documented in perlipc, but a little sugar show show versatile it can be. Basically add or remove from the queue with any child process not worrying about blocking / synchronization issues, but you can accidentally destroy the queue before it is empty, or leave an empty or not-empty queue hanging around that you would have to delete with icprm command.

#!/usr/bin/env perl use strict; use warnings; use feature 'say'; use IPC::SysV qw/IPC_PRIVATE S_IRUSR S_IWUSR IPC_NOWAIT/; use IPC::Msg; $|++; use constant MSG_BYTES => 1024; my $q = IPC::Msg->new(IPC_PRIVATE,S_IRUSR|S_IWUSR); sub quit { $q->remove; } $SIG{INT} = \&quit; sub enqueue { my $msg = shift; my $msg_type = shift // 1; # Used for message selection, see msgsn +d(2) $q->snd($msg_type,$msg,IPC_NOWAIT) || die $!; } sub dequeue { my $type = shift // 1; # See msgop(2) my $msg; $q->rcv($msg,MSG_BYTES,$type,IPC_NOWAIT); #if ($!) { warn "$$ - $!" } return $msg // undef; } sub dequeue_block { my $msg; my $type = shift // 1; # See msgop(2) $q->rcv($msg,MSG_BYTES,$type); #if ($!) { warn "$$ - $!" } return $msg // undef; } sub isEmpty { my $stat = $q->stat; return $stat->qnum == 0; } sub isPrime { my $num = shift; return 1 if ($num < 4); return 0 if ($num %2 == 0); for (my $i=3; $i <= sqrt($num); $i+=2) { return 0 if $num % $i == 0; } return 1; } my $n_workers = shift // die ; enqueue($_) for (1..2_000_000); my $parent = $$; for (my $i=1; $i<$n_workers && $$ == $parent; $i++) { fork // die; } while (my $data = dequeue()) { if (isPrime($data)) { print "Prime: $data\n"; } } END { if ($$ == $parent) { 1 until wait == -1; quit; } }

Update 2019-02-04 - You can't wait on grandchildren, changed code to only fork from parent. Fixed spelling of empty.

Replies are listed 'Best First'.
Re: IPC::Msg Fork queue
by trippledubs (Deacon) on Feb 03, 2019 at 21:49 UTC

    Wow SysV is cool to learn. There is more to it than I thought though. Two things operating out of the same queue will not work without syncing. I *think* the code I wrote is okay since the queue is filled up and then processed, but SysV IPC must be some of the oldest code still in Linux. Of the three books I have that reference it, seems it was still shiny back in 1986, discouraged in 2005, and downright hated by everyone in 2012. First sentence in Programming Perl about System V IPC: "Everyone hates System V IPC."

    I didn't know enough about it to hate it yet. You'd think a queue implementation with fork would be relatively not hard to do. Totally new to me I thought I was breaking new ground about to dance in the graveyard of conventional wisdom -- not so much. You need something like IPC::Semaphore to control access, and then you have to clean that up as well, which presents problems because you want to delete the queue when it is empty, but delete the queue too soon, and it becomes quite difficult to lock. Delete it too late and it's just hanging around somewhere in memory forever. And it doesn't do chunking, nobody wants to dequeue just one item. Okay.. I'm starting to get it now -- at the end of the CPAN road is MCE or you could go cooperative multitasking with Coro.. gotta go but i wanted to post this now

    Update 2019-02-04 - Wait properly and destroy the queue from the parent after all children have been reaped

      Hi trippledubs,

      MCE can process a sequence of numbers efficiently. E.g. 1 to MAX_ITEMS. The bounds_only option tells MCE to pass the next starting and ending elements only. Hence boundaries only. IPC occurs once per each chunk_size.

      #!/usr/bin/env perl use strict; use warnings; use MCE; use constant MAX_ITEMS => 200000; my $n_workers = shift // do { die "usage: $0 n_workers\n"; }; sub isPrime { my $num = shift; return 1 if ($num < 4); return 0 if ($num %2 == 0); for (my $i=3; $i <= sqrt($num); $i+=2) { return 0 if $num % $i == 0; } return 1; } sub task { my ( $mce, $seq, $chunk_id ) = @_; for my $data ( $seq->[0] .. $seq->[1] ) { print "Prime: $data\n" if isPrime($data); } } MCE->new( max_workers => $n_workers, sequence => [ 1, MAX_ITEMS ], bounds_only => 1, chunk_size => 200, user_func => \&task, )->run();

      Regards, Mario

        Hi again,

        If needed, workers may write output orderly and efficiently via MCE::Relay simply by passing the init_relay option. The value to init_relay is not used in the demonstration. Just the block to MCE::relay which runs serially and orderly.

        #!/usr/bin/env perl use strict; use warnings; use MCE; use constant MAX_ITEMS => 200000; $| = 1; # necessary when workers output orderly my $n_workers = shift // do { die "usage: $0 n_workers\n"; }; sub isPrime { my $num = shift; return 1 if ($num < 4); return 0 if ($num %2 == 0); for (my $i=3; $i <= sqrt($num); $i+=2) { return 0 if $num % $i == 0; } return 1; } sub task { my ( $mce, $seq, $chunk_id ) = @_; my $output = ''; for my $data ( $seq->[0] .. $seq->[1] ) { $output .= "Prime: $data\n" if isPrime($data); } MCE::relay { print $output }; } MCE->new( max_workers => $n_workers, sequence => [ 1, MAX_ITEMS ], bounds_only => 1, chunk_size => 200, user_func => \&task, init_relay => 1, )->run();

        Regards, Mario

        Love it! What I wanted to get across is multiple workers operating, enqueing and dequeing, inside same queue. Like Thread::Queue or MCE::Queue. Checking if a number is prime, probably not the best example, but easy to understand. That is not much different than what Parallel:ForkManager can do. Re: Sum to 100 at Rosetta Code is the better example. If you can get your problem into a queue, and keep possible solutions inside the queue, then that design can be made to be concurrent. The best example would probably be an Evolutionary Algorithm or something like that. Queues are awesome anyways, fun to say, rhyme with a lot of other words, lot of vowels.. Not communist and unfair like a stack.

        They are primitives that can be used as building blocks, observable, core perl, been around forever, reentrant, configurable, and persistent. Seems to scale well and does priorities too. And with priorities comes the ability to order the output and sort. API stinks, but documented well. From what I read just a linked list inside the kernel. But it doesn't do chunking for you like MCE, well it doesn't do a lot of things MCE does for you, which as you illustrate is a big bottleneck.