#!/usr/bin/perl
=foo
Update: added rules
Update: on my darwin i set up the relevant rule with
ipfw add 65500 divert 65500 tcp from me to any via en0
replace with your iface of choice, naturally. I haven't set this up on
linux (yet), but I know divert sockets were ported at some point...
ciao ciao!
This little script was hell to write, because every time i didn't do t
+he
firewall rule right, and had a fix for the script, my netinfo server
couldn't be contacted... this ment that i couldn't run
ipfw flush
regardless, i've finally made it. It now prioritises using a size base
+d
queue, which is really nothing compared to what you can do. Suprisingl
+y
it's not a resource hog, and there's still some stuff to implement, su
+ch
as timeout based dropping using a journal (the purger is written, but
not the logger), and priority queue clipping using mapl's janus heaps.
+..
I guess i'll prioritize the cmp sub so that it puts tcp SYN packets, a
+nd
non piggybacked ACKS, aswell as short ICMP and UDP at the top
automatically. Then I'll prioritise by service...
so far only the heap has made bittorrent and perlmonks chat possible a
+t
the same time, even when running several bts at a time... I must say I
was surprised.
Replace "#print" with ";print" for debugging info.
Unresolved bug: within the SIGIO handler recv will sometimes return
undefined, but the loop check doesn't pick it up, so pack complains. I
have no idea why.
Bah. Enough yak, here's the stuff:
=cut
use strict;
use warnings;
use Socket;
use Time::HiRes qw(time sleep alarm);
use Fcntl qw(F_SETOWN F_GETFL F_SETFL O_ASYNC O_NONBLOCK);
use Array::Heap;
#use Array::Heap2; # same thing, with a different namespacs
use NetPacket::IP qw(IP_PROTO_ICMP IP_PROTO_TCP IP_PROTO_UDP);
use NetPacket::TCP;
use NetPacket::UDP;
use NetPacket::ICMP;
use Scalar::Util qw(dualvar);
### KNOWN BUGS
# this code is very dirty and hackish, may be resolved in the future
### CHANGES
# added real rules to order the heaps. A simple API and examples at th
+e bottom...
# currently aggressively user oriented, with a bit of space for standa
+rd unixish servers
### TODO
# score changes dynamically based on time... Instability in heap could
# be negligeable at this point, as TCP is fault tolerant and UDP
# shouldn't care. ICMP should a have high enough priority
#
# purge using timeout, not just by clipping the queue
#
# both of these are theoretically solveable using references within th
+e queue,
# so that you could undef and change the value of the referants (thing
+ys) in
# the heap itself. I wonder if make_heap instead of push_heap would be
+ much worse.
# This way we could take a reference to an element in the heap.
# aggressive purging minimizes wasted bandwidth:
# have an x second maximum queue life (to be implemented)
# purge heap size:
# if (overflow){
# reverse order
# reverse heapify
# reverse pop
# reverse order
# }
#
# push
# user config
sub PORT () { 65500 }; # the port to bind on
sub WANTED_BYTES_PER_SECOND () { 94 * 128 }; # cap ( * 128 for kilobit
+s, * 1024 for kilobytes )... divide by ERROR
sub SLEEP_INTERVAL () { 0.05 } ## smaller = smoother, more overhead.
+For some perspective:
## 0.01 is usually the size of a time
+slice on unices... (linux lets you get picky)
## 0.03 gives good response on ~10K, 0
+.05 on 100K and more. the question is really
## how many packets do you burst befor
+e delaying for them all. If you send 100 packets
## a second the overhead of calling sl
+eep will cause a significant delay
sub PURGE_INTERVAL () { 2 } ## how often are packets purged
sub PURGE_TIMEOUT () { 20 } ## how long can a packet live in the queue
+... only as accurate as PURGE_INTERVAL
sub QUEUE_LIMIT () { 32 } ## how many packets are allowed to be in the
+ queue at any given time
# constants
sub ERROR () { 0.996 }; # clipped average on my machine, for 12 * 1024
+, 64 * 1024, 128 * 1024
sub BYTES_PER_SECOND () { WANTED_BYTES_PER_SECOND / ERROR };
sub SLICE () { BYTES_PER_SECOND * SLEEP_INTERVAL }; # a slice of data
+corresponding to SLEEP_INTERVAL
sub NOQUEUE () { O_ASYNC | O_NONBLOCK };
sub PACK_QUEUE () { "a16 a*" };
sub PACK_DEPT () { "d S" }; ## F: less overhead, innacurate. d: accura
+te, more overhead,
## D: less overhead, accurate, needs long
+double support (perl -V).
sub PURGE_MARKERS () { int(PURGE_TIMEOUT / PURGE_INTERVAL) }
# variables
my ($dept,@dept) = (0);
my ($qcnt,@qjournal,@queue) = (0,((undef) x PURGE_MARKERS));
#print "Initializing...\n";
#print "SLICE=", SLICE, "\n";
my $rules = new Rules;
install_rules($rules);
### hackery for priority queue implmentation
sub queue (@) { ## use $rules to compute score, push, janusify and pop
+ if needed
my @packets = @_;
foreach my $packet (@packets){
$qcnt++;
$packet = dualvar $rules->evaluate(substr($packet,16)), $packe
+t;
# add journal entry
}
if ($qcnt > QUEUE_LIMIT){
#print "queue has exceeded limit, clipping\n";
@queue = reverse @queue;
make_heap_cmp { $b <=> $a } @queue;
while ($qcnt > QUEUE_LIMIT){
pop_heap_cmp { $b <=> $a } @queue;
$qcnt--;
}
@queue = reverse @queue;
}
push_heap @queue, @packets;
}
sub dequeue (){ ### pop while defined
$qcnt--;
pop_heap @queue;
}
### hackery ends
## set up socket
socket D, PF_INET, SOCK_RAW, getprotobyname("divert") or die "$!";
bind D,sockaddr_in(PORT,inet_aton("0.0.0.0")) or die "$!";
fcntl D, F_SETOWN, $$;
#print "fcntl returned async ", ((fcntl D, F_GETFL, 0 & NOQUEUE) ? "on
+" : "off"), "\n";
#$SIG{ALRM} = sub {
# if ($qcnt){
# while (defined $qjournal[0]){
# undef ${shift @qjournal}; # undefine the reference
# if (--$qcnt == 1){
# @queue = ();
# @qjournal = ((undef) x PURGE_MARKERS);
# return;
# }
# }; shift @qjournal; # take out one marker
# push @qjournal, undef; # put another in
# }
#};
#alarm PURGE_INTERVAL,PURGE_INTERVAL; # purge old packets every n
my ($ts,$p); # lexical for SIGIO temp usage... don't alloc/daelloc eve
+ry time
$SIG{IO} = sub {
#print time(), " SIGIO: queuing up\n";
while (defined($ts = recv D, $p, 65535, 0)){ # we assume packet
+s can come in faster than SIGIO can be called
# (on my machine,
+128 * 1024 cap, this loops usually picks up 3-4
# packets), so we'
+ll save some context switching on high load,
#print "undefined p ($!)\n" unless defined $p;
#print "undefined ts ($!)\n" unless defined $ts;
queue(pack(PACK_QUEUE, $ts, $p));
}
};
#print "Initialization complete, starting read loop\n";
# start loop
my ($to, $t, $s, $l);
#my ($start, $end, $total); # used to compute ERROR
PACKET: { if (defined ($to = recv D, $_, 65535, 0)){ # blocking rea
+d. the queue is empty. $to is reassigned
# because the
+packet could come from various rules. hack at it
# if it ticks
+you off.
#print time(), " received packet\n";
#print "received: " . length($to) . "\n";
if ($dept < SLICE){
#print time(), " dept is $dept - short circuited, should take
+", length($_) / BYTES_PER_SECOND, " seconds to deliver\n";
send D, $_, 0, $to;
$dept += length($_);
push @dept, pack(PACK_DEPT, time(), length($_) );
redo PACKET;
} else {
#print time(), " queued (too much dept: $dept)\n";
queue(pack(PACK_QUEUE, $to, $_)); # pack is about 1.5 times
+faster than refs (benchmark)
}
# the queue is not empty, or dept needs purging
#print time(), " clearing up queue\n";
fcntl D, F_SETFL, ((fcntl D, F_GETFL, 0)|NOQUEUE); # switch to asy
+nc
#print "fcntl is now noqueue ", ((fcntl D, F_GETFL, 0 & NOQUEUE) ?
+ "on" : "off"), "\n";
# use to compute ERROR
#$start = time;
#$total = 0;
until (not $qcnt){ # until the queue is empty
do {
#print time(), " cleaning out and making up for dept\n";
$t = time;
for (my $i = 0; $i < @dept; $i++){
defined $dept[$i] or next;
($s, $l) = unpack(PACK_DEPT, $dept[$i]);
#print time(), " diff is ", time - $s, ", ", ($l / BYT
+ES_PER_SECOND)," diff needed queue length is $#queue, dept joural has
+ $#dept entries ($dept)\n";
if ($t > $s + ($l / BYTES_PER_SECOND) ){
$dept -= $l;
delete($dept[$i]); # faster than splice
}
}
while (@dept and not exists $dept[0]){ shift @dept }; ## c
+lean out those which are easy
#print time(), " dept is now $dept\n";
#print time(), " will sleep for ", $dept / BYTES_PER_SECON
+D,"\n" if $dept > SLICE;
} while (($dept > SLICE) and sleep $dept / BYTES_PER_SECOND);
+ # sleep (one should suffice, but in case a sig came
+ # (IO, ALRM are used)) until we've cleared the dept
#print time(), " dept is now $dept, flushing a packet\n";
my ($to,$p) = unpack(PACK_QUEUE, dequeue() );
$dept += length($p);
push @dept, pack(PACK_DEPT, time(), length($p) );
#$total += length($p); used to compute ERROR
#print time(), " sent one from queue, dept is now $dept, shoul
+d take ", length($p) / BYTES_PER_SECOND, " seconds to deliver (queue
+left: $#queue)\n";
send D, $p, 0, $to;
!$qcnt ? fcntl D, F_SETFL, ((fcntl D, F_GETFL, 0)&!NOQUEUE) :
+redo ; # unset async. checking here will skip checking
+ # until(!queue), up to the time fcntl is called.
+ # Then a double check is made to avoid a packet
+ # getting stuck in the queue while others are
+ # getting short circuited
#print "fcntl is now noqueue ", ((fcntl D, F_GETFL, 0 & NOQUEU
+E) ? "on" : "off"), "\n";
}
# use this code to determine ERROR
#$end = time;
#my $bps = ($total/($end-$start));
# print "during high load we sent $total bytes in ", $end-$start,
+" seconds, which means ", $bps, " bytes per second.\n";
# print "the ratio of actual rate versus cap is ", $bps/BYTES_PER_
+SECOND, "\n";
#print time(), " queue empty, returned to synchronious IO\n";
# the queue is empty
} redo }
1; # Keep your mother happy.
sub install_rules { ## the rules
$_[0]->install(
### DEPENDANCIES
Rule::Dependancy::Simple->new({ # basic (network unrelated) da
+ta
provides => ["basic"],
evaluate => sub {
my $packet = shift;
my $basic = new Dependancy::Simple;
$basic->set("size",length($packet));
{basic => $basic};
},
}),
Rule::Dependancy::Simple->new({ # ip packet data
provides => ["ip"],
evaluate => sub { {ip => NetPacket::IP->decode($_[0]) } }
}),
Rule::Dependancy::Simple->new({ # tcp packet data
needs => ["ip"],
provides => ["tcp"],
evaluate => sub {
#print "providing tcp packet dependancy\n";
##print "got packet: ", unpack("H*",$_[0]), "\n";
##print "Available dependancies:\n\n", do { use Data::
+Dumper; Dumper $_[1] },"\n";
($_[1]{ip}{proto} == IP_PROTO_TCP) ? {tcp => NetPacket
+::TCP->decode($_[1]{ip}{data}) } : {} }
}),
Rule::Dependancy::Simple->new({ # udp packet data
needs => ["ip"],
provides => ["udp"],
evaluate => sub { ($_[1]{ip}{proto} == IP_PROTO_UDP) ? {ud
+p => NetPacket::UDP->decode($_[1]{ip}{data}) } : {} }
}),
Rule::Dependancy::Simple->new({ # icmp packet data
needs => ["ip"],
provides => ["icmp"],
evaluate => sub { ($_[1]{ip}{proto} == IP_PROTO_ICMP) ? {i
+cmp => NetPacket::ICMP->decode($_[1]{ip}{data}) } : {} }
}),
### RULES
Rule::Simple->new({ # handle Type of Service et cetera (delay
++= 8, thoroughput += 5, reliability += 4, cost += 1, congestion += 2)
needs => ["ip"],
evaluate => sub { 0 },
}),
Rule::Simple->new({ # packet size
needs => ["basic"],
evaluate => sub {
#print "evaluating size based score adjustment\n";
length($_[1]{basic}->get("size")) ? (1.5 * log(length(
+$_[1]{basic}->get("size")))) : 0 }
}),
Rule::Simple->new({ # tcp window size
needs => ["tcp"],
evaluate => sub {
#print "evaluating window size score adjustment\n";
$_[1]{tcp}{winsize} ? 0.1 * log($_[1]{tcp}{winsize})
+: 0 }
}),
Rule::Simple->new({ # icmp conditional
needs => ["icmp"],
evaluate => sub {
#print "packet is icmp, -20\n";
-20 },
}),
Rule::Simple->new({ # udp conditional
needs => ["udp"],
evaluate => sub {
#print "packet is UDP, -6\n";
-6 },
}),
Rule::Simple->new({ # tcp flags
needs => ["tcp"],
evaluate => sub {
#print "evaluating tcp flags\n";
my $flags = $_[1]{tcp}{flags};
my $ret = 0;
# tcp messages with special information have varying d
+egrees of additional importance
$ret -= 1 if $flags & FIN;
$ret -= 8 if $flags & SYN;
$ret -= 20 if $flags & RST; # attempt to help prevent
+ waste by pushing as fast as possible. They're pretty rare anyway
$ret -= 5 if $flags & PSH;
$ret -= 2 if $flags & ACK; # packets without acks ar
+en't as urgent
$ret -= 20 if $flags & URG;
# $ret += 0 if $flags & ECE;
# $ret += 0 if $flags & CWR;
#print "final score: $ret\n";
$ret;
}
}),
Rule::Simple->new({ # generic (udp, tcp) port handling
wants => ["tcp","udp"], # we either have tcp, or tcp
evaluate => sub {
#print "evaluating port rules\n";
my $prot = (exists $_[1]->{tcp}) ? $_[1]{tcp} : $_[1]{
+udp};
my $ret = 0;
my $src = $prot->{src_port};
my $dst = $prot->{dest_port};
#print "ports: dest=$dst, src=$src\n";
SWITCH: { # source port
# unpriviliged ports
$src > 1024 and do {
$ret += 2;
#print "source port is unpriviliged\n";
$ret += 18, last if ($src >= 6881 and $src <=
+6888); # bittorrent
$ret += 17, last if $src == 5501;
+ # hotline
$ret += 15, last if $src == 20;
+ # ftp data
last;
};
# important services
$src == 80 and $ret -= 1, last; # http
$src == 443 and $ret -= 1, last; # https
$src == 143 and $ret -= 4, last; # imap
$src == 110 and $ret -= 4, last; # pop3
$src == 25 and $ret -= 5, last; # smtp
$src == 22 and $ret -= 7, last; # ssh
$src == 21 and $ret -= 6, last; # ftp control
}
SWITCH: { # destination port
$dst > 1024 and do {
$ret += 3;
#print "destination port is unpriviliged\n";
$ret += 16, last if ($dst >= 6881 and $dst <=
+6888) and not ($src >= 6881 and $src <= 6888);
$ret += 15, last if $dst == 5501;
$ret += 14, last if $dst == 20;
last;
};
$dst == 80 and $ret -= 6, last; # http
$dst == 443 and $ret -= 6, last; # https
$dst == 143 and $ret -= 4, last; # imap
$dst == 110 and $ret -= 4, last; # pop3
$dst == 25 and $ret -= 2, last; # smtp
$dst == 22 and $ret -= 10, last; # ssh
$dst == 23 and $ret -= 10, last; # telnet
$dst == 21 and $ret -= 6, last; # ftp ctrl
}
#print "port score: $ret\n";
$ret;
}
}),
)
}
package Rules; # API for joint abstraction - rules depend on common sh
+ared data, and may be added and removed.
# rules evaluate recursive, possibly asynchroniously in the future.
# once a dependancy is solved it may not be altered, and all it's chil
+dren may be computed on it with no locking - methods are supposed to
+return static or unrelated data.
# a dependancy gets it's own
# dependancy is either or: (more complexity may be built by creating e
+mpty dependnancy rules)
# needs -> a strong dependancy list. every dependancy must be met (eva
+luated as soon as all are met)
# wants -> a weak dependnancy list, at least one must be met (evaluate
+d as soon as one is met)
# evaluate -> run the rule, and return either a hash of dependancy obj
+ects, or a score modification
# provides -> currently irrelevant. for hinting install in the future
sub new {
bless [],shift; # dependancy tree, inverse dependancy tree, rules
+pending parent, execution tree
}
sub install { # clear rules that will never have all their dependancie
+s met, and then filter for duplicates
my $self = shift;
# filter here
#print "installing score rules\n";
push @$self,@_;
}
sub evaluate { # evaluate all of the rules and return the sum
my $self = shift;
my $packet = shift;
#no warnings; # perl doesn't like me playing with closures
my %offers;
my %deferred;
my @ruleq;
my $score = 0;
#print "evaluating entire ruleset\n";
foreach my $rule (@$self){
my $dep = [ 0, $rule ];
# build dependancy counter
if ($rule->has_deps){
my @needs;
if ($rule->strong_deps){
@needs = grep { not exists $offers{$_} } $rule->needs;
$dep->[0] = scalar @needs;
} else {
$dep->[0] = 1;
@needs = grep { $dep->[0] and (exists $offers{$_} ? ((
+$dep->[0] = 0),undef) : 1) } $rule->needs;
$dep->[0] or @needs = ();
}
#print "this rule needs (@needs)\n";
foreach my $dependancy (@needs){
$deferred{$dependancy} = $dep;
}
}
push @ruleq,try($packet,\$score,\%offers,\%deferred,$dep);
}
my $last = scalar @ruleq;
while(@ruleq){ # finish the loop
#print "attempting to evaluate remaining rules\n";
push @ruleq, try($packet,\$score,\%offers,\%deferred,shift @ru
+leq);
(last == @ruleq) ? last : ($last = @ruleq); # break an infinit
+e loop
}
#print "Final score is $score\n";
return $score;
sub try {
my ($packet,$score,$offers,$deferred,$dep) = (@_);
#print "trying to evaluate rule\n";
if ($dep->[0] < 1){
#print "all dependancies met\n";
my $ret = $dep->[1]->evaluate($packet,$offers);
if (ref $ret){
#print "rule introduced new offerings:";
foreach my $key (keys %{$ret}){
#print " $key,";
$offers->{$key} = $ret->{$key}; # install dependan
+cies
foreach my $dependant (@{$deferred->{$key}}){
$dependant->[0]--; # dependancy count goes dow
+n by one
}
}
#print "\n";
} else {
#print "rule adjusted score by $ret\n";
$$score += $ret;
} # don't forget this is a closure
return (); # we have nothing to requeue
} else {
#print "unmet dependancies\n";
return $dep; # requeue the current one
}
}
}
## base packages for rules
package Rule::Simple; # a rule is something that fits in rules, and wo
+rks via a certain API. a leaf in a dependancy tree
sub new {
my $pkg = shift;
bless shift, $pkg;
}
sub has_deps { (exists $_[0]{needs} or exists $_[0]{wants}) ? 1 : unde
+f };
sub strong_deps { (exists $_[0]{needs}) ? 1 : undef };
sub needs { (exists $_[0]{needs}) ? @{$_[0]{needs}} : @{$_[0]{wants}}
+}
sub evaluate { goto &{shift->{evaluate}} }
package Rule::Dependancy::Simple; # a dependancy rule is something ano
+ther dependancy rule or plain rule needs. a node in a dependancy tree
+.
use base "Rule::Simple"; # a simple rule that also provides();
sub provides { @{$_[0]{provides}} }
package Dependancy::Simple; # a dependancy is something a dependancy r
+ule creates - This is just a base class for dependancy objects to wor
+k on. It contains plain values, and is basically a blessed hash
sub new { bless {},shift }
sub set { # set a value
$_[0]{$_[1]} = $_[2];
}
sub get { # get a value
$_[0]{$_[1]}
}
__END__
|