diff --git a/oonipipeline/src/oonipipeline/temporal/activities/common.py b/oonipipeline/src/oonipipeline/temporal/activities/common.py index dfef2082..20b5dffe 100644 --- a/oonipipeline/src/oonipipeline/temporal/activities/common.py +++ b/oonipipeline/src/oonipipeline/temporal/activities/common.py @@ -25,8 +25,6 @@ class ClickhouseParams: def optimize_all_tables(params: ClickhouseParams): with ClickhouseConnection(params.clickhouse_url) as db: for _, table_name in make_create_queries(): - if table_name.startswith("buffer_"): - continue db.execute(f"OPTIMIZE TABLE {table_name}") diff --git a/oonipipeline/tests/test_cli.py b/oonipipeline/tests/test_cli.py index 03a6b001..57aaf45c 100644 --- a/oonipipeline/tests/test_cli.py +++ b/oonipipeline/tests/test_cli.py @@ -59,6 +59,7 @@ def test_parse_config(tmp_path): assert defaults["backfill"]["something"] == "other" +@pytest.mark.skip("TODO(art): moved into temporal_e2e") def test_full_workflow( db, cli_runner, diff --git a/oonipipeline/tests/test_temporal_e2e.py b/oonipipeline/tests/test_temporal_e2e.py new file mode 100644 index 00000000..b1b56979 --- /dev/null +++ b/oonipipeline/tests/test_temporal_e2e.py @@ -0,0 +1,86 @@ +from concurrent.futures import ThreadPoolExecutor +import pytest + +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker +from temporalio import activity + +from oonipipeline.temporal.workflows import ( + TASK_QUEUE_NAME, + ObservationsWorkflow, + ObservationsWorkflowParams, +) +from oonipipeline.temporal.workers import ACTIVTIES + + +@pytest.mark.asyncio +async def test_observation_workflow(datadir, db): + obs_params = ObservationsWorkflowParams( + probe_cc=["BA"], + test_name=["web_connectivity"], + clickhouse=db.clickhouse_url, + data_dir=str(datadir.absolute()), + fast_fail=False, + bucket_date="2022-10-21", + ) + async with await WorkflowEnvironment.start_time_skipping() as env: + async with Worker( + env.client, + task_queue=TASK_QUEUE_NAME, + workflows=[ObservationsWorkflow], + activities=ACTIVTIES, + activity_executor=ThreadPoolExecutor(max_workers=4 + 2), + ): + wf_res = await env.client.execute_workflow( + ObservationsWorkflow.run, + obs_params, + id="obs-wf", + task_queue=TASK_QUEUE_NAME, + ) + db.execute("OPTIMIZE TABLE buffer_obs_web") + assert wf_res["measurement_count"] == 613 + assert wf_res["size"] == 11381440 + assert wf_res["bucket_date"] == "2022-10-21" + + res = db.execute( + """ + SELECT bucket_date, + COUNT(DISTINCT(measurement_uid)) + FROM obs_web WHERE probe_cc = 'BA' + GROUP BY bucket_date + """ + ) + bucket_dict = dict(res) + assert bucket_dict[wf_res["bucket_date"]] == wf_res["measurement_count"] + res = db.execute( + """ + SELECT bucket_date, + COUNT() + FROM obs_web WHERE probe_cc = 'BA' + GROUP BY bucket_date + """ + ) + bucket_dict = dict(res) + obs_count = bucket_dict[wf_res["bucket_date"]] + assert obs_count == 2548 + + wf_res = await env.client.execute_workflow( + ObservationsWorkflow.run, + obs_params, + id="obs-wf-2", + task_queue=TASK_QUEUE_NAME, + ) + db.execute("OPTIMIZE TABLE buffer_obs_web") + db.execute("OPTIMIZE TABLE obs_web") + res = db.execute( + """ + SELECT bucket_date, + COUNT() + FROM obs_web WHERE probe_cc = 'BA' + GROUP BY bucket_date + """ + ) + bucket_dict = dict(res) + obs_count_2 = bucket_dict[wf_res["bucket_date"]] + + assert obs_count == obs_count_2