Beefy Boxes and Bandwidth Generously Provided by pair Networks
There's more than one way to do things
 
PerlMonks  

Google Spreadsheet Distributed Agent System

by dmlond (Acolyte)
on Sep 30, 2009 at 11:59 UTC ( [id://798311]=sourcecode: print w/replies, xml ) Need Help??
Category: Utility Scripts
Author/Contact Info
Description: This object sets up a system to allow a user to configure a set of scripts to use a single Google Spreadsheet as a control panel when running across many servers. See The RFC for more information.
package IGSP::GoogleAgent;

use FindBin;
use YAML::Any qw/LoadFile/;
use Net::Google::Spreadsheets;
use Net::SMTP::TLS;
use IO::CaptureOutput qw/capture/;
use Sys::Hostname;
use Moose;
use Carp;

our $VERSION = '0.01';

sub BUILD {
    my $self = shift;

    my @required_key_fields = grep { $self->config->{key_fields}->{$_}
+->{required} } keys %{$self->config->{key_fields}};
    die ("Your configuration must have at least one required key_field
+s key!\n") unless (@required_key_fields);

    foreach my $required_query_field (@required_key_fields) {
        croak ("You must provide a bind_key_fields ${required_query_fi
+eld} key - value pair!\n")
          unless ($self->bind_key_fields->{$required_query_field});
    }
}

has 'bind_key_fields' => (
                            is => 'ro',
                            isa => 'HashRef',
                            required => 1
                    );

has 'agent_name' => ( 
                      is => 'ro',
                      isa => 'Str',
                      required => 1
                      );

has 'page_name' => (
                    is => 'ro',
                    required => 1
                    );

has 'prerequisites' => (
                        is => 'ro',
                        isa => 'ArrayRef'
                        );

has 'debug' => (
                is => 'ro',
                isa => 'Bool'
                );

has 'max_selves' => (
                     is => 'ro',
                     isa => 'Int'
                     );

has 'subsumed_by' => (
                      is => 'ro',
                      isa => 'HashRef'
                      );

has 'config_file' => (
                      is => 'ro',
                      isa => 'Str',
                      );

has 'config' => (
                 is => 'ro',
                 builder => '_build_config'
                 );

has 'google_db' => (
                    is => 'ro',
                    builder => '_build_google_db',
                    lazy => 1, # depends on config
                    init_arg => undef # google_db cannot be overridden
                    );

#### BUILDERS

sub _build_config {
    my $self = shift;
    my $config_file = $self->config_file || $FindBin::Bin.'/../config/
+agent.conf.yml';
    croak "Config ${config_file} not found!\n" unless (-e $config_file
+);
    return YAML::Any::LoadFile($config_file);
}

sub _build_google_db {
    my $self = shift;
    my $service = Net::Google::Spreadsheets->new(
                                                 username => $self->co
+nfig->{guser},
                                                 password => $self->co
+nfig->{gpass},
                                                 );
    return $service->spreadsheet({
        title => $self->config->{spreadsheet_name}
    });
}

#### METHODS

around 'run_my' => sub {
    my ($orig, $self, @args) = @_;

    if ($self->debug) {
        return $self->$orig(@args);
    }
    else {
        my $capture_output;
        my $no_problems = capture {
            my $ret;
            eval {
                $ret = $self->$orig(@args);
            };
            if ($@) {
                print STDERR $@;
                return;
            }
            return $ret;
        } \$capture_output, \$capture_output;
        $self->mail_error($capture_output) unless ($no_problems);
        return $no_problems;
    }
};

sub run_my {
    my ($self, $agent_code) = @_;
    return 1 if ($self->is_subsumed);
    my $entry = $self->run_entry();
    
    return unless ($entry);
    return 1 if ($entry->{'not_runnable'}); # this is one that is not 
+ready, already running, or already run

    my ($success, $update_entry) = $agent_code->($entry->content);
    if ($success) {
        $self->complete_entry($update_entry);
        return 1;
    }
    else {
        $self->fail_entry($update_entry);
        return;
    }
}

sub is_subsumed {
    my $self = shift;

    return unless ($self->max_selves || $self->subsumed_by); # nothing
+ to subsume here

    my $subsumed;
    my %running_subsumers;

    my $subsume_opened = open (my $subsuming_in, '-|', 'ps', '-eo', 'p
+id,command');
    unless ($subsume_opened) {
        print STDERR "Couldnt check subsumption $!\n";
        return 1; # subsume to be safe
    }

    SUBIN: while (my $in = <$subsuming_in>) {
        next if ($in =~ m/emacs|vi|screen|SCREEN/); # skip editing and
+ screen
        next if ($in =~ m/\s*$$/); # skip this agent
        next if ($in =~ m/(\[|\])/); # skip daemons

        my $self_name = $self->agent_name;
        if ($self->max_selves 
            && $in =~ m/$self_name/) {
            $running_subsumers{$self->agent_name}++;
            if ($running_subsumers{$self->agent_name} == $self->max_se
+lves) {
                print STDERR "max_selves limit reached\n";
                $subsumed = 1;
                last SUBIN;
            }
        }

        if ($self->subsumed_by) {
            foreach my $subsumer (keys %{$self->subsumed_by}) {
                if ($in =~ m/$subsumer/) {
                    $running_subsumers{$subsumer}++;
                    if ($running_subsumers{$subsumer} == $self->subsum
+ed_by->{$subsumer}) {
                        print STDERR "subsumed by ${subsumer}\n";
                        $subsumed = 1;
                        last SUBIN;
                    }
                }
            }
        }
    }
    close $subsuming_in;

    return $subsumed;
}

sub get_entry {
    my $self = shift;
    my $entry;

    my $worksheet = $self->google_db->worksheet({
        title => $self->page_name
    });

    # note, the Google Spreadsheet Data API does supply an sq query op
+erator
    # which could be used here, but, as of 0.04 of Net::Google::Spread
+sheets
    # this did not prove to be reliable during the tests.  This may be
+ 
    # a limitation of the Google API rather than Net::Google::Spreadsh
+eets
    # as it appeared that Net::Google::Spreadsheets was submitting val
+id,
    # url encoded queries that the Google system rejected. Instead thi
+s software
    # conducts a full table scan to ensure the correct row is returned
    if ($worksheet) {
        my @rows = $worksheet->rows();
        ROW: foreach my $row (@rows) {
            ARG: foreach my $arg (keys %{$self->config->{key_fields}})
+ {
                next ARG if (
                             !($self->config->{key_fields}->{$arg}->{r
+equired}) 
                             && !($self->bind_key_fields->{$arg})
                             ); # skip args that are not required and 
+not bound
                next ROW unless ($row->content->{$arg} eq $self->bind_
+key_fields->{$arg});
            }
            $entry = $row;
            last ROW;
        }
    }

    return $entry;
}

# this call initiates a race resistant attempt to make sure that there
+ is only 1 clear 'winner' among N potential
# agents attempting to run the same goal on the same spreadsheet agent
+'s cell
sub run_entry {
    my $self = shift;

    my $entry = $self->get_entry();

    my $output = '';
    foreach my $bound_arg (keys %{$self->bind_key_fields}) {
        next if (!($self->config->{key_fields}->{$bound_arg}) && !($se
+lf->bind_key_fields->{$bound_arg}));
        $output .= join(' ', $bound_arg, $self->bind_key_fields->{$bou
+nd_arg})." ";
    }

    unless ($entry) {
        print STDERR $output." is not supported on ".$self->page_name.
+"\n";
        return;
    }

    unless ($entry->content->{ready}) {
        print STDERR $output." is not ready to run ".$self->agent_name
+."\n";
        return {'not_runnable' => 1};
    }

    if ($entry->content->{$self->agent_name}) {
        my ($status, $running_hostname) = split /\:/, $entry->content-
+>{$self->agent_name};
        if ($status eq 'r') {
            print STDERR $output." is already running ".$self->agent_n
+ame." on ${running_hostname}\n";
            return {'not_runnable' => 1};
        }
        
        if ($status == 1) {
            print STDERR $output." has already run ".$self->agent_name
+."\n";
            return {'not_runnable' => 1};
        }

        if ($status eq 'F') {
            print STDERR $output." has already Failed ".$self->agent_n
+ame." on a previous run and must be investigated on ${running_hostnam
+e}\n";
            return {'not_runnable' => 1};
        }
    }

    if ($self->prerequisites) {
        foreach my $prereq_field (@{$self->prerequisites}) {
            unless ($entry->content->{$prereq_field} == 1) {
                print STDERR $output." has not finished ${prereq_field
+}\n";
                return {'not_runnable' => 1};
            }
        }
    }

    my $content = $entry->content;

    # first attempt to set the hostname of the machine as the value of
+ the agent
    my $hostname = Sys::Hostname::hostname;
    $content->{$self->agent_name} = 'r:'.$hostname;
    eval { 
        $entry->content($content); 
    };
    if ($@) {
        # this is a collision, which is to be treated as if it is not 
+runnable
        print STDERR $output." lost ".$self->agent_name." on ${hostnam
+e}\n";
        return {'not_runnable' => 1};
    }

    sleep 3;
    my $nentry;
    eval {
        $nentry = $self->get_entry();
    };
    if ($@) {
        # this is a collision, which is to be treated as if it is not 
+runnable
        print STDERR $output." lost ".$self->agent_name." on ${hostnam
+e}\n";
        return {'not_runnable' => 1};
    }

    my $check = $nentry->content->{$self->agent_name};
    my ($status, $running_hostname) = split /\:/, $check;
    return $nentry if ($hostname eq $running_hostname);
    print STDERR $output." lost ".$self->agent_name." on ${hostname}\n
+";
    return {'not_runnable' => 1};
}

sub fail_entry {
    my $self = shift;
    my $update_entry = shift;

    my $entry = $self->get_entry();
    my $hostname = Sys::Hostname::hostname;
    my $content = $entry->content;
    if ($update_entry) {
        print STDERR "Updating entry\n";
        foreach my $key (keys %{$update_entry}) {
            $content->{$key} = $update_entry->{$key};
        }
    }

    $content->{$self->agent_name} = 'F:'.$hostname;
    $entry->content($content);
}

sub complete_entry {
    my $self = shift;
    my $update_entry = shift;

    print STDERR "All Complete\n";
    my $entry = $self->get_entry();
    my $content = $entry->content;
    if ($update_entry) {
        print STDERR "Updating entry\n";
        foreach my $key (keys %{$update_entry}) {
            $content->{$key} = $update_entry->{$key};
        }
    }
    $content->{$self->agent_name} = 1;
    $entry->content($content);
}

sub mail_error {
    my ($self, $error) = @_;

    my $output = '';
    foreach my $bound_arg (keys %{$self->bind_key_fields}) {
        $output .= join(' ', $bound_arg, $self->bind_key_fields->{$bou
+nd_arg})." ";
    }

    my $prefix = join(' ', Sys::Hostname::hostname, $output, $self->ag
+ent_name);
    eval {
        my $mailer = new Net::SMTP::TLS(  
                                          'smtp.gmail.com',  
                                          Hello   =>      'smtp.gmail.
+com',  
                                          Port    =>      587,  
                                          User    =>      $self->confi
+g->{guser},  
                                          Password =>      $self->conf
+ig->{gpass});
        $mailer->mail($self->config->{reply_email});  
        $mailer->to($self->config->{send_to});  
        $mailer->data;  
        $mailer->datasend(join("\n", $prefix,$error));
        $mailer->dataend;  
        $mailer->quit;
    };
}

1;  # End of IGSP::GoogleAgent
__END__

=head1 NAME

IGSP::GoogleAgent - A Distributed Agent System using Google Spreadshee
+ts

=head1 VERSION

Version 0.01

=head1 SYNOPSIS

  use IGSP::GoogleAgent;

  my $google_agent = IGSP::GoogleAgent->new(
                                          agent_name => $goal,
                                          page_name => $google_page,
                                          debug => $debug,
                                          max_selves => $max, 
                                          bind_key_fields => {
                                               'foo' => 'this_particul
+ar_foo'
                                          },
                                          prerequisites => [ 'isitdone
+', 'isthisone' ],
                                          subsumed_by => {
                                                           'someother_
+agent.pl' => 3,
                                                           'someother_
+process' => 1
                                                         }
                                          );

  $google_agent->run_my(sub {
                               print STDERR "THIS ONE PASSES!!!";
                               return 1;
                        });


  $google_agent->run_my(sub {
                               print STDERR "THIS ONE FAILS AND EITHER
+ EMAILS OR PRINTS THIS ERROR TO STDERR (depending on debug)!!!";
                               return;
                        });

  $google_agent->run_my(sub {
                               print STDERR "THIS ONE PASSES AND UPDAT
+ES THE 'cool' field in the spreadsheet!!!";
                               return (1, {'cool' => 'really cool'});
                        });


=head1 DESCRIPTION

  IGSP::GoogleAgent is a framework for creating massively distributed 
+pipelines
  across many different servers, each using the same google spreadshee
+t as a
  control panel.  It is extensible, and flexible.  It doesnt specify w
+hat
  goals any pipeline should be working towards, or which goals are pre
+requisites
  for other goals, but it does provide logic for easily defining these
+ relationships
  based on your own needs.  It does this by providing a subsumption ar
+chitecture,
  whereby many small, highly focused agents are written to perform spe
+cific goals,
  and also know what resources they require to perform them.  In addit
+ion, it is
  designed from the beginning to support the creation of simple human-
+computational
  workflows.

=head1 CONFIGURATION

  Scripts which use IGSP::GoogleAgents must run in the 'agent_bin' dir
+ectory
  within the same agent root as the 'config' directory where the agent
+.conf.yaml
  file is contained.  See that file for more details on what is config
+ured.

=head1 METHODS

=head2 new

 This method constructs a new instance of an IGSP::GoogleAgent.  An in
+stance must
 specify its name, the name of the Worksheet within the spreadsheet th
+at it is
 working off, and values for the required key_field(s) within the conf
+iguration
 which will result in a single row being returned from the given sprea
+dsheet.
 Optionally, you can specify an ArrayRef of prerequisite fields in the
+ spreadsheet
 which must be true before the agent can run, whether to print out deb
+ug information
 to the terminal, or email the errors using the configured email only 
+on errors (default),
 the maximum number of agents of this name to allow to run on the give
+n machine,
 and a HashRef of processes which, if a certain number are already run
+ning on the machine,
 should cause the agent to exit without running.

 required:
  agent_name => Str
  page_name => Str
  bind_key_fields => HashRef { key_field_name => bound_value, ... }

 optional:
  prerequisites => []
  debug => Bool
  max_selves => Int
  subsumed_by => { process_name => max_allowed, ... }

  This method will throw an exception if bind_key_fields are
  not supplied for required key_fields, as specified in the
  configuration.

  Also, there must be a field in the spreadsheet name for the agent_na
+me.
  This field will be filled in with the status of the agent for a part
+icular
  row, e.g. 1 for finished, r:hostname for running, or f:hostname for 
+failure.

=head2 run_my

  This method takes a subroutine codeRef as an argument.  It then chec
+ks to determine
  if the agent needs to run for the given bind_key_field(s) specified 
+row (it must
  have a 1 in the 'ready' field for the row, and the agent_name field 
+must be empty),
  whether any prerequisite fields are true, whether the agent is subsu
+med by something
  else running on the machine, and whether there are not already max_s
+elves other
  instances of the agent running on the machine.  If all of these are 
+true, it then
  attempts to fill its hostname into the field for the agent_name.  If
+ it succeeds,
  it will then run the code_ref.  If it does not succeed (such as if a
+n instance 
  running on another server already chose that job and won the field) 
+it exits.

  The coderef can do almost anything it wants to do, but it must retur
+n one of the following:

=over 3

=item return true

  This instructs IGSP::GoogleAgent to place a 1 (true) value in the fi
+eld for the agent on
  the spreadsheet, signifying that it has been completed.

=item return false

  This instructs IGSP::GoogleAgent to place F:hostname into the field 
+for the agent on the
  spreadsheet, signifying that it has failed.  It will not run again f
+or this job until the
  failure is cleared from the spreadsheet (by any other agent).

=item return (true|false, HashRef)

  This does what returning true or false does, as well as allowing spe
+cific fields in the 
  spreadsheet to also be modified by the calling code.  The HashRef sh
+ould contain keys
  only for those fields to be updated (it should not attempt to update
+ the field for the
  agent_name itself, as this will be ignored).

=back

  In addition, the coderef can print to STDOUT and STDERR.  If the age
+nt was instantiated in
  debug mode (true), it will print these to their normal destination. 
+ If the agent was
  instantiated without debug mode (the default), STDOUT and STDERR are
+ captured, and, if
  the codeRef returned false, emailed to the address specified in the 
+configuration using the
  same google account that configures access to the google spreadsheet
+.

  One thing the agent must try at all costs to avoid is dying during t
+he subref (e.g. use
  eval for anything that you dont have control over).  It should alway
+s try to return one
  of the valid return states so that the spreadsheet status can be upd
+ated correctly.

=head2 agent_name

 This returns the name of the agent, in case it is needed by the calli
+ng code for other reasons.

=head2 debug

 This returns the debug state specified in the constructor.

=head2 google_db

 This returns the actual Net::Google::Spreadsheet object used
 by the agent, in case other types of queries,  or modifications
 need to be made that do not fit within this system.

=head1 AUTHOR

Darin London, C<< <darin.london at duke.edu> >>

=head1 BUGS

Please report any bugs or feature requests to C<bug-igsp-googleagent a
+t rt.cpan.org>, or through
the web interface at L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=
+IGSP-GoogleAgent>.  I will be notified, and then you'll
automatically be notified of progress on your bug as I make changes.


=head1 SUPPORT

You can find documentation for this module with the perldoc command.

    perldoc IGSP::GoogleAgent


You can also look for information at:

=over 4

=item * RT: CPAN's request tracker

L<http://rt.cpan.org/NoAuth/Bugs.html?Dist=IGSP-GoogleAgent>

=item * AnnoCPAN: Annotated CPAN documentation

L<http://annocpan.org/dist/IGSP-GoogleAgent>

=back

=head1 SEE ALSO

L<Net::Google::Spreadsheets>
L<Moose>

=head1 COPYRIGHT & LICENSE

Copyright 2009 Darin London.

This program is free software; you can redistribute it and/or modify i
+t
under the terms of either: the GNU General Public License as published
by the Free Software Foundation; or the Artistic License.

See http://dev.perl.org/licenses/ for more information.

=head1 PerlMonks Nodes

RFC: 798154
Code: 798311

=cut
Replies are listed 'Best First'.
Re: Google Spreadsheet Distributed Agent System
by dmlond (Acolyte) on Oct 13, 2009 at 19:23 UTC

    updated to allow config to be overridden by constructor parameters.

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: sourcecode [id://798311]
help
Chatterbox?
and the web crawler heard nothing...

How do I use this?Last hourOther CB clients
Other Users?
Others musing on the Monastery: (8)
As of 2024-03-28 09:58 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?

    No recent polls found