Beefy Boxes and Bandwidth Generously Provided by pair Networks
Problems? Is your data what you think it is?
 
PerlMonks  

IPC::Open, Parallel::ForkManager, or Sockets::IO for parallelizing?

by mldvx4 (Friar)
on Sep 04, 2023 at 09:55 UTC ( [id://11154241] : perlquestion . print w/replies, xml ) Need Help??

mldvx4 has asked for the wisdom of the Perl Monks concerning the following question:

Greetings,

I have a task where I fetch RSS and Atom feeds from different sites. Each URL is querying a different host and there are pushing a thousand hosts, so doing this in parallel would greatly speed up the end result without troubling the remote servers. Going one site at a time, the best way seems to be to have a subroutine to use LWP to deal with fetching each feed, and XML::Feed to process each successful response.

Which direction should I be looking for an efficient way of running a dozen or two such subroutines running concurrently? I want to be able to limit the number of concurrent queries to less than two dozen since larger numbers seem to trigger some kind of outgoing throttling from my ISP. Should I have the main script launch LWP scripts and communicate using IPC or Sockets? Or should I try something like Parallel::ForkManager or similar? Or something else entirely?

Thanks for any tips or advice.

  • Comment on IPC::Open, Parallel::ForkManager, or Sockets::IO for parallelizing?

Replies are listed 'Best First'.
Re: IPC::Open, Parallel::ForkManager, or Sockets::IO for parallelizing?
by Corion (Patriarch) on Sep 04, 2023 at 10:02 UTC
Re: IPC::Open, Parallel::ForkManager, or Sockets::IO for parallelizing?
by hippo (Bishop) on Sep 04, 2023 at 10:03 UTC

    What's easiest will depend on your existing code but it could well be that LWP::Parallel could just slot in nicely. Fetch in batches of "a dozen or two" in parallel, then process the results before moving on to the next batch. It won't be the absolute most efficient but it will get you from serial to parallel for what I expect to be the slow part of the process with minimal fuss.


    🦛

      Thanks. I took a closer look at LWP::Parallel and it may come in handy later¹. From the documentation, I see how to fetch batches of links and wonder if there is any way to parallelize the processing of the results in the same move. That way fetch+process runs continuously, rather than fetch in parallel wait and then process in parallel.

      I'll be digging into the options mentioned in the other threads, too.

      #!/usr/bin/perl use LWP::Parallel::UserAgent; use strict; use warnings; my @feeds = ( 'http://localhost/feed1.xml', # rss 'http://localhost/feed2.xml', # atom 'http://localhost/foo', # 404 ); my $requests = &prepare_requests(@feeds); my $entries = &fetch_feeds($requests); foreach my $k (keys %$entries) { my $res = $entries->{$k}->response; print "Answer for '",$res->request->url,"' was \t", $res->code,": ", $res->message,"\n"; # $res->content,"\n"; } exit(0); sub prepare_requests { my (@feeds) = (@_); my $requests; foreach my $url (@feeds) { push(@$requests, HTTP::Request->new('GET', $url)); } return($requests); } sub fetch_feeds { my ($requests) = (@_); my $pua = LWP::Parallel::UserAgent->new(); $pua->in_order (0); # handle requests in order of registration $pua->duplicates(1); # ignore duplicates $pua->timeout (9); # in seconds $pua->redirect (1); # follow redirects $pua->max_hosts (3); # max locations accessed in parallel foreach my $req (@$requests) { print "Registering '".$req->url."'\n"; if (my $res=$pua->register($req, \&handle_answer, 8192, 1)) { # print STDERR $res->error_as_HTML; print $res->error_as_HTML; } else { print qq(ok\n); } } my $entries = $pua->wait(); return($entries); } sub handle_answer { my($content, $response, $protocol, $entry) = @_; if (length($content)) { $response->add_content($content); } else { 1; } return(undef); }

      ¹ That's the thing about CPAN, there are so many useful modules with great accompanying documentation that discovery can be a challenge. So I am very appreciative of everyone's input here.

        wonder if there is any way to parallelize the processing of the results in the same move

        Have you tried just putting your processing in the handle_answer callback? That may be all you need. But I would still profile it first because it would be a big surprise if the feed processing step weren't dwarfed by the fetch times.


        🦛

      Thanks. I've taken a closer look at LWP::Parallel now and have some questions about how it should handle many (most?) HTTPS sites. For now, it seems to return HTTP Status "503 Service Unavailable" for ones that exist and are accessible via other agents. Here is one example:

      #!/usr/bin/perl use LWP::Parallel::UserAgent; use LWP::Debug qw(+); use strict; use warnings; my $headers = new HTTP::Headers( 'User-Agent' => "Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.36 +(KHTML, like Gecko) Chrome/30.0.1599.66 Safari/537.36", ); my @requests; foreach my $url ('https://blog.arduino.cc/feed/') { push(@requests, HTTP::Request->new('GET', $url, $headers)); } # new parallel agent my $pua = LWP::Parallel::UserAgent->new(); $pua->in_order (0); $pua->duplicates(1); $pua->timeout (9); $pua->redirect (0); $pua->max_hosts (5); $pua->nonblock (0); foreach my $req (@requests) { if ( my $res = $pua->register ($req, \&handle_answer, 8192) ) { print $res->error_as_HTML; } else { print qq(ok\n); } } my $entries = $pua->wait(); foreach my $k (keys %$entries) { my $res = $entries->{$k}->response; my $url = $res->request->url; print $res->code,qq(\t $url\n); } exit(0); sub handle_answer { my($content, $response, $protocol, $entry) = @_; if (length($content)) { $response->add_content($content); } return(undef); }

      As one can see with various browsers the feed in question is there but yet it is one of the feeds that LWP::Parallel is choking on.

        have some questions about how it should handle many (most?) HTTPS sites.

        Yeah, it seems to be pretty much all of them, which is a real shame. I guess it must have been about 6 or 7 years ago that I last used LWP::Parallel for anything serious and back then this wasn't really an issue. In the meantime the heavy hand of Google has de-facto forced most of the web over onto HTTPS and now this is a major consideration.

        Having tested this briefly against one of my own sites it does actually appear to be downloading the content in that the server receives, accepts and serves the request OK. It's just that the user agent has some sort of internal problem with the response.

        It might be worth raising a ticket although there are plenty open. Still, it would alert other users to the problem.


        🦛

Re: IPC::Open, Parallel::ForkManager, or Sockets::IO for parallelizing?
by 1nickt (Canon) on Sep 04, 2023 at 11:37 UTC

    Hi, I gave an example of solving this problem with MCE and MCE::Shared that you might like to look at.

    Hope this helps!

    The way forward always starts with a minimal test.
Re: IPC::Open, Parallel::ForkManager, or Sockets::IO for parallelizing?
by tybalt89 (Monsignor) on Sep 04, 2023 at 20:37 UTC

    Sort of example with only partial processing (because you didn't show any). You can also pass back all the data in the child's return hash, I didn't just because I didn't know how you wanted to process it.

    See Re^3: Pre-Forking Daemon with Parallel::ForkManager for the Forking::Amazing module.

    #!/usr/bin/perl use strict; # https://www.perlmonks.org/?node_id=11154241 use warnings; use Forking::Amazing; use Time::HiRes qw( time ); use LWP::UserAgent; my $ua = LWP::UserAgent->new; my %answers; Forking::Amazing::run 23, # maxforks, less than two dozen sub # runs in child { my $starttime = time; my ($url) = @_; print "child debug starting $url\n"; $ua->agent($url); my $req = HTTP::Request->new(GET => $url); my $res = $ua->request($req); my $status = $res->status_line($req); my $data = $res->content; print "child debug ending $url\n"; return { len => length $data, status => $status, status => $status, took => sprintf "%.3f seconds", time - $start +time }; }, sub # runs in parent with results from child { my ($url, $hashref) = @_; # url child, hashref from child $answers{$url} = $hashref; }, map 'https://' . tr/\n//dr, <DATA>; # list of URLs to process use Data::Dump 'dd'; dd \%answers; __DATA__ gap.com amazon.com ebay.com wunderground.com imdb.com google.com nosuchurl.com underarmour.com disney.com espn.com

    Outputs:

    child debug starting https://gap.com child debug starting https://amazon.com child debug starting https://ebay.com child debug starting https://wunderground.com child debug starting https://imdb.com child debug starting https://google.com child debug starting https://nosuchurl.com child debug starting https://underarmour.com child debug starting https://disney.com child debug starting https://espn.com child debug ending https://nosuchurl.com child debug ending https://wunderground.com child debug ending https://google.com child debug ending https://disney.com child debug ending https://gap.com child debug ending https://espn.com child debug ending https://ebay.com child debug ending https://underarmour.com child debug ending https://amazon.com child debug ending https://imdb.com { "https://amazon.com" => { len => 724660, status => "200 OK", t +ook => "1.575 seconds" }, "https://disney.com" => { len => 548346, status => "200 OK", t +ook => "0.547 seconds" }, "https://ebay.com" => { len => 468521, status => "200 OK", t +ook => "0.832 seconds" }, "https://espn.com" => { len => 1474156, status => "200 OK", +took => "0.649 seconds" }, "https://gap.com" => { len => 495420, status => "200 OK", t +ook => "0.624 seconds" }, "https://google.com" => { len => 19623, status => "200 OK", to +ok => "0.352 seconds" }, "https://imdb.com" => { len => 947107, status => "200 OK", t +ook => "2.467 seconds" }, "https://nosuchurl.com" => { len => 168, status => "500 Can't connect to nosu +churl.com:443 (Name or service not known)", took => "0.048 seconds", }, "https://underarmour.com" => { len => 491776, status => "200 OK", t +ook => "1.015 seconds" }, "https://wunderground.com" => { len => 168647, status => "200 OK", t +ook => "0.257 seconds" }, }

    Of course, after it's working the debug lines can be removed :)

Re: IPC::Open, Parallel::ForkManager, or Sockets::IO for parallelizing?
by alexander_lunev (Pilgrim) on Sep 04, 2023 at 14:31 UTC