Greetings,
During my internship I had to set up a network probe. They basically gave me root on a server with three network interfaces and 0 budget.
I set up the following solution:
- Ntopng did the monitoring
- Elasticsearch retained ntopng logs for easy retrieval and analysis (one month were retained given the number of flows)
- Kibana was used for visualisation
Once the setup and the documentation was done I started writing a script to automate as many thing as I could. Namely database export, backup and restoration, service monitoring, interface monitoring (I spent a day wondering WHY an interface would go down by itself until someone told me that a maintenance crew did some voodoo on the routers and unplugged things) and such.
So here is the code, I hope it will be useful to you, since I had most of en ELK stack there (except for the Logstash part) this script should be easily adapted to other situations.
#!/usr/bin/perl -- use strict; use warnings; use autodie; use Log::Log4perl; use JSON::Parse 'parse_json'; use Date::Parse; use Date::Calc qw(Add_Delta_Days Date_to_Days); use Data::Dumper; use Email::Sender::Simple qw(sendmail); use Email::Simple; use Email::Simple::Creator; use Email::Sender::Transport::SMTP; my $interface = "ethWhatEver"; #monitoring interface my $serveur_ip = "123.123.123.123"; my $techs_email='engineers.monitoring@techsupport.local';#mail address + to send request for help my $from_addr='automated_solution@monitoring.local'; my $nb_mois_conserves = 1;#how many months of logs should we keep? my $elasticsearch_logs_location="/var/logs/el_nt.log"; my $nom_index = "ntopng";#index prefix, they are supposed to be like t +his: $nom_index-yyyy.mm.dd my $transport = Email::Sender::Transport::SMTP->new({#sendmail smtp co +nfiguration host => 'smtp.example.local', port => 25, }); Log::Log4perl->init("thingie.logger"); my $err_logger = Log::Log4perl->get_logger(); #*********************************************************# #REGEXP section # #*********************************************************# my $snapshot_number = qr/_?(?<number>\d+)\z/; my $date_indice = qr/ntopng-(?<date>\d\d\d\d\.[0-1][0-9]\.[0-3][0-9])/ +; my $monitoring_interface = qr/\A$interface/; my $link_regex = qr/\A\d+: $interface/; my $link_state_regex = qr/state (?<status>(DOWN|UP))/; my $interface_state_regex = qr/PROMISC/; my $elasticsearch_log_regex = qr/elasticsearch\.log\.(?<date>\d\d\d\d- +\d\d-\d\d)(?<tarred>\.tar\.gz)?/; #************************************************************# #export functions # #************************************************************# sub elasticsearch_export{ my $cmd = curl_this("-XGET","http://localhost:9200/_snapshot/m +y_backup/_all"); my $backup=parse_json(`$cmd`); my $snapshots = $backup->{snapshots}; my $last_snapshot_name = $snapshots->[$#$snapshots]->{snapshot +}; $last_snapshot_name =~ $snapshot_number;#extract the snapshot +number my $new_number= $+{number}+1; $cmd = curl_this("-XPUT","http://localhost:9200/_snapshot/my_b +ackup/snapshot_$new_number?wait_for_completion=true"); my $output = parse_json(`$cmd`); if($output->{snapshot}->{state} ne 'SUCCESS'){ $err_logger->error("Could not save snapshot"); send_email($techs_email,"I have been unable to save a +snapshot","Backup error"); } #let's check if the snapshot was correctly saved $cmd = curl_this("-XGET","http://localhost:9200/_snapshot/my_b +ackup/_all"); $backup=parse_json(`$cmd`); $snapshots = $backup->{snapshots}; $last_snapshot_name = $snapshots->[$#$snapshots]->{snapshot}; unless($last_snapshot_name eq "snapshot_$new_number"){ $err_logger->error("Backup error, last record snapshot + = $last_snapshot_name "); send_email($techs_email,"I could not save a snapshot". " Maybe there is not enough free space or we c +ant reach the backup destination? Response dump:\n". Dumper($output), "backup error"); } elsif($#$snapshots >= 2){#everything went fine, lets delete th +e oldest snapshot if we have two other snapshots $cmd=curl_this("-XDELETE","http://localhost:9200/_snap +shot/my_backup/".$snapshots->[0]->{snapshot}); $output = parse_json(`$cmd`); if($output->{acknowledged} ne 'true' && $output->{ackn +owledged} != 1){ $err_logger->error("I was not able to delete t +he snapshot $snapshots->[0]->{snapshot}"); send_email($techs_email,"Could not delete the +snapshot ".$snapshots->[0]->{snapshot}. " from the elasticsearch database, response du +mp: \n".Dumper($output), "backup error"); } } } #************************# #restore functions # #************************# sub restore{ my $cmd = curl_this('-XGET',"http://localhost:9200/_snapshot/m +y_backup/_all"); my $backup=parse_json(`$cmd`); my $snapshots = $backup->{snapshots}; my $last_snapshot_name = $snapshots->[$#$snapshots]->{snapshot +}; $cmd = curl_this('-XPOST', 'localhost:9200/_all/_close'); my $output = parse_json(`$cmd`);#ferme tous les index pour la +restoration if($output->{acknowledged} ne 'true'&& $output->{acknowledged} + != 1){ $err_logger->error("Could not close indices for restor +ation, I'm still going to try"); } $cmd = curl_this('-XPOST',"http://localhost:9200/_snapshot/my_ +backup/$last_snapshot_name/_restore"); $output = parse_json(`$cmd`); if($output->{accepted} ne 'true'){ $err_logger->error("restore procedure failed in part o +r completely, let's send an email"); send_email($techs_email,"restore failed, response dump +:". " from elasticsearch server: \n".Dumper($outpu +t),"restore procedure fail"); } $cmd = curl_this('-XPOST', 'localhost:9200/_all/_open'); $output = parse_json(`$cmd`);#rouvre tous les index après la r +estoration if($output->{acknowledged} ne 'true'&& $output->{acknowledged} + != 1){ $err_logger->error("indices reopening procedure failed +, sending an email"); send_email($techs_email,"could not reopen indices, ple +ase try doing so manually". " to keep recording data. response dump: \n".Dumper($o +utput), "indices reopening error"); } } sub send_email{ my ($dest,$body,$error) = @_; my $email = Email::Simple->create( header =>[ Subject=>$error ], body => $body, ); unless(sendmail($email, { transport => $transport, from=>$from +_addr, to=>$dest})){ $err_logger->error("I could not send any email, all th +at's left is to keep logging and hope..."); } } sub elasticsearch_check{ #Returns 1 if backup and database have the sa +me age, 2 if database is more recent #3 if database is older than the backup my $cmd = curl_this('-XGET',"http://localhost:9200/_snapshot/m +y_backup/_all"); my $backup_list = parse_json(`$cmd `); if(!defined($backup_list)){ $err_logger->error("Could not retrieve indices list, p +lease check that". "the backup partition is mounted and accessibl +e"); send_email($techs_email,"Could not retrieve the saved +indices list ". "is backup partition mounted?","Could not retr +ieve backed up indices"); die; } my $snapshots = $backup_list->{snapshots}; my $backed_up_indices = $snapshots->[$#$snapshots]->{indices}; + #retrieves last snapshot indices $cmd = curl_this('http://localhost:9200/_cat/indices?v'); my $indices =`$cmd`; my $most_recent_backup = bk_get_most_recent_date($backed_up_in +dices); my $most_recent_index = db_most_recent_date($indices); if($most_recent_index == $most_recent_backup){ return 1; } elsif($most_recent_index > $most_recent_backup){ return 2; } else{ return 3; } } #*****************************************************************# #These functions return the most recent index's age in unix format# #either in the back up or the current database # #*****************************************************************# sub bk_get_most_recent_date{ my $indices = shift; my @dates; my $j = 0; foreach my $i (@$indices){ if($i =~ $date_indice){ $dates[$j] = str2time($+{date}); $j++; } } my @eldest = sort { $b <=> $a } @dates; return $eldest[0]; } sub db_most_recent_date{ my $input =shift; my @lines = split(/\n/,$input); my @indice_dates; my $j = 0; foreach my $l (@lines){ if($l =~ $date_indice){ $indice_dates[$j] = str2time($+{date}); $j++; } } my @eldest = sort { $b <=> $a } @indice_dates; return $eldest[0]; } #*******************************************************************# #Get max index age from current date and value in $nb_mois_conserves# #*******************************************************************# sub get_relative_date{ my($day,$month,$year) = (localtime)[3,4,5];#date du jour $year += 1900;#calcule la date réelle $month++;#remember that january is month 0! my $canonical_date = Date_to_Days($year,$month,$day); ($year,$month,$day) = Add_Delta_Days($year,$month,$day,-($nb_m +ois_conserves * 31)); return str2time("$year/$month/$day"); } #********************************************# #this function gets rid of the oldest indices# #********************************************# sub elasticsearch_clean{ my $cmd = curl_this('http://localhost:9200/_cat/indices?v'); my $indices =`$cmd`; my (@indice_dates,@lines,$j); @lines = split(/\n/,$indices); $j = 0; foreach my $l (@lines){ if($l =~ $date_indice){ $indice_dates[$j] = $+{date}; $j++; } } foreach my $i (@indice_dates){ my $cmd = curl_this("-XDELETE","http://localhost:9200/ +$nom_index"."-$i"); if(str2time($i) <= get_relative_date()){ my $output = parse_json(`$cmd`); if($output->{acknowledged} ne 'true'&& $output +->{acknowledged} != 1){ $err_logger->error("could not delete i +ndex $nom_index"."-$i"); send_email($techs_email,"unable to del +ete index $nom_index"."-$i". "Dump: ".Dumper($output), "index deletion error"); } } } } sub curl_this{ my($arg,$address) = @_; my $cmd = "curl --silent "; if(defined($address)){ $cmd = $cmd.$arg." $address"; } else{ $cmd = $cmd.$arg; } return $cmd; } #*********************# #check interface is up# #*********************# sub if_up{ my $linking = `ip link`; my @linklines = split(/\n/,$linking); foreach my $link (@linklines){ if($link =~ $link_regex ){ if($link =~ $link_state_regex && $link =~ $int +erface_state_regex){ return 1; } } } return 0; } sub is_running{ my $service = shift; my $out = `ps -aux|grep $service`; if($service eq 'elasticsearch'){ $service = 'elastic\+'; } my @lines = split(/\n/,$out); foreach my $l(@lines){ if($l =~ qr/\A$service\s+\d+/){ return 1 } } return 0; } sub service_check{ my $out; for my $s ('elasticsearch','kibana','ntopng'){ unless(is_running($s)){ $out = `service $s restart`; $out = `service $s status`; sleep 30; unless(is_running($s)){ $err_logger->error("je n'ai pas réussi + à redemarrer $s... j'appelle à l'aide\n"); send_email($techs_email,"impossible de + redemarrer $s, output:\n".Dumper(\$out), "core service down"); } } } } sub elasticsearch_logrotate{ opendir(my $logdir,$elasticsearch_logs_location); while (my $file = readdir($logdir)) { if($file =~ $elasticsearch_log_regex){ if(get_relative_date() >= str2time($+{date})){ unlink $elasticsearch_logs_location.$f +ile; } elsif(!defined($+{tarred})){ my $cmd = "tar cvzf $elasticsearch_log +s_location"."$file.tar.gz $elasticsearch_logs_location"."$file"; `$cmd`; unlink($elasticsearch_logs_location.$f +ile); } } } closedir($logdir); } #*******************************************************# #Fonction principale, tourne chaque nuit et au démarrage# #*******************************************************# sub main{ elasticsearch_logrotate(); service_check(); my $el_bk_status = elasticsearch_check(); if($el_bk_status>2){#la base est moins récente que le backup $err_logger->error("Database is younger older than bac +kup, starting restoration"); send_email($techs_email,"Database most recent index is + less recent than backup most recent one, starting restoration". "more mails might follow if the procedure goes + wrong","Database error, restore procedure starting"); restore(); } elsif($el_bk_status== 2){ elasticsearch_export(); elasticsearch_clean(); } if(!if_up()){ $err_logger->error("$interface is down, trying to brin +g it back up"); `ifconfig $interface promisc up`; sleep 10; if(!if_up()){ my $link = `ip link`; my $ifconfig = `ifconfig`; send_email($techs_email,"$interface not linkin +g, ip link output: \n$link,". "ifconfig output: $ifconfig","interfac +e issues"); } } } #add backup repository (utile en cas de réinstallation complète du ser +veur): my $cmd = curl_this("-XPUT", "'http://localhost:9200/_snapshot/my_back +up' -d '{". '"type": "fs",'. '"settings": {'. '"location": "/backup/whataver/",'. '"compress": true'. '}'. '}'); `$cmd`; main(); =head $cmd = curl_this('-XGET',"http://localhost:9200/_snapshot/my_backup/_a +ll"); my $backup_list = parse_json(`$cmd `); print Dumper($backup_list)."\n"x3; $cmd = curl_this('http://localhost:9200/_cat/indices?v'); my $indices =`$cmd`; print $indices; print "\nis running elasticsearch : ".is_running('elasticsearch'); print "\nidem kibana: ".is_running('kibana')."\n"."\n"; service_check(); print "$interface is up? answer: ".if_up()."\n"; =cut
Back to
Cool Uses for Perl