diff --git a/docs/testing.md b/docs/testing.md index 46a63d3..c133e86 100644 --- a/docs/testing.md +++ b/docs/testing.md @@ -23,7 +23,10 @@ require this storage backend. 4. Set the `cStorageUrl` in `tests/storage/StorageTestHelper.hpp` to `jdbc:mariadb://localhost:3306/?user=&password=`. -## Running tests +5. Set the `storage_url` in `tests/integration/client.py` to + `jdbc:mariadb://localhost:3306/?user=&password=`. + +## Running unit tests You can use the following tasks to run the set of unit tests that's appropriate. @@ -45,4 +48,13 @@ REQUIRE( storage->connect(spider::test::cStorageUrl).success() ) The [unit_tests.yaml][gh-workflow-unit-tests] GitHub workflow runs the unit tests on push, pull requests, and daily. Currently, it only runs unit tests that don't require a storage backend. +## Running integration tests + +You can use the following tasks to run integration tests. + +| Task | Description | +|-------------------------------|-------------------------------------------------------------------| +| `test:integration` | Runs all integration tests. | + + [gh-workflow-unit-tests]: ../.github/workflows/unit-tests.yaml diff --git a/lint-requirements.txt b/lint-requirements.txt index 06301bd..5897486 100644 --- a/lint-requirements.txt +++ b/lint-requirements.txt @@ -1,5 +1,7 @@ +black>=24.4.2 # Lock to v18.x until we can upgrade our code to meet v19's formatting standards. clang-format~=18.1 clang-tidy>=19.1.0 +ruff>=0.4.4 gersemi>=0.16.2 -yamllint>=1.35.1 +yamllint>=1.35.1 \ No newline at end of file diff --git a/lint-tasks.yaml b/lint-tasks.yaml index 13db6cc..f42b802 100644 --- a/lint-tasks.yaml +++ b/lint-tasks.yaml @@ -9,6 +9,7 @@ tasks: cmds: - task: "cmake-check" - task: "cpp-check" + - task: "py-check" - task: "yml-check" fix: @@ -102,6 +103,34 @@ tasks: SRC_DIR: "{{.G_SRC_SPIDER_DIR}}" TEST_DIR: "{{.G_TEST_DIR}}" + py-check: + cmds: + - task: "py" + vars: + BLACK_FLAGS: "--check" + RUFF_FLAGS: "" + + py-fix: + cmds: + - task: "py" + vars: + BLACK_FLAGS: "" + RUFF_FLAGS: "--fix" + + py: + internal: true + requires: + vars: ["BLACK_FLAGS", "RUFF_FLAGS"] + deps: ["venv"] + cmds: + - for: + - "tests/integration" + cmd: |- + . "{{.G_LINT_VENV_DIR}}/bin/activate" + cd "{{.ITEM}}" + black --color --line-length 100 {{.BLACK_FLAGS}} . + ruff check {{.RUFF_FLAGS}} . + yml: aliases: - "yml-check" diff --git a/ruff.toml b/ruff.toml new file mode 100644 index 0000000..969421f --- /dev/null +++ b/ruff.toml @@ -0,0 +1,6 @@ +line-length = 100 +lint.select = ["I"] + +[lint.isort] +case-sensitive = false +order-by-type = false \ No newline at end of file diff --git a/test-requirements.txt b/test-requirements.txt new file mode 100644 index 0000000..ec24b0b --- /dev/null +++ b/test-requirements.txt @@ -0,0 +1,3 @@ +msgpack>=1.1.0 +mysql-connector-python>=8.0.26 +pytest>=8.3.4 \ No newline at end of file diff --git a/test-tasks.yaml b/test-tasks.yaml index a38fb12..c6dbc7d 100644 --- a/test-tasks.yaml +++ b/test-tasks.yaml @@ -1,26 +1,28 @@ version: "3" vars: - G_TEST_BINARY: "{{.G_BUILD_SPIDER_DIR}}/tests/unitTest" + G_UNIT_TEST_BINARY: "{{.G_BUILD_SPIDER_DIR}}/tests/unitTest" + G_TEST_VENV_DIR: "{{.G_BUILD_DIR}}/test-venv" + G_TEST_VENV_CHECKSUM_FILE: "{{.G_BUILD_DIR}}/test#venv.md5" tasks: non-storage-unit-tests: deps: - "build-unit-test" cmds: - - "{{.G_TEST_BINARY}} \"~[storage]\"" + - "{{.G_UNIT_TEST_BINARY}} \"~[storage]\"" storage-unit-tests: deps: - "build-unit-test" cmds: - - "{{.G_TEST_BINARY}} \"[storage]\"" + - "{{.G_UNIT_TEST_BINARY}} \"[storage]\"" all: deps: - "build-unit-test" cmds: - - "{{.G_TEST_BINARY}}" + - "{{.G_UNIT_TEST_BINARY}}" build-unit-test: internal: true @@ -28,3 +30,48 @@ tasks: - task: ":build:target" vars: TARGETS: ["spider_task_executor", "unitTest", "worker_test"] + + integration: + dir: "{{.G_BUILD_SPIDER_DIR}}" + deps: + - "venv" + - task: ":build:target" + vars: + TARGETS: [ + "spider_task_executor", + "worker_test", + "spider_worker", + "spider_scheduler", + "integrationTest"] + cmd: |- + . ../test-venv/bin/activate + ../test-venv/bin/pytest tests/integration + + venv: + internal: true + vars: + CHECKSUM_FILE: "{{.G_TEST_VENV_CHECKSUM_FILE}}" + OUTPUT_DIR: "{{.G_TEST_VENV_DIR}}" + sources: + - "{{.ROOT_DIR}}/taskfile.yaml" + - "{{.TASKFILE}}" + - "test-requirements.txt" + generates: ["{{.CHECKSUM_FILE}}"] + run: "once" + deps: + - ":init" + - task: ":utils:validate-checksum" + vars: + CHECKSUM_FILE: "{{.CHECKSUM_FILE}}" + DATA_DIR: "{{.OUTPUT_DIR}}" + cmds: + - task: ":utils:create-venv" + vars: + LABEL: "test" + OUTPUT_DIR: "{{.OUTPUT_DIR}}" + REQUIREMENTS_FILE: "test-requirements.txt" + # This command must be last + - task: ":utils:compute-checksum" + vars: + DATA_DIR: "{{.OUTPUT_DIR}}" + OUTPUT_FILE: "{{.CHECKSUM_FILE}}" diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 494d9ea..4734ff0 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -42,7 +42,18 @@ target_link_libraries( Boost::system spdlog::spdlog ) +add_dependencies(unitTest worker_test) add_library(worker_test SHARED) target_sources(worker_test PRIVATE worker/worker-test.cpp) target_link_libraries(worker_test PRIVATE spider_core) + +add_custom_target(integrationTest ALL) +add_custom_command( + TARGET integrationTest + PRE_BUILD + COMMAND + ${CMAKE_COMMAND} -E copy_directory ${CMAKE_CURRENT_SOURCE_DIR}/integration + ${CMAKE_CURRENT_BINARY_DIR}/integration +) +add_dependencies(integrationTest worker_test) diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/client.py b/tests/integration/client.py new file mode 100644 index 0000000..bda5acf --- /dev/null +++ b/tests/integration/client.py @@ -0,0 +1,157 @@ +import re +import uuid +from dataclasses import dataclass +from typing import Dict, List, Optional, Tuple + +import mysql.connector +import pytest + + +@dataclass +class TaskInput: + type: str + task_output: Optional[Tuple[uuid.UUID, int]] = None + value: Optional[str] = None + data_id: Optional[uuid.UUID] = None + + +@dataclass +class TaskOutput: + type: str + value: Optional[str] = None + data_id: Optional[uuid.UUID] = None + + +@dataclass +class Task: + id: uuid.UUID + function_name: str + inputs: List[TaskInput] + outputs: List[TaskOutput] + timeout: float = 0.0 + + +@dataclass +class TaskGraph: + id: uuid.UUID + tasks: Dict[uuid.UUID, Task] + dependencies: List[Tuple[uuid.UUID, uuid.UUID]] + + +def create_connection(storage_url: str): + pattern = re.compile( + r"jdbc:mariadb://(?P[^:/]+):(?P\d+)/(?P[^?]+)\?user=(?P[^&]+)&password=(?P[^&]+)" + ) + match = pattern.match(storage_url) + if not match: + raise ValueError("Invalid JDBC URL format") + + connection_params = match.groupdict() + return mysql.connector.connect( + host=connection_params["host"], + port=int(connection_params["port"]), + database=connection_params["database"], + user=connection_params["user"], + password=connection_params["password"], + ) + + +def is_head_task(task_id: uuid.UUID, dependencies: List[Tuple[uuid.UUID, uuid.UUID]]): + return not any(dependency[1] == task_id for dependency in dependencies) + + +storage_url = "jdbc:mariadb://localhost:3306/spider_test?user=root&password=password" + + +@pytest.fixture(scope="session") +def storage(): + conn = create_connection(storage_url) + yield conn + conn.close() + + +def submit_job(conn, client_id: uuid.UUID, graph: TaskGraph): + cursor = conn.cursor() + + cursor.execute( + "INSERT INTO jobs (id, client_id) VALUES (%s, %s)", (graph.id.bytes, client_id.bytes) + ) + + for task_id, task in graph.tasks.items(): + if is_head_task(task_id, graph.dependencies): + state = "ready" + else: + state = "pending" + cursor.execute( + "INSERT INTO tasks (id, job_id, func_name, state, timeout) VALUES (%s, %s, %s, %s, %s)", + (task.id.bytes, graph.id.bytes, task.function_name, state, task.timeout), + ) + + for i, task_input in enumerate(task.inputs): + cursor.execute( + "INSERT INTO task_inputs (type, task_id, position, output_task_id, output_task_position, value, data_id) VALUES (%s, %s, %s, %s, %s, %s, %s)", + ( + task_input.type, + task.id.bytes, + i, + task_input.task_output[0].bytes if task_input.task_output is not None else None, + task_input.task_output[1] if task_input.task_output is not None else None, + task_input.value, + task_input.data_id.bytes if task_input.data_id is not None else None, + ), + ) + + for i, task_output in enumerate(task.outputs): + cursor.execute( + "INSERT INTO task_outputs (task_id, position, type) VALUES (%s, %s, %s)", + (task.id.bytes, i, task_output.type), + ) + + for dependency in graph.dependencies: + cursor.execute( + "INSERT INTO task_dependencies (parent, child) VALUES (%s, %s)", + (dependency[0].bytes, dependency[1].bytes), + ) + + conn.commit() + cursor.close() + + +def get_task_outputs(conn, task_id: uuid.UUID) -> List[TaskOutput]: + cursor = conn.cursor() + + cursor.execute( + "SELECT type, value, data_id FROM task_outputs WHERE task_id = %s ORDER BY position", + (task_id.bytes,), + ) + outputs = [] + for output_type, value, data_id in cursor.fetchall(): + if value is not None: + outputs.append(TaskOutput(type=output_type, value=value)) + elif data_id is not None: + outputs.append(TaskOutput(type=output_type, data_id=uuid.UUID(bytes=data_id))) + else: + outputs.append(TaskOutput(type=output_type)) + + conn.commit() + cursor.close() + return outputs + + +def get_task_state(conn, task_id: uuid.UUID) -> str: + cursor = conn.cursor() + + cursor.execute("SELECT state FROM tasks WHERE id = %s", (task_id.bytes,)) + state = cursor.fetchone()[0] + + conn.commit() + cursor.close() + return state + + +def remove_job(conn, job_id: uuid.UUID): + cursor = conn.cursor() + + cursor.execute("DELETE FROM jobs WHERE id = %s", (job_id.bytes,)) + conn.commit() + cursor.close() diff --git a/tests/integration/test_scheduler_worker.py b/tests/integration/test_scheduler_worker.py new file mode 100644 index 0000000..d7a80ab --- /dev/null +++ b/tests/integration/test_scheduler_worker.py @@ -0,0 +1,156 @@ +import subprocess +import time +import uuid +from pathlib import Path +from typing import Tuple + +import msgpack +import pytest + +from .client import ( + get_task_outputs, + get_task_state, + remove_job, + storage, + storage_url, + submit_job, + Task, + TaskGraph, + TaskInput, + TaskOutput, +) + + +def start_scheduler_worker( + storage_url: str, scheduler_port: int +) -> Tuple[subprocess.Popen, subprocess.Popen]: + # Start the scheduler + dir_path = Path(__file__).resolve().parent + dir_path = dir_path / ".." / ".." / "src" / "spider" + scheduler_cmds = [ + str(dir_path / "spider_scheduler"), + "--port", + str(scheduler_port), + "--storage_url", + storage_url, + ] + scheduler_process = subprocess.Popen(scheduler_cmds) + worker_cmds = [ + str(dir_path / "spider_worker"), + "--storage_url", + storage_url, + "--libs", + "tests/libworker_test.so", + ] + worker_process = subprocess.Popen(worker_cmds) + return scheduler_process, worker_process + + +scheduler_port = 6103 + + +@pytest.fixture(scope="class") +def scheduler_worker(storage): + scheduler_process, worker_process = start_scheduler_worker( + storage_url=storage_url, scheduler_port=scheduler_port + ) + yield + scheduler_process.kill() + worker_process.kill() + + +@pytest.fixture(scope="function") +def success_job(storage): + parent_1 = Task( + id=uuid.uuid4(), + function_name="sum_test", + inputs=[ + TaskInput(type="int", value=msgpack.packb(1)), + TaskInput(type="int", value=msgpack.packb(2)), + ], + outputs=[TaskOutput(type="int")], + ) + parent_2 = Task( + id=uuid.uuid4(), + function_name="sum_test", + inputs=[ + TaskInput(type="int", value=msgpack.packb(3)), + TaskInput(type="int", value=msgpack.packb(4)), + ], + outputs=[TaskOutput(type="int")], + ) + child = Task( + id=uuid.uuid4(), + function_name="sum_test", + inputs=[ + TaskInput(type="int", task_output=(parent_1.id, 0)), + TaskInput(type="int", task_output=(parent_2.id, 0)), + ], + outputs=[TaskOutput(type="int")], + ) + graph = TaskGraph( + tasks={parent_1.id: parent_1, parent_2.id: parent_2, child.id: child}, + dependencies=[(parent_1.id, child.id), (parent_2.id, child.id)], + id=uuid.uuid4(), + ) + + submit_job(storage, uuid.uuid4(), graph) + assert get_task_state(storage, parent_1.id) == "ready" + assert get_task_state(storage, parent_2.id) == "ready" + assert get_task_state(storage, child.id) == "pending" + print("success job task ids:", parent_1.id, parent_2.id, child.id) + + yield graph, parent_1, parent_2, child + + remove_job(storage, graph.id) + + +@pytest.fixture(scope="function") +def fail_job(storage): + task = Task( + id=uuid.uuid4(), + function_name="error_test", + inputs=[TaskInput(type="int", value=msgpack.packb(1))], + outputs=[TaskOutput(type="int")], + ) + graph = TaskGraph( + tasks={task.id: task}, + dependencies=[], + id=uuid.uuid4(), + ) + + submit_job(storage, uuid.uuid4(), graph) + print("fail job task id:", task.id) + + yield task + + remove_job(storage, graph.id) + + +class TestSchedulerWorker: + def test_job_success(self, scheduler_worker, storage, success_job): + graph, parent_1, parent_2, child = success_job + # Wait for 2 seconds and check task state and output + time.sleep(2) + state = get_task_state(storage, parent_1.id) + assert state == "success" + outputs = get_task_outputs(storage, parent_1.id) + assert len(outputs) == 1 + assert outputs[0].value == msgpack.packb(3).decode("utf-8") + state = get_task_state(storage, parent_2.id) + assert state == "success" + outputs = get_task_outputs(storage, parent_2.id) + assert len(outputs) == 1 + assert outputs[0].value == msgpack.packb(7).decode("utf-8") + state = get_task_state(storage, child.id) + assert state == "success" + outputs = get_task_outputs(storage, child.id) + assert len(outputs) == 1 + assert outputs[0].value == msgpack.packb(10).decode("utf-8") + + def test_job_failure(self, scheduler_worker, storage, fail_job): + task = fail_job + # Wait for 2 seconds and check task output + time.sleep(2) + state = get_task_state(storage, task.id) + assert state == "fail"