diff --git a/oonipipeline/src/oonipipeline/temporal/activities/observations.py b/oonipipeline/src/oonipipeline/temporal/activities/observations.py index 0c3e14ad..0dc43cbe 100644 --- a/oonipipeline/src/oonipipeline/temporal/activities/observations.py +++ b/oonipipeline/src/oonipipeline/temporal/activities/observations.py @@ -67,12 +67,14 @@ def write_observations_to_db( @dataclass class MakeObservationsFileEntryBatch: - file_entry_batch: List[FileEntryBatchType] + batch_idx: int clickhouse: str write_batch_size: int data_dir: str bucket_date: str probe_cc: List[str] + test_name: List[str] + bucket_date: str fast_fail: bool @@ -80,6 +82,7 @@ class MakeObservationsFileEntryBatch: def make_observations_for_file_entry_batch( params: MakeObservationsFileEntryBatch, ) -> int: + day = datetime.strptime(params.bucket_date, "%Y-%m-%d").date() data_dir = pathlib.Path(params.data_dir) netinfodb = NetinfoDB(datadir=data_dir, download=False) @@ -87,6 +90,13 @@ def make_observations_for_file_entry_batch( tracer = trace.get_tracer(__name__) + file_entry_batches, _ = list_file_entries_batches( + probe_cc=params.probe_cc, + test_name=params.test_name, + start_day=day, + end_day=day + timedelta(days=1), + ) + total_failure_count = 0 current_span = trace.get_current_span() with ClickhouseConnection( @@ -94,7 +104,7 @@ def make_observations_for_file_entry_batch( ) as db: ccs = ccs_set(params.probe_cc) idx = 0 - for bucket_name, s3path, ext, fe_size in params.file_entry_batch: + for bucket_name, s3path, ext, fe_size in file_entry_batches: failure_count = 0 # Nest the traced span within the current span with tracer.start_span("MakeObservations:stream_file_entry") as span: @@ -158,7 +168,8 @@ def make_observations_for_file_entry_batch( ObservationBatches = TypedDict( - "ObservationBatches", {"batches": List[List[FileEntryBatchType]], "total_size": int} + "ObservationBatches", + {"batch_count": int, "total_size": int}, ) @@ -174,7 +185,7 @@ def make_observation_batches(params: MakeObservationsParams) -> ObservationBatch end_day=day + timedelta(days=1), ) log.info(f"listing {len(file_entry_batches)} batches took {t.pretty}") - return {"batches": file_entry_batches, "total_size": total_size} + return {"batch_count": len(file_entry_batches), "total_size": total_size} @dataclass diff --git a/oonipipeline/src/oonipipeline/temporal/workflows/observations.py b/oonipipeline/src/oonipipeline/temporal/workflows/observations.py index ade8da5e..6649ac07 100644 --- a/oonipipeline/src/oonipipeline/temporal/workflows/observations.py +++ b/oonipipeline/src/oonipipeline/temporal/workflows/observations.py @@ -70,7 +70,7 @@ async def run(self, params: ObservationsWorkflowParams) -> dict: await workflow.execute_activity( optimize_all_tables, ClickhouseParams(clickhouse_url=params.clickhouse), - start_to_close_timeout=timedelta(minutes=5), + start_to_close_timeout=timedelta(minutes=20), retry_policy=RetryPolicy(maximum_attempts=10), ) @@ -87,7 +87,7 @@ async def run(self, params: ObservationsWorkflowParams) -> dict: retry_policy=RetryPolicy(maximum_attempts=10), ) - obs_batches = await workflow.execute_activity( + batch_res = await workflow.execute_activity( make_observation_batches, params_make_observations, start_to_close_timeout=timedelta(minutes=30), @@ -95,14 +95,15 @@ async def run(self, params: ObservationsWorkflowParams) -> dict: ) coroutine_list = [] - for batch in obs_batches["batches"]: + for batch_idx in range(batch_res["batch_count"]): batch_params = MakeObservationsFileEntryBatch( - file_entry_batch=batch, + batch_idx=batch_idx, clickhouse=params.clickhouse, write_batch_size=1_000_000, data_dir=params.data_dir, bucket_date=params.bucket_date, probe_cc=params.probe_cc, + test_name=params.test_name, fast_fail=params.fast_fail, ) coroutine_list.append( @@ -116,7 +117,7 @@ async def run(self, params: ObservationsWorkflowParams) -> dict: ) total_msmt_count = sum(await asyncio.gather(*coroutine_list)) - mb_per_sec = round(obs_batches["total_size"] / total_t.s / 10**6, 1) + mb_per_sec = round(batch_res["total_size"] / total_t.s / 10**6, 1) msmt_per_sec = round(total_msmt_count / total_t.s) workflow.logger.info( f"finished processing all batches in {total_t.pretty} speed: {mb_per_sec}MB/s ({msmt_per_sec}msmt/s)" @@ -125,7 +126,7 @@ async def run(self, params: ObservationsWorkflowParams) -> dict: await workflow.execute_activity( optimize_all_tables, ClickhouseParams(clickhouse_url=params.clickhouse), - start_to_close_timeout=timedelta(minutes=5), + start_to_close_timeout=timedelta(minutes=30), retry_policy=RetryPolicy(maximum_attempts=10), ) @@ -141,7 +142,7 @@ async def run(self, params: ObservationsWorkflowParams) -> dict: return { "measurement_count": total_msmt_count, - "size": obs_batches["total_size"], + "size": batch_res["total_size"], "mb_per_sec": mb_per_sec, "bucket_date": params.bucket_date, "msmt_per_sec": msmt_per_sec,