Beefy Boxes and Bandwidth Generously Provided by pair Networks
Just another Perl shrine
 
PerlMonks  

Inline::Java with MCE::Hobo

by eraskin (Novice)
on May 26, 2021 at 22:08 UTC ( #11133096=perlquestion: print w/replies, xml ) Need Help??

eraskin has asked for the wisdom of the Perl Monks concerning the following question:

Hello all... brand new seeker here.

I am new to using MCE::Hobo and looking for some wisdom. I have a program that makes a web service call to a SAP DataServices system. The code supplied by SAP is written in Java, so I have written a module using Inline::Java to access it. It works fine single-threaded.

Performance on large files is terrible and I know the bottleneck is the call to the service. I am attempting to using MCE::Hobo using workers and a queue to send requests to the server in parallel. When I execute the task, only some records return with valid data. It appears that only some of the workers are calling the service properly.

I have a sneaking suspicion that the problem is with Inline::Java and the JVM it creates. I thought (mistakenly?) that using MCE::Hobo would create individual processes and each would get its own JVM. Perhaps not? I wrote the software this way because I am not sure that the Java code provided by SAP works in a multi-threaded environment - hence the use of multiple processes.

Does anybody have experience running Inline::Java inside MCE::Hobo tasks?

Please let me know what other information you might need. I will post what I can. The code itself is rather long.

Replies are listed 'Best First'.
Re: Inline::Java with MCE::Hobo
by marto (Cardinal) on May 27, 2021 at 09:26 UTC

    Sorry, this is perhaps a bit of a tangent from your question, however IIRC SAP DataServices has a REST API which should remove the complication of using a Java program to achieve this.

      Thanks for the reply. A Data Services real time service is accessed by sending messages to an Access Server, which has Job Servers running behind it. Each Job Server is running the same data flow, which takes an XML message in and replies with a resulting XML message. The Access Server automatically load balances the requests to as many Job Servers as you have configured, automatically starting and stopping them as the load requires. IMO, this is much better than just a REST call, which is why I use it.

      Of course, if I can't get it to work, I can try REST calls as a fallback.

      Thanks for the suggestion!

Re: Inline::Java with MCE::Hobo
by cavac (Curate) on May 27, 2021 at 08:11 UTC

    While i'm not familiar with MCE::Hobo (or Inline::Java for that matter), it seems it uses fork() to create new workers. While this works great in general, there are a few caveats that you have to keep in mind and check for.

    For once, open file handles are shared with child processes. That means you might need to close all file handles and open new ones. Including any temporary files Inline::Java might have created. For example, Inline::Java might create some external temporary files that it can execute with the java engine in the parent process during loading of your module. When the first child process exits, it might clean up (delete) those files, thus preventing other child processes from working correctly. And/or it might share a network socket to the JVM.

    Not sure how to solve this in Inline::Java, or even if it's solvable in it's current form. There is a thread on stackoverflow discussing a similar problem.

    One way to work around this would be to have a master process that forks and then calls up a completely new instance of the worker script. Something along the lines of (untested, written from memory)

    #!/usr/bin/env perl use strict; use warnings; use Time::HiRes qw(sleep); my $workercount = 0; # Counts active workers SIG{CHLD} = sub { # Child exit detected $workercount--; # remove child from "active" count return; } while(1) { if($workercount < 20) { my $pid = fork(); if($pid) { # parent $workercount++; # forked a new child } else { # child my $cmd = 'perl myworkerscript.pl'; eval { exec($cmd); }; exit(0); } } else { sleep(0.5); } }

    There is also the funny matter of things like the pseudo-random number generator. When you fork(), all child processes will generate the same sequence of random numbers. Some protocols (and other stuff) use randomly generated unique IDs to, say for example, identify a transaction or a data block. If you launch multiple child processes that now generate the same "unique" IDs, this could lead to all sorts of strange problems. Calling srand() immediately after fork in the child (in whatever function you call from MCE::Hobo) might be a good idea.

    perl -e 'use Crypt::Digest::SHA256 qw[sha256_hex]; print substr(sha256_hex("the Answer To Life, The Universe And Everything"), 6, 2), "\n";'

      Thanks. I wish the children were independent enough to do this, but they are really just an internal part of a much bigger program.

      We are in a business that receives hundreds of files a day, all in different formats. To process them, we convert them to a standard format. I had a tool that I purchased 20 years ago written in COBOL(!). Rather than keeping that up to date, I wrote a perl script to take on the functions we needed.

      The code is basically just a read-process-write loop. Read the input, parse the data, generate the output format, write the output file, track counts and statistics about the data.

      Part of "parse the data" is a call to Data Services to parse the input name and generate titles and gender codes. That's where my bottleneck is. I have taken the internals of the "parse the data" and put them into a sub. This sub reads from a queue and processes the data until the queue is empty. Each worker sub writes to an output queue. An output loop reads each record from the queue, generates the output format and writes the file. Here are some relevant parts:

      my $num_workers = 5; use SexCoder; use MCE::Hobo; use MCE::Shared; my $input_q = MCE::Shared->queue( fast => 1 ); my $output_q = MCE::Shared->queue( fast => 1 ); mce_open my $OUT, ">>", \*STDOUT or die "open error: $!"; mce_open my $ERR, ">>", \*STDERR or die "open error: $!"; MCE::Hobo->create({ posix_exit => 1 }, 'input_task', $_) for 1 .. $num +_workers; # open the files, etc... - not shown here ######### MAIN INPUT LOOP while (my $row = readdata($fhin, \$dbfrownum, \$inbuffer, @sortedfield +s)) { $input_q->enqueue($row); } ######### END OF INPUT LOOP # exit worker threads (flag end of input queue) $input_q->end(); ######### MAIN OUTPUT LOOP while(my $result = $output_q->dequeue()) { if (exists $result->{finished}) { MCE::Hobo->waitone; $output_q->end unless MCE::Hobo->pending; next; } output_row($result); # THIS OUTPUTS A ROW } ######### END OF MAIN OUTPUT LOOP exit; sub input_task { # THIS CALLS SOME JAVA CODE TO CREATE THE CONNECTION TO DATA SERV +ICES $sexcoder=SexCoder->new("$ds_server", 4000); while (my $row = $input_q->dequeue()) { my $outrow = mapdata($row); my @result = ($reject, $row, $outrow); $output_q->enqueue(\@result); } # THIS DESTROYS THE BLESSED OBJECT, CLOSING THE CONNECTION $sexcoder = undef; $output_q->enqueue({ finished => $$ }); }

      The code that actually calls the service is deep inside the mapdata() function. The relevant piece of code is:

      sub parsename() { my $dsparse = 0; if ($dsparsename == 1) { eval { my $result = $sexcoder->getsexcode($countrycode, $name); $parsedname{'TITLE'} = $result->{title}; $parsedname{'FIRST'} = $result->{first}; $parsedname{'MIDDLE'} = $result->{middle}; $parsedname{'LAST'} = $result->{last}; $parsedname{'SUFFIX'} = $result->{suffix}; $parsedname{'PROFSFX'} = $result->{profsfx}; $dsname = $result->{name}; $data->{'out$gender'} = $result->{gender}; $data->{'out$sexcode'} = $result->{sexcode}; $dsparse = 1; }; } return $dsname if $dsparse == 1; # Fall back to simpler parser if $dsparse != 1 or $dsparsename != 1 }

      Finally, here is the complete code from the SexCoder object

      package SexCoder; use strict; use warnings; use Thread::Bless; # NOT SURE IF I NEED THIS use XML::LibXML; use XML::Compile::Schema; use XML::Compile::Util qw(pack_type); use Encode; use Data::HexDump; use Inline::Java qw(caught); require Exporter; our @ISA = qw(Exporter); # Items to export into callers namespace by default. Note: do not expo +rt # names by default without a very good reason. Use EXPORT_OK instead. # Do not simply export all your public functions/methods/constants. # This allows declaration use SexCoder ':all'; # If you do not need this, moving things directly into @EXPORT or @EXP +ORT_OK # will save memory. our %EXPORT_TAGS = ( 'all' => [ qw( new getsexcode ) ] ); our @EXPORT_OK = ( @{ $EXPORT_TAGS{'all'} } ); our @EXPORT = qw( new getsexcode ); our $VERSION = '1.00'; BEGIN { $ENV{'CLASSPATH'} = "/pas/src/perl/SexCoder/lib/rtsClient.jar"; $ENV{'LINK_DIR'} = "/pas/src/perl/SexCoder/lib"; } use Inline Java => <<'END_OF_JAVA_CODE', CLASSPATH=>$ENV{CLASSPATH}, import com.businessobjects.rtsclient.*; import java.security.Security; public class dsclient { RTServiceClient rts; String keystorePasswd = "dskeystore"; String keystorePath = "/pas/src/perl/SexCoder/keystore/dskeystore. +jks"; public dsclient() throws RTServiceException { System.setProperty("javax.net.ssl.trustStore", keystorePath); Security.setProperty("javax.net.ssl.trustStorePassword", keystor +ePasswd); rts = new RTServiceClient(); } public void connect(String server, int port) throws RTServiceException { rts.connect(server, port, true); } public String invoke(String servicename, String xmlstring) throws RTServiceException { return rts.invoke(servicename, xmlstring); } public void disconnect() throws RTServiceException { rts.disconnect(); } } END_OF_JAVA_CODE ; use Inline Java => 'STUDY', STUDY => [ 'com.businessobjects.rtsclient.RTServiceClientX', 'com.businessobjects.rtsclient.RTServiceException', 'java.security.Security' ], AUTOSTUDY => 1; my $servicename = "SexCoder"; my $server; my $port; my $rts; my $sexcode_msg_schema; my $sexcode_reply_schema; sub new { my $self = bless {}, shift; # Service and Port identify the Data Services Server (ex: ds421.pasli +sts.com:4200) my $ds_server = "ds.paslists.com"; $ds_server = $ENV{DS_SERVER} if defined($ENV{DS_SERVER}); $server = exists $_[0] ? $_[0] : $ds_server; $port = exists $_[1] ? $_[1] : 4200; # Compile the XML Schema and create a reader and writer object $sexcode_msg_schema = XML::Compile::Schema->new('/pas/src/perl/SexCo +der/lib/sexcoder_msg.xsd'); $sexcode_reply_schema = XML::Compile::Schema->new('/pas/src/perl/Sex +Coder/lib/sexcoder_reply.xsd'); $self->{WRITER} = $sexcode_msg_schema->compile(WRITER => '{http://pa +slists.com/sexcoder}sexcode_msg'); $self->{READER} = $sexcode_reply_schema->compile(READER => '{http:// +paslists.com/sexcoder}sexcode_reply'); # Create a connection to Data Services eval { $self->{RTS} = new SexCoder::dsclient(); $self->{RTS}->connect($server, $port); }; if ($@) { if (caught("java.lang.Exception")) { die $@; } } # Mark the object as INIT'd $self->{INIT} = 1; return $self; } sub getsexcode { my $self = shift; # Arguments are countrycode and name my ($countrycode, $name) = @_; (my $lang=$ENV{LANG}) =~ s/^(.*)\.(.*)$/$2/; $name = decode($lang,$name) if $lang !~ /UTF-8/; # basic error checking die "SexCoder not Initialized\n" if $self->{INIT} != 1; if (!defined($self->{RTS})) { eval { $self->{RTS} = new SexCoder::dsclient(); $self->{RTS}->connect($server, $port); }; if ($@) if (caught("java.lang.Exception")) { die $@; } } } die "No Connection to Data Services defined\n" if !defined($self->{R +TS}); # build a new XML document to send the parameters in my $doc = XML::LibXML::Document->new('1.0','UTF-8'); my $data; $data->{countrycode} = $countrycode; $data->{name} = $name; my $sexcode_msg_xml = $self->{WRITER}->($doc, $data); # ask data services for the sex code and fielded name my $sexcode_reply_xml = ""; eval { $sexcode_reply_xml = $self->{RTS}->invoke($servicename, $sexcode_m +sg_xml->toString); }; if ($@) { if (caught("java.lang.Exception")) { eval { $self->{RTS}->connect($server, $port); }; if ($@) { if (caught("java.lang.Exception")) { die $@; } } else { eval { $sexcode_reply_xml = $self->{RTS}->invoke($servicename, $sex +code_msg_xml->toString); }; if ($@) { if (caught("java.lang.Exception")) { warn $@; unset $@; } } } } } # return a hash version of the XML result $sexcode_reply_xml = decode("iso-8859-1",$sexcode_reply_xml) if $lan +g !~ /UTF-8/; my $result = $self->{READER}->($sexcode_reply_xml); return $result; } # Disconnect from the server at the end DESTROY { my $self = shift; eval { $self->{RTS}->disconnect() if defined($self->{RTS}); if (caught("java.lang.Exception")) { die $@; } } } 1;

      Sorry for the long post

Re: Inline::Java with MCE::Hobo
by Fletch (Chancellor) on May 27, 2021 at 14:31 UTC

    Untried hunch: Haven't diddled with MCE as much as I'd like to have (possibly should have) but if the problem is that your worker children are inheriting too much context, then rather than pulling in Inline::Java in the parent you might try pushing that out into a separate module which you then require in your on_start handler for your MCE::Hobo workers. That way you're going to be sure that an separate JVM instance and all its associated plumbing is coming up only on the child process side of the fork.

    Edit: Derp, I do need to play with MCE more because apparently (re-?)reading the MCE::Hobo docs the on_start callback is called in the parent after a worker is started. Possibly instead then do the require in the sub you pass to the create method instead and that'll still keep it done only in the child.

    The cake is a lie.
    The cake is a lie.
    The cake is a lie.

      I could not make the require work. First, I removed the "use SexCoder" from the top of my script. Then I added this:

      MCE::Hobo->init(on_start => sub { require SexCoder; });
      The error message I got was:
      Hobo 30016 terminated abnormally: reason Can't locate object method "n +ew" via package "SexCoder" (perhaps you forgot to load "SexCoder"?) a +t ./convert.mce line 721, <__ANONIO__> line 24.

      So, the SexCoder did not load. I then tried removing the Hobo->init and placing "use SexCoder" inside my worker sub {}. That was no different than my original code. I had exactly the same issue, where some workers were able to get results from the service and others were not. That really confuses me. I think you are on the right track, though. There must be something odd in my SexCoder package that I'm not seeing. See my reply to cavac above for the complete code to that package. Maybe you can see something?

      I have one more tidbit for everyone helping me. The code works if $num_workers = 1 (simulating single-threaded code). That's why I suspect an issue with Inline::Java or with the rtsClient itself. I just can't fathom why if each is running in its own process.

        Greetings eraskin,

        Have you tried importing SexCoder inside the input_task Hobo routine. This is where to load the class uniquely per worker child.

        sub input_task { require SexCoder; SexCoder->import(); # THIS CALLS JAVA TO CREATE THE CONNECTION TO DATA SERVICES my $sexcoder = SexCoder->new("$ds_server", 4000); ... }
Re: Inline::Java with MCE::Hobo
by karlgoethebier (Abbot) on May 27, 2021 at 08:13 UTC
    «Does anybody have experience running Inline::Java inside MCE::Hobo tasks?»

    No I don’t. Post some code. In the meantime you may take a look at Discipulus library which contains many links about the basic theme. Some less or more useful stuff there is from me - with a little help from my friends here 🤪😎 Regards, Karl

    «The Crux of the Biscuit is the Apostrophe»

      Thanks. Please see my reply to cavac above for code

Re: Inline::Java with MCE::Hobo
by karlgoethebier (Abbot) on May 29, 2021 at 12:35 UTC

    One more thing: cavac mentioned this 9 years old thread on stackoverflow.

    From ibidem:

    "...Another approach is to forget about forking from Perl and to leverage Java's superior threading model to do parallelization. Design your Java code to perform its tasks in new threads, and start new threads from Perl..."

    Probably not a bad idea. And something i dreamed of and never tried.

    Regards, Karl

    «The Crux of the Biscuit is the Apostrophe»

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: perlquestion [id://11133096]
Approved by philipbailey
Front-paged by Corion
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others lurking in the Monastery: (1)
As of 2021-10-18 01:50 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?
    My first memorable Perl project was:







    Results (72 votes). Check out past polls.

    Notices?