diff --git a/oonipipeline/src/oonipipeline/temporal/activities/common.py b/oonipipeline/src/oonipipeline/temporal/activities/common.py index c7a34fcc..72cb78ce 100644 --- a/oonipipeline/src/oonipipeline/temporal/activities/common.py +++ b/oonipipeline/src/oonipipeline/temporal/activities/common.py @@ -46,7 +46,9 @@ class OptimizeTablesParams: def optimize_tables(params: OptimizeTablesParams): with ClickhouseConnection(params.clickhouse) as db: for table_name in params.table_names: - wait_for_mutations(db, params.table_names) + # Wait for mutation to complete so that we don't run into out of + # space issues while doing the batch inserts + wait_for_mutations(db, table_name=table_name) log.info(f"waiting for mutations to finish on {table_name}") db.execute(f"OPTIMIZE TABLE {table_name}") diff --git a/oonipipeline/src/oonipipeline/temporal/common.py b/oonipipeline/src/oonipipeline/temporal/common.py index fd7889d3..4ece122b 100644 --- a/oonipipeline/src/oonipipeline/temporal/common.py +++ b/oonipipeline/src/oonipipeline/temporal/common.py @@ -86,6 +86,7 @@ def maybe_delete_prev_range(db: ClickhouseConnection, prev_range: PrevRange) -> if not prev_range.max_created_at or not prev_range.min_created_at: return "" + # Before deleting, we need to wait for all the mutations to be done wait_for_mutations(db, prev_range.table_name) where, q_args = prev_range.format_query()