Skip to content

Commit

Permalink
Move end2end tests into temporal_e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
hellais committed Sep 4, 2024
1 parent 01f3b80 commit c81422c
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 2 deletions.
2 changes: 0 additions & 2 deletions oonipipeline/src/oonipipeline/temporal/activities/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ class ClickhouseParams:
def optimize_all_tables(params: ClickhouseParams):
with ClickhouseConnection(params.clickhouse_url) as db:
for _, table_name in make_create_queries():
if table_name.startswith("buffer_"):
continue
db.execute(f"OPTIMIZE TABLE {table_name}")


Expand Down
1 change: 1 addition & 0 deletions oonipipeline/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def test_parse_config(tmp_path):
assert defaults["backfill"]["something"] == "other"


@pytest.mark.skip("TODO(art): moved into temporal_e2e")
def test_full_workflow(
db,
cli_runner,
Expand Down
86 changes: 86 additions & 0 deletions oonipipeline/tests/test_temporal_e2e.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from concurrent.futures import ThreadPoolExecutor
import pytest

from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
from temporalio import activity

from oonipipeline.temporal.workflows import (
TASK_QUEUE_NAME,
ObservationsWorkflow,
ObservationsWorkflowParams,
)
from oonipipeline.temporal.workers import ACTIVTIES


@pytest.mark.asyncio
async def test_observation_workflow(datadir, db):
obs_params = ObservationsWorkflowParams(
probe_cc=["BA"],
test_name=["web_connectivity"],
clickhouse=db.clickhouse_url,
data_dir=str(datadir.absolute()),
fast_fail=False,
bucket_date="2022-10-21",
)
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(
env.client,
task_queue=TASK_QUEUE_NAME,
workflows=[ObservationsWorkflow],
activities=ACTIVTIES,
activity_executor=ThreadPoolExecutor(max_workers=4 + 2),
):
wf_res = await env.client.execute_workflow(
ObservationsWorkflow.run,
obs_params,
id="obs-wf",
task_queue=TASK_QUEUE_NAME,
)
db.execute("OPTIMIZE TABLE buffer_obs_web")
assert wf_res["measurement_count"] == 613
assert wf_res["size"] == 11381440
assert wf_res["bucket_date"] == "2022-10-21"

res = db.execute(
"""
SELECT bucket_date,
COUNT(DISTINCT(measurement_uid))
FROM obs_web WHERE probe_cc = 'BA'
GROUP BY bucket_date
"""
)
bucket_dict = dict(res)
assert bucket_dict[wf_res["bucket_date"]] == wf_res["measurement_count"]
res = db.execute(
"""
SELECT bucket_date,
COUNT()
FROM obs_web WHERE probe_cc = 'BA'
GROUP BY bucket_date
"""
)
bucket_dict = dict(res)
obs_count = bucket_dict[wf_res["bucket_date"]]
assert obs_count == 2548

wf_res = await env.client.execute_workflow(
ObservationsWorkflow.run,
obs_params,
id="obs-wf-2",
task_queue=TASK_QUEUE_NAME,
)
db.execute("OPTIMIZE TABLE buffer_obs_web")
db.execute("OPTIMIZE TABLE obs_web")
res = db.execute(
"""
SELECT bucket_date,
COUNT()
FROM obs_web WHERE probe_cc = 'BA'
GROUP BY bucket_date
"""
)
bucket_dict = dict(res)
obs_count_2 = bucket_dict[wf_res["bucket_date"]]

assert obs_count == obs_count_2

0 comments on commit c81422c

Please sign in to comment.