Skip to content

Commit

Permalink
Re-enable workflow run tests
Browse files Browse the repository at this point in the history
  • Loading branch information
hellais committed Apr 15, 2024
1 parent bd3ae86 commit 667389a
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 26 deletions.
45 changes: 22 additions & 23 deletions oonipipeline/src/oonipipeline/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,28 +405,27 @@ def checkdb(
list_all_table_diffs(db)


async def run_workers():
client = await Client.connect("localhost:7233")
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
worker = Worker(
client,
task_queue=TASK_QUEUE_NAME,
workflows=[
ObservationsWorkflow,
GroundTruthsWorkflow,
AnalysisWorkflow,
],
activities=[
make_observation_in_day,
make_ground_truths_in_day,
make_analysis_in_a_day,
],
activity_executor=activity_executor,
)
await worker.run()


@cli.command()
def start_workers():
async def run():
client = await Client.connect("localhost:7233")
with concurrent.futures.ThreadPoolExecutor(
max_workers=100
) as activity_executor:
worker = Worker(
client,
task_queue=TASK_QUEUE_NAME,
workflows=[
ObservationsWorkflow,
GroundTruthsWorkflow,
AnalysisWorkflow,
],
activities=[
make_observation_in_day,
make_ground_truths_in_day,
make_analysis_in_a_day,
],
activity_executor=activity_executor,
)
await worker.run()

asyncio.run(run())
asyncio.run(run_workers())
14 changes: 13 additions & 1 deletion oonipipeline/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import os
import subprocess
from pathlib import Path
from datetime import date
from click.testing import CliRunner
import time


import pytest

import orjson

from click.testing import CliRunner
from clickhouse_driver import Client as ClickhouseClient

from oonidata.dataclient import sync_measurements
Expand Down Expand Up @@ -43,6 +46,15 @@ def clickhouse_server(docker_ip, docker_services):
yield url


@pytest.fixture(scope="session")
def temporal_dev_server(request):
proc = subprocess.Popen(["temporal", "server", "start-dev"])
time.sleep(2)
assert not proc.poll()
yield proc
request.addfinalizer(proc.kill)


@pytest.fixture
def datadir():
return DATA_DIR
Expand Down
32 changes: 30 additions & 2 deletions oonipipeline/tests/test_workflows.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,37 @@
import asyncio
from multiprocessing import Process
from pathlib import Path
import time

from oonipipeline.cli.commands import cli, run_workers

def _test_full_workflow(
db, cli_runner, fingerprintdb, netinfodb, datadir, tmp_path: Path

def wait_for_mutations(db, table_name):
while True:
res = db.execute(
f"SELECT * FROM system.mutations WHERE is_done=0 AND table='{table_name}';"
)
if len(res) == 0: # type: ignore
break
time.sleep(1)


def start_workers():
asyncio.run(run_workers())


def test_full_workflow(
db,
cli_runner,
fingerprintdb,
netinfodb,
datadir,
tmp_path: Path,
temporal_dev_server,
):
# simulate the starting of workers
Process(target=start_workers, args=(), daemon=True).start()

result = cli_runner.invoke(
cli,
[
Expand Down

0 comments on commit 667389a

Please sign in to comment.