Programming
multithreading perl queue
Updated Sat, 21 May 2022 14:48:55 GMT

Perl Queues and Threading


I'm trying to accomplish the following:

  1. Have a thread that reads data from a very large file say about 10GB and push them into the queue. (I do not wish for the queue to get very large either)

  2. While the buildQueue thread is pushing data to the queue at the same time have about 5 worker threads de-queue and process data.

I've made an attempt but my other threads are unreachable because of a continuous loop in my buildQueue thread.

My approach may be totally wrong. Thanks for any help, it's much appreciated.

Here's the code for buildQueue:

sub buildQueue {
    print "Enter a file name: ";
    my $dict_path = <STDIN>;
    chomp($dict_path);
    open DICT_FILE, $dict_path or die("Sorry, could not open file!");
    while (1) {
        if (<DICT_FILE>) {
            if ($queue->pending() < 100) {
                 my $query = <DICT_FILE>;
                 chomp($query);
                 $queue->enqueue($query);
                 my $count = $queue->pending();
                 print "Queue Size: $count Query: $query\n";
            }
        }
    }
}

And as I've expected when this thread gets executed nothing else after will be executed because this thread will not finish.

my $builder = new Thread(&buildQueue);

Since the builder thread will be running for a long time I never get to create worker threads.

Here's the entire code:

#!/usr/bin/perl -w
use strict;
use Thread;
use Thread::Queue;
my $queue = new Thread::Queue();
my @threads;
sub buildQueue {
    print "Enter a file name: ";
    my $dict_path = <STDIN>;
    chomp($dict_path);
    open dict_file, $dict_path or die("Sorry, could not open file!");
    while (1) {
        if (<dict_file>) {
            if ($queue->pending() < 100) {
                 my $query = <dict_file>;
                 chomp($query);
                 $queue->enqueue($query);
                 my $count = $queue->pending();
                 print "Queue Size: $count Query: $query\n";
            }
        }
    }
}
sub processor {
    my $query;
    while (1) {
        if ($query = $queue->dequeue) {
            print "$query\n";
        }
    }
}
my $builder = new Thread(&buildQueue);
push @threads, new Thread(&processor) for 1..5;



Solution

You'll need to mark when you want your threads to exit (via either joinor detach ). The fact that you have infinite loops with no last statements to break out of them is also a problem.

Edit: I also forgot a very important part! Each worker thread will block, waiting for another item to process off of the queue until they get an undef in the queue. Hence why we specifically enqueue undef once for each thread after the queue builder is done.

Try:

#!/usr/bin/perl -w
use strict;
use threads;
use Thread::Queue;
my $queue = new Thread::Queue();
our @threads; #Do you really need our instead of my?
sub buildQueue
{
    print "Enter a file name: ";
    my $dict_path = <STDIN>;
    chomp($dict_path);
    #Three-argument open, please!
    open my $dict_file, "<",$dict_path or die("Sorry, could not open file!");
    while(my $query=<$dict_file>)
    {
        chomp($query);
        while(1)
        {   #Wait to see if our queue has < 100 items...
            if ($queue->pending() < 100) 
            {
                $queue->enqueue($query);
                print "Queue Size: " . $queue->pending . "\n";
                last; #This breaks out of the infinite loop
            }
        }
    }
    close($dict_file);
    foreach(1..5)
    {
        $queue->enqueue(undef);
    }
}
sub processor 
{
    my $query;
    while ($query = $queue->dequeue) 
    {
        print "Thread " . threads->tid . " got $query\n";
    }
}
my $builder=threads->create(\&buildQueue);
push @threads,threads->create(\&process) for 1..5;
#Waiting for our threads to finish.
$builder->join;
foreach(@threads)
{
    $_->join;
}




Comments (1)

  • +1 – It seems that the problem was the deprecated Thread module I switched to the threads module instead and my code works as it should now. Thank you Jack Many for pointing me in the correct direction. — Feb 13, 2012 at 13:06  


External Links

External links referenced by this document: