http://qs321.pair.com?node_id=11133136


in reply to Re: Inline::Java with MCE::Hobo
in thread Inline::Java with MCE::Hobo

Thanks. I wish the children were independent enough to do this, but they are really just an internal part of a much bigger program.

We are in a business that receives hundreds of files a day, all in different formats. To process them, we convert them to a standard format. I had a tool that I purchased 20 years ago written in COBOL(!). Rather than keeping that up to date, I wrote a perl script to take on the functions we needed.

The code is basically just a read-process-write loop. Read the input, parse the data, generate the output format, write the output file, track counts and statistics about the data.

Part of "parse the data" is a call to Data Services to parse the input name and generate titles and gender codes. That's where my bottleneck is. I have taken the internals of the "parse the data" and put them into a sub. This sub reads from a queue and processes the data until the queue is empty. Each worker sub writes to an output queue. An output loop reads each record from the queue, generates the output format and writes the file. Here are some relevant parts:

my $num_workers = 5; use SexCoder; use MCE::Hobo; use MCE::Shared; my $input_q = MCE::Shared->queue( fast => 1 ); my $output_q = MCE::Shared->queue( fast => 1 ); mce_open my $OUT, ">>", \*STDOUT or die "open error: $!"; mce_open my $ERR, ">>", \*STDERR or die "open error: $!"; MCE::Hobo->create({ posix_exit => 1 }, 'input_task', $_) for 1 .. $num +_workers; # open the files, etc... - not shown here ######### MAIN INPUT LOOP while (my $row = readdata($fhin, \$dbfrownum, \$inbuffer, @sortedfield +s)) { $input_q->enqueue($row); } ######### END OF INPUT LOOP # exit worker threads (flag end of input queue) $input_q->end(); ######### MAIN OUTPUT LOOP while(my $result = $output_q->dequeue()) { if (exists $result->{finished}) { MCE::Hobo->waitone; $output_q->end unless MCE::Hobo->pending; next; } output_row($result); # THIS OUTPUTS A ROW } ######### END OF MAIN OUTPUT LOOP exit; sub input_task { # THIS CALLS SOME JAVA CODE TO CREATE THE CONNECTION TO DATA SERV +ICES $sexcoder=SexCoder->new("$ds_server", 4000); while (my $row = $input_q->dequeue()) { my $outrow = mapdata($row); my @result = ($reject, $row, $outrow); $output_q->enqueue(\@result); } # THIS DESTROYS THE BLESSED OBJECT, CLOSING THE CONNECTION $sexcoder = undef; $output_q->enqueue({ finished => $$ }); }

The code that actually calls the service is deep inside the mapdata() function. The relevant piece of code is:

sub parsename() { my $dsparse = 0; if ($dsparsename == 1) { eval { my $result = $sexcoder->getsexcode($countrycode, $name); $parsedname{'TITLE'} = $result->{title}; $parsedname{'FIRST'} = $result->{first}; $parsedname{'MIDDLE'} = $result->{middle}; $parsedname{'LAST'} = $result->{last}; $parsedname{'SUFFIX'} = $result->{suffix}; $parsedname{'PROFSFX'} = $result->{profsfx}; $dsname = $result->{name}; $data->{'out$gender'} = $result->{gender}; $data->{'out$sexcode'} = $result->{sexcode}; $dsparse = 1; }; } return $dsname if $dsparse == 1; # Fall back to simpler parser if $dsparse != 1 or $dsparsename != 1 }

Finally, here is the complete code from the SexCoder object

package SexCoder; use strict; use warnings; use Thread::Bless; # NOT SURE IF I NEED THIS use XML::LibXML; use XML::Compile::Schema; use XML::Compile::Util qw(pack_type); use Encode; use Data::HexDump; use Inline::Java qw(caught); require Exporter; our @ISA = qw(Exporter); # Items to export into callers namespace by default. Note: do not expo +rt # names by default without a very good reason. Use EXPORT_OK instead. # Do not simply export all your public functions/methods/constants. # This allows declaration use SexCoder ':all'; # If you do not need this, moving things directly into @EXPORT or @EXP +ORT_OK # will save memory. our %EXPORT_TAGS = ( 'all' => [ qw( new getsexcode ) ] ); our @EXPORT_OK = ( @{ $EXPORT_TAGS{'all'} } ); our @EXPORT = qw( new getsexcode ); our $VERSION = '1.00'; BEGIN { $ENV{'CLASSPATH'} = "/pas/src/perl/SexCoder/lib/rtsClient.jar"; $ENV{'LINK_DIR'} = "/pas/src/perl/SexCoder/lib"; } use Inline Java => <<'END_OF_JAVA_CODE', CLASSPATH=>$ENV{CLASSPATH}, import com.businessobjects.rtsclient.*; import java.security.Security; public class dsclient { RTServiceClient rts; String keystorePasswd = "dskeystore"; String keystorePath = "/pas/src/perl/SexCoder/keystore/dskeystore. +jks"; public dsclient() throws RTServiceException { System.setProperty("javax.net.ssl.trustStore", keystorePath); Security.setProperty("javax.net.ssl.trustStorePassword", keystor +ePasswd); rts = new RTServiceClient(); } public void connect(String server, int port) throws RTServiceException { rts.connect(server, port, true); } public String invoke(String servicename, String xmlstring) throws RTServiceException { return rts.invoke(servicename, xmlstring); } public void disconnect() throws RTServiceException { rts.disconnect(); } } END_OF_JAVA_CODE ; use Inline Java => 'STUDY', STUDY => [ 'com.businessobjects.rtsclient.RTServiceClientX', 'com.businessobjects.rtsclient.RTServiceException', 'java.security.Security' ], AUTOSTUDY => 1; my $servicename = "SexCoder"; my $server; my $port; my $rts; my $sexcode_msg_schema; my $sexcode_reply_schema; sub new { my $self = bless {}, shift; # Service and Port identify the Data Services Server (ex: ds421.pasli +sts.com:4200) my $ds_server = "ds.paslists.com"; $ds_server = $ENV{DS_SERVER} if defined($ENV{DS_SERVER}); $server = exists $_[0] ? $_[0] : $ds_server; $port = exists $_[1] ? $_[1] : 4200; # Compile the XML Schema and create a reader and writer object $sexcode_msg_schema = XML::Compile::Schema->new('/pas/src/perl/SexCo +der/lib/sexcoder_msg.xsd'); $sexcode_reply_schema = XML::Compile::Schema->new('/pas/src/perl/Sex +Coder/lib/sexcoder_reply.xsd'); $self->{WRITER} = $sexcode_msg_schema->compile(WRITER => '{http://pa +slists.com/sexcoder}sexcode_msg'); $self->{READER} = $sexcode_reply_schema->compile(READER => '{http:// +paslists.com/sexcoder}sexcode_reply'); # Create a connection to Data Services eval { $self->{RTS} = new SexCoder::dsclient(); $self->{RTS}->connect($server, $port); }; if ($@) { if (caught("java.lang.Exception")) { die $@; } } # Mark the object as INIT'd $self->{INIT} = 1; return $self; } sub getsexcode { my $self = shift; # Arguments are countrycode and name my ($countrycode, $name) = @_; (my $lang=$ENV{LANG}) =~ s/^(.*)\.(.*)$/$2/; $name = decode($lang,$name) if $lang !~ /UTF-8/; # basic error checking die "SexCoder not Initialized\n" if $self->{INIT} != 1; if (!defined($self->{RTS})) { eval { $self->{RTS} = new SexCoder::dsclient(); $self->{RTS}->connect($server, $port); }; if ($@) if (caught("java.lang.Exception")) { die $@; } } } die "No Connection to Data Services defined\n" if !defined($self->{R +TS}); # build a new XML document to send the parameters in my $doc = XML::LibXML::Document->new('1.0','UTF-8'); my $data; $data->{countrycode} = $countrycode; $data->{name} = $name; my $sexcode_msg_xml = $self->{WRITER}->($doc, $data); # ask data services for the sex code and fielded name my $sexcode_reply_xml = ""; eval { $sexcode_reply_xml = $self->{RTS}->invoke($servicename, $sexcode_m +sg_xml->toString); }; if ($@) { if (caught("java.lang.Exception")) { eval { $self->{RTS}->connect($server, $port); }; if ($@) { if (caught("java.lang.Exception")) { die $@; } } else { eval { $sexcode_reply_xml = $self->{RTS}->invoke($servicename, $sex +code_msg_xml->toString); }; if ($@) { if (caught("java.lang.Exception")) { warn $@; unset $@; } } } } } # return a hash version of the XML result $sexcode_reply_xml = decode("iso-8859-1",$sexcode_reply_xml) if $lan +g !~ /UTF-8/; my $result = $self->{READER}->($sexcode_reply_xml); return $result; } # Disconnect from the server at the end DESTROY { my $self = shift; eval { $self->{RTS}->disconnect() if defined($self->{RTS}); if (caught("java.lang.Exception")) { die $@; } } } 1;

Sorry for the long post