my $num_workers = 5; use SexCoder; use MCE::Hobo; use MCE::Shared; my $input_q = MCE::Shared->queue( fast => 1 ); my $output_q = MCE::Shared->queue( fast => 1 ); mce_open my $OUT, ">>", \*STDOUT or die "open error: $!"; mce_open my $ERR, ">>", \*STDERR or die "open error: $!"; MCE::Hobo->create({ posix_exit => 1 }, 'input_task', $_) for 1 .. $num_workers; # open the files, etc... - not shown here ######### MAIN INPUT LOOP while (my $row = readdata($fhin, \$dbfrownum, \$inbuffer, @sortedfields)) { $input_q->enqueue($row); } ######### END OF INPUT LOOP # exit worker threads (flag end of input queue) $input_q->end(); ######### MAIN OUTPUT LOOP while(my $result = $output_q->dequeue()) { if (exists $result->{finished}) { MCE::Hobo->waitone; $output_q->end unless MCE::Hobo->pending; next; } output_row($result); # THIS OUTPUTS A ROW } ######### END OF MAIN OUTPUT LOOP exit; sub input_task { # THIS CALLS SOME JAVA CODE TO CREATE THE CONNECTION TO DATA SERVICES $sexcoder=SexCoder->new("$ds_server", 4000); while (my $row = $input_q->dequeue()) { my $outrow = mapdata($row); my @result = ($reject, $row, $outrow); $output_q->enqueue(\@result); } # THIS DESTROYS THE BLESSED OBJECT, CLOSING THE CONNECTION $sexcoder = undef; $output_q->enqueue({ finished => $$ }); } #### sub parsename() { my $dsparse = 0; if ($dsparsename == 1) { eval { my $result = $sexcoder->getsexcode($countrycode, $name); $parsedname{'TITLE'} = $result->{title}; $parsedname{'FIRST'} = $result->{first}; $parsedname{'MIDDLE'} = $result->{middle}; $parsedname{'LAST'} = $result->{last}; $parsedname{'SUFFIX'} = $result->{suffix}; $parsedname{'PROFSFX'} = $result->{profsfx}; $dsname = $result->{name}; $data->{'out$gender'} = $result->{gender}; $data->{'out$sexcode'} = $result->{sexcode}; $dsparse = 1; }; } return $dsname if $dsparse == 1; # Fall back to simpler parser if $dsparse != 1 or $dsparsename != 1 } #### package SexCoder; use strict; use warnings; use Thread::Bless; # NOT SURE IF I NEED THIS use XML::LibXML; use XML::Compile::Schema; use XML::Compile::Util qw(pack_type); use Encode; use Data::HexDump; use Inline::Java qw(caught); require Exporter; our @ISA = qw(Exporter); # Items to export into callers namespace by default. Note: do not export # names by default without a very good reason. Use EXPORT_OK instead. # Do not simply export all your public functions/methods/constants. # This allows declaration use SexCoder ':all'; # If you do not need this, moving things directly into @EXPORT or @EXPORT_OK # will save memory. our %EXPORT_TAGS = ( 'all' => [ qw( new getsexcode ) ] ); our @EXPORT_OK = ( @{ $EXPORT_TAGS{'all'} } ); our @EXPORT = qw( new getsexcode ); our $VERSION = '1.00'; BEGIN { $ENV{'CLASSPATH'} = "/pas/src/perl/SexCoder/lib/rtsClient.jar"; $ENV{'LINK_DIR'} = "/pas/src/perl/SexCoder/lib"; } use Inline Java => <<'END_OF_JAVA_CODE', CLASSPATH=>$ENV{CLASSPATH}, import com.businessobjects.rtsclient.*; import java.security.Security; public class dsclient { RTServiceClient rts; String keystorePasswd = "dskeystore"; String keystorePath = "/pas/src/perl/SexCoder/keystore/dskeystore.jks"; public dsclient() throws RTServiceException { System.setProperty("javax.net.ssl.trustStore", keystorePath); Security.setProperty("javax.net.ssl.trustStorePassword", keystorePasswd); rts = new RTServiceClient(); } public void connect(String server, int port) throws RTServiceException { rts.connect(server, port, true); } public String invoke(String servicename, String xmlstring) throws RTServiceException { return rts.invoke(servicename, xmlstring); } public void disconnect() throws RTServiceException { rts.disconnect(); } } END_OF_JAVA_CODE ; use Inline Java => 'STUDY', STUDY => [ 'com.businessobjects.rtsclient.RTServiceClientX', 'com.businessobjects.rtsclient.RTServiceException', 'java.security.Security' ], AUTOSTUDY => 1; my $servicename = "SexCoder"; my $server; my $port; my $rts; my $sexcode_msg_schema; my $sexcode_reply_schema; sub new { my $self = bless {}, shift; # Service and Port identify the Data Services Server (ex: ds421.paslists.com:4200) my $ds_server = "ds.paslists.com"; $ds_server = $ENV{DS_SERVER} if defined($ENV{DS_SERVER}); $server = exists $_[0] ? $_[0] : $ds_server; $port = exists $_[1] ? $_[1] : 4200; # Compile the XML Schema and create a reader and writer object $sexcode_msg_schema = XML::Compile::Schema->new('/pas/src/perl/SexCoder/lib/sexcoder_msg.xsd'); $sexcode_reply_schema = XML::Compile::Schema->new('/pas/src/perl/SexCoder/lib/sexcoder_reply.xsd'); $self->{WRITER} = $sexcode_msg_schema->compile(WRITER => '{http://paslists.com/sexcoder}sexcode_msg'); $self->{READER} = $sexcode_reply_schema->compile(READER => '{http://paslists.com/sexcoder}sexcode_reply'); # Create a connection to Data Services eval { $self->{RTS} = new SexCoder::dsclient(); $self->{RTS}->connect($server, $port); }; if ($@) { if (caught("java.lang.Exception")) { die $@; } } # Mark the object as INIT'd $self->{INIT} = 1; return $self; } sub getsexcode { my $self = shift; # Arguments are countrycode and name my ($countrycode, $name) = @_; (my $lang=$ENV{LANG}) =~ s/^(.*)\.(.*)$/$2/; $name = decode($lang,$name) if $lang !~ /UTF-8/; # basic error checking die "SexCoder not Initialized\n" if $self->{INIT} != 1; if (!defined($self->{RTS})) { eval { $self->{RTS} = new SexCoder::dsclient(); $self->{RTS}->connect($server, $port); }; if ($@) if (caught("java.lang.Exception")) { die $@; } } } die "No Connection to Data Services defined\n" if !defined($self->{RTS}); # build a new XML document to send the parameters in my $doc = XML::LibXML::Document->new('1.0','UTF-8'); my $data; $data->{countrycode} = $countrycode; $data->{name} = $name; my $sexcode_msg_xml = $self->{WRITER}->($doc, $data); # ask data services for the sex code and fielded name my $sexcode_reply_xml = ""; eval { $sexcode_reply_xml = $self->{RTS}->invoke($servicename, $sexcode_msg_xml->toString); }; if ($@) { if (caught("java.lang.Exception")) { eval { $self->{RTS}->connect($server, $port); }; if ($@) { if (caught("java.lang.Exception")) { die $@; } } else { eval { $sexcode_reply_xml = $self->{RTS}->invoke($servicename, $sexcode_msg_xml->toString); }; if ($@) { if (caught("java.lang.Exception")) { warn $@; unset $@; } } } } } # return a hash version of the XML result $sexcode_reply_xml = decode("iso-8859-1",$sexcode_reply_xml) if $lang !~ /UTF-8/; my $result = $self->{READER}->($sexcode_reply_xml); return $result; } # Disconnect from the server at the end DESTROY { my $self = shift; eval { $self->{RTS}->disconnect() if defined($self->{RTS}); if (caught("java.lang.Exception")) { die $@; } } } 1;