in reply to Re: Using Thread::Conveyor and Parallel::ForkManager in thread Using Thread::Conveyor and Parallel::ForkManager
Do you mean that I can use more "consumer" subroutine to process the same queue?
For example, if I have this definition, are you suggesting it would be a better solution:
my $archive_queue = Thread::Conveyor -> new (
{
maxboxes => 5000,
minboxes => 4000,
optimize => 'memory',
}
);
my $compress_log_thread_1 = new threads (\&compress_logs);
my $compress_log_thread_2 = new threads (\&compress_logs);
my $compress_log_thread_3 = new threads (\&compress_logs);
my $compress_log_thread_4 = new threads (\&compress_logs);
Thanks
Asgaroth
Re^3: Using Thread::Conveyor and Parallel::ForkManager
by BrowserUk (Patriarch) on Aug 09, 2004 at 09:42 UTC
|
I'm only saying that mixing threads and forks is, at best going to be messy and extremely difficult to debug; at worst will never work.
I cannot suggest a better solution to your problem without a clear description of the problem.
So far, I do not understand what you are trying to achieve by using Thread::Conveyor? The example code you have offered is nothing more than the code from the Synopsis of that module with a couple of constants changed to big numbers.
Personally I am not sure what problem Thread::Conveyor is designed to solve. As such I have never found a use the module. I'd also be wary of using it as (as far as I'm aware) it's author is no longer developing it.
I am slowly building a library of good uses for threads, and as such I am always willing to try and come up with working solutions to problems using them. In order to do this, it requires a description of the underlying problem devoid of pre-conceptions of the best way to solve it.
In this case, you appear to wanting to run the compression of several log files in parellel. My problem is, I do not see how Thread::Conveyor helps in solving this? I'm also not sure that there will be much in the way of performance advantage in running this type of process in parallel unless you have multiple cpus, but until I try it, I may be completely wrong on that.
Summary: Re-state the question without the preconseptions of how to solve it and I will willing take a crack at it:
Examine what is said, not who speaks.
"Efficiency is intelligent laziness." -David Dunham
"Think for yourself!" - Abigail
"Memory, processor, disk in that order on the hardware side. Algorithm, algorithm, algorithm on the code side." - tachyon
| [reply] |
|
Ok, here's the breakdown:
The system that this particular script is running on is a SUN StarFire which has 8 x 400MHz SOMBRA CPU Modules and 8GB RAM.
This system is constantly recieving database archive log files from several servers, currently we are recieving from 15 database servers. In peak times, a single server can send as many as 12 x 250MB archive logs per minute. Multiply this value by 15 and we have alot of archive logs on this box. The number of servers sending logs is projected to grow drasticly.
These logs need to be compressed and rsync'd off onto another box offsite (for recovery purposes).
What the scripts main objective is, is to scan a directory structure for archive logs recieved, to compress the archive logs, test the compressed archive log, then rsync it to the remote offsite server.
To run the scan of the directories and compressions in parallel, I figured using queue's would be a good option, as the scanning could be continous, and if anything new is recieved, just to pop it on the queue, then with a thread to process this queue, i could continually compress these logs until the queue is depleted. However, at the rate these archive logs are coming into the system, I need to compress more logs in parallel. Based on the number of CPU's in the system, I was hoping to open 4 simultaneous compressions of the logs, leaving the other 2 available for compression testing (2 threads here) and OS functionality.
I hope this explains it a little better. If you need more info, please dont hesitate to raise it here.
Thanks
Asgaroth
| [reply] |
|
Okay. Here a skeletal, completely untested, deviod-of-error-checking, attempt in 30 lines of code.
The main thread, sets up a Queue through which to communicate with the worker threads, starts 4 workers that sit blocked waiting for somethng to do. The main thread then falls into a loop monitoring the directory for log files and queueing the names of new ones for the workers to process.
As it's not clear from your spec. what criteria the process would end under, I've set up a couple of Sig handlers to do this. You'll need to adjust these to your requirements.
#! perl -slw
use strict;
use threads;
use Thread::Queue;
my $LOGDIR = "./logs";
my $WORKERS = 4;
## The basis of the communications
my $Qwork = new Thread::Queue;
## Minimal error handling for posting
sub compress_log {
require Compress::ZLib;
while( ( my $workitem = $Qwork->dequeue ) != 'DONE' ) {
open IN, '<', $workitem
or warn $! and next;
open OUT, '> :raw', "$workitem.gz"
or warn $! and next;
## Slurp the file, compress it and write it out.
print OUT Compress::Zlib::memGzip(
do{ $/ = \-s( IN ); <IN> }
);
close OUT;
close IN;
unlink $workitem;
}
}
## Set up a method of terminating the process cleanly.
## The main loop below will terminate on ^C/^Break (on win32)
## Set appropriate SIG vales for your OS / usage.
my $monitoring = 1;
$SIG{INT} = $SIG{QUIT} = sub{ $monitoring = 0; };
## start the compressor threads.
my @workers = map{ threads->new( \&compress_log ) } 1 .. $WORKERS;
## Monitor the directory for new .log files
my %seen; ## Track which files we have already queued.
while( $monitoring ) {
## Grab the latest logs (exluding those we've already queued)
my @newfiles = grep{ !defined $seen{ $_ }++ } glob( "$LOGDIR/*.log
+" );
## And queue them for compression
$Qwork->enqueue( @newfiles );
## sleep a while before looking again
sleep 10;
}
## You can wait for existing work items to be processed here?
## The timeout chosen appropriate to your requirements
my $timeout = 60;
sleep 1 while --$timeout and $Qwork->pending;
## Empty any remaining items from the work queue -- Do something with
+them?
$Qwork->dequeue while $Qwork->pending;
## Post 'Done' messages one per worker
$Qwork->enqueue( ('DONE') x $WORKERS );
## And wait for them to complete
$_->join for @workers;
## Bye.
exit;
Examine what is said, not who speaks.
"Efficiency is intelligent laziness." -David Dunham
"Think for yourself!" - Abigail
"Memory, processor, disk in that order on the hardware side. Algorithm, algorithm, algorithm on the code side." - tachyon
| [reply] [d/l] |
|
Re^3: Using Thread::Conveyor and Parallel::ForkManager
by perrin (Chancellor) on Aug 09, 2004 at 12:25 UTC
|
It seems like you're not understanding that Parallel::ForkManager does not use threads. It is process-based. Since Threads::Conveyor uses threads, the two don't mix. | [reply] |
|
|