Skip to content

Commit

Permalink
Wait on mutations before doing the delete
Browse files Browse the repository at this point in the history
  • Loading branch information
hellais committed Sep 5, 2024
1 parent 3de4b67 commit 7a8210f
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 17 deletions.
2 changes: 1 addition & 1 deletion oonipipeline/src/oonipipeline/temporal/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def maybe_delete_prev_range(db: ClickhouseConnection, prev_range: PrevRange) ->
if not prev_range.max_created_at or not prev_range.min_created_at:
return ""

wait_for_mutations(db, prev_range.table_name)
# Disabled due to: https://github.com/ClickHouse/ClickHouse/issues/40651
# db.execute("SET allow_experimental_lightweight_delete = true;")

Expand All @@ -99,7 +100,6 @@ def maybe_delete_prev_range(db: ClickhouseConnection, prev_range: PrevRange) ->
q = f"ALTER TABLE {prev_range.table_name} DELETE "
final_query = q + where
db.execute(final_query, q_args)
wait_for_mutations(db, prev_range.table_name)
return final_query


Expand Down
19 changes: 3 additions & 16 deletions oonipipeline/src/oonipipeline/temporal/workflows/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,10 @@ async def run(self, params: ObservationsWorkflowParams) -> dict:

await workflow.execute_activity(
optimize_tables,
OptimizeTablesParams(
clickhouse=params.clickhouse, table_names=["buffer_obs_web"]
),
OptimizeTablesParams(clickhouse=params.clickhouse, table_names=["obs_web"]),
start_to_close_timeout=timedelta(minutes=20),
retry_policy=RetryPolicy(maximum_attempts=10),
)
workflow.logger.info(
f"finished optimize_tables for bucket_date={params.bucket_date}"
)

previous_ranges = await workflow.execute_activity(
get_previous_range,
Expand All @@ -89,8 +84,8 @@ async def run(self, params: ObservationsWorkflowParams) -> dict:
probe_cc=params.probe_cc,
tables=["obs_web"],
),
start_to_close_timeout=timedelta(minutes=20),
retry_policy=RetryPolicy(maximum_attempts=10),
start_to_close_timeout=timedelta(minutes=2),
retry_policy=RetryPolicy(maximum_attempts=4),
)
workflow.logger.info(
f"finished get_previous_range for bucket_date={params.bucket_date}"
Expand All @@ -108,14 +103,6 @@ async def run(self, params: ObservationsWorkflowParams) -> dict:
f"{total_t.pretty} speed: {obs_res['mb_per_sec']}MB/s ({obs_res['measurement_per_sec']}msmt/s)"
)

await workflow.execute_activity(
optimize_tables,
OptimizeTablesParams(
clickhouse=params.clickhouse, table_names=["buffer_obs_web"]
),
start_to_close_timeout=timedelta(minutes=20),
retry_policy=RetryPolicy(maximum_attempts=10),
)
workflow.logger.info(
f"finished optimize_tables for bucket_date={params.bucket_date}"
)
Expand Down

0 comments on commit 7a8210f

Please sign in to comment.