-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Boefjes combined schedulers integration #4015
base: poc/mula/combined-schedulers
Are you sure you want to change the base?
Boefjes combined schedulers integration #4015
Conversation
# TODO: check | ||
if not response: | ||
logger.debug("Queue %s empty", queue_type.value) | ||
time.sleep(10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these be time.sleep(self.settings.poll_interval)
?
if not p_item: | ||
logger.debug("Queue %s empty", queue.id) | ||
continue | ||
# TODO: check |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check what kind of response the scheduler gives when the queue is empty and handle this.
def pop_item(self, scheduler_id: str) -> PaginatedTasksResponse | None: | ||
response = self._session.post(f"/schedulers/{scheduler_id}/pop?limit=1") | ||
self._verify_response(response) | ||
|
||
return TypeAdapter(PaginatedTasksResponse | None).validate_json(response.content) | ||
|
||
def pop_items(self, scheduler_id: str, filters: dict[str, Any]) -> PaginatedTasksResponse | None: | ||
response = self._session.post(f"/schedulers/{scheduler_id}/pop", json=filters) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Important to note is that the pop
endpoint changed to return multiple Task
s, additionally it allows for more complex filtering options. I've, for now limited the response to 1 Task
to resemble how it currently works, but this can be updated accordingly to what is desired.
…-schedulers-integration
if task_queue.qsize() > self.settings.pool_size: | ||
time.sleep(self.settings.worker_heartbeat) | ||
return | ||
logger.debug("Popping from queue %s", queue_type.value) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add this back in
Changes
Warning
Please check the SOURCE PR and the associated issue(s) to get the background and full context of this PR. This PR is to be merged into that source PR.
/queues
has been refactored to the/schedulers/{scheduler_id}/pop
endpoint how the task runner is popping off tasks. We don't have to iterate over all the organizational queues and check if items are still on the queue. We now can just pop off tasks from the queue.Issue link
Closes #3961