my $num_workers = 5;
use SexCoder;
use MCE::Hobo;
use MCE::Shared;
my $input_q = MCE::Shared->queue( fast => 1 );
my $output_q = MCE::Shared->queue( fast => 1 );
mce_open my $OUT, ">>", \*STDOUT or die "open error: $!";
mce_open my $ERR, ">>", \*STDERR or die "open error: $!";
MCE::Hobo->create({ posix_exit => 1 }, 'input_task', $_) for 1 .. $num_workers;
# open the files, etc... - not shown here
######### MAIN INPUT LOOP
while (my $row = readdata($fhin, \$dbfrownum, \$inbuffer, @sortedfields))
{
$input_q->enqueue($row);
}
######### END OF INPUT LOOP
# exit worker threads (flag end of input queue)
$input_q->end();
######### MAIN OUTPUT LOOP
while(my $result = $output_q->dequeue()) {
if (exists $result->{finished})
{
MCE::Hobo->waitone;
$output_q->end unless MCE::Hobo->pending;
next;
}
output_row($result); # THIS OUTPUTS A ROW
}
######### END OF MAIN OUTPUT LOOP
exit;
sub input_task
{
# THIS CALLS SOME JAVA CODE TO CREATE THE CONNECTION TO DATA SERVICES
$sexcoder=SexCoder->new("$ds_server", 4000);
while (my $row = $input_q->dequeue()) {
my $outrow = mapdata($row);
my @result = ($reject, $row, $outrow);
$output_q->enqueue(\@result);
}
# THIS DESTROYS THE BLESSED OBJECT, CLOSING THE CONNECTION
$sexcoder = undef;
$output_q->enqueue({ finished => $$ });
}
####
sub parsename()
{
my $dsparse = 0;
if ($dsparsename == 1)
{
eval {
my $result = $sexcoder->getsexcode($countrycode, $name);
$parsedname{'TITLE'} = $result->{title};
$parsedname{'FIRST'} = $result->{first};
$parsedname{'MIDDLE'} = $result->{middle};
$parsedname{'LAST'} = $result->{last};
$parsedname{'SUFFIX'} = $result->{suffix};
$parsedname{'PROFSFX'} = $result->{profsfx};
$dsname = $result->{name};
$data->{'out$gender'} = $result->{gender};
$data->{'out$sexcode'} = $result->{sexcode};
$dsparse = 1;
};
}
return $dsname if $dsparse == 1;
# Fall back to simpler parser if $dsparse != 1 or $dsparsename != 1
}
##
##
package SexCoder;
use strict;
use warnings;
use Thread::Bless; # NOT SURE IF I NEED THIS
use XML::LibXML;
use XML::Compile::Schema;
use XML::Compile::Util qw(pack_type);
use Encode;
use Data::HexDump;
use Inline::Java qw(caught);
require Exporter;
our @ISA = qw(Exporter);
# Items to export into callers namespace by default. Note: do not export
# names by default without a very good reason. Use EXPORT_OK instead.
# Do not simply export all your public functions/methods/constants.
# This allows declaration use SexCoder ':all';
# If you do not need this, moving things directly into @EXPORT or @EXPORT_OK
# will save memory.
our %EXPORT_TAGS = ( 'all' => [ qw(
new
getsexcode
) ] );
our @EXPORT_OK = ( @{ $EXPORT_TAGS{'all'} } );
our @EXPORT = qw(
new
getsexcode
);
our $VERSION = '1.00';
BEGIN {
$ENV{'CLASSPATH'} = "/pas/src/perl/SexCoder/lib/rtsClient.jar";
$ENV{'LINK_DIR'} = "/pas/src/perl/SexCoder/lib";
}
use Inline
Java => <<'END_OF_JAVA_CODE', CLASSPATH=>$ENV{CLASSPATH},
import com.businessobjects.rtsclient.*;
import java.security.Security;
public class dsclient
{
RTServiceClient rts;
String keystorePasswd = "dskeystore";
String keystorePath = "/pas/src/perl/SexCoder/keystore/dskeystore.jks";
public dsclient() throws RTServiceException
{
System.setProperty("javax.net.ssl.trustStore", keystorePath);
Security.setProperty("javax.net.ssl.trustStorePassword", keystorePasswd);
rts = new RTServiceClient();
}
public void connect(String server, int port)
throws RTServiceException
{
rts.connect(server, port, true);
}
public String invoke(String servicename, String xmlstring)
throws RTServiceException
{
return rts.invoke(servicename, xmlstring);
}
public void disconnect()
throws RTServiceException
{
rts.disconnect();
}
}
END_OF_JAVA_CODE
;
use Inline Java => 'STUDY',
STUDY => [ 'com.businessobjects.rtsclient.RTServiceClientX',
'com.businessobjects.rtsclient.RTServiceException',
'java.security.Security'
],
AUTOSTUDY => 1;
my $servicename = "SexCoder";
my $server;
my $port;
my $rts;
my $sexcode_msg_schema;
my $sexcode_reply_schema;
sub new {
my $self = bless {}, shift;
# Service and Port identify the Data Services Server (ex: ds421.paslists.com:4200)
my $ds_server = "ds.paslists.com";
$ds_server = $ENV{DS_SERVER} if defined($ENV{DS_SERVER});
$server = exists $_[0] ? $_[0] : $ds_server;
$port = exists $_[1] ? $_[1] : 4200;
# Compile the XML Schema and create a reader and writer object
$sexcode_msg_schema = XML::Compile::Schema->new('/pas/src/perl/SexCoder/lib/sexcoder_msg.xsd');
$sexcode_reply_schema = XML::Compile::Schema->new('/pas/src/perl/SexCoder/lib/sexcoder_reply.xsd');
$self->{WRITER} = $sexcode_msg_schema->compile(WRITER => '{http://paslists.com/sexcoder}sexcode_msg');
$self->{READER} = $sexcode_reply_schema->compile(READER => '{http://paslists.com/sexcoder}sexcode_reply');
# Create a connection to Data Services
eval {
$self->{RTS} = new SexCoder::dsclient();
$self->{RTS}->connect($server, $port);
};
if ($@)
{
if (caught("java.lang.Exception"))
{
die $@;
}
}
# Mark the object as INIT'd
$self->{INIT} = 1;
return $self;
}
sub getsexcode {
my $self = shift;
# Arguments are countrycode and name
my ($countrycode, $name) = @_;
(my $lang=$ENV{LANG}) =~ s/^(.*)\.(.*)$/$2/;
$name = decode($lang,$name) if $lang !~ /UTF-8/;
# basic error checking
die "SexCoder not Initialized\n" if $self->{INIT} != 1;
if (!defined($self->{RTS}))
{
eval {
$self->{RTS} = new SexCoder::dsclient();
$self->{RTS}->connect($server, $port);
};
if ($@)
if (caught("java.lang.Exception"))
{
die $@;
}
}
}
die "No Connection to Data Services defined\n" if !defined($self->{RTS});
# build a new XML document to send the parameters in
my $doc = XML::LibXML::Document->new('1.0','UTF-8');
my $data;
$data->{countrycode} = $countrycode;
$data->{name} = $name;
my $sexcode_msg_xml = $self->{WRITER}->($doc, $data);
# ask data services for the sex code and fielded name
my $sexcode_reply_xml = "";
eval {
$sexcode_reply_xml = $self->{RTS}->invoke($servicename, $sexcode_msg_xml->toString);
};
if ($@)
{
if (caught("java.lang.Exception"))
{
eval {
$self->{RTS}->connect($server, $port);
};
if ($@)
{
if (caught("java.lang.Exception"))
{
die $@;
}
}
else
{
eval {
$sexcode_reply_xml = $self->{RTS}->invoke($servicename, $sexcode_msg_xml->toString);
};
if ($@)
{
if (caught("java.lang.Exception"))
{
warn $@;
unset $@;
}
}
}
}
}
# return a hash version of the XML result
$sexcode_reply_xml = decode("iso-8859-1",$sexcode_reply_xml) if $lang !~ /UTF-8/;
my $result = $self->{READER}->($sexcode_reply_xml);
return $result;
}
# Disconnect from the server at the end
DESTROY {
my $self = shift;
eval {
$self->{RTS}->disconnect() if defined($self->{RTS});
if (caught("java.lang.Exception"))
{
die $@;
}
}
}
1;