-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add pytest integration test for scheduler and worker #39
Conversation
Warning Rate limit exceeded@sitaowang1998 has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 4 minutes and 39 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (1)
WalkthroughThe pull request introduces comprehensive updates to the project's testing infrastructure, focusing on integration testing capabilities. The changes include adding new dependencies for testing and linting, creating configuration files for Python linting, and developing integration test frameworks. A new integration test client and test suite have been implemented, along with updates to documentation, build configurations, and task management for running tests. Changes
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Nitpick comments (9)
lint-requirements.txt (1)
Line range hint
1-8
: Consider adding tool-specific configuration filesSince you're adding Black and Ruff, it would be beneficial to add their configuration files (pyproject.toml or .ruff.toml) to ensure consistent behaviour across different development environments.
Would you like me to help generate the configuration files with recommended settings for your project?
lint-tasks.yaml (2)
106-112
: Consider adding explicit error handling for the py-check taskWhile the task structure is sound, it might be helpful to add error handling and clear error messages when the linting checks fail.
py-check: cmds: - - task: "py" + - | + echo "Running Python linting checks..." + task: "py" vars: BLACK_FLAGS: "--check" RUFF_FLAGS: "" + || { echo "Python linting checks failed. Please run 'task lint:py-fix' to attempt automatic fixes."; exit 1; }
129-132
: Consider adding output formatting for better readabilityThe linting commands could benefit from clearer output formatting to help developers quickly identify issues.
. "{{.G_LINT_VENV_DIR}}/bin/activate" cd "{{.ITEM}}" + echo "=== Running Black formatter on {{.ITEM}} ===" black --color --line-length 100 {{.BLACK_FLAGS}} . + echo "=== Running Ruff linter on {{.ITEM}} ===" ruff check {{.RUFF_FLAGS}} .tests/integration/client.py (1)
15-15
: Clarify the type for data_id
Currently, data_id is declared as Optional[Tuple[uuid.UUID]]. This could be confusing since it suggests a single-element tuple. Consider changing it to Optional[uuid.UUID] if the intended usage is for one UUID or List[uuid.UUID] if multiple UUIDs are needed.tests/CMakeLists.txt (1)
51-59
: Optional: Remove the 'ALL' keyword for integrationTest
Marking integrationTest with ALL means it is always built, which can increase build times when running partial builds. If you want integration tests only on request, consider removing ALL or adding a separate CTest label.test-tasks.yaml (2)
19-19
: Consistent naming
Likewise, make sure the references to "{{.G_UNIT_TEST_BINARY}}" are consistent throughout the file (e.g., also in lines 25, 13, etc.) to avoid confusion.
34-49
: Integration test task
Adding the integration task is great for a comprehensive testing pipeline. As a further enhancement, consider parameterising the tests to handle different environments or to skip certain test suites if needed.tests/integration/test_scheduler_worker.py (2)
49-49
: Consider using dynamic port allocation.Using a hardcoded port number (6103) could cause conflicts in CI/CD environments or when running multiple test instances simultaneously.
Consider using a utility function to find an available port:
def get_free_port(): import socket with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('', 0)) s.listen(1) port = s.getsockname()[1] return port scheduler_port = get_free_port()
62-106
: Consider parameterizing test inputs and adding timing validation.The success_job fixture has hardcoded test values and lacks timing validation.
Consider these improvements:
- Define test inputs as constants:
TEST_INPUTS = { 'parent_1': (1, 2, 3), # input1, input2, expected_sum 'parent_2': (3, 4, 7), 'child': (3, 7, 10) # parent1_sum, parent2_sum, expected_sum }
- Add timing validation:
yield graph, parent_1, parent_2, child + # Ensure job cleanup doesn't take too long + start_time = time.monotonic() remove_job(storage, graph.id) + cleanup_time = time.monotonic() - start_time + assert cleanup_time < 1.0, f"Job cleanup took too long: {cleanup_time:.2f}s"
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
docs/testing.md
(2 hunks)lint-requirements.txt
(1 hunks)lint-tasks.yaml
(2 hunks)ruff.toml
(1 hunks)test-requirements.txt
(1 hunks)test-tasks.yaml
(1 hunks)tests/CMakeLists.txt
(1 hunks)tests/integration/client.py
(1 hunks)tests/integration/test_scheduler_worker.py
(1 hunks)
✅ Files skipped from review due to trivial changes (2)
- ruff.toml
- test-requirements.txt
🔇 Additional comments (7)
lint-requirements.txt (1)
1-1
: Excellent choice of Python linting tools!
The addition of Black and Ruff is a solid choice for Python code formatting and linting. These tools are well-maintained, widely adopted, and complement each other nicely.
Also applies to: 5-5
lint-tasks.yaml (2)
12-12
: Well-integrated into the existing check workflow
The Python linting task is properly integrated into the main check workflow, maintaining consistency with other language checks.
120-133
: Review the directory targeting strategy
The current implementation only targets the tests/integration
directory. Consider whether other Python files in the project should also be linted.
Let's check for other Python files that might need linting:
Consider parameterizing the target directories:
py:
internal: true
requires:
vars: ["BLACK_FLAGS", "RUFF_FLAGS"]
deps: ["venv"]
+ vars:
+ PYTHON_DIRS:
+ - "tests/integration"
+ # Add other Python directories here
cmds:
- for:
- - "tests/integration"
+ - "{{.PYTHON_DIRS}}"
cmd: |-
. "{{.G_LINT_VENV_DIR}}/bin/activate"
cd "{{.ITEM}}"
black --color --line-length 100 {{.BLACK_FLAGS}} .
ruff check {{.RUFF_FLAGS}} .
✅ Verification successful
The current linting strategy is appropriate
The verification shows that there are no Python files outside of the tests/integration
directory (excluding build artifacts and virtual environments). This confirms that the current implementation's focus on tests/integration
is correct and comprehensive for the project's Python files.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Find all Python files in the project
# excluding common build and virtual environment directories
echo "Python files not covered by current linting:"
fd -e py -E 'tests/integration' -E '*venv*' -E 'build*' -E 'dist*'
Length of output: 165
tests/CMakeLists.txt (1)
45-45
: Consider if 'ALL' is needed
The line add_dependencies(unitTest worker_test) ensures worker_test is built before unitTest, which is likely acceptable. Double-check if any additional dependencies or ordering constraints are needed to maintain a stable build.
✅ Verification successful
Dependencies are correctly specified
The dependency chain is properly set up:
unitTest
depends onworker_test
(shared library)- Both
unitTest
andintegrationTest
targets depend onworker_test
- The ALL flag is not needed here as it's already used appropriately on the
integrationTest
target
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Verifies if an additional target depends on worker_test or unitTest
ast-grep --pattern $'add_library($_) {\n $$$\n}\n'
Length of output: 48
Script:
#!/bin/bash
# Search for CMake targets and dependencies
rg -A 5 "add_(library|executable|dependencies|test)" --type cmake
Length of output: 5111
docs/testing.md (2)
26-28
: Ensure matching credentials
These lines instruct users to set storage_url in client.py. Confirm the database credentials match those specified in step 3 for correct connectivity, especially if a non-root user is used.
51-58
: Excellent documentation update
This section on integration tests clearly describes how to run them, improving the project’s usability for new contributors.
tests/integration/test_scheduler_worker.py (1)
1-21
: LGTM! Well-organized imports.
The imports are properly organized with clear separation between standard library, third-party, and local imports.
return True | ||
|
||
|
||
storage_url = "jdbc:mariadb://localhost:3306/spider_test?user=root&password=password" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid hard-coding credentials
Storing credentials (user/password) directly in source code can pose a security risk. Consider using environment variables or a secure secrets manager.
|
||
tasks: | ||
non-storage-unit-tests: | ||
deps: | ||
- "build-unit-test" | ||
cmds: | ||
- "{{.G_TEST_BINARY}} \"~[storage]\"" | ||
- "{{.G_UNIT_TEST_BINARY}} \"~[storage]\"" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Variable G_UNIT_TEST_BINARY is used but not defined
The codebase search confirms that G_UNIT_TEST_BINARY is referenced in multiple test commands but is not defined in the variables section where G_TEST_BINARY is defined. This mismatch needs to be addressed to prevent potential runtime errors.
- In test-tasks.yaml: G_TEST_BINARY is defined but never used
- In test-tasks.yaml: G_UNIT_TEST_BINARY is used in three test commands but not defined
🔗 Analysis chain
Double-check usage of G_UNIT_TEST_BINARY
The command references "{{.G_UNIT_TEST_BINARY}}" but the variable name G_TEST_BINARY is defined at the top. Confirm that G_UNIT_TEST_BINARY is similarly defined and that no mismatch occurs.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Attempt to locate references to G_UNIT_TEST_BINARY and G_TEST_BINARY
rg "G_UNIT_TEST_BINARY|G_TEST_BINARY" -A 3
Length of output: 699
class TestSchedulerWorker: | ||
def test_job_success(self, scheduler_worker, storage, success_job): | ||
graph, parent_1, parent_2, child = success_job | ||
# Wait for 2 seconds and check task state and output | ||
time.sleep(2) | ||
state = get_task_state(storage, parent_1.id) | ||
assert state == "success" | ||
outputs = get_task_outputs(storage, parent_1.id) | ||
assert len(outputs) == 1 | ||
assert outputs[0].value == msgpack.packb(3).decode("utf-8") | ||
state = get_task_state(storage, parent_2.id) | ||
assert state == "success" | ||
outputs = get_task_outputs(storage, parent_2.id) | ||
assert len(outputs) == 1 | ||
assert outputs[0].value == msgpack.packb(7).decode("utf-8") | ||
state = get_task_state(storage, child.id) | ||
assert state == "success" | ||
outputs = get_task_outputs(storage, child.id) | ||
assert len(outputs) == 1 | ||
assert outputs[0].value == msgpack.packb(10).decode("utf-8") | ||
|
||
def test_job_failure(self, scheduler_worker, storage, fail_job): | ||
task = fail_job | ||
# Wait for 2 seconds and check task output | ||
time.sleep(2) | ||
state = get_task_state(storage, task.id) | ||
assert state == "fail" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Enhance test robustness and coverage.
The test implementation could be improved in several ways:
- Replace hardcoded sleep with polling
- Add concurrent job testing
- Validate error messages in failure case
Here's a suggested improvement:
+ def wait_for_task_completion(self, storage, task_id, timeout=5):
+ """Wait for task completion with timeout."""
+ start_time = time.monotonic()
+ while time.monotonic() - start_time < timeout:
+ state = get_task_state(storage, task_id)
+ if state in ["success", "fail"]:
+ return state
+ time.sleep(0.1)
+ raise TimeoutError(f"Task {task_id} did not complete within {timeout}s")
+
def test_job_success(self, scheduler_worker, storage, success_job):
graph, parent_1, parent_2, child = success_job
- # Wait for 2 seconds and check task state and output
- time.sleep(2)
- state = get_task_state(storage, parent_1.id)
+ state = self.wait_for_task_completion(storage, parent_1.id)
assert state == "success"
# ... rest of the assertions ...
def test_job_failure(self, scheduler_worker, storage, fail_job):
task = fail_job
- # Wait for 2 seconds and check task output
- time.sleep(2)
- state = get_task_state(storage, task.id)
+ state = self.wait_for_task_completion(storage, task.id)
assert state == "fail"
+ # Validate error message
+ task_info = get_task_info(storage, task.id)
+ assert "error_test failure" in task_info.error_message
+
+ def test_concurrent_jobs(self, scheduler_worker, storage):
+ """Test multiple jobs running concurrently."""
+ jobs = [success_job(storage) for _ in range(3)]
+ try:
+ for job in jobs:
+ graph, *_ = job
+ for task_id in graph.tasks:
+ state = self.wait_for_task_completion(storage, task_id)
+ assert state == "success"
+ finally:
+ for job in jobs:
+ graph, *_ = job
+ remove_job(storage, graph.id)
Committable suggestion skipped: line range outside the PR's diff.
def start_scheduler_worker( | ||
storage_url: str, scheduler_port: int | ||
) -> Tuple[subprocess.Popen, subprocess.Popen]: | ||
# Start the scheduler | ||
dir_path = Path(__file__).resolve().parent | ||
dir_path = dir_path / ".." / ".." / "src" / "spider" | ||
scheduler_cmds = [ | ||
str(dir_path / "spider_scheduler"), | ||
"--port", | ||
str(scheduler_port), | ||
"--storage_url", | ||
storage_url, | ||
] | ||
scheduler_process = subprocess.Popen(scheduler_cmds) | ||
worker_cmds = [ | ||
str(dir_path / "spider_worker"), | ||
"--storage_url", | ||
storage_url, | ||
"--libs", | ||
"tests/libworker_test.so", | ||
] | ||
worker_process = subprocess.Popen(worker_cmds) | ||
return scheduler_process, worker_process | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling and health checks for process management.
The current implementation has several areas for improvement:
- No error handling if processes fail to start
- No verification that processes are running successfully
- Hardcoded path to test library could cause issues in different environments
Consider implementing these improvements:
def start_scheduler_worker(
storage_url: str, scheduler_port: int
) -> Tuple[subprocess.Popen, subprocess.Popen]:
# Start the scheduler
dir_path = Path(__file__).resolve().parent
dir_path = dir_path / ".." / ".." / "src" / "spider"
+ test_lib_path = Path(__file__).resolve().parent.parent / "libworker_test.so"
+
+ if not (dir_path / "spider_scheduler").exists():
+ raise FileNotFoundError("Scheduler executable not found")
+ if not (dir_path / "spider_worker").exists():
+ raise FileNotFoundError("Worker executable not found")
+ if not test_lib_path.exists():
+ raise FileNotFoundError("Test library not found")
+
scheduler_cmds = [
str(dir_path / "spider_scheduler"),
"--port",
str(scheduler_port),
"--storage_url",
storage_url,
]
- scheduler_process = subprocess.Popen(scheduler_cmds)
+ try:
+ scheduler_process = subprocess.Popen(
+ scheduler_cmds,
+ stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE
+ )
+ except subprocess.SubprocessError as e:
+ raise RuntimeError(f"Failed to start scheduler: {e}")
+
+ # Wait for scheduler to be ready
+ time.sleep(1)
+ if scheduler_process.poll() is not None:
+ raise RuntimeError("Scheduler failed to start")
+
worker_cmds = [
str(dir_path / "spider_worker"),
"--storage_url",
storage_url,
"--libs",
- "tests/libworker_test.so",
+ str(test_lib_path),
]
- worker_process = subprocess.Popen(worker_cmds)
+ try:
+ worker_process = subprocess.Popen(
+ worker_cmds,
+ stderr=subprocess.PIPE,
+ stdout=subprocess.PIPE
+ )
+ except subprocess.SubprocessError as e:
+ scheduler_process.kill()
+ raise RuntimeError(f"Failed to start worker: {e}")
+
+ # Wait for worker to be ready
+ time.sleep(1)
+ if worker_process.poll() is not None:
+ scheduler_process.kill()
+ raise RuntimeError("Worker failed to start")
+
return scheduler_process, worker_process
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def start_scheduler_worker( | |
storage_url: str, scheduler_port: int | |
) -> Tuple[subprocess.Popen, subprocess.Popen]: | |
# Start the scheduler | |
dir_path = Path(__file__).resolve().parent | |
dir_path = dir_path / ".." / ".." / "src" / "spider" | |
scheduler_cmds = [ | |
str(dir_path / "spider_scheduler"), | |
"--port", | |
str(scheduler_port), | |
"--storage_url", | |
storage_url, | |
] | |
scheduler_process = subprocess.Popen(scheduler_cmds) | |
worker_cmds = [ | |
str(dir_path / "spider_worker"), | |
"--storage_url", | |
storage_url, | |
"--libs", | |
"tests/libworker_test.so", | |
] | |
worker_process = subprocess.Popen(worker_cmds) | |
return scheduler_process, worker_process | |
def start_scheduler_worker( | |
storage_url: str, scheduler_port: int | |
) -> Tuple[subprocess.Popen, subprocess.Popen]: | |
# Start the scheduler | |
dir_path = Path(__file__).resolve().parent | |
dir_path = dir_path / ".." / ".." / "src" / "spider" | |
test_lib_path = Path(__file__).resolve().parent.parent / "libworker_test.so" | |
if not (dir_path / "spider_scheduler").exists(): | |
raise FileNotFoundError("Scheduler executable not found") | |
if not (dir_path / "spider_worker").exists(): | |
raise FileNotFoundError("Worker executable not found") | |
if not test_lib_path.exists(): | |
raise FileNotFoundError("Test library not found") | |
scheduler_cmds = [ | |
str(dir_path / "spider_scheduler"), | |
"--port", | |
str(scheduler_port), | |
"--storage_url", | |
storage_url, | |
] | |
try: | |
scheduler_process = subprocess.Popen( | |
scheduler_cmds, | |
stderr=subprocess.PIPE, | |
stdout=subprocess.PIPE | |
) | |
except subprocess.SubprocessError as e: | |
raise RuntimeError(f"Failed to start scheduler: {e}") | |
# Wait for scheduler to be ready | |
time.sleep(1) | |
if scheduler_process.poll() is not None: | |
raise RuntimeError("Scheduler failed to start") | |
worker_cmds = [ | |
str(dir_path / "spider_worker"), | |
"--storage_url", | |
storage_url, | |
"--libs", | |
str(test_lib_path), | |
] | |
try: | |
worker_process = subprocess.Popen( | |
worker_cmds, | |
stderr=subprocess.PIPE, | |
stdout=subprocess.PIPE | |
) | |
except subprocess.SubprocessError as e: | |
scheduler_process.kill() | |
raise RuntimeError(f"Failed to start worker: {e}") | |
# Wait for worker to be ready | |
time.sleep(1) | |
if worker_process.poll() is not None: | |
scheduler_process.kill() | |
raise RuntimeError("Worker failed to start") | |
return scheduler_process, worker_process |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
tests/integration/client.py (3)
59-64
: Consider using a Pythonic approach for dependency checksThe current iteration works as expected; however, you could reduce iteration overhead by leveraging Python's built-in functions. For instance, using “any()” to determine if the task is a child in a dependency. The simplification would enhance readability and performance.
69-74
: Evaluate the fixture's concurrency behaviourIf multiple tests run in parallel, sharing a single database connection could lead to race conditions or locked resources. Consider creating a separate connection per test or marking tests that use this fixture as serial to avoid collisions.
76-121
: Wrap database operations in a transaction or try/except/finally blockTo ensure atomic inserts and handle possible errors gracefully, consider wrapping these operations in a single transaction. If anything fails in the middle, you can roll back all changes, maintaining data integrity.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
test-tasks.yaml
(1 hunks)tests/integration/client.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- test-tasks.yaml
🔇 Additional comments (2)
tests/integration/client.py (2)
66-66
: **Avoid hard-coding credentials **
Storing credentials (user/password) directly in the source code can pose a security risk. This concern was raised previously. Consider using environment variables or a secure secrets manager to enhance security.
125-141
: Validate “get_task_outputs” call results and handle empty rows
Currently, the function assumes that rows are present. If the task has no recorded outputs, ensure graceful handling by verifying that the fetched data is not empty.
Description
As title.
Validation performed
Summary by CodeRabbit
New Features
Bug Fixes
Chores