Beefy Boxes and Bandwidth Generously Provided by pair Networks
We don't bite newbies here... much
 
PerlMonks  

What is the correct way to use Thread::Queue::Any ?

by fx (Pilgrim)
on Jan 07, 2004 at 16:09 UTC ( [id://319494] : perlquestion . print w/replies, xml ) Need Help??

fx has asked for the wisdom of the Perl Monks concerning the following question:

I am trying to use Thread::Queue::Any 0.07 by liz but need some help.

The following code works fine:

#!/opt/perl-5.8.2/bin/perl -w use strict; use Thread::Queue::Any; my $value = 1234; my $queue = Thread::Queue::Any->new; $queue->enqueue( $value ); my ($returned_value) = $queue->dequeue; printf "We have returned %i\n", $returned_value;

and prints We have returned 1234 as expected.

The power of a queue, however, comes into its own when used with threads as it allows data to be passed around. It is here that I am having problems.

The Thread::Queue::Any documentation describes the usage of each object method but doesn't give an example using threads.

The following code was my first attempt to get Thread::Queue::Any working with threads:

#!/opt/perl-5.8.2/bin/perl -w use strict; use threads; use Thread::Queue::Any; my $queue = Thread::Queue::Any->new; my $prod_thread = new threads(\&producer); my $cons_thread = new threads(\&consumer); $prod_thread->join; $cons_thread->join; sub producer { my $value = 1234; $queue->enqueue( $value ); $queue->enqueue( undef ); } sub consumer { while( my($returned_value) = $queue->dequeue ) { printf "I have been returned: %i\n", $returned_value; } }

but running it produces:

I have been returned: 1234 Use of uninitialized value in printf at ./2.pl line 23. I have been returned: 0

and then it just sits there doing nothing.

The Perl 5.8.0 thread tutorial uses the idiom of enqueuing an undef to a queue so that the while loop handling dequeuing exits when the undef is dequeued. This does not seem to work with Thread::Queue::Any. Should it?

In an example given in the above tutorial, the queue is passed to the thread as a parameter for the sub. I thought I might need something like that with Thread::Queue::Any so I tried the next piece of code:

#!/opt/perl-5.8.2/bin/perl -w use strict; use threads; use Thread::Queue::Any; my $queue = Thread::Queue::Any->new; my $prod_thread = new threads(\&producer, $queue); my $cons_thread = new threads(\&consumer, $queue); $prod_thread->join; $cons_thread->join; sub producer { my $queue = $_[0]; my $value = 1234; $queue->enqueue( $value ); $queue->enqueue( undef ); } sub consumer { my $queue = $_[0]; while( my($returned_value) = $queue->dequeue ) { printf "I have been returned: %i\n", $returned_value; } }

but this too produced:

I have been returned: 1234 Use of uninitialized value in printf at ./3.pl line 25. I have been returned: 0

and then just sat there too.

And so we reach my question - how should I really be using Thread::Queue::Any across threads?

== fx, Infinity Is Colourless

Replies are listed 'Best First'.
Re: What is the correct way to use Thread::Queue::Any ?
by liz (Monsignor) on Jan 07, 2004 at 16:35 UTC
    sub consumer { while( my($returned_value) = $queue->dequeue ) { printf "I have been returned: %i\n", $returned_value; } }

    I think you're being fooled by printf(). The "%i" turns an undef value into 0.

    Liz

      I thought that enqueuing an undef would have stopped that while loop and hence didn't much care about how it would interact with printf.

      Is the behaviour of enqueuing (or perhaps more specifically dequeuing) undef's under Thread::Queue::Any different than under Thread::Queue for some reason that I'm not thinking of?

      Many thanks.

        while( my($returned_value) = $queue->dequeue ) {

        You forget that $queue->dequeue always returns an array with at least 1 element. So the condition being checked is always 1 or higher. Something like:

        while( my ($returned_value) = $queue->dequeue ) { last unless defined $returned_value;
        should do the trick, I think.

        Liz

        I thought that enqueuing an undef would have stopped that while loop
        Nope, that's a list assignment in scalar context, which evaluates to the number of items on the right of the assignment.
Re: What is the correct way to use Thread::Queue::Any ?
by Molt (Chaplain) on Jan 07, 2004 at 16:15 UTC
    Try Thread::Conveyor.. it's the module that replaced Thread::Queue::Any. I've not yet tried to actually use this module, that's tomorrow's goal (Well timed question!), but you may as well at least try the later version and it's docs.
      Further to this, I've just added the following to the documentation of Thread::Conveyor and uploaded it to CPAN:


      Why would you use Thread::Conveyor over Thread::Queue::Any? Well, Thread::Conveyor has the following extra features:
      It works with Perl 5.8.0
      Shared arrays leak memory very badly in Perl 5.8.0. Therefore, you cannot really use Thread::Queue in Perl 5.8.0, and consequently cannot use Thread::Queue::Any in any type of production environment.

      It provides throttling
      A thread that enqueues very many values quickly, can cause a large amount of memory to be used. With throttling, any thread that enqueues will have to wait until there is "room" on the belt again before continuing. See methods "minboxes" and "maxboxes".

      You can check for a new value without removing it from the belt
      Sometimes it can be nice to check whether there is a new value on the belt without actually removing it from the belt. See the "peek" and "peek_dontwait" methods.

      You can reset the entire belt
      Sometimes you want to be able to reset the contents of the belt. See the "clean" and "clean_dontwait" methods for that.

      You can get everything from the belt in one go
      Sometimes you want everything that's on the belt in one go. That can also ba accomplished with the "clean" and "clean_dontwait" methods.

      Liz

Re: What is the correct way to use Thread::Queue::Any ?
by BrowserUk (Patriarch) on Jan 07, 2004 at 16:37 UTC

    Remove the brackets from the my statement in the while condition.

    while( my $returned_value = $queue->dequeue ) { # ^ ^

    The code will then behave as you expect.

    Update: Sorry! Wrong answer. I hadn't reinstalled Thread::Queue::Any since ungrading and tried it with Thread::Queue instead.

    P:\test>type 319494.pl8 use strict; use threads; use Thread::Queue; my $queue = Thread::Queue->new; my $prod_thread = new threads(\&producer, $queue); my $cons_thread = new threads(\&consumer, $queue); $prod_thread->join; $cons_thread->join; sub producer { my $queue = $_[0]; my $value = 1234; $queue->enqueue( $value ); $queue->enqueue( undef ); } sub consumer { my $queue = $_[0]; while( my $returned_value = $queue->dequeue ) { printf "I have been returned: %i\n", $returned_value; } } P:\test>319494 I have been returned: 1234 P:\test>

    It worked! ...but not with Thread::Queued::Any :(


    Examine what is said, not who speaks.
    "Efficiency is intelligent laziness." -David Dunham
    "Think for yourself!" - Abigail
    Hooray!

      The reason for Thread::Queue::Any over Thread::Queue (if it matters) is that I need to be queuing references to data structures.

      From a previous post of mine it turns out that I cannot do what I need with Thread::Queue otherwise I'd be using it :)

      Hey ho, 'tis a shame :)

      Many thanks.

Re: What is the correct way to use Thread::Queue::Any ?
by fx (Pilgrim) on Jan 07, 2004 at 18:13 UTC

    Thanks to Molt and to liz - Thread::Conveyor seems to be working with the following piece of code:

    #!/opt/perl-5.8.2/bin/perl -w use strict; use threads; use Thread::Conveyor; my $queue = Thread::Conveyor->new; my $prod_thread = new threads(\&producer); my $cons_thread = new threads(\&consumer); $prod_thread->join; $cons_thread->join; sub producer { my $value = 1234; $queue->put( $value ); $queue->put( undef ); } sub consumer { while( my $returned_value = $queue->take ) { printf "I have been returned: %i\n", $returned_value; } }