Skip to content

Commit

Permalink
Merge pull request #61 from Project-OMOTES/55-rework-sdk-to-utilize-s…
Browse files Browse the repository at this point in the history
…ingle-job-submission-queue-instead-of-one-per-workflow-type

55 rework sdk to utilize single job submission queue instead of one per workflow type
  • Loading branch information
lfse-slafleur authored Aug 1, 2024
2 parents d733f5b + e91cb4d commit 0aafcea
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 51 deletions.
86 changes: 67 additions & 19 deletions src/omotes_sdk/internal/common/broker_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,42 +216,29 @@ async def _send_message_to(
)
await self._exchanges[exchange_name].publish(amqp_message, routing_key=routing_key)

async def _add_queue_subscription(
async def _declare_queue(
self,
queue_name: str,
callback_on_message: Callable[[bytes], None],
queue_type: AMQPQueueType,
bind_to_routing_key: Optional[str] = None,
exchange_name: Optional[str] = None,
delete_after_messages: Optional[int] = None,
) -> None:
"""Declare an AMQP queue and subscribe to the messages.
) -> AbstractQueue:
"""Declare an AMQP queue.
:param queue_name: Name of the queue to declare.
:param callback_on_message: Callback which is called from a separate thread to process the
message body.
:param queue_type: Declare the queue using one of the known queue types.
:param bind_to_routing_key: Bind the queue to this routing key next to the default routing
key of the queue name. If none, the queue is only bound to the name of the queue.
If not none, then the exchange_name must be set as well.
:param exchange_name: Name of the exchange on which the messages will be published.
:param delete_after_messages: Delete the subscription & queue after this limit of messages
have been successfully processed.
"""
if queue_name in self._queue_subscription_consumer_by_name:
logger.error(
"Attempting to declare a subscription on %s but a "
"subscription on this queue already exists."
)
raise RuntimeError(f"Queue subscription for {queue_name} already exists.")

if bind_to_routing_key is not None and exchange_name is None:
raise RuntimeError(
f"Routing key for binding was set to {bind_to_routing_key} but no "
f"exchange name was provided."
)

logger.info("Declaring queue as %s and adding subscription to %s", queue_type, queue_name)
logger.info("Declaring queue %s as %s", queue_name, queue_type)
queue = await self._channel.declare_queue(queue_name, **queue_type.to_argument())

if exchange_name is not None:
Expand All @@ -263,6 +250,41 @@ async def _add_queue_subscription(
logger.info("Binding queue %s to routing key %s", queue_name, bind_to_routing_key)
await queue.bind(exchange=exchange, routing_key=bind_to_routing_key)

return queue

async def _declare_queue_and_add_subscription(
self,
queue_name: str,
callback_on_message: Callable[[bytes], None],
queue_type: AMQPQueueType,
bind_to_routing_key: Optional[str] = None,
exchange_name: Optional[str] = None,
delete_after_messages: Optional[int] = None,
) -> None:
"""Declare an AMQP queue and subscribe to the messages.
:param queue_name: Name of the queue to declare.
:param callback_on_message: Callback which is called from a separate thread to process the
message body.
:param queue_type: Declare the queue using one of the known queue types.
:param bind_to_routing_key: Bind the queue to this routing key next to the default routing
key of the queue name. If none, the queue is only bound to the name of the queue.
If not none, then the exchange_name must be set as well.
:param exchange_name: Name of the exchange on which the messages will be published.
:param delete_after_messages: Delete the subscription & queue after this limit of messages
have been successfully processed.
"""
if queue_name in self._queue_subscription_consumer_by_name:
logger.error(
"Attempting to declare a subscription on %s but a "
"subscription on this queue already exists."
)
raise RuntimeError(f"Queue subscription for {queue_name} already exists.")

queue = await self._declare_queue(
queue_name, queue_type, bind_to_routing_key, exchange_name
)

queue_consumer = QueueSubscriptionConsumer(
queue, delete_after_messages, callback_on_message
)
Expand Down Expand Up @@ -365,7 +387,33 @@ def declare_exchange(self, exchange_name: str) -> None:
"""
asyncio.run_coroutine_threadsafe(self._declare_exchange(exchange_name), self._loop).result()

def add_queue_subscription(
def declare_queue(
self,
queue_name: str,
queue_type: AMQPQueueType,
bind_to_routing_key: Optional[str] = None,
exchange_name: Optional[str] = None,
) -> None:
"""Declare an AMQP queue.
:param queue_name: Name of the queue to declare.
:param queue_type: Declare the queue using one of the known queue types.
:param bind_to_routing_key: Bind the queue to this routing key next to the default routing
key of the queue name. If none, the queue is only bound to the name of the queue.
If not none, then the exchange_name must be set as well.
:param exchange_name: Name of the exchange on which the messages will be published.
"""
asyncio.run_coroutine_threadsafe(
self._declare_queue(
queue_name=queue_name,
queue_type=queue_type,
bind_to_routing_key=bind_to_routing_key,
exchange_name=exchange_name,
),
self._loop,
).result()

def declare_queue_and_add_subscription(
self,
queue_name: str,
callback_on_message: Callable[[bytes], None],
Expand All @@ -387,7 +435,7 @@ def add_queue_subscription(
have been successfully processed.
"""
asyncio.run_coroutine_threadsafe(
self._add_queue_subscription(
self._declare_queue_and_add_subscription(
queue_name=queue_name,
callback_on_message=callback_on_message,
queue_type=queue_type,
Expand Down
35 changes: 21 additions & 14 deletions src/omotes_sdk/omotes_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ def start(self) -> None:
"""Start any other interfaces and request available workflows."""
self.broker_if.start()
self.broker_if.declare_exchange(OmotesQueueNames.omotes_exchange_name())
# Declare job submissions queue just in case the orchestrator isn't life yet so we do not
# lose work.
self.broker_if.declare_queue(
queue_name=OmotesQueueNames.job_submission_queue_name(),
queue_type=AMQPQueueType.DURABLE,
exchange_name=OmotesQueueNames.omotes_exchange_name(),
)
self.connect_to_available_workflows_updates()
self.request_available_workflows()

Expand All @@ -148,14 +155,14 @@ def disconnect_from_submitted_job(self, job: Job) -> None:
:param job: Job to disconnect from.
"""
self.broker_if.remove_queue_subscription(OmotesQueueNames.job_results_queue_name(job))
self.broker_if.remove_queue_subscription(OmotesQueueNames.job_progress_queue_name(job))
self.broker_if.remove_queue_subscription(OmotesQueueNames.job_status_queue_name(job))
self.broker_if.remove_queue_subscription(OmotesQueueNames.job_results_queue_name(job.id))
self.broker_if.remove_queue_subscription(OmotesQueueNames.job_progress_queue_name(job.id))
self.broker_if.remove_queue_subscription(OmotesQueueNames.job_status_queue_name(job.id))

def _autodelete_progres_status_queues_on_result(self, job: Job) -> None:
"""Disconnect and delete the progress and status queues for some job."""
self.broker_if.remove_queue_subscription(OmotesQueueNames.job_progress_queue_name(job))
self.broker_if.remove_queue_subscription(OmotesQueueNames.job_status_queue_name(job))
self.broker_if.remove_queue_subscription(OmotesQueueNames.job_progress_queue_name(job.id))
self.broker_if.remove_queue_subscription(OmotesQueueNames.job_status_queue_name(job.id))

def connect_to_submitted_job(
self,
Expand Down Expand Up @@ -193,23 +200,23 @@ def connect_to_submitted_job(
auto_disconnect_handler,
)

self.broker_if.add_queue_subscription(
queue_name=OmotesQueueNames.job_results_queue_name(job),
self.broker_if.declare_queue_and_add_subscription(
queue_name=OmotesQueueNames.job_results_queue_name(job.id),
callback_on_message=callback_handler.callback_on_finished_wrapped,
queue_type=AMQPQueueType.DURABLE,
exchange_name=OmotesQueueNames.omotes_exchange_name(),
delete_after_messages=1,
)
if callback_on_progress_update:
self.broker_if.add_queue_subscription(
queue_name=OmotesQueueNames.job_progress_queue_name(job),
self.broker_if.declare_queue_and_add_subscription(
queue_name=OmotesQueueNames.job_progress_queue_name(job.id),
callback_on_message=callback_handler.callback_on_progress_update_wrapped,
queue_type=AMQPQueueType.DURABLE,
exchange_name=OmotesQueueNames.omotes_exchange_name(),
)
if callback_on_status_update:
self.broker_if.add_queue_subscription(
queue_name=OmotesQueueNames.job_status_queue_name(job),
self.broker_if.declare_queue_and_add_subscription(
queue_name=OmotesQueueNames.job_status_queue_name(job.id),
callback_on_message=callback_handler.callback_on_status_update_wrapped,
queue_type=AMQPQueueType.DURABLE,
exchange_name=OmotesQueueNames.omotes_exchange_name(),
Expand Down Expand Up @@ -275,8 +282,8 @@ def submit_job(
params_dict=convert_params_dict_to_struct(params_dict),
)
self.broker_if.send_message_to(
OmotesQueueNames.omotes_exchange_name(),
OmotesQueueNames.job_submission_queue_name(workflow_type),
exchange_name=OmotesQueueNames.omotes_exchange_name(),
routing_key=OmotesQueueNames.job_submission_queue_name(),
message=job_submission_msg.SerializeToString(),
)
logger.debug("Done submitting job %s", job.id)
Expand All @@ -302,7 +309,7 @@ def cancel_job(self, job: Job) -> None:

def connect_to_available_workflows_updates(self) -> None:
"""Connect to updates of the available workflows."""
self.broker_if.add_queue_subscription(
self.broker_if.declare_queue_and_add_subscription(
queue_name=OmotesQueueNames.available_workflows_queue_name(self.client_id),
callback_on_message=self.callback_on_update_available_workflows,
queue_type=AMQPQueueType.EXCLUSIVE,
Expand Down
34 changes: 16 additions & 18 deletions src/omotes_sdk/queue_names.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from omotes_sdk.job import Job
from omotes_sdk.workflow_type import WorkflowType
import uuid


class OmotesQueueNames:
Expand All @@ -14,40 +13,39 @@ def omotes_exchange_name() -> str:
return "omotes_exchange"

@staticmethod
def job_submission_queue_name(workflow_type: WorkflowType) -> str:
"""Generate the job submission queue name given the workflow type.
def job_submission_queue_name() -> str:
"""Generate the job submission queue name.
:param workflow_type: Workflow type.
:return: The queue name.
"""
return f"job_submissions.{workflow_type.workflow_type_name}"
return "job_submissions"

@staticmethod
def job_results_queue_name(job: Job) -> str:
"""Generate the job results queue name given the job.
def job_results_queue_name(job_uuid: uuid.UUID) -> str:
"""Generate the job results queue name given the job id.
:param job: The job.
:param job_uuid: The identifier of the job.
:return: The queue name.
"""
return f"jobs.{job.id}.result"
return f"jobs.{job_uuid}.result"

@staticmethod
def job_progress_queue_name(job: Job) -> str:
"""Generate the job progress update queue name given the job.
def job_progress_queue_name(job_uuid: uuid.UUID) -> str:
"""Generate the job progress update queue name given the job id.
:param job: The job.
:param job_uuid: The identifier of the job.
:return: The queue name.
"""
return f"jobs.{job.id}.progress"
return f"jobs.{job_uuid}.progress"

@staticmethod
def job_status_queue_name(job: Job) -> str:
"""Generate the job status update queue name given the job.
def job_status_queue_name(job_uuid: uuid.UUID) -> str:
"""Generate the job status update queue name given the job id.
:param job: The job.
:param job_uuid: The identifier of the job.
:return: The queue name.
"""
return f"jobs.{job.id}.status"
return f"jobs.{job_uuid}.status"

@staticmethod
def job_cancel_queue_name() -> str:
Expand Down

0 comments on commit 0aafcea

Please sign in to comment.