Verified Commit 4d88af3b authored by Mike Jones's avatar Mike Jones 🌶
Browse files

Remove old files

parent 1b2394d5
package App::Netsplit::Ingest;
use Mojo::IOLoop;
use Mojo::IOLoop::Delay;
use Moo;
use Time::Seconds;
use Types::Standard qw(ArrayRef Int);
with qw(
App::Netsplit::Ingest::Role::Config
App::Netsplit::Ingest::Role::Logger
);
################################################################################
has ioloop_tick => (is => 'ro', isa => Int, default => Time::Seconds::ONE_MINUTE);
has processors => (is => 'ro', isa => ArrayRef, lazy => 1, builder => '_build_processors');
################################################################################
sub poll {
my $self = shift;
if ($self->config->{active_polling}) {
$self->logger->info('Active polling');
Mojo::IOLoop->recurring($self->ioloop_tick => sub {
$self->logger->debug('Tick');
return $self->_poll_real();
});
Mojo::IOLoop->start();
} else {
$self->logger->info('Inactive polling');
$self->_poll_real();
}
return 0;
}
################################################################################
sub _poll_real {
my $self = shift;
foreach my $processor (@{$self->processors}) {
unless ($processor->process()) {
$self->logger->warn('Failed to process in '.ref $processor);
}
}
return 1;
}
################################################################################
sub _build_processors {
my $self = shift;
my @processors;
foreach my $processor (@{$self->config->{processors}}) {
my $poller_class = sprintf('App::Netsplit::Ingest::Process::%s', $processor->{source});
# Dynamically require processor modules
eval "require ${poller_class}";
if ($@) {
$self->logger->warn($@);
}
push @processors, $poller_class->new({
destination_name => $processor->{destination},
});
}
return \@processors;
}
################################################################################
1;
package App::Netsplit::Ingest::Config;
use Moo;
use Types::Standard qw(HashRef Str);
use YAML::Syck 'LoadFile';
################################################################################
has config => (is => 'ro', isa => HashRef, lazy => 1, builder => '_build_config');
has filename => (is => 'ro', isa => Str, lazy => 1, builder => '_build_filename');
has default_filename => (is => 'ro', isa => Str, default => 'config.yml');
has filename_env_var => (is => 'ro', isa => Str, default => 'ANI_CONFIG');
################################################################################
sub _build_config {
my $self = shift;
return LoadFile($self->filename);
}
sub _build_filename {
my $self = shift;
# A custom config file location can be set by setting the ANI_CONFIG
# environment variable
return $ENV{$self->filename_env_var} || $self->default_filename;
}
################################################################################
1;
package App::Netsplit::Ingest::Destination::InfluxDB;
use Mojo::UserAgent;
use Moo;
use Types::Standard qw(InstanceOf Str);
use InfluxDB::LineProtocol 'data2line';
with 'App::Netsplit::Ingest::Role::Logger';
################################################################################
has address => (is => 'ro', isa => Str, required => 1);
has database => (is => 'ro', isa => Str, required => 1);
has password => (is => 'ro', isa => Str, required => 0);
has username => (is => 'ro', isa => Str, required => 0);
has scheme => (is => 'ro', isa => Str, default => 'http');
has ua => (is => 'ro', isa => InstanceOf['Mojo::UserAgent'], lazy => 1, builder => '_build_ua');
has url => (is => 'ro', isa => Str, lazy => 1, builder => '_build_url');
################################################################################
sub write_entry {
my $self = shift;
my $table = shift;
my $input = shift;
my $res = $self->ua->post($self->url => {
Accept => '*/*',
} => data2line($table, $input))->res;
unless ($res->is_success) {
$self->logger->warn(sprintf('Bad response: %d - %s', $res->code, $res->message));
return 0;
}
return 1;
}
################################################################################
sub _build_ua {
return Mojo::UserAgent->new();
}
sub _build_url {
my $self = shift;
my $url = sprintf('%s/write?db=%s', $self->address, $self->database);
if ($self->username && $self->password) {
# Basic auth
$url = sprintf('%s:%s@%s', $self->username, $self->password, $url);
}
return sprintf('%s://%s', $self->scheme, $url);
}
################################################################################
1;
package App::Netsplit::Ingest::Exception;
use Moo;
use Types::Standard 'Str';
################################################################################
has message => (is => 'ro', isa => Str, required => 0);
################################################################################
1;
package App::Netsplit::Ingest::Exception::NotImplementedError;
use Moo;
extends 'App::Netsplit::Ingest::Exception';
with 'Throwable';
1;
package App::Netsplit::Ingest::Logger;
use Mojo::Log;
use Moo;
use Types::Standard qw(HashRef InstanceOf);
with 'App::Netsplit::Ingest::Role::Config';
################################################################################
has levels => (is => 'ro', isa => HashRef, lazy => 1, builder => '_build_levels');
has logger => (is => 'ro', isa => InstanceOf['Mojo::Log'], lazy => 1, builder => '_build_logger');
################################################################################
sub _build_levels {
# From Mojo::Log %LEVEL
return {
debug => 1,
info => 1,
warn => 1,
error => 1,
fatal => 1,
};
}
sub _build_logger {
my $self = shift;
my $logger = Mojo::Log->new();
my $level = $self->config->{log_level};
# Check the user-configured log level is valid for Mojo::Log
if ($level && $self->levels->{$level}) {
$logger->level($level);
} else {
$logger->warn(sprintf('%s is not a valid log level', $level));
}
return $logger;
}
################################################################################
1;
package App::Netsplit::Ingest::Process;
use Moo;
use Types::Standard qw(Object Str);
use App::Netsplit::Ingest::Exception::NotImplementedError;
with 'App::Netsplit::Ingest::Role::Config';
################################################################################
has source => (is => 'ro', isa => Object, lazy => 1, builder => '_build_source');
has destination => (is => 'ro', isa => Object, lazy => 1, builder => '_build_destination');
# TODO: when there are more destinations, this will be required
has destination_name => (is => 'ro', isa => Str, required => 0);
################################################################################
sub _build_source {
App::Netsplit::Ingest::Exception::NotImplementedError->throw({
message => '_build_source not overridden in child class',
});
}
sub _build_destination {
App::Netsplit::Ingest::Exception::NotImplementedError->throw({
message => '_build_destination not overridden in child class',
});
}
################################################################################
1;
package App::Netsplit::Ingest::Process::HG612;
use Moo;
use Types::Standard 'Str';
use Switch::Plain;
use App::Netsplit::Ingest::Destination::InfluxDB;
use App::Netsplit::Ingest::Source::HG612;
extends 'App::Netsplit::Ingest::Process';
################################################################################
has influxdb_database => (is => 'ro', isa => Str, lazy => 1, builder => '_build_influxdb_database');
################################################################################
sub process {
my $self = shift;
my $report = $self->source->poll();
foreach my $type (qw(actual_aggregate_tx_power current max)) {
if ($report->{$type}->{up} > 0 && $report->{$type}->{down} > 0) {
$self->destination->write_entry($type, {
upstream => $report->{$type}->{up},
downstream => $report->{$type}->{down},
});
}
}
if ($report->{status}) {
$self->destination->write_entry('status', {
value => $report->{status},
});
$self->destination->write_entry('status_i', {
value => $self->_status_to_i($report->{status}),
});
}
return 1;
}
################################################################################
sub _status_to_i {
my $self = shift;
my $status = shift;
my $i = 0;
sswitch ($status) {
case 'Showtime': { $i = 4 }
case 'Training': { $i = 3 }
case 'Handshake': { $i = 2 }
case 'Idle': { $i = 1 }
}
return $i;
}
################################################################################
sub _build_destination {
my $self = shift;
# TODO: honour $self->destination_name when there are more destinations
return App::Netsplit::Ingest::Destination::InfluxDB->new({
address => $self->config->{destinations}->{InfluxDB}->{address},
database => $self->influxdb_database,
scheme => $self->config->{destinations}->{InfluxDB}->{scheme},
});
}
sub _build_influxdb_database {
my $self = shift;
return $self->config->{sources}->{HG612}->{influxdb_database};
}
sub _build_source {
my $self = shift;
return App::Netsplit::Ingest::Source::HG612->new({
address => $self->config->{sources}->{HG612}->{address},
password => $self->config->{sources}->{HG612}->{password},
username => $self->config->{sources}->{HG612}->{username},
});
}
################################################################################
1;
package App::Netsplit::Ingest::Role::Config;
use Moo::Role;
use Types::Standard 'HashRef';
use App::Netsplit::Ingest::Config;
################################################################################
has config => (is => 'ro', isa => HashRef, lazy => 1, builder => '_build_config');
################################################################################
sub _build_config {
return App::Netsplit::Ingest::Config->new->config;
}
################################################################################
1;
package App::Netsplit::Ingest::Role::Logger;
use Moo::Role;
use Types::Standard 'InstanceOf';
use App::Netsplit::Ingest::Logger;
################################################################################
has logger => (is => 'ro', isa => InstanceOf['Mojo::Log'], lazy => 1, builder => '_build_logger');
################################################################################
sub _build_logger {
return App::Netsplit::Ingest::Logger->new->logger;
}
################################################################################
1;
package App::Netsplit::Ingest::Source::HG612;
use Moo;
use Net::Telnet;
use Types::Standard qw(InstanceOf Str);
################################################################################
has address => (is => 'ro', isa => Str, required => 1);
has password => (is => 'ro', isa => Str, required => 0);
has username => (is => 'ro', isa => Str, required => 0);
has scheme => (is => 'ro', isa => Str, default => 'http');
has prompt => (is => 'ro', isa => Str, default => '/# $/');
has command => (is => 'ro', isa => Str, default => 'xdslcmd info --pbParams');
################################################################################
sub poll {
my $self = shift;
my $telnet = Net::Telnet->new(
prompt => '/[\$%#>]$/',
timeout => 20,
);
# TODO: exceptions
$telnet->open($self->address);
$telnet->login($self->username, $self->password);
$telnet->print('sh');
$telnet->prompt($self->prompt);
$telnet->waitfor($self->prompt);
return $self->_parse_stats([ $telnet->cmd($self->command) ]);
}
################################################################################
sub _clean_value {
my $self = shift;
my $input = shift;
chomp $input;
$input =~ s/(^\s+|\s+$)//s;
my ($stat, $metric) = split /\s+/, $input;
return [ $stat, $metric ];
}
sub _line_to_speeds {
my $self = shift;
my $input = shift;
my ($up, $down) = $_ =~ /Upstream rate = (\d+) Kbps, Downstream rate = (\d+) Kbps/;
return ($up, $down);
}
sub _line_to_parts {
my $self = shift;
my $input = shift;
my @parts = split /\t+/, $input;
shift @parts;
return \@parts;
}
sub _parse_stats {
my $self = shift;
my $lines = shift;
my %stats = (
status => '',
attainable => { up => -1, down => -1 },
actual_aggregate_tx_power => { up => -1, down => -1 },
current => { up => -1, down => -1 },
max => { up => -1, down => -1 },
);
foreach (@{$lines}) {
if (/^Status:/) {
($stats{status}) = $_ =~ /^Status: (.+)/;
} elsif (/^Bearer:\t+0/) {
($stats{current}->{up}, $stats{current}->{down}) = $self->_line_to_speeds($_);
} elsif (/^Max:\t+/) {
($stats{max}->{up}, $stats{max}->{down}) = $self->_line_to_speeds($_);
} elsif (/^Attainable Net Data Rate:/) {
my $parts = $self->_line_to_parts($_);
$stats{attainable}->{up} = $self->_clean_value($parts->[0])->[0];
$stats{attainable}->{down} = $self->_clean_value($parts->[1])->[0];
} elsif (/^Actual Aggregate Tx Power:/) {
my $parts = $self->_line_to_parts($_);
$stats{actual_aggregate_tx_power}->{up} = $self->_clean_value($parts->[0])->[0];
$stats{actual_aggregate_tx_power}->{down} = $self->_clean_value($parts->[1])->[0];
}
}
return \%stats;
}
################################################################################
1;
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment