Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refact: make topic configurable for SolaceMessageQueue #424

Merged
merged 1 commit into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion llama_deploy/apiserver/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ def _load_message_queue_client(
self._simple_message_queue_server = SimpleMessageQueueServer(cfg)
return SimpleMessageQueue(cfg) # type: ignore
elif cfg.type == "solace":
return SolaceMessageQueue(**cfg.model_dump())
return SolaceMessageQueue(cfg) # type: ignore
else:
msg = f"Unsupported message queue: {cfg.type}"
raise ValueError(msg)
Expand Down
4 changes: 1 addition & 3 deletions llama_deploy/deploy/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ def _get_message_queue_client(config: BaseSettings) -> BaseMessageQueue:
elif isinstance(config, RedisMessageQueueConfig):
return RedisMessageQueue(config) # type: ignore
elif isinstance(config, SolaceMessageQueueConfig):
return SolaceMessageQueue(
**config.model_dump(),
)
return SolaceMessageQueue(config) # type: ignore
else:
raise ValueError(f"Invalid message queue config: {config}")

Expand Down
82 changes: 41 additions & 41 deletions llama_deploy/message_queues/solace.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@
BaseMessageQueueConsumer,
StartConsumingCallable,
)
from llama_deploy.message_queues.base import BaseMessageQueue
from llama_deploy.message_queues.base import AbstractMessageQueue
from llama_deploy.messages.base import QueueMessage

if TYPE_CHECKING:
from solace.messaging.connections.connectable import Connectable
from solace.messaging.messaging_service import MessagingService
from solace.messaging.publisher.persistent_message_publisher import (
PersistentMessagePublisher,
PublishReceipt,
Expand All @@ -39,10 +38,10 @@
SOLACE_INSTALLED = True

try:
from solace.messaging.messaging_service import MessagingService
from solace.messaging.publisher.persistent_message_publisher import (
MessagePublishReceiptListener,
PersistentMessagePublisher,
PublishReceipt,
)
from solace.messaging.receiver.message_receiver import MessageHandler
from solace.messaging.receiver.persistent_message_receiver import (
Expand All @@ -55,12 +54,12 @@ class MessagePublishReceiptListenerImpl(MessagePublishReceiptListener):
def __init__(self, callback: Any = None) -> None:
self.callback = callback

def on_publish_receipt(self, publish_receipt: "PublishReceipt") -> None:
def on_publish_receipt(self, publish_receipt: PublishReceipt) -> None:
if publish_receipt.user_context:
logger.info(
f"\tUser context received: {publish_receipt.user_context.get_custom_message}"
f"\tUser context received: {publish_receipt.user_context.get_custom_message}" # type:ignore
)
callback = publish_receipt.user_context.get("callback")
callback = publish_receipt.user_context.get("callback") # type:ignore
callback(publish_receipt.user_context)

class MessageHandlerImpl(MessageHandler):
Expand All @@ -69,15 +68,15 @@ class MessageHandlerImpl(MessageHandler):
def __init__(
self,
consumer: BaseMessageQueueConsumer,
receiver: PersistentMessageReceiver = None,
receiver: PersistentMessageReceiver | None = None,
) -> None:
self._consumer = consumer
self._receiver = receiver

def on_message(self, message: "InboundMessage") -> None:
try:
topic = message.get_destination_name()
payload_as_string = message.get_payload_as_string()
payload_as_string = message.get_payload_as_string() or ""
correlation_id = message.get_correlation_id()

message_details = {
Expand Down Expand Up @@ -151,16 +150,10 @@ def get_properties(self) -> dict:
return broker_properties


class SolaceMessageQueue(BaseMessageQueue):
class SolaceMessageQueue(AbstractMessageQueue):
"""Solace PubSub+ Message Queue."""

messaging_service: "MessagingService" = None
publisher: "PersistentMessagePublisher" = None
persistent_receiver: "PersistentMessageReceiver" = None
broker_properties: dict = {}
is_queue_temporary: bool = True

def __init__(self, **kwargs: Any) -> None:
def __init__(self, config: SolaceMessageQueueConfig | None) -> None:
"""Initialize the Solace message queue."""
if not SOLACE_INSTALLED:
raise ValueError(
Expand All @@ -170,19 +163,21 @@ def __init__(self, **kwargs: Any) -> None:
from solace.messaging.config.retry_strategy import RetryStrategy
from solace.messaging.messaging_service import MessagingService

super().__init__()

config = SolaceMessageQueueConfig(**kwargs)
self.broker_properties = config.get_properties()
self.messaging_service = (
self._publisher: "PersistentMessagePublisher | None" = None
self._persistent_receiver: "PersistentMessageReceiver | None" = None
self._config = config or SolaceMessageQueueConfig()
self._broker_properties = self._config.get_properties()
self._messaging_service = (
MessagingService.builder()
.from_properties(self.broker_properties)
.from_properties(self._broker_properties)
.with_reconnection_retry_strategy(
RetryStrategy.parametrized_retry(20, 3000)
)
.build()
)
self.is_queue_temporary = bool(self.broker_properties.get("IS_QUEUE_TEMPORARY"))
self._is_queue_temporary = bool(
self._broker_properties.get("IS_QUEUE_TEMPORARY")
)
logger.info("Solace Messaging Service created")

def __del__(self) -> None:
Expand All @@ -202,14 +197,14 @@ async def _establish_connection(self) -> "Connectable":

try:
logger.info("Establishing connection to Solace server")
connect = self.messaging_service.connect()
connect = self._messaging_service.connect()

# Create a publisher
self.publisher = self.messaging_service.create_persistent_message_publisher_builder().build()
self.publisher.start()
self._publisher = self._messaging_service.create_persistent_message_publisher_builder().build()
self._publisher.start() # type:ignore

publish_receipt_listener = MessagePublishReceiptListenerImpl()
self.publisher.set_message_publish_receipt_listener(
self._publisher.set_message_publish_receipt_listener( # type:ignore
publish_receipt_listener
)

Expand All @@ -236,7 +231,7 @@ async def _publish(self, message: QueueMessage, topic: str) -> None:
message_body = json.dumps(message.model_dump())

try:
self.publisher.publish(
self._publisher.publish( # type:ignore
message=message_body,
destination=destination,
)
Expand All @@ -249,14 +244,14 @@ async def _publish(self, message: QueueMessage, topic: str) -> None:
def disconnect(self) -> None:
"""Disconnect from the Solace server."""
try:
self.messaging_service.disconnect()
self._messaging_service.disconnect()
logger.info("Disconnected from Solace server")
except Exception as exception:
logger.debug("Error disconnecting: %s", exception)

def is_connected(self) -> bool:
"""Check if the Solace server is connected."""
return self.messaging_service.is_connected
return self._messaging_service.is_connected

def bind_to_queue(self, subscriptions: list = []) -> None:
"""Bind to a queue and subscribe to topics."""
Expand All @@ -277,26 +272,26 @@ def bind_to_queue(self, subscriptions: list = []) -> None:
return
queue_name = QUEUE_TEMPLATE.substitute(iteration=subscriptions[0])

if self.is_queue_temporary:
if self._is_queue_temporary:
queue = Queue.non_durable_exclusive_queue(queue_name)
else:
queue = Queue.durable_exclusive_queue(queue_name)

try:
# Build a receiver and bind it to the queue
self.persistent_receiver = (
self.messaging_service.create_persistent_message_receiver_builder()
self._persistent_receiver = (
self._messaging_service.create_persistent_message_receiver_builder()
.with_missing_resources_creation_strategy(
MissingResourcesCreationStrategy.CREATE_ON_START
)
.build(queue)
.build(queue) # type:ignore
)
self.persistent_receiver.start()
self._persistent_receiver.start() # type:ignore

logger.debug(
"Persistent receiver started... Bound to Queue [%s] (Temporary: %s)",
queue.get_name(),
self.is_queue_temporary,
self._is_queue_temporary,
)

# Handle API exception
Expand All @@ -310,7 +305,7 @@ def bind_to_queue(self, subscriptions: list = []) -> None:
# If subscriptions are provided, add them to the receiver
if subscriptions:
for subscription in subscriptions:
self.persistent_receiver.add_subscription(subscription)
self._persistent_receiver.add_subscription(subscription) # type:ignore
logger.info("Subscribed to topic: %s", subscription)

return
Expand All @@ -319,6 +314,9 @@ async def register_consumer(
self, consumer: BaseMessageQueueConsumer, topic: str | None = None
) -> StartConsumingCallable:
"""Register a new consumer."""
if topic is None:
raise ValueError("Topic must be a valid string")

try:
from solace.messaging.errors.pubsubplus_client_error import (
IllegalStateError,
Expand All @@ -339,8 +337,10 @@ async def register_consumer(

self.bind_to_queue(subscriptions=subscriptions)
logger.info(f"Consumer registered to: {consumer_subscription}")
self.persistent_receiver.receive_async(
MessageHandlerImpl(consumer=consumer, receiver=self.persistent_receiver)
self._persistent_receiver.receive_async( # type:ignore
MessageHandlerImpl(
consumer=consumer, receiver=self._persistent_receiver
)
)

async def start_consuming_callable() -> None:
Expand All @@ -365,15 +365,15 @@ async def deregister_consumer(self, consumer: BaseMessageQueueConsumer) -> None:

try:
for topic in topics:
self.persistent_receiver.remove_subscription(topic)
self._persistent_receiver.remove_subscription(topic) # type:ignore

logger.info(f"Consumer deregistered from: {consumer_subscription}")
time.sleep(MAX_SLEEP)
except Exception as e:
logger.error(f"Failed to deregister consumer: {e}")
raise
finally:
self.persistent_receiver.terminate()
self._persistent_receiver.terminate() # type:ignore

async def processing_loop(self) -> None:
"""A loop for getting messages from queues and sending to consumer."""
Expand Down
10 changes: 5 additions & 5 deletions tests/message_queues/test_solace.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ def solace_queue(
mock_messaging_service.create_persistent_message_publisher_builder.return_value.build.return_value = mock_publisher
mock_messaging_service.create_persistent_message_receiver_builder.return_value.with_missing_resources_creation_strategy.return_value.build.return_value = mock_receiver

queue = SolaceMessageQueue(type="solace")
queue.messaging_service = mock_messaging_service
queue.publisher = mock_publisher
queue.persistent_receiver = mock_receiver
queue = SolaceMessageQueue(SolaceMessageQueueConfig(type="solace"))
queue._messaging_service = mock_messaging_service
queue._publisher = mock_publisher
queue._persistent_receiver = mock_receiver
yield queue


Expand All @@ -140,7 +140,7 @@ async def test_establish_connection(
assert connect_result is not None
mock_messaging_service.connect.assert_called_once()
mock_publisher.start.assert_called_once()
assert solace_queue.publisher.set_message_publish_receipt_listener.called
assert solace_queue._publisher.set_message_publish_receipt_listener.called # type: ignore


@pytest.mark.asyncio
Expand Down
Loading