diff --git a/oonidata/src/oonidata/models/analysis.py b/oonidata/src/oonidata/models/analysis.py index 6da88959..2906bbc3 100644 --- a/oonidata/src/oonidata/models/analysis.py +++ b/oonidata/src/oonidata/models/analysis.py @@ -3,7 +3,7 @@ from datetime import datetime from typing import List, Optional -from .base import table_model, ProcessingMeta +from .base import table_model from oonidata.models.observations import MeasurementMeta, ProbeMeta @@ -20,7 +20,6 @@ class WebAnalysis: probe_meta: ProbeMeta measurement_meta: MeasurementMeta - processing_meta: ProcessingMeta analysis_id: str observation_id: str diff --git a/oonidata/src/oonidata/models/base.py b/oonidata/src/oonidata/models/base.py index 27d150bd..bfb6d4b3 100644 --- a/oonidata/src/oonidata/models/base.py +++ b/oonidata/src/oonidata/models/base.py @@ -14,10 +14,13 @@ class Config(BaseConfig): code_generation_options = [TO_DICT_ADD_OMIT_NONE_FLAG] -def table_model(table_name: str, table_index: Tuple[str, ...]): +def table_model( + table_name: str, table_index: Tuple[str, ...], partition_key: Optional[str] = None +): def decorator(cls): cls.__table_name__ = table_name cls.__table_index__ = table_index + cls.__partition_key__ = partition_key return cls return decorator @@ -28,12 +31,7 @@ def decorator(cls): class TableModelProtocol(Protocol): __table_name__: str __table_index__: Tuple[str, ...] + __partition_key__: Optional[str] probe_meta: Any measurement_meta: Any - - -@dataclass -class ProcessingMeta: - processing_start_time: datetime - processing_end_time: Optional[datetime] = None diff --git a/oonidata/src/oonidata/models/observations.py b/oonidata/src/oonidata/models/observations.py index 22c8866e..cce96c82 100644 --- a/oonidata/src/oonidata/models/observations.py +++ b/oonidata/src/oonidata/models/observations.py @@ -7,7 +7,7 @@ Tuple, ) -from oonidata.models.base import table_model, ProcessingMeta +from oonidata.models.base import table_model from oonidata.models.dataformats import Failure @@ -181,15 +181,20 @@ class TCPObservation: @table_model( table_name="obs_web_ctrl", - table_index=("measurement_uid", "observation_id", "measurement_start_time"), + table_index=( + "measurement_start_time", + "hostname", + "measurement_uid", + "observation_idx", + ), + partition_key="concat(substring(bucket_date, 1, 4), substring(bucket_date, 6, 2))", ) @dataclass class WebControlObservation: measurement_meta: MeasurementMeta - processing_meta: ProcessingMeta hostname: str - observation_id: str = "" + observation_idx: int = 0 created_at: Optional[datetime] = None @@ -220,18 +225,23 @@ class WebControlObservation: @table_model( table_name="obs_web", - table_index=("measurement_uid", "observation_id", "measurement_start_time"), + table_index=( + "measurement_start_time", + "probe_cc", + "probe_asn", + "measurement_uid", + "observation_idx", + ), + partition_key="concat(substring(bucket_date, 1, 4), substring(bucket_date, 6, 2))", ) @dataclass class WebObservation: measurement_meta: MeasurementMeta probe_meta: ProbeMeta - processing_meta: ProcessingMeta # These fields are added by the processor - observation_id: str = "" + observation_idx: int = 0 created_at: Optional[datetime] = None - processing_time: Optional[float] = None target_id: Optional[str] = None hostname: Optional[str] = None @@ -338,14 +348,19 @@ class WebObservation: @table_model( table_name="obs_http_middlebox", - table_index=("measurement_uid", "measurement_start_time"), + table_index=( + "measurement_start_time", + "measurement_uid", + "observation_idx", + ), + partition_key="concat(substring(bucket_date, 1, 4), substring(bucket_date, 6, 2))", ) @dataclass class HTTPMiddleboxObservation: measurement_meta: MeasurementMeta probe_meta: ProbeMeta - observation_id: str = "" + observation_idx: int = 0 created_at: Optional[datetime] = None diff --git a/oonipipeline/src/oonipipeline/analysis/datasources.py b/oonipipeline/src/oonipipeline/analysis/datasources.py index a7aa3f18..e052838d 100644 --- a/oonipipeline/src/oonipipeline/analysis/datasources.py +++ b/oonipipeline/src/oonipipeline/analysis/datasources.py @@ -2,7 +2,6 @@ from datetime import date, timedelta from typing import Generator, List, Optional -from oonidata.models.base import ProcessingMeta from oonidata.models.observations import MeasurementMeta, ProbeMeta, WebObservation from ..db.connections import ClickhouseConnection @@ -27,12 +26,10 @@ def iter_web_observations( measurement_meta_cols = [f.name for f in dataclasses.fields(MeasurementMeta)] probe_meta_cols = [f.name for f in dataclasses.fields(ProbeMeta)] - processing_meta_cols = [f.name for f in dataclasses.fields(ProcessingMeta)] obs_cols = [f.name for f in dataclasses.fields(WebObservation)] obs_cols.remove("probe_meta") obs_cols.remove("measurement_meta") - obs_cols.remove("processing_meta") - column_names = measurement_meta_cols + probe_meta_cols + processing_meta_cols + obs_cols + column_names = measurement_meta_cols + probe_meta_cols + obs_cols q = "SELECT (" q += ",\n".join(column_names) @@ -62,7 +59,7 @@ def iter_web_observations( # TODO(art): this is super sketchy. # We need to do this in order to obtain the correct offsets into the queried columns - # Basically probe_meta, measurement_meta and processing_meta are class + # Basically probe_meta, measurement_meta are class # attributes that are composed into the dataclass, however in the # database they need to be stored flat, since nesting is not desirable. # What we are doing here is figuring out how to construct the nested @@ -83,18 +80,14 @@ def iter_web_observations( ], ) ) - processing_meta = dict( - zip(processing_meta_cols, row[: len(processing_meta_cols)]) - ) rest = dict( - zip(obs_cols, row[len(measurement_meta_cols) + len(probe_meta_cols) + len(processing_meta_cols) :]) + zip(obs_cols, row[len(measurement_meta_cols) + len(probe_meta_cols) :]) ) obs_group.append( WebObservation( measurement_meta=MeasurementMeta(**measurement_meta), probe_meta=ProbeMeta(**probe_meta), - processing_meta=ProcessingMeta(**processing_meta), **rest, ) ) diff --git a/oonipipeline/src/oonipipeline/analysis/signal.py b/oonipipeline/src/oonipipeline/analysis/signal.py index 11ffbc0c..97f1639f 100644 --- a/oonipipeline/src/oonipipeline/analysis/signal.py +++ b/oonipipeline/src/oonipipeline/analysis/signal.py @@ -60,7 +60,7 @@ def make_signal_experiment_result( outcome_meta["why"] = "dns failure" outcomes.append( Outcome( - observation_id=web_o.observation_id, + observation_id=f"{web_o.measurement_meta.measurement_uid}_{web_o.observation_idx}", scope=BlockingScope.UNKNOWN, subject=f"{web_o.hostname}", category="dns", @@ -112,7 +112,7 @@ def make_signal_experiment_result( outcomes.append( Outcome( - observation_id=web_o.observation_id, + observation_id=f"{web_o.measurement_meta.measurement_uid}_{web_o.observation_idx}", scope=BlockingScope.UNKNOWN, subject=f"{web_o.hostname}", category="dns", @@ -127,7 +127,7 @@ def make_signal_experiment_result( else: outcomes.append( Outcome( - observation_id=web_o.observation_id, + observation_id=f"{web_o.measurement_meta.measurement_uid}_{web_o.observation_idx}", scope=BlockingScope.UNKNOWN, subject=f"{web_o.hostname}", category="dns", @@ -152,7 +152,7 @@ def make_signal_experiment_result( outcomes.append( Outcome( - observation_id=web_o.observation_id, + observation_id=f"{web_o.measurement_meta.measurement_uid}_{web_o.observation_idx}", scope=BlockingScope.UNKNOWN, subject=f"{web_o.ip}:{web_o.port}", category="tcp", @@ -168,7 +168,7 @@ def make_signal_experiment_result( elif not dns_blocked and web_o.tcp_success: outcomes.append( Outcome( - observation_id=web_o.observation_id, + observation_id=f"{web_o.measurement_meta.measurement_uid}_{web_o.observation_idx}", scope=BlockingScope.UNKNOWN, subject=f"{web_o.ip}:{web_o.port}", category="tcp", @@ -197,7 +197,7 @@ def make_signal_experiment_result( outcomes.append( Outcome( - observation_id=web_o.observation_id, + observation_id=f"{web_o.measurement_meta.measurement_uid}_{web_o.observation_idx}", scope=BlockingScope.UNKNOWN, subject=f"{web_o.hostname}", category="tls", @@ -225,7 +225,7 @@ def make_signal_experiment_result( outcomes.append( Outcome( - observation_id=web_o.observation_id, + observation_id=f"{web_o.measurement_meta.measurement_uid}_{web_o.observation_idx}", scope=BlockingScope.UNKNOWN, subject=f"{web_o.hostname}", category="tls", @@ -240,7 +240,7 @@ def make_signal_experiment_result( elif not dns_blocked and not tcp_blocked and web_o.tls_cipher_suite is not None: outcomes.append( Outcome( - observation_id=web_o.observation_id, + observation_id=f"{web_o.measurement_meta.measurement_uid}_{web_o.observation_idx}", scope=BlockingScope.UNKNOWN, subject=f"{web_o.hostname}", category="tls", diff --git a/oonipipeline/src/oonipipeline/analysis/web_analysis.py b/oonipipeline/src/oonipipeline/analysis/web_analysis.py index 7c442949..0d85f81d 100644 --- a/oonipipeline/src/oonipipeline/analysis/web_analysis.py +++ b/oonipipeline/src/oonipipeline/analysis/web_analysis.py @@ -12,7 +12,6 @@ List, Dict, ) -from oonidata.models.base import ProcessingMeta from oonidata.models.analysis import WebAnalysis from oonidata.models.observations import WebControlObservation, WebObservation @@ -686,10 +685,7 @@ def make_web_analysis( website_analysis = WebAnalysis( measurement_meta=web_o.measurement_meta, probe_meta=web_o.probe_meta, - processing_meta=ProcessingMeta( - processing_start_time=datetime.now(timezone.utc) - ), - observation_id=web_o.observation_id, + observation_id=f"{web_o.measurement_meta.measurement_uid}_{web_o.observation_idx}", created_at=created_at, analysis_id=f"{web_o.measurement_meta.measurement_uid}_{idx}", target_domain_name=domain_name, @@ -980,7 +976,4 @@ def make_web_analysis( http_analysis.is_http_fp_false_positive ) - website_analysis.processing_meta.processing_start_time = datetime.now( - timezone.utc - ) yield website_analysis diff --git a/oonipipeline/src/oonipipeline/cli/commands.py b/oonipipeline/src/oonipipeline/cli/commands.py index 98ffbe3b..1e063762 100644 --- a/oonipipeline/src/oonipipeline/cli/commands.py +++ b/oonipipeline/src/oonipipeline/cli/commands.py @@ -1,15 +1,24 @@ +import asyncio import logging from pathlib import Path -from typing import List, Optional +from typing import Coroutine, List, Optional from datetime import date, timedelta, datetime, timezone from typing import List, Optional +from oonipipeline.db.maintenance import ( + optimize_all_tables_by_partition, + list_partitions_to_delete, + list_duplicates_in_buckets, +) from oonipipeline.temporal.client_operations import ( TemporalConfig, - run_backfill, - run_create_schedules, - run_status, - run_clear_schedules, + get_status, + temporal_connect, +) +from oonipipeline.temporal.schedules import ( + clear_all_schedules, + schedule_all, + schedule_backfill, ) from oonipipeline.temporal.workers import start_workers @@ -23,6 +32,14 @@ from ..netinfo import NetinfoDB from ..settings import config + +def run_async(main: Coroutine): + try: + asyncio.run(main) + except KeyboardInterrupt: + print("shutting down") + + def _parse_csv(ctx, param, s: Optional[str]) -> List[str]: if s: return s.split(",") @@ -80,8 +97,6 @@ def maybe_create_delete_tables( clickhouse_url: str, create_tables: bool, drop_tables: bool, - clickhouse_buffer_min_time: int = 10, - clickhouse_buffer_max_time: int = 60, ): if create_tables: if drop_tables: @@ -90,9 +105,7 @@ def maybe_create_delete_tables( ) with ClickhouseConnection(clickhouse_url) as db: - for query, table_name in make_create_queries( - min_time=clickhouse_buffer_min_time, max_time=clickhouse_buffer_max_time - ): + for query, table_name in make_create_queries(): if drop_tables: db.execute(f"DROP TABLE IF EXISTS {table_name};") db.execute(query) @@ -117,7 +130,7 @@ def cli(log_level: int): @end_at_option @probe_cc_option @test_name_option -@click.option("--workflow-name", type=str, required=True) +@click.option("--workflow-name", type=str, required=True, default="observations") @click.option( "--create-tables", is_flag=True, @@ -146,8 +159,6 @@ def backfill( clickhouse_url=config.clickhouse_url, create_tables=create_tables, drop_tables=drop_tables, - clickhouse_buffer_min_time=config.clickhouse_buffer_min_time, - clickhouse_buffer_max_time=config.clickhouse_buffer_max_time, ) temporal_config = TemporalConfig( @@ -157,23 +168,30 @@ def backfill( temporal_tls_client_key_path=config.temporal_tls_client_key_path, ) - run_backfill( - workflow_name=workflow_name, - temporal_config=temporal_config, - probe_cc=probe_cc, - test_name=test_name, - start_at=start_at, - end_at=end_at, - ) + async def main(): + client = await temporal_connect(temporal_config=temporal_config) + + return await schedule_backfill( + client=client, + probe_cc=probe_cc, + test_name=test_name, + start_at=start_at, + end_at=end_at, + workflow_name=workflow_name, + ) + + run_async(main()) @cli.command() @probe_cc_option @test_name_option -def schedule( - probe_cc: List[str], - test_name: List[str], -): +@click.option( + "--analysis/--no-analysis", + default=True, + help="should we drop tables before creating them", +) +def schedule(probe_cc: List[str], test_name: List[str], analysis: bool): """ Create schedules for the specified parameters """ @@ -184,13 +202,19 @@ def schedule( temporal_tls_client_key_path=config.temporal_tls_client_key_path, ) - run_create_schedules( - probe_cc=probe_cc, - test_name=test_name, - clickhouse_url=config.clickhouse_url, - data_dir=config.data_dir, - temporal_config=temporal_config, - ) + async def main(): + client = await temporal_connect(temporal_config=temporal_config) + + return await schedule_all( + client=client, + probe_cc=probe_cc, + test_name=test_name, + clickhouse_url=config.clickhouse_url, + data_dir=config.data_dir, + schedule_analysis=analysis, + ) + + run_async(main()) @cli.command() @@ -210,11 +234,16 @@ def clear_schedules( temporal_tls_client_key_path=config.temporal_tls_client_key_path, ) - run_clear_schedules( - probe_cc=probe_cc, - test_name=test_name, - temporal_config=temporal_config, - ) + async def main(): + client = await temporal_connect(temporal_config=temporal_config) + + return await clear_all_schedules( + client=client, + probe_cc=probe_cc, + test_name=test_name, + ) + + run_async(main()) @cli.command() @@ -228,7 +257,12 @@ def status(): temporal_tls_client_cert_path=config.temporal_tls_client_cert_path, temporal_tls_client_key_path=config.temporal_tls_client_key_path, ) - run_status(temporal_config=temporal_config) + + run_async( + get_status( + temporal_config=temporal_config, + ) + ) @cli.command() @@ -252,30 +286,77 @@ def startworkers(): @cli.command() @click.option( - "--create-tables", - is_flag=True, + "--create-tables/--no-create-tables", + default=False, help="should we attempt to create the required clickhouse tables", ) @click.option( - "--drop-tables", - is_flag=True, + "--drop-tables/--no-drop-tables", + default=False, help="should we drop tables before creating them", ) +@click.option( + "--print-create/--no-print-create", + default=True, + help="should we print the create table queries", +) +@click.option( + "--print-diff/--no-print-diff", + default=False, + help="should we print the table diff", +) def checkdb( - create_tables: bool, - drop_tables: bool, + create_tables: bool, drop_tables: bool, print_create: bool, print_diff: bool ): """ Check if the database tables require migrations. If the create-tables flag is not specified, it will not perform any operations. """ - maybe_create_delete_tables( + if print_create: + for query, table_name in make_create_queries(): + click.echo(f"## Create for {table_name}") + click.echo(query) + + if create_tables or drop_tables: + maybe_create_delete_tables( + clickhouse_url=config.clickhouse_url, + create_tables=create_tables, + drop_tables=drop_tables, + ) + + if print_diff: + with ClickhouseConnection(config.clickhouse_url) as db: + list_all_table_diffs(db) + + +@cli.command() +@start_at_option +@end_at_option +@click.option( + "--optimize/--no-optimize", + default=False, + help="should we perform an optimization of the tables as well", +) +def check_duplicates(start_at: datetime, end_at: datetime, optimize: bool): + """ + Perform checks on the bucket ranges to ensure no duplicate entries are + present. This is useful when backfilling the database to make sure the + optimize operations have converged. + """ + duplicates = list_duplicates_in_buckets( clickhouse_url=config.clickhouse_url, - create_tables=create_tables, - drop_tables=drop_tables, - clickhouse_buffer_min_time=config.clickhouse_buffer_min_time, - clickhouse_buffer_max_time=config.clickhouse_buffer_max_time, + start_bucket=start_at, + end_bucket=end_at, ) - - with ClickhouseConnection(config.clickhouse_url) as db: - list_all_table_diffs(db) + found_duplicates = False + for count, bucket_date in duplicates: + if count > 0: + found_duplicates = True + click.echo(f"* {bucket_date}: {count}") + if not found_duplicates: + click.echo("no duplicates found") + if optimize: + optimize_all_tables_by_partition( + clickhouse_url=config.clickhouse_url, + partition_list=list_partitions_to_delete(duplicates), + ) diff --git a/oonipipeline/src/oonipipeline/dataviz/main.py b/oonipipeline/src/oonipipeline/dataviz/main.py index 6ed6309b..647479c8 100644 --- a/oonipipeline/src/oonipipeline/dataviz/main.py +++ b/oonipipeline/src/oonipipeline/dataviz/main.py @@ -30,19 +30,17 @@ templates = Jinja2Templates(directory=f"{current_dir}/templates") -def extract_meta(orig_obs_list) -> Tuple[List, Dict, Dict, Dict]: +def extract_meta(orig_obs_list) -> Tuple[List, Dict, Dict]: obs_list = [] measurement_meta = {} probe_meta = {} - processing_meta = {} for obs in orig_obs_list: wo_dict = asdict(obs) measurement_meta = wo_dict.pop("probe_meta", None) probe_meta = wo_dict.pop("measurement_meta", None) - processing_meta = wo_dict.pop("processing_meta", None) obs_list.append(wo_dict) - return obs_list, measurement_meta, probe_meta, processing_meta + return obs_list, measurement_meta, probe_meta @app.get("/analysis/") @@ -116,7 +114,7 @@ def analysis_by_msmt( # print(analysis_transcript_list) web_analysis_list, _, _, _ = extract_meta(web_analysis) - web_observations_list, measurement_meta, probe_meta, processing_meta = extract_meta( + web_observations_list, measurement_meta, probe_meta = extract_meta( web_observations ) return templates.TemplateResponse( @@ -132,7 +130,7 @@ def analysis_by_msmt( web_observations=web_observations_list, measurement_meta=measurement_meta, probe_meta=probe_meta, - processing_meta=processing_meta, + processing_meta={}, loni_blocked_dict=dict(zip(wer.loni_blocked_keys, wer.loni_blocked_values)), loni_blocked_value=sum(wer.loni_blocked_values), loni_down_dict=dict(zip(wer.loni_down_keys, wer.loni_down_values)), @@ -157,10 +155,10 @@ def observations_by_msmt( msmt, netinfodb=netinfodb ) - web_observations_list, probe_meta, measurement_meta, processing_meta = extract_meta( + web_observations_list, probe_meta, measurement_meta = extract_meta( web_observations ) - web_control_observations, _, _, _ = extract_meta(web_control_observations) + web_control_observations, _, _ = extract_meta(web_control_observations) return templates.TemplateResponse( request=request, name="observations.html", diff --git a/oonipipeline/src/oonipipeline/db/connections.py b/oonipipeline/src/oonipipeline/db/connections.py index 33434a2d..2afe7404 100644 --- a/oonipipeline/src/oonipipeline/db/connections.py +++ b/oonipipeline/src/oonipipeline/db/connections.py @@ -100,9 +100,7 @@ def execute_iter(self, *args, **kwargs): *args, **kwargs, settings={"max_block_size": self.max_block_size} ) - def write_rows(self, table_name, rows, column_names, use_buffer_table=False): - if use_buffer_table: - table_name = f"buffer_{table_name}" + def write_rows(self, table_name, rows, column_names): fields_str = ", ".join(column_names) query_str = f"INSERT INTO {table_name} ({fields_str}) VALUES" self.execute(query_str, rows) @@ -119,8 +117,6 @@ def _consume_rows( d.update(d.pop("probe_meta")) if "measurement_meta" in d: d.update(d.pop("measurement_meta")) - if "processing_meta" in d: - d.update(d.pop("processing_meta")) # TODO(art): this custom_remap should not be here if "loni_list" in d: @@ -137,28 +133,19 @@ def _consume_rows( def write_table_model_rows( self, row_iterator: Union[Iterable, List], - use_buffer_table=True, ): """ Write rows from a TableModelProtocol to the database. - We use two stages of buffering for performance reasons: - 1. Python in memory buffer to batch writes to the underlying database - connection via the max_block_size argument - 2. An optional Buffer table to batch the actual writes at the Clickhouse level. - - The second is needed because we might have multiple connections - hitting the database at a given time and it's desirable to be able to - tune the actual underlying writes at the clickhouse level. + We use buffering for performance reasons via a Python in memory buffer + to batch writes to the underlying database connection via the + max_block_size argument """ row_list, table_name = self._consume_rows(row_iterator) if len(row_list) == 0: return assert table_name is not None, f"no table for {row_list}" - if use_buffer_table: - table_name = f"buffer_{table_name}" - if table_name not in self.row_buffer: self.row_buffer[table_name] = [] self.row_buffer[table_name] += row_list diff --git a/oonipipeline/src/oonipipeline/db/create_tables.py b/oonipipeline/src/oonipipeline/db/create_tables.py index b416672e..ee8efb20 100644 --- a/oonipipeline/src/oonipipeline/db/create_tables.py +++ b/oonipipeline/src/oonipipeline/db/create_tables.py @@ -16,7 +16,7 @@ from dataclasses import Field, fields import typing -from oonidata.models.base import TableModelProtocol, ProcessingMeta +from oonidata.models.base import TableModelProtocol from oonidata.models.experiment_result import ( ExperimentResult, MeasurementExperimentResult, @@ -129,11 +129,6 @@ def iter_table_fields( type_str = typing_to_clickhouse(f.type) yield f, type_str continue - if f.type == ProcessingMeta: - for f in fields(ProcessingMeta): - type_str = typing_to_clickhouse(f.type) - yield f, type_str - continue try: type_str = typing_to_clickhouse(f.type) @@ -157,7 +152,9 @@ def format_create_query( index_str = ",\n".join(model.__table_index__) extra_str = "" if extra: - extra_str = f"ORDER BY ({index_str}) SETTINGS index_granularity = 8192;" + if model.__partition_key__: + extra_str = f"PARTITION BY ({model.__partition_key__})\n" + extra_str += f"ORDER BY ({index_str}) SETTINGS index_granularity = 8192;" return ( f""" CREATE TABLE IF NOT EXISTS {table_name} ( @@ -179,39 +176,13 @@ def format_create_query( ] -def make_create_queries( - num_layers=1, - min_time=10, - max_time=500, - min_rows=10_0000, - max_rows=100_000, - min_bytes=10_000_000, - max_bytes=1_000_000_000, -): +def make_create_queries(): create_queries = [] for model in table_models: table_name = model.__table_name__ create_queries.append( format_create_query(table_name, model), ) - - engine_str = f""" - Buffer( - currentDatabase(), {table_name}, - {num_layers}, - {min_time}, {max_time}, - {min_rows}, {max_rows}, - {min_bytes}, {max_bytes} - ) - """ - create_queries.append( - format_create_query( - f"buffer_{table_name}", - model, - engine=engine_str, - extra=False, - ) - ) return create_queries @@ -323,9 +294,6 @@ def list_all_table_diffs(db: ClickhouseConnection): diff_orig = get_table_column_diff( db=db, base_class=base_class, table_name=table_name ) - diff_buffer = get_table_column_diff( - db=db, base_class=base_class, table_name=f"buffer_{table_name}" - ) except TableDoesNotExistError: print(f"# {table_name} does not exist") print("rerun with --create-tables") @@ -334,11 +302,6 @@ def list_all_table_diffs(db: ClickhouseConnection): print(f"# {table_name} diff") for cd in diff_orig: print(cd.get_sql_migration()) - if len(diff_buffer) > 0: - print(f"# buffer_{table_name} diff") - for cd in diff_buffer: - print(cd.get_sql_migration()) - def main(): for query, table_name in make_create_queries(): diff --git a/oonipipeline/src/oonipipeline/db/maintenance.py b/oonipipeline/src/oonipipeline/db/maintenance.py new file mode 100644 index 00000000..1756284f --- /dev/null +++ b/oonipipeline/src/oonipipeline/db/maintenance.py @@ -0,0 +1,84 @@ +import itertools +from datetime import datetime, timedelta +from typing import List, Optional +from tqdm import tqdm +from collections import defaultdict + +from clickhouse_driver import Client as Clickhouse + + +def click_execute(q, params: Optional[dict] = None, clickhouse_url: str = "localhost"): + click = Clickhouse( + clickhouse_url, + connect_timeout=10, + send_receive_timeout=60 * 15, + sync_request_timeout=5, + ) + return click.execute(q, params=params) + + +def batch(iterable, n=1): + l = len(iterable) + for ndx in range(0, l, n): + yield iterable[ndx : min(ndx + n, l)] + + +def list_duplicates_in_buckets( + clickhouse_url: str, + start_bucket: datetime, + end_bucket: datetime, + target_table: str = "obs_web", +) -> List: + end_delta = (end_bucket - start_bucket).days + bucket_range = [ + (start_bucket + timedelta(days=offset)).strftime("%Y-%m-%d") + for offset in range(end_delta) + ] + result_list = [] + for bucket_dates in tqdm(list(batch(bucket_range, n=5))): + res = click_execute( + f""" + SELECT + countIf(uid_cnt > 1) as duplicate_uids, + bucket_date + FROM ( + SELECT + CONCAT(measurement_uid, '-', toString(observation_idx)) as uid, + COUNT() as uid_cnt, + bucket_date + FROM {target_table} + WHERE bucket_date IN %(bucket_date)s + GROUP BY bucket_date, uid + ) GROUP BY bucket_date + """, + params={"bucket_date": bucket_dates}, + clickhouse_url=clickhouse_url, + ) + result_list += res + return sorted(result_list, key=lambda x: x[1]) + + +def list_partitions_to_delete(result_list): + partitions_to_clear = [] + + sum_by_partition = defaultdict(lambda: 0) + for count, bucket_date in result_list: + partition_id = bucket_date.replace("-", "")[:6] + sum_by_partition[partition_id] += count + for partition_id, count in sum_by_partition.items(): + if count > 0: + partitions_to_clear.append(partition_id) + return partitions_to_clear + + +def optimize_all_tables_by_partition( + clickhouse_url: str, + partition_list, + tables=["obs_web", "obs_web_ctrl", "obs_http_middlebox"], +): + for table_name, partition in tqdm(list(itertools.product(tables, partition_list))): + print(f"optimizing {table_name} partition {partition}") + click_execute( + f"OPTIMIZE TABLE {table_name} PARTITION '{partition}'", + clickhouse_url=clickhouse_url, + ) diff --git a/oonipipeline/src/oonipipeline/settings.py b/oonipipeline/src/oonipipeline/settings.py index 4fe5f073..7868dce8 100644 --- a/oonipipeline/src/oonipipeline/settings.py +++ b/oonipipeline/src/oonipipeline/settings.py @@ -16,8 +16,6 @@ class Settings(BaseSettings): data_dir: str = "tests/data/datadir" clickhouse_url: str = "clickhouse://localhost" - clickhouse_buffer_min_time: int = 10 - clickhouse_buffer_max_time: int = 60 clickhouse_write_batch_size: int = 200_000 telemetry_endpoint: Optional[str] = None diff --git a/oonipipeline/src/oonipipeline/temporal/activities/analysis.py b/oonipipeline/src/oonipipeline/temporal/activities/analysis.py index 36756d7d..77f85967 100644 --- a/oonipipeline/src/oonipipeline/temporal/activities/analysis.py +++ b/oonipipeline/src/oonipipeline/temporal/activities/analysis.py @@ -192,7 +192,9 @@ def make_analysis_in_a_day(params: MakeAnalysisParams) -> dict: ) except: - web_obs_ids = ",".join(map(lambda wo: wo.observation_id, web_obs)) + web_obs_ids = ",".join( + map(lambda wo: wo.measurement_meta.measurement_uid, web_obs) + ) log.error( f"failed to generate analysis for {web_obs_ids}", exc_info=True ) diff --git a/oonipipeline/src/oonipipeline/temporal/activities/common.py b/oonipipeline/src/oonipipeline/temporal/activities/common.py index 72cb78ce..c79df876 100644 --- a/oonipipeline/src/oonipipeline/temporal/activities/common.py +++ b/oonipipeline/src/oonipipeline/temporal/activities/common.py @@ -29,28 +29,25 @@ class ClickhouseParams: def optimize_all_tables(params: ClickhouseParams): with ClickhouseConnection(params.clickhouse_url) as db: table_names = [table_name for _, table_name in make_create_queries()] - # We first flush the buffer_ tables and then the non-buffer tables - for table_name in filter(lambda x: x.startswith("buffer_"), table_names): - db.execute(f"OPTIMIZE TABLE {table_name}") - for table_name in filter(lambda x: not x.startswith("buffer_"), table_names): - db.execute(f"OPTIMIZE TABLE {table_name}") + for tn in table_names: + db.execute(f"OPTIMIZE TABLE {tn}") @dataclass class OptimizeTablesParams: clickhouse: str table_names: List[str] + partition_str: str @activity.defn def optimize_tables(params: OptimizeTablesParams): with ClickhouseConnection(params.clickhouse) as db: for table_name in params.table_names: - # Wait for mutation to complete so that we don't run into out of - # space issues while doing the batch inserts - wait_for_mutations(db, table_name=table_name) - log.info(f"waiting for mutations to finish on {table_name}") - db.execute(f"OPTIMIZE TABLE {table_name}") + log.info(f"OPTIMIZING {table_name} for partition {params.partition_str}") + db.execute( + f"OPTIMIZE TABLE {table_name} PARTITION '{params.partition_str}'" + ) def update_assets( diff --git a/oonipipeline/src/oonipipeline/temporal/activities/observations.py b/oonipipeline/src/oonipipeline/temporal/activities/observations.py index 6f1a0645..df957738 100644 --- a/oonipipeline/src/oonipipeline/temporal/activities/observations.py +++ b/oonipipeline/src/oonipipeline/temporal/activities/observations.py @@ -97,7 +97,7 @@ def make_observations_for_file_entry( bucket_date=bucket_date, ) for obs_list in obs_tuple: - db.write_table_model_rows(obs_list, use_buffer_table=False) + db.write_table_model_rows(obs_list) measurement_count += 1 except Exception as exc: log.error(f"failed at idx: {measurement_count} ({msmt_str})", exc_info=True) diff --git a/oonipipeline/src/oonipipeline/temporal/client_operations.py b/oonipipeline/src/oonipipeline/temporal/client_operations.py index 3a6589f1..f8194623 100644 --- a/oonipipeline/src/oonipipeline/temporal/client_operations.py +++ b/oonipipeline/src/oonipipeline/temporal/client_operations.py @@ -1,15 +1,7 @@ -import asyncio import logging from dataclasses import dataclass -from datetime import datetime from typing import List, Optional, Tuple -from oonipipeline.temporal.schedules import ( - ScheduleIdMap, - schedule_all, - schedule_backfill, - clear_schedules, -) from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import SERVICE_NAME, Resource @@ -112,64 +104,6 @@ async def temporal_connect( return client -async def execute_backfill( - probe_cc: List[str], - test_name: List[str], - start_at: datetime, - end_at: datetime, - workflow_name: str, - temporal_config: TemporalConfig, -): - log.info(f"creating all schedules") - - client = await temporal_connect(temporal_config=temporal_config) - - return await schedule_backfill( - client=client, - probe_cc=probe_cc, - test_name=test_name, - start_at=start_at, - end_at=end_at, - workflow_name=workflow_name, - ) - - -async def create_schedules( - probe_cc: List[str], - test_name: List[str], - clickhouse_url: str, - data_dir: str, - temporal_config: TemporalConfig, -) -> ScheduleIdMap: - log.info(f"creating all schedules") - - client = await temporal_connect(temporal_config=temporal_config) - - return await schedule_all( - client=client, - probe_cc=probe_cc, - test_name=test_name, - clickhouse_url=clickhouse_url, - data_dir=data_dir, - ) - - -async def execute_clear_schedules( - probe_cc: List[str], - test_name: List[str], - temporal_config: TemporalConfig, -) -> List[str]: - log.info(f"rescheduling everything") - - client = await temporal_connect(temporal_config=temporal_config) - - return await clear_schedules( - client=client, - probe_cc=probe_cc, - test_name=test_name, - ) - - async def get_status( temporal_config: TemporalConfig, ) -> Tuple[List[WorkflowExecution], List[WorkflowExecution]]: @@ -205,83 +139,3 @@ async def get_status( print(f" execution_time={workflow.execution_time}") print(f" execution_time={workflow.execution_time}") return active_observation_workflows, active_observation_workflows - - -def run_backfill( - temporal_config: TemporalConfig, - probe_cc: List[str], - test_name: List[str], - workflow_name: str, - start_at: datetime, - end_at: datetime, -): - try: - asyncio.run( - execute_backfill( - temporal_config=temporal_config, - workflow_name=workflow_name, - probe_cc=probe_cc, - test_name=test_name, - start_at=start_at, - end_at=end_at, - ) - ) - except KeyboardInterrupt: - print("shutting down") - - -def run_create_schedules( - probe_cc: List[str], - test_name: List[str], - clickhouse_url: str, - data_dir: str, - temporal_config: TemporalConfig, -): - try: - asyncio.run( - create_schedules( - probe_cc=probe_cc, - test_name=test_name, - clickhouse_url=clickhouse_url, - data_dir=data_dir, - temporal_config=temporal_config, - ) - ) - except KeyboardInterrupt: - print("shutting down") - - -def run_clear_schedules( - probe_cc: List[str], - test_name: List[str], - temporal_config: TemporalConfig, -): - try: - asyncio.run( - execute_clear_schedules( - probe_cc=probe_cc, - test_name=test_name, - temporal_config=temporal_config, - ) - ) - except KeyboardInterrupt: - print("shutting down") - - -def start_event_loop(async_task): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop.run_until_complete(async_task()) - - -def run_status( - temporal_config: TemporalConfig, -): - try: - asyncio.run( - get_status( - temporal_config=temporal_config, - ) - ) - except KeyboardInterrupt: - print("shutting down") diff --git a/oonipipeline/src/oonipipeline/temporal/schedules.py b/oonipipeline/src/oonipipeline/temporal/schedules.py index abee47b2..f1f8db7b 100644 --- a/oonipipeline/src/oonipipeline/temporal/schedules.py +++ b/oonipipeline/src/oonipipeline/temporal/schedules.py @@ -85,6 +85,7 @@ async def schedule_all( test_name: List[str], clickhouse_url: str, data_dir: str, + schedule_analysis: bool = True, ) -> ScheduleIdMap: schedule_id_map = ScheduleIdMap() filter_id = gen_schedule_filter_id(probe_cc, test_name) @@ -136,50 +137,51 @@ async def schedule_all( ) schedule_id_map.observations = sched_handle.id - analysis_params = AnalysisWorkflowParams( - probe_cc=probe_cc, - test_name=test_name, - clickhouse=clickhouse_url, - data_dir=data_dir, - fast_fail=False, - ) - sched_handle = await client.create_schedule( - id=f"{ANALYSIS_SCHED_PREFIX}-{filter_id}-{ts}", - schedule=Schedule( - action=ScheduleActionStartWorkflow( - AnalysisWorkflow.run, - analysis_params, - id=f"{ANALYSIS_WF_PREFIX}-{filter_id}-{ts}", - task_queue=TASK_QUEUE_NAME, - execution_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT, - task_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT, - run_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT, + if schedule_analysis == True: + analysis_params = AnalysisWorkflowParams( + probe_cc=probe_cc, + test_name=test_name, + clickhouse=clickhouse_url, + data_dir=data_dir, + fast_fail=False, + ) + sched_handle = await client.create_schedule( + id=f"{ANALYSIS_SCHED_PREFIX}-{filter_id}-{ts}", + schedule=Schedule( + action=ScheduleActionStartWorkflow( + AnalysisWorkflow.run, + analysis_params, + id=f"{ANALYSIS_WF_PREFIX}-{filter_id}-{ts}", + task_queue=TASK_QUEUE_NAME, + execution_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT, + task_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT, + run_timeout=MAKE_ANALYSIS_START_TO_CLOSE_TIMEOUT, + ), + spec=ScheduleSpec( + intervals=[ + ScheduleIntervalSpec( + # We offset the Analysis workflow by 4 hours assuming + # that the observation generation will take less than 4 + # hours to complete. + # TODO(art): it's probably better to refactor this into some + # kind of DAG + every=timedelta(days=1), + offset=timedelta(hours=6), + ) + ], + ), + policy=SchedulePolicy(overlap=ScheduleOverlapPolicy.ALLOW_ALL), + state=ScheduleState( + note="Run the analysis workflow every day with an offset of 6 hours to ensure the observation workflow has completed" + ), ), - spec=ScheduleSpec( - intervals=[ - ScheduleIntervalSpec( - # We offset the Analysis workflow by 4 hours assuming - # that the observation generation will take less than 4 - # hours to complete. - # TODO(art): it's probably better to refactor this into some - # kind of DAG - every=timedelta(days=1), - offset=timedelta(hours=6), - ) - ], - ), - policy=SchedulePolicy(overlap=ScheduleOverlapPolicy.ALLOW_ALL), - state=ScheduleState( - note="Run the analysis workflow every day with an offset of 6 hours to ensure the observation workflow has completed" - ), - ), - ) + ) schedule_id_map.analysis = sched_handle.id return schedule_id_map -async def clear_schedules( +async def clear_all_schedules( client: TemporalClient, probe_cc: List[str], test_name: List[str], diff --git a/oonipipeline/src/oonipipeline/temporal/workflows/analysis.py b/oonipipeline/src/oonipipeline/temporal/workflows/analysis.py index 97c653b9..14715dab 100644 --- a/oonipipeline/src/oonipipeline/temporal/workflows/analysis.py +++ b/oonipipeline/src/oonipipeline/temporal/workflows/analysis.py @@ -16,6 +16,7 @@ ) from oonipipeline.temporal.activities.common import ( ClickhouseParams, + OptimizeTablesParams, ObsCountParams, get_obs_count_by_cc, optimize_all_tables, diff --git a/oonipipeline/src/oonipipeline/temporal/workflows/observations.py b/oonipipeline/src/oonipipeline/temporal/workflows/observations.py index d9482865..9482f826 100644 --- a/oonipipeline/src/oonipipeline/temporal/workflows/observations.py +++ b/oonipipeline/src/oonipipeline/temporal/workflows/observations.py @@ -14,15 +14,10 @@ optimize_tables, ) from oonipipeline.temporal.activities.observations import ( - DeletePreviousRangeParams, - GetPreviousRangeParams, MakeObservationsParams, - delete_previous_range, - get_previous_range, make_observations, ) from oonipipeline.temporal.workflows.common import ( - TASK_QUEUE_NAME, get_workflow_start_time, ) @@ -34,6 +29,7 @@ class ObservationsWorkflowParams: clickhouse: str data_dir: str fast_fail: bool + is_reprocessing: bool = True bucket_date: Optional[str] = None @@ -55,30 +51,6 @@ async def run(self, params: ObservationsWorkflowParams) -> dict: fast_fail=params.fast_fail, bucket_date=params.bucket_date, ) - - await workflow.execute_activity( - optimize_tables, - OptimizeTablesParams(clickhouse=params.clickhouse, table_names=["obs_web"]), - start_to_close_timeout=timedelta(minutes=20), - retry_policy=RetryPolicy(maximum_attempts=10), - ) - - previous_ranges = await workflow.execute_activity( - get_previous_range, - GetPreviousRangeParams( - clickhouse=params.clickhouse, - bucket_date=params.bucket_date, - test_name=params.test_name, - probe_cc=params.probe_cc, - tables=["obs_web"], - ), - start_to_close_timeout=timedelta(minutes=2), - retry_policy=RetryPolicy(maximum_attempts=4), - ) - workflow.logger.info( - f"finished get_previous_range for bucket_date={params.bucket_date}" - ) - obs_res = await workflow.execute_activity( make_observations, params_make_observations, @@ -91,19 +63,23 @@ async def run(self, params: ObservationsWorkflowParams) -> dict: f"{total_t.pretty} speed: {obs_res['mb_per_sec']}MB/s ({obs_res['measurement_per_sec']}msmt/s)" ) - workflow.logger.info( - f"finished optimize_tables for bucket_date={params.bucket_date}" - ) - - await workflow.execute_activity( - delete_previous_range, - DeletePreviousRangeParams( - clickhouse=params.clickhouse, - previous_ranges=previous_ranges, - ), - start_to_close_timeout=timedelta(minutes=10), - retry_policy=RetryPolicy(maximum_attempts=10), - ) + # Force the recreation of all parts when reprocessing, this is not + # needed for a daily run. + if params.is_reprocessing: + partition_str = params.bucket_date.replace("-", "")[:6] + await workflow.execute_activity( + optimize_tables, + OptimizeTablesParams( + clickhouse=params.clickhouse, + table_names=["obs_web", "obs_web_ctrl", "obs_http_middlebox"], + partition_str=partition_str, + ), + start_to_close_timeout=timedelta(minutes=30), + retry_policy=RetryPolicy(maximum_attempts=10), + ) + workflow.logger.info( + f"finished optimize_tables for bucket_date={params.bucket_date}" + ) return { "measurement_count": obs_res["measurement_count"], diff --git a/oonipipeline/src/oonipipeline/transforms/measurement_transformer.py b/oonipipeline/src/oonipipeline/transforms/measurement_transformer.py index 43ceec32..30e668fd 100644 --- a/oonipipeline/src/oonipipeline/transforms/measurement_transformer.py +++ b/oonipipeline/src/oonipipeline/transforms/measurement_transformer.py @@ -16,7 +16,6 @@ ) from collections import defaultdict -from oonidata.models.base import ProcessingMeta from oonidata.models.dataformats import ( DNSAnswer, DNSQuery, @@ -502,6 +501,7 @@ def make_web_observation( msmt_meta: MeasurementMeta, probe_meta: ProbeMeta, netinfodb: NetinfoDB, + observation_idx: int = 0, dns_o: Optional[DNSObservation] = None, tcp_o: Optional[TCPObservation] = None, tls_o: Optional[TLSObservation] = None, @@ -518,9 +518,8 @@ def make_web_observation( probe_analysis=probe_analysis, measurement_meta=msmt_meta, probe_meta=probe_meta, - processing_meta=ProcessingMeta( - processing_start_time=datetime.now(timezone.utc) - ), + observation_idx=observation_idx, + created_at=datetime.now(timezone.utc), ) dns_ip = None if dns_o and dns_o.answer: @@ -565,7 +564,6 @@ def make_web_observation( maybe_set_web_fields( src_obs=http_o, prefix="http_", web_obs=web_obs, field_names=WEB_OBS_FIELDS ) - web_obs.processing_meta.processing_end_time = datetime.now(timezone.utc) return web_obs @@ -753,6 +751,7 @@ def __init__( msmt=measurement, bucket_date=bucket_date ) self.probe_meta = make_probe_meta(msmt=measurement, netinfodb=netinfodb) + self.observation_idx = 1 def make_http_observations( self, @@ -903,6 +902,7 @@ def consume_web_observations( http_o=http_o, target_id=target_id, probe_analysis=probe_analysis, + observation_idx=self.observation_idx, ) ) if tcp_o: @@ -911,6 +911,7 @@ def consume_web_observations( tls_observations.remove(tls_o) if http_o: http_observations.remove(http_o) + self.observation_idx += 1 for tcp_o in tcp_observations: _, tls_o, http_o = find_relevant_observations( @@ -933,8 +934,10 @@ def consume_web_observations( http_o=http_o, target_id=target_id, probe_analysis=probe_analysis, + observation_idx=self.observation_idx, ) ) + self.observation_idx += 1 for tls_o in tls_observations: _, _, http_o = find_relevant_observations( @@ -953,8 +956,10 @@ def consume_web_observations( http_o=http_o, target_id=target_id, probe_analysis=probe_analysis, + observation_idx=self.observation_idx, ) ) + self.observation_idx += 1 for http_o in http_observations: web_obs_list.append( @@ -965,14 +970,10 @@ def consume_web_observations( http_o=http_o, target_id=target_id, probe_analysis=probe_analysis, + observation_idx=self.observation_idx, ) ) - - for idx, obs in enumerate(web_obs_list): - obs.observation_id = f"{obs.measurement_meta.measurement_uid}_{idx}" - obs.created_at = datetime.now(timezone.utc).replace( - microsecond=0, tzinfo=None - ) + self.observation_idx += 1 return web_obs_list diff --git a/oonipipeline/src/oonipipeline/transforms/nettests/browser_web.py b/oonipipeline/src/oonipipeline/transforms/nettests/browser_web.py index 8d30f771..db16a9e0 100644 --- a/oonipipeline/src/oonipipeline/transforms/nettests/browser_web.py +++ b/oonipipeline/src/oonipipeline/transforms/nettests/browser_web.py @@ -1,6 +1,5 @@ from datetime import datetime, timezone from typing import List, Tuple -from oonidata.models.base import ProcessingMeta from oonidata.models.nettests import BrowserWeb from oonidata.models.observations import WebObservation @@ -12,10 +11,6 @@ def make_observations(self, msmt: BrowserWeb) -> Tuple[List[WebObservation]]: bw_obs = WebObservation( measurement_meta=self.measurement_meta, probe_meta=self.probe_meta, - processing_meta=ProcessingMeta( - processing_start_time=datetime.now(timezone.utc), - processing_end_time=datetime.now(timezone.utc), - ), http_failure=msmt.test_keys.result, http_runtime=msmt.test_keys.load_time_ms, ) diff --git a/oonipipeline/src/oonipipeline/transforms/nettests/http_header_field_manipulation.py b/oonipipeline/src/oonipipeline/transforms/nettests/http_header_field_manipulation.py index 8f906c46..11e6841b 100644 --- a/oonipipeline/src/oonipipeline/transforms/nettests/http_header_field_manipulation.py +++ b/oonipipeline/src/oonipipeline/transforms/nettests/http_header_field_manipulation.py @@ -14,7 +14,7 @@ def make_observations( ) -> Tuple[List[HTTPMiddleboxObservation]]: mb_obs = HTTPMiddleboxObservation( hfm_success=True, - observation_id=f"{msmt.measurement_uid}_0", + observation_idx=1, created_at=datetime.now(timezone.utc).replace(microsecond=0, tzinfo=None), measurement_meta=self.measurement_meta, probe_meta=self.probe_meta, diff --git a/oonipipeline/src/oonipipeline/transforms/nettests/http_invalid_request_line.py b/oonipipeline/src/oonipipeline/transforms/nettests/http_invalid_request_line.py index f3184f5c..c7ff91a3 100644 --- a/oonipipeline/src/oonipipeline/transforms/nettests/http_invalid_request_line.py +++ b/oonipipeline/src/oonipipeline/transforms/nettests/http_invalid_request_line.py @@ -54,7 +54,7 @@ def make_observations( ) -> Tuple[List[HTTPMiddleboxObservation]]: mb_obs = HTTPMiddleboxObservation( hirl_success=True, - observation_id=f"{msmt.measurement_uid}_0", + observation_idx=1, created_at=datetime.now(timezone.utc).replace(microsecond=0, tzinfo=None), measurement_meta=self.measurement_meta, probe_meta=self.probe_meta, diff --git a/oonipipeline/src/oonipipeline/transforms/nettests/web_connectivity.py b/oonipipeline/src/oonipipeline/transforms/nettests/web_connectivity.py index 82433e86..e4716ce3 100644 --- a/oonipipeline/src/oonipipeline/transforms/nettests/web_connectivity.py +++ b/oonipipeline/src/oonipipeline/transforms/nettests/web_connectivity.py @@ -3,7 +3,6 @@ from typing import Dict, List, Tuple from urllib.parse import urlparse from oonidata.datautils import is_ip_bogon -from oonidata.models.base import ProcessingMeta from oonidata.models.nettests import WebConnectivity from oonidata.models.observations import ( MeasurementMeta, @@ -44,9 +43,6 @@ def make_web_control_observations( if msmt.test_keys.control.dns and msmt.test_keys.control.dns.failure: obs = WebControlObservation( measurement_meta=measurement_meta, - processing_meta=ProcessingMeta( - processing_start_time=datetime.now(timezone.utc) - ), hostname=hostname, created_at=created_at, ) @@ -64,9 +60,6 @@ def make_web_control_observations( obs = WebControlObservation( measurement_meta=measurement_meta, - processing_meta=ProcessingMeta( - processing_start_time=datetime.now(timezone.utc) - ), hostname=hostname, created_at=created_at, ) @@ -89,9 +82,6 @@ def make_web_control_observations( measurement_meta=measurement_meta, hostname=p.hostname, port=p.port, - processing_meta=ProcessingMeta( - processing_start_time=datetime.now(timezone.utc) - ), created_at=created_at, ) @@ -127,9 +117,6 @@ def make_web_control_observations( for ip in dns_ips - mapped_dns_ips: obs = WebControlObservation( measurement_meta=measurement_meta, - processing_meta=ProcessingMeta( - processing_start_time=datetime.now(timezone.utc) - ), hostname=hostname, created_at=created_at, ) @@ -140,9 +127,6 @@ def make_web_control_observations( if msmt.test_keys.control.http_request: obs = WebControlObservation( measurement_meta=measurement_meta, - processing_meta=ProcessingMeta( - processing_start_time=datetime.now(timezone.utc) - ), hostname=hostname, created_at=created_at, ) @@ -153,8 +137,7 @@ def make_web_control_observations( web_ctrl_obs.append(obs) for idx, obs in enumerate(web_ctrl_obs): - obs.observation_id = f"{obs.measurement_meta.measurement_uid}_{idx}" - obs.processing_meta.processing_end_time = datetime.now(timezone.utc) + obs.observation_idx = idx return web_ctrl_obs diff --git a/oonipipeline/src/oonipipeline/transforms/observations.py b/oonipipeline/src/oonipipeline/transforms/observations.py index f0ce3a2a..5002c830 100644 --- a/oonipipeline/src/oonipipeline/transforms/observations.py +++ b/oonipipeline/src/oonipipeline/transforms/observations.py @@ -1,10 +1,26 @@ -from typing import List, Tuple, Union +from typing import List, Optional, Tuple, Union, overload +from datetime import datetime, timezone from oonidata.models.observations import ( HTTPMiddleboxObservation, WebControlObservation, WebObservation, ) + +from oonidata.models.nettests import ( + Signal, + SupportedDataformats, + Whatsapp, + Telegram, + StunReachability, + Tor, + FacebookMessenger, + HTTPHeaderFieldManipulation, + UrlGetter, + WebConnectivity, + HTTPInvalidRequestLine, +) + from .nettests.dnscheck import DNSCheckTransformer from .nettests.http_header_field_manipulation import ( HTTPHeaderFieldManipulationTransformer, @@ -46,6 +62,40 @@ TypeHTTPMiddleboxObservations = Tuple[List[HTTPMiddleboxObservation]] +@overload +def measurement_to_observations( + msmt: Union[HTTPHeaderFieldManipulation, HTTPInvalidRequestLine], + netinfodb: NetinfoDB, + bucket_date: str, +) -> TypeHTTPMiddleboxObservations: ... + + +@overload +def measurement_to_observations( + msmt: WebConnectivity, + netinfodb: NetinfoDB, + bucket_date: str, +) -> TypeWebConnectivityObservations: ... + + +@overload +def measurement_to_observations( + msmt: Union[ + Signal, Whatsapp, Telegram, StunReachability, Tor, FacebookMessenger, UrlGetter + ], + netinfodb: NetinfoDB, + bucket_date: str, +) -> TypeWebObservations: ... + + +@overload +def measurement_to_observations( + msmt: SupportedDataformats, + netinfodb: NetinfoDB, + bucket_date: str, +) -> TypeWebObservations: ... + + def measurement_to_observations( msmt, netinfodb: NetinfoDB, diff --git a/oonipipeline/tests/conftest.py b/oonipipeline/tests/conftest.py index f0daec0b..0c107935 100644 --- a/oonipipeline/tests/conftest.py +++ b/oonipipeline/tests/conftest.py @@ -143,7 +143,7 @@ def create_db_for_fixture(conn_url): db.execute("SELECT 1") except: pytest.skip("no database connection") - for query, _ in make_create_queries(min_time=1, max_time=2): + for query, _ in make_create_queries(): db.execute(query) return db @@ -156,8 +156,6 @@ def db_notruncate(clickhouse_server): @pytest.fixture def db(clickhouse_server): db = create_db_for_fixture(clickhouse_server) - for _, table_name in make_create_queries(min_time=1, max_time=2): - if table_name.startswith("buffer_"): - continue + for _, table_name in make_create_queries(): db.execute(f"TRUNCATE TABLE {table_name};") yield db diff --git a/oonipipeline/tests/test_cli.py b/oonipipeline/tests/test_cli.py index 745753e5..4c2ff715 100644 --- a/oonipipeline/tests/test_cli.py +++ b/oonipipeline/tests/test_cli.py @@ -1,12 +1,10 @@ import asyncio -from multiprocessing import Process +from unittest import mock from pathlib import Path import time -import textwrap from oonipipeline.cli.commands import cli from oonipipeline.temporal.client_operations import TemporalConfig, get_status -import pytest def wait_for_mutations(db, table_name): @@ -37,40 +35,23 @@ def __init__(self): self.default_map = {} -@pytest.mark.skip("TODO(art): maybe test new settings parsing") -def test_parse_config(tmp_path): - ctx = MockContext() - - config_content = """[options] - something = other - [options.subcommand] - otherthing = bar - [options.subcommand2] - spam = ham - """ - config_path = tmp_path / "config.ini" - with config_path.open("w") as out_file: - out_file.write(textwrap.dedent(config_content)) - defaults = parse_config_file(ctx, str(config_path)) - assert defaults["something"] == "other" - assert defaults["subcommand"]["otherthing"] == "bar" - assert defaults["subcommand2"]["spam"] == "ham" - assert defaults["schedule"]["something"] == "other" - assert defaults["backfill"]["something"] == "other" - - -@pytest.mark.skip("TODO(art): moved into temporal_e2e") +@mock.patch("oonipipeline.cli.commands.make_create_queries") +@mock.patch("oonipipeline.cli.commands.list_all_table_diffs") +@mock.patch("oonipipeline.cli.commands.maybe_create_delete_tables") +@mock.patch("oonipipeline.cli.commands.clear_all_schedules") +@mock.patch("oonipipeline.cli.commands.schedule_backfill") +@mock.patch("oonipipeline.cli.commands.schedule_all") +@mock.patch("oonipipeline.cli.commands.temporal_connect") def test_full_workflow( - db, + temporal_connect_mock, + schedule_all_mock, + schedule_backfill_mock, + clear_all_schedules_mock, + maybe_create_delete_tables_mock, + list_all_table_diffs, + make_create_queries_mock, cli_runner, - fingerprintdb, - netinfodb, - datadir, - tmp_path: Path, - temporal_dev_server, - temporal_workers, ): - print(f"running schedule observations") result = cli_runner.invoke( cli, [ @@ -79,20 +60,12 @@ def test_full_workflow( "BA", "--test-name", "web_connectivity", - "--create-tables", - "--data-dir", - datadir, - "--clickhouse", - db.clickhouse_url, - "--clickhouse-buffer-min-time", - 1, - "--clickhouse-buffer-max-time", - 2, - # "--archives-dir", - # tmp_path.absolute(), ], ) assert result.exit_code == 0 + assert temporal_connect_mock.called + assert schedule_all_mock.called + temporal_connect_mock.reset_mock() result = cli_runner.invoke( cli, [ @@ -101,173 +74,42 @@ def test_full_workflow( "2022-10-21", "--end-at", "2022-10-22", - "--clickhouse", - db.clickhouse_url, - "--clickhouse-buffer-min-time", - 1, - "--clickhouse-buffer-max-time", - 2, - "--schedule-id", - "oonipipeline-observations-schedule-ba-web_connectivity", - # "--archives-dir", - # tmp_path.absolute(), + "--workflow-name", + "observations", ], ) assert result.exit_code == 0 + assert temporal_connect_mock.called + assert schedule_backfill_mock.called - wait_for_backfill() - # We wait on the table buffers to be flushed - db.execute("OPTIMIZE TABLE buffer_obs_web") - # assert len(list(tmp_path.glob("*.warc.gz"))) == 1 - res = db.execute( - "SELECT bucket_date, COUNT(DISTINCT(measurement_uid)) FROM obs_web WHERE probe_cc = 'BA' GROUP BY bucket_date" - ) - bucket_dict = dict(res) - assert "2022-10-20" in bucket_dict, bucket_dict - assert bucket_dict["2022-10-20"] == 200, bucket_dict - obs_count = bucket_dict["2022-10-20"] - - print("running backfill") + temporal_connect_mock.reset_mock() result = cli_runner.invoke( cli, [ - "backfill", - "--start-at", - "2022-10-21", - "--end-at", - "2022-10-22", - "--clickhouse", - db.clickhouse_url, - "--clickhouse-buffer-min-time", - 1, - "--clickhouse-buffer-max-time", - 2, - "--schedule-id", - "oonipipeline-observations-schedule-ba-web_connectivity", - # "--archives-dir", - # tmp_path.absolute(), + "clear-schedules", ], ) assert result.exit_code == 0 + assert temporal_connect_mock.called + assert clear_all_schedules_mock.called - wait_for_backfill() - # We wait on the table buffers to be flushed - db.execute("OPTIMIZE TABLE buffer_obs_web") - - # Wait for the mutation to finish running - wait_for_mutations(db, "obs_web") - res = db.execute( - "SELECT bucket_date, COUNT(DISTINCT(measurement_uid)) FROM obs_web WHERE probe_cc = 'BA' GROUP BY bucket_date" - ) - bucket_dict = dict(res) - assert "2022-10-20" in bucket_dict, bucket_dict - # By re-running it against the same date, we should still get the same observation count - assert bucket_dict["2022-10-20"] == obs_count, bucket_dict - - # result = cli_runner.invoke( - # cli, - # [ - # "fphunt", - # "--data-dir", - # datadir, - # "--archives-dir", - # tmp_path.absolute(), - # ], - # ) - # assert result.exit_code == 0 - - # print("running mkanalysis") - # result = cli_runner.invoke( - # cli, - # [ - # "mkanalysis", - # "--probe-cc", - # "BA", - # "--start-day", - # "2022-10-20", - # "--end-day", - # "2022-10-21", - # "--test-name", - # "web_connectivity", - # "--data-dir", - # datadir, - # "--clickhouse", - # db.clickhouse_url, - # "--clickhouse-buffer-min-time", - # 1, - # "--clickhouse-buffer-max-time", - # 2, - # "--parallelism", - # 1, - # ], - # ) - # assert result.exit_code == 0 - # time.sleep(3) - # res = db.execute( - # "SELECT COUNT(DISTINCT(measurement_uid)) FROM measurement_experiment_result WHERE measurement_uid LIKE '20221020%' AND location_network_cc = 'BA'" - # ) - # assert res[0][0] == 200 # type: ignore - # print("finished ALL") - # # We wait on the table buffers to be flushed - - print("running schedule analysis") result = cli_runner.invoke( cli, - [ - "schedule", - "--probe-cc", - "BA", - "--test-name", - "web_connectivity", - "--create-tables", - "--data-dir", - datadir, - "--clickhouse", - db.clickhouse_url, - "--clickhouse-buffer-min-time", - 1, - "--clickhouse-buffer-max-time", - 2, - "--no-observations", - "--analysis", - # "--archives-dir", - # tmp_path.absolute(), - ], + ["checkdb", "--print-create", "--create-tables", "--print-diff"], ) assert result.exit_code == 0 + assert maybe_create_delete_tables_mock.called + assert list_all_table_diffs.called + assert make_create_queries_mock.called + + maybe_create_delete_tables_mock.reset_mock() + list_all_table_diffs.reset_mock() + make_create_queries_mock.reset_mock() result = cli_runner.invoke( cli, - [ - "backfill", - "--start-at", - "2022-10-21", - "--end-at", - "2022-10-22", - "--clickhouse", - db.clickhouse_url, - "--clickhouse-buffer-min-time", - 1, - "--clickhouse-buffer-max-time", - 2, - "--schedule-id", - "oonipipeline-analysis-schedule-ba-web_connectivity", - # "--archives-dir", - # tmp_path.absolute(), - ], + ["checkdb", "--print-create"], ) assert result.exit_code == 0 - - # We wait on the table buffers to be flushed - wait_for_backfill() - # assert len(list(tmp_path.glob("*.warc.gz"))) == 1 - db.execute("OPTIMIZE TABLE measurement_experiment_result") - db.execute("OPTIMIZE TABLE buffer_measurement_experiment_result") - wait_for_mutations(db, "measurement_experiment_result") - - # TODO(art): find a better way than sleeping to get the tables to be flushed - time.sleep(10) - res = db.execute( - "SELECT COUNT(DISTINCT(measurement_uid)) FROM measurement_experiment_result WHERE measurement_uid LIKE '20221020%' AND location_network_cc = 'BA'" - ) - assert res[0][0] == 200 # type: ignore - print("finished ALL") + assert not maybe_create_delete_tables_mock.called + assert not list_all_table_diffs.called + assert make_create_queries_mock.called diff --git a/oonipipeline/tests/test_ctrl.py b/oonipipeline/tests/test_ctrl.py index a073397b..f976091b 100644 --- a/oonipipeline/tests/test_ctrl.py +++ b/oonipipeline/tests/test_ctrl.py @@ -1,7 +1,6 @@ from datetime import date, datetime, timezone import time -from oonidata.models.base import ProcessingMeta from oonidata.models.observations import MeasurementMeta, ProbeMeta, WebObservation from oonipipeline.analysis.datasources import iter_web_observations @@ -90,17 +89,13 @@ def test_web_ground_truth_from_clickhouse(db, datadir, netinfodb, tmp_path): WebObservation( probe_meta=DUMMY_PROBE_META, measurement_meta=DUMMY_MEASUREMENT_META, - processing_meta=ProcessingMeta( - processing_start_time=datetime.now(timezone.utc), - processing_end_time=datetime.now(timezone.utc), - ), # The only things we look at to find the groundtruth are hostname, ip, http_request_url hostname="explorer.ooni.org", ip="37.218.242.149", port=443, http_request_url="https://explorer.ooni.org/", created_at=datetime(2023, 11, 17, 10, 35, 34), - observation_id="TEST", + observation_idx=1, target_id=None, transaction_id=None, ip_asn=54113, diff --git a/oonipipeline/tests/test_temporal_e2e.py b/oonipipeline/tests/test_temporal_e2e.py index 63526add..d425cf7a 100644 --- a/oonipipeline/tests/test_temporal_e2e.py +++ b/oonipipeline/tests/test_temporal_e2e.py @@ -3,7 +3,7 @@ from oonipipeline.temporal.schedules import ( list_existing_schedules, schedule_all, - clear_schedules, + clear_all_schedules, ) import pytest @@ -36,7 +36,7 @@ async def test_scheduling(datadir, db): scheduled_ids = [sched_res.analysis, sched_res.observations] while len(scheduled_ids) > 0: - cleared_schedule_ids = await clear_schedules( + cleared_schedule_ids = await clear_all_schedules( client=env.client, probe_cc=[], test_name=[], @@ -91,7 +91,6 @@ async def test_observation_workflow(datadir, db): id="obs-wf", task_queue=TASK_QUEUE_NAME, ) - db.execute("OPTIMIZE TABLE buffer_obs_web") assert wf_res["measurement_count"] == 613 assert wf_res["size"] == 11381440 assert wf_res["bucket_date"] == "2022-10-21" diff --git a/oonipipeline/tests/test_workflows.py b/oonipipeline/tests/test_workflows.py index df0534c9..ebc82443 100644 --- a/oonipipeline/tests/test_workflows.py +++ b/oonipipeline/tests/test_workflows.py @@ -174,8 +174,6 @@ def test_make_file_entry_batch(datadir, db): ) assert obs_msmt_count == 453 - # Flush buffer table - db.execute("OPTIMIZE TABLE buffer_obs_web") make_ground_truths_in_day( MakeGroundTruthsParams( day=date(2023, 10, 31).strftime("%Y-%m-%d"), @@ -220,10 +218,11 @@ def test_write_observations(measurements, netinfodb, db): for obs_list in measurement_to_observations( msmt=msmt, netinfodb=netinfodb, bucket_date=bucket_date ): + # Ensure observation IDS do not clash + obs_idxs = list(map(lambda x: x.observation_idx, obs_list)) + assert len(obs_idxs) == len(set(obs_idxs)) db.write_table_model_rows(obs_list) db.close() - # Flush buffer table - db.execute("OPTIMIZE TABLE buffer_obs_web") cnt_by_cc = get_obs_count_by_cc( ObsCountParams( clickhouse_url=db.clickhouse_url, @@ -232,9 +231,9 @@ def test_write_observations(measurements, netinfodb, db): ) ) assert cnt_by_cc["CH"] == 2 - assert cnt_by_cc["GR"] == 4 + assert cnt_by_cc["GR"] == 20 assert cnt_by_cc["US"] == 3 - assert cnt_by_cc["RU"] == 3 + assert cnt_by_cc["RU"] == 47 def test_hirl_observations(measurements, netinfodb):