Skip to content

Commit

Permalink
Remove all get_previous_range calls and rely on optimize tables to de…
Browse files Browse the repository at this point in the history
…duplicate
  • Loading branch information
hellais committed Sep 11, 2024
1 parent 2a23d20 commit b790744
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 47 deletions.
8 changes: 3 additions & 5 deletions oonipipeline/src/oonipipeline/temporal/activities/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,15 @@ def optimize_all_tables(params: ClickhouseParams):
class OptimizeTablesParams:
clickhouse: str
table_names: List[str]
partition_str: str


@activity.defn
def optimize_tables(params: OptimizeTablesParams):
with ClickhouseConnection(params.clickhouse) as db:
for table_name in params.table_names:
# Wait for mutation to complete so that we don't run into out of
# space issues while doing the batch inserts
wait_for_mutations(db, table_name=table_name)
log.info(f"waiting for mutations to finish on {table_name}")
db.execute(f"OPTIMIZE TABLE {table_name}")
log.info(f"OPTIMIZING {table_name} for partition {params.partition_str}")
db.execute(f"OPTIMIZE TABLE {table_name} PARTITION {params.partition_str}")


def update_assets(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
)
from oonipipeline.temporal.activities.common import (
ClickhouseParams,
OptimizeTablesParams,
ObsCountParams,
get_obs_count_by_cc,
optimize_all_tables,
Expand Down
60 changes: 18 additions & 42 deletions oonipipeline/src/oonipipeline/temporal/workflows/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,10 @@
optimize_tables,
)
from oonipipeline.temporal.activities.observations import (
DeletePreviousRangeParams,
GetPreviousRangeParams,
MakeObservationsParams,
delete_previous_range,
get_previous_range,
make_observations,
)
from oonipipeline.temporal.workflows.common import (
TASK_QUEUE_NAME,
get_workflow_start_time,
)

Expand All @@ -34,6 +29,7 @@ class ObservationsWorkflowParams:
clickhouse: str
data_dir: str
fast_fail: bool
is_reprocessing: bool = True
bucket_date: Optional[str] = None


Expand All @@ -55,30 +51,6 @@ async def run(self, params: ObservationsWorkflowParams) -> dict:
fast_fail=params.fast_fail,
bucket_date=params.bucket_date,
)

await workflow.execute_activity(
optimize_tables,
OptimizeTablesParams(clickhouse=params.clickhouse, table_names=["obs_web"]),
start_to_close_timeout=timedelta(minutes=20),
retry_policy=RetryPolicy(maximum_attempts=10),
)

previous_ranges = await workflow.execute_activity(
get_previous_range,
GetPreviousRangeParams(
clickhouse=params.clickhouse,
bucket_date=params.bucket_date,
test_name=params.test_name,
probe_cc=params.probe_cc,
tables=["obs_web"],
),
start_to_close_timeout=timedelta(minutes=2),
retry_policy=RetryPolicy(maximum_attempts=4),
)
workflow.logger.info(
f"finished get_previous_range for bucket_date={params.bucket_date}"
)

obs_res = await workflow.execute_activity(
make_observations,
params_make_observations,
Expand All @@ -91,19 +63,23 @@ async def run(self, params: ObservationsWorkflowParams) -> dict:
f"{total_t.pretty} speed: {obs_res['mb_per_sec']}MB/s ({obs_res['measurement_per_sec']}msmt/s)"
)

workflow.logger.info(
f"finished optimize_tables for bucket_date={params.bucket_date}"
)

await workflow.execute_activity(
delete_previous_range,
DeletePreviousRangeParams(
clickhouse=params.clickhouse,
previous_ranges=previous_ranges,
),
start_to_close_timeout=timedelta(minutes=10),
retry_policy=RetryPolicy(maximum_attempts=10),
)
# Force the recreation of all parts when reprocessing, this is not
# needed for a daily run.
if params.is_reprocessing:
partition_str = params.bucket_date.replace("-", "")[:6]
await workflow.execute_activity(
optimize_tables,
OptimizeTablesParams(
clickhouse=params.clickhouse,
table_names=["obs_web", "obs_web_ctrl", "obs_http_middlebox"],
partition_str=partition_str,
),
start_to_close_timeout=timedelta(minutes=30),
retry_policy=RetryPolicy(maximum_attempts=10),
)
workflow.logger.info(
f"finished optimize_tables for bucket_date={params.bucket_date}"
)

return {
"measurement_count": obs_res["measurement_count"],
Expand Down

0 comments on commit b790744

Please sign in to comment.