Beefy Boxes and Bandwidth Generously Provided by pair Networks
P is for Practical
 
PerlMonks  

Elasticsearch and ntopng

by QuillMeantTen (Friar)
on Aug 11, 2016 at 17:49 UTC ( #1169597=CUFP: print w/replies, xml ) Need Help??

Greetings,
During my internship I had to set up a network probe. They basically gave me root on a server with three network interfaces and 0 budget.
I set up the following solution:

  1. Ntopng did the monitoring
  2. Elasticsearch retained ntopng logs for easy retrieval and analysis (one month were retained given the number of flows)
  3. Kibana was used for visualisation
My orders were quite simple, they needed a network probe that would "just work", they needed a nice way to display the probe's data on any computer inside the department AND on the nice big screen in the IT den.

Once the setup and the documentation was done I started writing a script to automate as many thing as I could. Namely database export, backup and restoration, service monitoring, interface monitoring (I spent a day wondering WHY an interface would go down by itself until someone told me that a maintenance crew did some voodoo on the routers and unplugged things) and such.
So here is the code, I hope it will be useful to you, since I had most of en ELK stack there (except for the Logstash part) this script should be easily adapted to other situations.
#!/usr/bin/perl -- use strict; use warnings; use autodie; use Log::Log4perl; use JSON::Parse 'parse_json'; use Date::Parse; use Date::Calc qw(Add_Delta_Days Date_to_Days); use Data::Dumper; use Email::Sender::Simple qw(sendmail); use Email::Simple; use Email::Simple::Creator; use Email::Sender::Transport::SMTP; my $interface = "ethWhatEver"; #monitoring interface my $serveur_ip = "123.123.123.123"; my $techs_email='engineers.monitoring@techsupport.local';#mail address + to send request for help my $from_addr='automated_solution@monitoring.local'; my $nb_mois_conserves = 1;#how many months of logs should we keep? my $elasticsearch_logs_location="/var/logs/el_nt.log"; my $nom_index = "ntopng";#index prefix, they are supposed to be like t +his: $nom_index-yyyy.mm.dd my $transport = Email::Sender::Transport::SMTP->new({#sendmail smtp co +nfiguration host => 'smtp.example.local', port => 25, }); Log::Log4perl->init("thingie.logger"); my $err_logger = Log::Log4perl->get_logger(); #*********************************************************# #REGEXP section # #*********************************************************# my $snapshot_number = qr/_?(?<number>\d+)\z/; my $date_indice = qr/ntopng-(?<date>\d\d\d\d\.[0-1][0-9]\.[0-3][0-9])/ +; my $monitoring_interface = qr/\A$interface/; my $link_regex = qr/\A\d+: $interface/; my $link_state_regex = qr/state (?<status>(DOWN|UP))/; my $interface_state_regex = qr/PROMISC/; my $elasticsearch_log_regex = qr/elasticsearch\.log\.(?<date>\d\d\d\d- +\d\d-\d\d)(?<tarred>\.tar\.gz)?/; #************************************************************# #export functions # #************************************************************# sub elasticsearch_export{ my $cmd = curl_this("-XGET","http://localhost:9200/_snapshot/m +y_backup/_all"); my $backup=parse_json(`$cmd`); my $snapshots = $backup->{snapshots}; my $last_snapshot_name = $snapshots->[$#$snapshots]->{snapshot +}; $last_snapshot_name =~ $snapshot_number;#extract the snapshot +number my $new_number= $+{number}+1; $cmd = curl_this("-XPUT","http://localhost:9200/_snapshot/my_b +ackup/snapshot_$new_number?wait_for_completion=true"); my $output = parse_json(`$cmd`); if($output->{snapshot}->{state} ne 'SUCCESS'){ $err_logger->error("Could not save snapshot"); send_email($techs_email,"I have been unable to save a +snapshot","Backup error"); } #let's check if the snapshot was correctly saved $cmd = curl_this("-XGET","http://localhost:9200/_snapshot/my_b +ackup/_all"); $backup=parse_json(`$cmd`); $snapshots = $backup->{snapshots}; $last_snapshot_name = $snapshots->[$#$snapshots]->{snapshot}; unless($last_snapshot_name eq "snapshot_$new_number"){ $err_logger->error("Backup error, last record snapshot + = $last_snapshot_name "); send_email($techs_email,"I could not save a snapshot". " Maybe there is not enough free space or we c +ant reach the backup destination? Response dump:\n". Dumper($output), "backup error"); } elsif($#$snapshots >= 2){#everything went fine, lets delete th +e oldest snapshot if we have two other snapshots $cmd=curl_this("-XDELETE","http://localhost:9200/_snap +shot/my_backup/".$snapshots->[0]->{snapshot}); $output = parse_json(`$cmd`); if($output->{acknowledged} ne 'true' && $output->{ackn +owledged} != 1){ $err_logger->error("I was not able to delete t +he snapshot $snapshots->[0]->{snapshot}"); send_email($techs_email,"Could not delete the +snapshot ".$snapshots->[0]->{snapshot}. " from the elasticsearch database, response du +mp: \n".Dumper($output), "backup error"); } } } #************************# #restore functions # #************************# sub restore{ my $cmd = curl_this('-XGET',"http://localhost:9200/_snapshot/m +y_backup/_all"); my $backup=parse_json(`$cmd`); my $snapshots = $backup->{snapshots}; my $last_snapshot_name = $snapshots->[$#$snapshots]->{snapshot +}; $cmd = curl_this('-XPOST', 'localhost:9200/_all/_close'); my $output = parse_json(`$cmd`);#ferme tous les index pour la +restoration if($output->{acknowledged} ne 'true'&& $output->{acknowledged} + != 1){ $err_logger->error("Could not close indices for restor +ation, I'm still going to try"); } $cmd = curl_this('-XPOST',"http://localhost:9200/_snapshot/my_ +backup/$last_snapshot_name/_restore"); $output = parse_json(`$cmd`); if($output->{accepted} ne 'true'){ $err_logger->error("restore procedure failed in part o +r completely, let's send an email"); send_email($techs_email,"restore failed, response dump +:". " from elasticsearch server: \n".Dumper($outpu +t),"restore procedure fail"); } $cmd = curl_this('-XPOST', 'localhost:9200/_all/_open'); $output = parse_json(`$cmd`);#rouvre tous les index après la r +estoration if($output->{acknowledged} ne 'true'&& $output->{acknowledged} + != 1){ $err_logger->error("indices reopening procedure failed +, sending an email"); send_email($techs_email,"could not reopen indices, ple +ase try doing so manually". " to keep recording data. response dump: \n".Dumper($o +utput), "indices reopening error"); } } sub send_email{ my ($dest,$body,$error) = @_; my $email = Email::Simple->create( header =>[ Subject=>$error ], body => $body, ); unless(sendmail($email, { transport => $transport, from=>$from +_addr, to=>$dest})){ $err_logger->error("I could not send any email, all th +at's left is to keep logging and hope..."); } } sub elasticsearch_check{ #Returns 1 if backup and database have the sa +me age, 2 if database is more recent #3 if database is older than the backup my $cmd = curl_this('-XGET',"http://localhost:9200/_snapshot/m +y_backup/_all"); my $backup_list = parse_json(`$cmd `); if(!defined($backup_list)){ $err_logger->error("Could not retrieve indices list, p +lease check that". "the backup partition is mounted and accessibl +e"); send_email($techs_email,"Could not retrieve the saved +indices list ". "is backup partition mounted?","Could not retr +ieve backed up indices"); die; } my $snapshots = $backup_list->{snapshots}; my $backed_up_indices = $snapshots->[$#$snapshots]->{indices}; + #retrieves last snapshot indices $cmd = curl_this('http://localhost:9200/_cat/indices?v'); my $indices =`$cmd`; my $most_recent_backup = bk_get_most_recent_date($backed_up_in +dices); my $most_recent_index = db_most_recent_date($indices); if($most_recent_index == $most_recent_backup){ return 1; } elsif($most_recent_index > $most_recent_backup){ return 2; } else{ return 3; } } #*****************************************************************# #These functions return the most recent index's age in unix format# #either in the back up or the current database # #*****************************************************************# sub bk_get_most_recent_date{ my $indices = shift; my @dates; my $j = 0; foreach my $i (@$indices){ if($i =~ $date_indice){ $dates[$j] = str2time($+{date}); $j++; } } my @eldest = sort { $b <=> $a } @dates; return $eldest[0]; } sub db_most_recent_date{ my $input =shift; my @lines = split(/\n/,$input); my @indice_dates; my $j = 0; foreach my $l (@lines){ if($l =~ $date_indice){ $indice_dates[$j] = str2time($+{date}); $j++; } } my @eldest = sort { $b <=> $a } @indice_dates; return $eldest[0]; } #*******************************************************************# #Get max index age from current date and value in $nb_mois_conserves# #*******************************************************************# sub get_relative_date{ my($day,$month,$year) = (localtime)[3,4,5];#date du jour $year += 1900;#calcule la date réelle $month++;#remember that january is month 0! my $canonical_date = Date_to_Days($year,$month,$day); ($year,$month,$day) = Add_Delta_Days($year,$month,$day,-($nb_m +ois_conserves * 31)); return str2time("$year/$month/$day"); } #********************************************# #this function gets rid of the oldest indices# #********************************************# sub elasticsearch_clean{ my $cmd = curl_this('http://localhost:9200/_cat/indices?v'); my $indices =`$cmd`; my (@indice_dates,@lines,$j); @lines = split(/\n/,$indices); $j = 0; foreach my $l (@lines){ if($l =~ $date_indice){ $indice_dates[$j] = $+{date}; $j++; } } foreach my $i (@indice_dates){ my $cmd = curl_this("-XDELETE","http://localhost:9200/ +$nom_index"."-$i"); if(str2time($i) <= get_relative_date()){ my $output = parse_json(`$cmd`); if($output->{acknowledged} ne 'true'&& $output +->{acknowledged} != 1){ $err_logger->error("could not delete i +ndex $nom_index"."-$i"); send_email($techs_email,"unable to del +ete index $nom_index"."-$i". "Dump: ".Dumper($output), "index deletion error"); } } } } sub curl_this{ my($arg,$address) = @_; my $cmd = "curl --silent "; if(defined($address)){ $cmd = $cmd.$arg." $address"; } else{ $cmd = $cmd.$arg; } return $cmd; } #*********************# #check interface is up# #*********************# sub if_up{ my $linking = `ip link`; my @linklines = split(/\n/,$linking); foreach my $link (@linklines){ if($link =~ $link_regex ){ if($link =~ $link_state_regex && $link =~ $int +erface_state_regex){ return 1; } } } return 0; } sub is_running{ my $service = shift; my $out = `ps -aux|grep $service`; if($service eq 'elasticsearch'){ $service = 'elastic\+'; } my @lines = split(/\n/,$out); foreach my $l(@lines){ if($l =~ qr/\A$service\s+\d+/){ return 1 } } return 0; } sub service_check{ my $out; for my $s ('elasticsearch','kibana','ntopng'){ unless(is_running($s)){ $out = `service $s restart`; $out = `service $s status`; sleep 30; unless(is_running($s)){ $err_logger->error("je n'ai pas réussi + à redemarrer $s... j'appelle à l'aide\n"); send_email($techs_email,"impossible de + redemarrer $s, output:\n".Dumper(\$out), "core service down"); } } } } sub elasticsearch_logrotate{ opendir(my $logdir,$elasticsearch_logs_location); while (my $file = readdir($logdir)) { if($file =~ $elasticsearch_log_regex){ if(get_relative_date() >= str2time($+{date})){ unlink $elasticsearch_logs_location.$f +ile; } elsif(!defined($+{tarred})){ my $cmd = "tar cvzf $elasticsearch_log +s_location"."$file.tar.gz $elasticsearch_logs_location"."$file"; `$cmd`; unlink($elasticsearch_logs_location.$f +ile); } } } closedir($logdir); } #*******************************************************# #Fonction principale, tourne chaque nuit et au démarrage# #*******************************************************# sub main{ elasticsearch_logrotate(); service_check(); my $el_bk_status = elasticsearch_check(); if($el_bk_status>2){#la base est moins récente que le backup $err_logger->error("Database is younger older than bac +kup, starting restoration"); send_email($techs_email,"Database most recent index is + less recent than backup most recent one, starting restoration". "more mails might follow if the procedure goes + wrong","Database error, restore procedure starting"); restore(); } elsif($el_bk_status== 2){ elasticsearch_export(); elasticsearch_clean(); } if(!if_up()){ $err_logger->error("$interface is down, trying to brin +g it back up"); `ifconfig $interface promisc up`; sleep 10; if(!if_up()){ my $link = `ip link`; my $ifconfig = `ifconfig`; send_email($techs_email,"$interface not linkin +g, ip link output: \n$link,". "ifconfig output: $ifconfig","interfac +e issues"); } } } #add backup repository (utile en cas de réinstallation complète du ser +veur): my $cmd = curl_this("-XPUT", "'http://localhost:9200/_snapshot/my_back +up' -d '{". '"type": "fs",'. '"settings": {'. '"location": "/backup/whataver/",'. '"compress": true'. '}'. '}'); `$cmd`; main(); =head $cmd = curl_this('-XGET',"http://localhost:9200/_snapshot/my_backup/_a +ll"); my $backup_list = parse_json(`$cmd `); print Dumper($backup_list)."\n"x3; $cmd = curl_this('http://localhost:9200/_cat/indices?v'); my $indices =`$cmd`; print $indices; print "\nis running elasticsearch : ".is_running('elasticsearch'); print "\nidem kibana: ".is_running('kibana')."\n"."\n"; service_check(); print "$interface is up? answer: ".if_up()."\n"; =cut

Replies are listed 'Best First'.
Re: Elasticsearch and ntopng
by Anonymous Monk on Aug 11, 2016 at 22:18 UTC
    I would replace all backthis with curl_this, and have curl_this use Capture::Tiny to avoid shell interpolation

      thanks for the tip, I did not know about this module! One upvote to you sir/madam :)

Log In?
Username:
Password:

What's my password?
Create A New User
Domain Nodelet?
Node Status?
node history
Node Type: CUFP [id://1169597]
Front-paged by Arunbear
help
Chatterbox?
and the web crawler heard nothing...

How do I use this? | Other CB clients
Other Users?
Others lurking in the Monastery: (2)
As of 2022-05-25 04:26 GMT
Sections?
Information?
Find Nodes?
Leftovers?
    Voting Booth?
    Do you prefer to work remotely?



    Results (84 votes). Check out past polls.

    Notices?