From b3891b47718e29abc2d5c274b2a941f01f693ac8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maty=C3=A1=C5=A1=20Kopp?= Date: Mon, 15 Nov 2021 10:53:16 +0100 Subject: [PATCH] add aggregations to log2sql conversion #34 --- scripts/log2sql.pl | 120 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 114 insertions(+), 6 deletions(-) diff --git a/scripts/log2sql.pl b/scripts/log2sql.pl index 4c1707a..dd86c21 100755 --- a/scripts/log2sql.pl +++ b/scripts/log2sql.pl @@ -10,6 +10,7 @@ use File::Spec; use File::Basename; use Digest::MD5; +use DateTime::Format::Strptime; @@ -20,6 +21,11 @@ my @services; my @print_sql; +my $strp = DateTime::Format::Strptime->new( + pattern => '%d/%b/%Y:%H:%M:%S %z', + on_error => 'croak', +); + GetOptions ( 'in-files=s{1,}' => \@logfiles, 'out-dir=s' => \$outdir, @@ -83,6 +89,11 @@ my ($first_line_checksum, $last_read_line_checksum, $lines_read, $lines_valid) = (undef, undef, 0, 0); open LOG, "<$log_file_path" or die "Could not open $log_file_path: $!"; my $prev_date=''; + my $prev_time=''; + my $aggr_data = {}; + $aggr_data->{month}={}; + my $first_datetime; + my $last_datetime; while(my $line = ){ $lines_read++; $line =~ s/\n$//; @@ -102,18 +113,26 @@ } } next unless defined $service_id; - my ($act_date) = join('-',split('/',substr($time_local,0,11))); - unless($act_date eq $prev_date){ + my $act_date = $time_local; + $act_date =~ s/\d{2}:\d{2}:\d{2} \+/00:00:00 \+/; + my $act_time = $time_local; + $act_time =~ s/:\d{2}:\d{2} \+/:00:00 \+/; + unless($act_date eq $prev_date){ unless($lines_valid){ close DUMP; + close DUMP_AGGR; } - my $dump_file = "$file_name.$act_date.dump"; + print_aggregated_data(\*DUMP_AGGR, $aggr_data,$prev_date,'day'); + $aggr_data->{day}={}; + my $date_filename = join('-',split('/',substr($act_date,0,11))); + my $dump_file = "$file_name.$date_filename.dump"; + my $dump_aggr_file = "$file_name.$date_filename.aggr.dump"; push @print_sql," DO \$\$ BEGIN - RAISE NOTICE '\%', NOW(); + RAISE NOTICE '--------------[\%]----------------', NOW(); RAISE NOTICE 'starting importing from $dump_file'; RAISE NOTICE 'log file line: $lines_read'; END; @@ -121,19 +140,53 @@ BEGIN "; push @print_sql,"\\copy log_file_entries(file_id, service_id, line_number, line_checksum, remote_addr, remote_user, time_local, method, request, protocol, status, body_bytes_sent, http_referer, http_user_agent, unit) from '$dump_file'"; open DUMP, ">".File::Spec->catfile($outdir,$dump_file) or die "Could not open $dump_file: $!"; + + push @print_sql,"DO \$\$ BEGIN RAISE NOTICE 'IMPORTING HOUR AND DAY AGGR--------------[\%]----------------', NOW();END;\$\$;"; + push @print_sql,"\\copy log_ip_aggr(period_start_date, period_end_date, period_level, ip, service_id, cnt_requests, cnt_units, cnt_body_bytes_sent) from '$dump_aggr_file'"; + open DUMP_AGGR, ">".File::Spec->catfile($outdir,$dump_aggr_file) or die "Could not open $dump_aggr_file: $!"; + $prev_date = $act_date; + $first_datetime = $time_local unless $first_datetime; + $last_datetime = $time_local; + } + unless($act_time eq $prev_time){ + print_aggregated_data(\*DUMP_AGGR, $aggr_data, $prev_time,'hour'); + $aggr_data->{hour}={}; + $prev_time = $act_time; } + $lines_valid++; ## print STDERR "$service_id: $request\n"; print DUMP join("\t", ($file_id,$service_id,$lines_valid,$last_read_line_checksum,$remote_addr,$remote_user,$time_local, $method, $request, $protocol, $status, $body_bytes_sent,$http_referer, $http_user_agent, $unit)),"\n"; + for my $ip ($remote_addr, '\N') { + for my $service ($service_id, '\N'){ + aggregate_data($aggr_data, $ip, $service, 1, $unit, $body_bytes_sent); + } + } } } + + print_aggregated_data(\*DUMP_AGGR, $aggr_data,$prev_time,'hour'); + print_aggregated_data(\*DUMP_AGGR, $aggr_data,$prev_date,'day'); close DUMP; + close DUMP_AGGR; close LOG; + my $dump_aggr_file = "$file_name.aggr.dump"; + open DUMP_AGGR, ">".File::Spec->catfile($outdir,$dump_aggr_file) or die "Could not open $dump_aggr_file: $!"; + my $act_month = $prev_date; $act_month =~ s/^../01/; + print_aggregated_data(\*DUMP_AGGR, $aggr_data,$act_month,'month'); + push @print_sql," +DO \$\$ +BEGIN RAISE NOTICE 'IMPORTING MONTH AGGR--------------[\%]----------------', NOW(); +RAISE NOTICE 'importing month aggregations $act_month'; END; \$\$;"; + push @print_sql,"\\copy log_ip_aggr(period_start_date, period_end_date, period_level, ip, service_id, cnt_requests, cnt_units, cnt_body_bytes_sent) from '$dump_aggr_file'"; + close DUMP_AGGR; + open SQL, ">".File::Spec->catfile($outdir,$sql_file) or die "Could not open $sql_file: $!"; print SQL " -- $log_file sql dump +DO \$\$ BEGIN RAISE NOTICE 'STARTED--------------[\%]----------------', NOW(); END; \$\$; ALTER TABLE log_file_entries DISABLE TRIGGER log_files_lines_read; ALTER TABLE log_file_entries DISABLE TRIGGER log_files_lines_read_aggr; @@ -147,9 +200,64 @@ BEGIN print SQL " ALTER TABLE log_file_entries ENABLE TRIGGER log_files_lines_read; ALTER TABLE log_file_entries ENABLE TRIGGER log_files_lines_read_aggr; +DO \$\$ BEGIN RAISE NOTICE 'FINISHED--------------[\%]----------------', NOW(); END; \$\$; + "; - print SQL "DO \$\$ BEGIN RAISE NOTICE 'TODO: run aggregations for::: SELECT * FROM log_files_entries WHERE file_id=$file_id'; END; \$\$; \n"; + + print STDERR "imported period: $first_datetime -- $last_datetime\n"; + close SQL; + + $first_datetime =~ s/:\d{2}:\d{2} \+/:00:00 \+/; + $last_datetime =~ s/:\d{2}:\d{2} \+/:00:00 \+/; + $last_datetime = $strp->parse_datetime($last_datetime)->add(hours => 1)->strftime('%d/%b/%Y:%H:%M:%S %z'); + print STDERR "aggregated period: $first_datetime -- $last_datetime\n"; + + for my $bound (['lower', $first_datetime],['upper',$last_datetime]){ + print STDERR "TESTING ",$bound->[0],": ",$bound->[1],"\n"; + my $dt = $bound->[1]; + for my $level (qw/hour day month/){ + $sql=" +SELECT count(1) AS cnt +FROM log_ip_aggr +WHERE + period_start_date <= '$dt' + AND period_end_date > '$dt' + AND period_level = '$level'::period_levels; +"; + $sth = $dbi->prepare($sql); + $sth->execute; + if(my $result = $sth->fetchrow_hashref){ + print STDERR " WARNING: ",$result->{cnt}," records cross boundary on $level level in existing database !!! \n\t-> IT PRODUCES DUPLICITIES WHEN IMPORT THIS\n" if $result->{cnt}; + } + } + } +} + + +$dbi->disconnect(); + + +sub print_aggregated_data{ + my ($fh,$aggr_data, $start_time, $period) = @_; + return unless $start_time; + my $end_time = $strp->parse_datetime($start_time); + $end_time->add("${period}s" => 1); + $end_time = $end_time->strftime('%d/%b/%Y:%H:%M:%S %z'); + for my $ip (keys %{$aggr_data->{$period}}) { + for my $service (keys %{$aggr_data->{$period}->{$ip}}){ + print $fh join("\t",$start_time, $end_time, $period,$ip,$service,@{$aggr_data->{$period}->{$ip}->{$service}}),"\n"; + } + } } -$dbi->disconnect(); \ No newline at end of file +sub aggregate_data{ + my ($data, $ip, $service, $request, $unit, $body_bytes_sent) = @_; + for my $period (qw/hour day month/){ + $data->{$period}->{$ip} //={}; + $data->{$period}->{$ip}->{$service} //= [0, 0, 0]; + $data->{$period}->{$ip}->{$service}->[0] += 1; ## requests + $data->{$period}->{$ip}->{$service}->[1] += $unit; ## units + $data->{$period}->{$ip}->{$service}->[2] += $body_bytes_sent; ## body_bytes_sent + } +} \ No newline at end of file