This is a rough draft of a module I intend to submit to CPAN soon, posted here for review and comments from the community.
The interface is still in development; this "version 0.001" might not be the same code that is eventually uploaded to CPAN and the interface might change in later versions.
package POE::Filter::Tor::TC1; # -*- CPerl -*-
use strict;
use warnings;
use POE::Filter::Line;
use vars qw($VERSION @ISA);
$VERSION = '0.001';
@ISA = qw(POE::Filter::Line);
use constant CRLF => "\015\012";
use constant LINE_BUFFER => POE::Filter::Line::FIRST_UNUSED + 0;
use constant LINE_STATE => POE::Filter::Line::FIRST_UNUSED + 1;
use constant FIRST_UNUSED => POE::Filter::Line::FIRST_UNUSED + 2;
use constant ST_IDLE => 0; # after a final response line is rece
+ived
use constant ST_CONT => 1; # after a continuation line is receiv
+ed
use constant ST_DATA => 2; # collecting to terminating "." on it
+s own line
sub new {
my $class = shift;
# TC1 is a line-oriented protocol, with "network" CRLF line endings.
$class->SUPER::new(Literal => CRLF);
}
# sub get_one_start inherited from POE::Filter::Line
sub get_pending {
my $self = shift;
my @data = ();
@data = map {$_.CRLF} @{$self->[LINE_BUFFER]} if $self->[LINE_BUFFER
+];
my $loose = $self->SUPER::get_pending;
push @data, @$loose if $loose;
return \@data if @data;
return undef;
}
sub get_one {
my $self = shift;
my $lines = $self->[LINE_BUFFER] || [];
my $state = $self->[LINE_STATE] || ST_IDLE;
my $line = $self->SUPER::get_one; # get one line
# first pass to accumulate a complete response in the buffer
while (@$line) {
for (@$line) { # (alias $_ to the element in @$line)
if ($state == ST_IDLE || $state == ST_CONT) {
# starting or continuing a reply message
unless (m/^[[:digit:]]{3}([- +])/)
{ die "invalid response received: $_" }
# $1 -- response continuation flag
if ($1 eq '-') # further lines follow in same reply
{ $state = ST_CONT }
elsif ($1 eq '+') # begin multiline data block
{ $state = ST_DATA }
else # $1 eq ' ' # last line in reply
{ $state = ST_IDLE }
} elsif ($state == ST_DATA) {
# collecting data
$state = ST_CONT if m/^[.]$/;
} else
{ die "invalid internal state $state" }
push @$lines, $_;
}
} continue { $line = ($state == ST_IDLE) ? [] : $self->SUPER::get_on
+e }
if ($state == ST_IDLE && @$lines) { # the buffer contains a complete
+ response
# reset object buffers
$self->[LINE_BUFFER] = [];
$self->[LINE_STATE] = ST_IDLE;
# check, collect and return data
return [_parse_reply(_check_reply($lines))];
}
$self->[LINE_BUFFER] = $lines;
$self->[LINE_STATE] = $state;
return []; # waiting for more data to arrive
}
# helper for sub get_one
sub _check_reply {
my $lines = shift;
# check buffered response
$lines->[0] =~ m/^([[:digit:]]{3})/
or die "invalid first line: ".$lines->[0];
my $first_code = $1;
my $async_message = (6 eq substr $1,0,1); my $mixed_async = 0;
my $all_codes_match = 1;
my $state = ST_CONT;
foreach (@$lines) {
if ($state == ST_CONT) {
m/^([[:digit:]]{3})([- +])/; # will always match
# state tracking
if ($2 eq '-') { $state = ST_CONT }
elsif ($2 eq '+') { $state = ST_DATA }
else { $state = ST_IDLE }
# message validation
$all_codes_match = 0 unless $first_code eq $1;
if ($async_message) { $mixed_async = 1 unless m/^6/ }
else { $mixed_async = 1 if m/^6/ }
} elsif ($state == ST_DATA) {
if (m/^[.]$/) { $state = ST_CONT }
} else { # idle state reached but more data follows
die "returned to idle state with more lines pending\n".
"at \"$_\" in\n".
join("\n", @$lines);
}
}
unless ($state == ST_IDLE)
{ die "incomplete response processed:\n".join("\n", @$lines) }
if ($mixed_async) # explicitly disallowed in protocol spec
{ die "async/sync responses conflated:\n".join("\n", @$lines) }
return ($lines, $all_codes_match)
}
# helper for sub get_one
sub _parse_reply {
my $lines = shift;
my $all_codes_match = shift;
if ($all_codes_match) {
# decode general response
#
# In general responses, the first line is always parsed as a seri
+es of
# positional fields followed by an optional series of keyword fie
+lds.
# Subsequent lines can additionally be parsed as bi-level keyword
# fields, if they begin with a single space-delmited token, follo
+wed
# by keyword fields. This bi-level syntax is used by PROTOCOLINF
+O.
#
my @result = (substr $lines->[0],0,3);
my %attrs = ();
my $target = undef;
my $state = ST_IDLE;
my $is_error = ($result[0] =~ m/^[45]/);
foreach (@$lines) {
if ($state == ST_CONT || $state == ST_IDLE) {
substr $_,0,3,''; # remove status code
my $flag = substr $_,0,1,''; # extract continuation flag
if ($is_error) {
push @result, $_; # copy message for errors
} elsif ($flag ne '+' && m/^([^"=\s]+)=(.+)$/) {
# lines beginning with a keyword carry one field to end-of-line
$attrs{$1} = $2;
} elsif ($state == ST_CONT && m/^([^"=\s]+)(?=\s[^"=\s]+=)/g) {
# lines beginning with a space-delmited keyword carry bi-level f
+ields
my $tag = $1;
$attrs{$tag}{$1} = $2
while m/\G\s*([^"=\s]+)=([^"\s]+|"(?:[^"\\]+|\\"?)+")/g;
} else {
# other lines may contain keyword fields after positional fields
push @result, m/\G\s*([^"=\s]+|"(?:[^"\\]+|\\"?)+")(?=\s|$)/gc;
$attrs{$1} = $2
while m/\G\s*([^"=\s]+)=([^"\s]+|"(?:[^"\\]+|\\"?)+")/g;
}
if ($flag eq '+') {
$state = ST_DATA;
if (m/\G\s*([^"=\s]+)=$/)
{ $attrs{$1} = ''; $target = \$attrs{$1} }
else
{ push @result, ''; $target = \$result[-1] }
}
$state = ST_CONT if $state == ST_IDLE;
} else { # ($state == ST_DATA) # collect data line
if (m/^[.]$/) { $state = ST_CONT; $target = undef; next }
$$target .= $_."\n";
}
}
# process backslash escapes in double-quoted values
s/^"(.*)"$/$1/ && s/\G([^\\]*)\\(.)/$1$2/g
for @result, values %attrs;
foreach (grep ref, values %attrs)
{ s/^"(.*)"$/$1/ && s/\G([^\\]*)\\(.)/$1$2/g for %$_ }
push @result, \%attrs if scalar keys %attrs;
return \@result
} else {
# return special result structure indicating varying response code
+s
my @result = (undef);
my $state = ST_CONT;
my $target = undef;
foreach (@$lines) {
if ($state == ST_CONT) {
my @row = (substr $_,0,3,''); # extract status code
my $flag = substr $_,0,1,''; # extract continuation flag
push @row, $_; # omit line-splitting in this mode
if ($flag eq '+') {
$state = ST_DATA;
$target = \$row[-1];
}
push @result, \@row;
} else { # ($state == ST_DATA) # collect data line
if (m/^[.]$/) { $state = ST_CONT; $target = undef; next }
$$target .= $_."\n";
}
}
return \@result;
}
}
# sub put inherited from POE::Filter::Line for now
1;
__END__
=head1 NAME
POE::Filter::Tor::TC1 - Tor Control Protocol 1 filter for POE
=head1 SYNOPSIS
use Data::Dumper;
use POE qw(Filter::Tor::TC1 Component::Client::TCP);
POE::Component::Client::TCP->new(
RemoteAddress => 'localhost',
RemotePort => 9051,
Filter => 'POE::Filter::Tor::TC1',
Connected => sub {
$_[HEAP]{server}->put("protocolinfo")
},
ServerInput => sub {
my $input = $_[ARG0];
print Data::Dumper->Dump([$input],[qw(ProtocolInfo)]);
$_[HEAP]{server}->put("quit");
$_[KERNEL]->yield("shutdown");
},
);
POE::Kernel->run();
=head1 DESCRIPTION
POE::Filter::Tor::TC1 is a L<POE::Filter> for sending commands to and
parsing replies from a Tor node on its control interface.
=head1 METHODS
POE::Filter::Tor::TC1 implements the interface defined in
L<POE::Filter/PUBLIC INTERFACE>.
=head1 AUTHOR
Jacob Bachmeyer, E<lt>jcb@cpan.orgE<gt>
=head1 SEE ALSO
L<POE::Filter>
L<https://www.torproject.org/>
L<https://gitweb.torproject.org/torspec.git/tree/control-spec.txt>
=head1 COPYRIGHT AND LICENSE
Copyright (C) 2021 by Jacob Bachmeyer
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.
=cut
I note with some amusement that the test file is longer than the module itself. :-)
# Unit tests for POE::Filter::Tor::TC1 module # -*- CPerl -
+*-
use strict;
use warnings;
use if $ENV{AUTOMATED_TESTING}, Carp => 'verbose';
use Storable qw(dclone);
use Test::More tests
=> 2 # loading tests
+ 9 # reply validation helper tests
+ 18*2# reply parser helper tests (later repeated with full filter)
+ 1 # bogus input test
+ 8 # pending buffer tests
+ 2; # exception test with invalid state
BEGIN { use_ok('POE::Filter::Tor::TC1')
or BAIL_OUT("POE::Filter::Tor::TC1 failed to load") }
BEGIN {
my $fail = 0;
eval q{use POE::Filter::Tor::TC1 9999.999; $fail = 1};
ok($fail == 0
&& $@ =~ m/POE::Filter::Tor::TC1.* version 9999.*required--this is
+ only/,
'POE::Filter::Tor::TC1 version check')
}
# ----------------------------------------
BEGIN { *_check_reply = \&POE::Filter::Tor::TC1::_check_reply }
sub test_check_reply {
my @result = _check_reply(shift);
my $expected_flags = shift;
shift @result; # discard input array reference
is_deeply(\@result, $expected_flags, shift)
or explain \@result
}
test_check_reply
['250 OK'], [1],
'validate simple response';
test_check_reply
['250+data follows:',
'blob blob blob',
'.',
'250 OK'], [1],
'validate simple data block response';
test_check_reply
['250-line 1',
'250-line 2',
'250 line 3'], [1],
'validate multi-line response with common status code';
test_check_reply
['250-item 1 ok',
'550-item 2 not ok',
'250-item 3 ok',
'550 item 4 not ok'], [0],
'validate multi-line response with mixed status codes';
subtest 'reject response without initial status code' => sub {
plan tests => 2;
my $fail = 0;
eval {test_check_reply ['bogus bogus bogus',
'.',
'250 OK'], [], ''; $fail = 1};
ok(!$fail, 'validating bogus record throws exception');
like($@, qr/invalid first line:/,
'validating record without initial status throws expected excep
+tion');
};
subtest 'reject incomplete response' => sub {
plan tests => 2;
my $fail = 0;
eval {test_check_reply ['250-BEGIN',
'250-MORE'], [], ''; $fail = 1};
ok(!$fail, 'validating bogus record throws exception');
like($@, qr/incomplete response/,
'validating record with incomplete response throws expected exc
+eption');
};
subtest 'reject multiple responses in same call' => sub {
plan tests => 2;
my $fail = 0;
eval {test_check_reply ['250-OK',
'250 OK',
'250-OK',
'250 OK'], [], ''; $fail = 1};
ok(!$fail, 'validating bogus record throws exception');
like($@, qr/returned to idle state/,
'validating record with multiple responses throws expected exce
+ption');
};
subtest 'reject mixed async/sync response' => sub {
plan tests => 2;
my $fail = 0;
eval {test_check_reply ['650-OK',
'250-OK',
'650-OK',
'250 OK'], [], ''; $fail = 1};
ok(!$fail, 'validating bogus record throws exception');
like($@, qr/async\/sync responses conflated:/,
'validating record with async/sync mix throws expected exceptio
+n');
};
subtest 'reject mixed sync/async response' => sub {
plan tests => 2;
my $fail = 0;
eval {test_check_reply ['250-OK',
'650-OK',
'250-OK',
'650 OK'], [], ''; $fail = 1};
ok(!$fail, 'validating bogus record throws exception');
like($@, qr/async\/sync responses conflated:/,
'validating record with sync/async mix throws expected exceptio
+n');
};
# ----------------------------------------
BEGIN { *_parse_reply = \&POE::Filter::Tor::TC1::_parse_reply }
# some sample responses taken from the Tor control protocol spec
my @REPLY_PARSE_TESTS =
([['250 OK']
=> [250, 'OK'],
'simple response'],
[['451 Resource exhausted']
=> [451, 'Resource exhausted'],
'simple error response'],
[['650 CIRC 1000 EXTENDED moria1,moria2']
=> [650, CIRC => 1000, EXTENDED => 'moria1,moria2'],
'sample circuit extension event'],
[['650-CIRC 1000 EXTENDED moria1,moria2 0xBEEF',
'650-EXTRAMAGIC=99',
'650 ANONYMITY=high']
=> [650, CIRC => 1000, EXTENDED => 'moria1,moria2', '0xBEEF',
{EXTRAMAGIC => 99, ANONYMITY => 'high'}],
'sample extended circuit extension event'],
[['650 ADDRMAP ddg.gg 52.149.246.39 "2021-02-11 20:34:04"'
.' EXPIRES="2021-02-12 02:34:04" CACHED="NO"']
=> [650, ADDRMAP => 'ddg.gg', '52.149.246.39', '2021-02-11 20:34:0
+4',
{EXPIRES => '2021-02-12 02:34:04', CACHED => 'NO'}],
'sample ADDRMAP event with quoted strings'],
[['650 ADDRMAP ddg.gg 52.149.246.39 "2021-02\\\\-11\\" \\"20\\\\:34
+:04"'
.' EXPIRES="2021\\\\-02-\\"12 02\\":34\\\\\\:04" CACHED="NO"']
=> [650, ADDRMAP => 'ddg.gg', '52.149.246.39', '2021-02\\-11" "20\
+\:34:04',
{EXPIRES => '2021\\-02-"12 02":34\\:04', CACHED => 'NO'}],
'sample ADDRMAP event with quoted strings and escape handling'],
[['250 SafeLogging=0']
=> [250, {SafeLogging => 0}],
'sample GETCONF response (one item)'],
[['250 SocksPort']
=> [250, 'SocksPort'],
'sample GETCONF response (one item; default)'],
[['250-Log=notice stdout',
'250 SafeLogging=0']
=> [250, {Log => 'notice stdout', SafeLogging => 0}],
'sample GETCONF response (two items)'],
[['250-Log=notice stdout',
'250-SocksPort',
'250-NATDPort',
'250 SafeLogging=0']
=> [250, 'SocksPort', 'NATDPort',
{Log => 'notice stdout', SafeLogging => 0}],
'sample GETCONF response (four items; two default)'],
[['250-OldAddress1=NewAddress1',
'250 OldAddress2=NewAddress2']
=> [250, {OldAddress1 => 'NewAddress1', OldAddress2 => 'NewAddress
+2'}],
'sample MAPADDRESS response with common reply code'],
[['512-syntax error: invalid address \'@@@\'',
'250 127.199.80.246=bogus1.google.com']
=> [undef,
[512, 'syntax error: invalid address \'@@@\''],
[250, '127.199.80.246=bogus1.google.com']],
'sample MAPADDRESS response with varying reply code'],
[['250-PROTOCOLINFO 1',
'250-AUTH METHODS=NULL',
'250-VERSION Tor="0.2.0.5-alpha"',
'250 OK'],
=> [250, PROTOCOLINFO => 1, 'OK',
{AUTH => { METHODS => 'NULL' },
VERSION => { Tor => '0.2.0.5-alpha'}}],
'sample PROTOCOLINFO response'],
[['250-PROTOCOLINFO 1',
'250-AUTH METHODS=NULL',
'250-VERSION Tor="0.\\\\2\\".0\\\\\\\\.\\"5-alpha\\\\"',
'250 OK'],
=> [250, PROTOCOLINFO => 1, 'OK',
{AUTH => { METHODS => 'NULL' },
VERSION => { Tor => '0.\\2".0\\\\."5-alpha\\'}}],
'sample PROTOCOLINFO response with escape handling'],
[['250-ServiceID=testboguskeydata',
'250-PrivateKey=RSA1024:base64/key/data/bits=',
'250 OK']
=> [250, 'OK',
{ServiceID => 'testboguskeydata',
PrivateKey => 'RSA1024:base64/key/data/bits='}],
'sample ADD_ONION response'],
[['650+NS',
'blob blob blob',
'blob of blob',
'blob blob blob',
'.',
'650 OK']
=> [650, 'NS', <<__BLOB__, 'OK'],
blob blob blob
blob of blob
blob blob blob
__BLOB__
'async blob response'],
[['250+desc/name/moria=',
'moria descriptor line 1',
'moria descriptor line 2',
'.',
'250-version=Tor 0.1.1.0-alpha-cvs',
'250 OK'],
=> [250, 'OK', {version => 'Tor 0.1.1.0-alpha-cvs',
'desc/name/moria' => <<__BLOB__}],
moria descriptor line 1
moria descriptor line 2
__BLOB__
'sample GETINFO response with blob'],
[['512-invalid name \'@@@\'',
'250+desc/name/moria=',
'moria descriptor line 1',
'moria descriptor line 2',
'.',
'250-info/item/1=info item 1',
'250 OK']
=> [undef,
[512, 'invalid name \'@@@\''],
[250, 'desc/name/moria='.<<__BLOB__],
moria descriptor line 1
moria descriptor line 2
__BLOB__
[250, 'info/item/1=info item 1'],
[250, 'OK']],
'sample mixed status code response with blob'],
);
foreach my $test (@REPLY_PARSE_TESTS) {
my $parsed = _parse_reply(_check_reply(dclone($test->[0])));
is_deeply($parsed, $test->[1], $test->[2])
or diag explain $parsed;
}
# ----------------------------------------
subtest 'bogus input' => sub {
plan tests => 3;
my $fail = 0;
my $filter = POE::Filter::Tor::TC1->new;
$filter->get_one_start(["bogus bogus", " bogus\015\012",
"250 OK\015\012"]);
my $result; eval {$result = $filter->get_one; $fail = 1};
ok(!defined $result, 'no result from reading bogus record')
or diag explain $result;
ok(!$fail, 'reading bogus record throws exception');
like($@, qr/invalid response received:/,
'reading bogus record at idle throws expected exception');
};
# ----------------------------------------
{
my $filter = POE::Filter::Tor::TC1->new;
my $result = $filter->get_pending;
is($result, undef, 'buffer is initially empty')
or diag explain $result;
$filter->get_one_start(["250 OK\015\012"]);
$result = $filter->get_one;
is_deeply($result, [[250, 'OK']],
'simple response through filter')
or diag explain $result;
$filter->get_one_start(["250-line 1\015\012"]);
$result = $filter->get_one;
is_deeply($result, [],
'empty result with partial response buffered (1)')
or diag explain $result;
$result = $filter->get_pending;
is_deeply($result, ["250-line 1\015\012"],
'buffer contains initial line')
or diag explain $result;
$filter->get_one_start(["250"]);
$result = $filter->get_one;
is_deeply($result, [],
'empty result with partial response buffered (2)')
or diag explain $result;
$result = $filter->get_pending;
is_deeply($result, ["250-line 1\015\012", "250"],
'buffer contains partial response')
or diag explain $result;
$filter->get_one_start([" OK\015\012"]);
$result = $filter->get_one;
is_deeply($result, [[250, 'line', '1', 'OK']],
'sample response parsed when complete')
or diag explain $result;
$result = $filter->get_pending;
is($result, undef, 'buffer empty after reading complete response'
+)
or diag explain $result;
}
# ----------------------------------------
{
my $filter = POE::Filter::Tor::TC1->new;
foreach my $test (@REPLY_PARSE_TESTS) {
my $parsed = $filter->get([map {$_."\015\012"} @{$test->[0]}]);
is_deeply($parsed, [$test->[1]], 'filtered '.$test->[2])
or diag explain $parsed;
}
# intentionally corrupt the filter state to test last edge case
$filter->[POE::Filter::Tor::TC1::LINE_STATE] = 42;
my $fail = 0;
my $result; eval {$result = $filter->get(["bogus bogus bogus\015\012
+"]);
$fail = 1};
ok(!$fail, 'exception thrown due to invalid state');
like($@, qr/invalid internal state 42/,
'expected exception thrown due to invalid state');
}