Skip to content

Commit

Permalink
feat: make the kafka topic configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
masci committed Nov 7, 2024
1 parent 34b9299 commit b65ee1b
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 103 deletions.
Empty file.
Empty file.
25 changes: 25 additions & 0 deletions e2e_tests/message_queues/message_queue_kafka/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
services:
kafka:
image: apache/kafka:3.7.1
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT_HOST://localhost:9092,PLAINTEXT://kafka:19092"
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:29093"
KAFKA_LISTENERS: "CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092"
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs"
healthcheck:
test: nc -z localhost 9092 || exit -1
start_period: 15s
interval: 30s
timeout: 10s
retries: 5
50 changes: 50 additions & 0 deletions e2e_tests/message_queues/message_queue_kafka/test_message_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import asyncio
import subprocess
from pathlib import Path

import pytest

from llama_deploy.message_consumers.callable import CallableMessageConsumer
from llama_deploy.message_queues import KafkaMessageQueue, KafkaMessageQueueConfig
from llama_deploy.messages import QueueMessage


@pytest.fixture
def kafka_service():
cwd = Path(__file__).resolve().parent
proc = subprocess.Popen(["docker-compose", "up", "-d", "--wait"], cwd=cwd)
proc.communicate()
yield
subprocess.Popen(["docker-compose", "down"], cwd=cwd)


@pytest.fixture
def mq(kafka_service):
return KafkaMessageQueue(KafkaMessageQueueConfig(topic_name="test"))


@pytest.mark.asyncio
async def test_roundtrip(mq):
received_messages = []

# register a consumer
def message_handler(message: QueueMessage) -> None:
received_messages.append(message)

test_consumer = CallableMessageConsumer(
message_type="test", handler=message_handler
)
start_consuming_callable = await mq.register_consumer(test_consumer)

# produce a message
test_message = QueueMessage(type="test", data={"message": "this is a test"})

# await asyncio.gather(start_consuming_callable(), mq.publish(test_message))
await mq.publish(test_message)
t = asyncio.create_task(start_consuming_callable())
await asyncio.sleep(0.5)
# at this point message should've been arrived
t.cancel()

assert len(received_messages) == 1
assert test_message in received_messages
16 changes: 8 additions & 8 deletions llama_deploy/deploy/deploy.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import asyncio
import httpx
import signal
import sys

from llama_deploy.message_queues.simple import SimpleRemoteClientMessageQueue
from pydantic_settings import BaseSettings
from typing import Any, Callable, List, Optional

import httpx
from llama_index.core.workflow import Workflow
from pydantic_settings import BaseSettings

from llama_deploy.control_plane.server import ControlPlaneConfig, ControlPlaneServer
from llama_deploy.deploy.network_workflow import NetworkServiceManager
from llama_deploy.message_queues import (
AWSMessageQueue,
AWSMessageQueueConfig,
BaseMessageQueue,
KafkaMessageQueue,
KafkaMessageQueueConfig,
Expand All @@ -20,14 +21,13 @@
RedisMessageQueueConfig,
SimpleMessageQueue,
SimpleMessageQueueConfig,
AWSMessageQueue,
AWSMessageQueueConfig,
)
from llama_deploy.message_queues.simple import SimpleRemoteClientMessageQueue
from llama_deploy.orchestrators.simple import (
SimpleOrchestrator,
SimpleOrchestratorConfig,
)
from llama_deploy.services.workflow import WorkflowServiceConfig, WorkflowService
from llama_deploy.services.workflow import WorkflowService, WorkflowServiceConfig

DEFAULT_TIMEOUT = 120.0

Expand Down Expand Up @@ -67,7 +67,7 @@ def _get_message_queue_client(config: BaseSettings) -> BaseMessageQueue:
elif isinstance(config, AWSMessageQueueConfig):
return AWSMessageQueue(**config.model_dump())
elif isinstance(config, KafkaMessageQueueConfig):
return KafkaMessageQueue(
return KafkaMessageQueue( # type: ignore
**config.model_dump(),
)
elif isinstance(config, RabbitMQMessageQueueConfig):
Expand Down
5 changes: 3 additions & 2 deletions llama_deploy/message_queues/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
KafkaMessageQueue,
KafkaMessageQueueConfig,
)
from llama_deploy.message_queues.base import BaseMessageQueue
from llama_deploy.message_queues.aws import AWSMessageQueue, AWSMessageQueueConfig
from llama_deploy.message_queues.base import AbstractMessageQueue, BaseMessageQueue
from llama_deploy.message_queues.rabbitmq import (
RabbitMQMessageQueue,
RabbitMQMessageQueueConfig,
Expand All @@ -13,9 +14,9 @@
SimpleMessageQueueConfig,
SimpleRemoteClientMessageQueue,
)
from llama_deploy.message_queues.aws import AWSMessageQueue, AWSMessageQueueConfig

__all__ = [
"AbstractMessageQueue",
"BaseMessageQueue",
"KafkaMessageQueue",
"KafkaMessageQueueConfig",
Expand Down
102 changes: 24 additions & 78 deletions llama_deploy/message_queues/apache_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,25 @@

import asyncio
import json
import logging
from logging import getLogger
from typing import Any, Callable, Coroutine, Dict, List, Optional, Literal
from typing import Any, Callable, Coroutine, Dict, List, Literal

from pydantic import BaseModel, model_validator, Field
from pydantic import BaseModel, Field, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict

from llama_deploy.message_consumers.callable import CallableMessageConsumer
from llama_deploy.message_queues.base import BaseMessageQueue
from llama_deploy.message_consumers.base import BaseMessageQueueConsumer
from llama_deploy.message_queues.base import AbstractMessageQueue
from llama_deploy.messages.base import QueueMessage

import logging

logger = getLogger(__name__)
logger.setLevel(logging.INFO)


DEFAULT_URL = "localhost:9092"
DEFAULT_TOPIC_PARTITIONS = 10
DEFAULT_TOPIC_REPLICATION_FACTOR = 1
DEFAULT_TOPIC_NAME = "control_plane"
DEFAULT_GROUP_ID = "default_group" # single group for competing consumers


Expand All @@ -32,8 +31,9 @@ class KafkaMessageQueueConfig(BaseSettings):

type: Literal["kafka"] = Field(default="kafka", exclude=True)
url: str = DEFAULT_URL
host: Optional[str] = None
port: Optional[int] = None
topic_name: str = Field(default=DEFAULT_TOPIC_NAME)
host: str | None = None
port: int | None = None

@model_validator(mode="after")
def update_url(self) -> "KafkaMessageQueueConfig":
Expand All @@ -42,7 +42,7 @@ def update_url(self) -> "KafkaMessageQueueConfig":
return self


class KafkaMessageQueue(BaseMessageQueue):
class KafkaMessageQueue(AbstractMessageQueue):
"""Apache Kafka integration with aiokafka.
This class implements a traditional message broker using Apache Kafka.
Expand All @@ -63,20 +63,16 @@ class KafkaMessageQueue(BaseMessageQueue):
```
"""

url: str = DEFAULT_URL

def __init__(
self,
url: str = DEFAULT_URL,
**kwargs: Any,
self, config: KafkaMessageQueueConfig = KafkaMessageQueueConfig()
) -> None:
super().__init__(url=url)
self._config = config

@classmethod
def from_url_params(
cls,
host: str,
port: Optional[int] = None,
port: int | None = None,
) -> "KafkaMessageQueue":
"""Convenience constructor from url params.
Expand All @@ -88,13 +84,13 @@ def from_url_params(
KafkaMessageQueue: An Apache Kafka MessageQueue integration.
"""
url = f"{host}:{port}" if port else f"{host}"
return cls(url=url)
return cls(KafkaMessageQueueConfig(url=url))

def _create_new_topic(
self,
topic_name: str,
num_partitions: Optional[int] = None,
replication_factor: Optional[int] = None,
num_partitions: int | None = None,
replication_factor: int | None = None,
**kwargs: Dict[str, Any],
) -> None:
"""Create a new topic.
Expand All @@ -113,7 +109,7 @@ def _create_new_topic(
"Please install it using `pip install kafka-python-ng`."
)

admin_client = KafkaAdminClient(bootstrap_servers=self.url)
admin_client = KafkaAdminClient(bootstrap_servers=self._config.url)
try:
topic = NewTopic(
name=topic_name,
Expand All @@ -138,7 +134,7 @@ async def _publish(self, message: QueueMessage) -> Any:
"Please install it using `pip install aiokafka`."
)

producer = AIOKafkaProducer(bootstrap_servers=self.url)
producer = AIOKafkaProducer(bootstrap_servers=self._config.url)
await producer.start()
try:
message_body = json.dumps(message.model_dump()).encode("utf-8")
Expand All @@ -165,7 +161,7 @@ async def cleanup_local(
"Please install it using `pip install aiokafka`."
)

admin_client = KafkaAdminClient(bootstrap_servers=self.url)
admin_client = KafkaAdminClient(bootstrap_servers=self._config.url)
active_topics = admin_client.list_topics()
topics_to_delete = [el for el in message_types if el in active_topics]
admin_client.delete_consumer_groups(DEFAULT_GROUP_ID)
Expand Down Expand Up @@ -203,17 +199,17 @@ async def register_consumer(
)

# register topic
self._create_new_topic(consumer.message_type)
self._create_new_topic(self._config.topic_name)
kafka_consumer = AIOKafkaConsumer(
consumer.message_type,
bootstrap_servers=self.url,
self._config.topic_name,
bootstrap_servers=self._config.url,
group_id=DEFAULT_GROUP_ID,
auto_offset_reset="earliest",
)
await kafka_consumer.start()

logger.info(
f"Registered consumer {consumer.id_}: {consumer.message_type}",
f"Registered consumer {consumer.id_}: {consumer.message_type} on topic {self._config.topic_name}",
)

async def start_consuming_callable() -> None:
Expand All @@ -227,62 +223,12 @@ async def start_consuming_callable() -> None:
stop_task = asyncio.create_task(kafka_consumer.stop())
stop_task.add_done_callback(
lambda _: logger.info(
f"stopped kafka consumer {consumer.id_}: {consumer.message_type}"
f"stopped kafka consumer {consumer.id_}: {consumer.message_type} on topic {self._config.topic_name}"
)
)
await asyncio.shield(stop_task)

return start_consuming_callable

def as_config(self) -> BaseModel:
return KafkaMessageQueueConfig(url=self.url)


if __name__ == "__main__":
# for testing
import argparse
import sys

logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger.addHandler(logging.StreamHandler(stream=sys.stdout))

parser = argparse.ArgumentParser()
parser.add_argument("--produce", action="store_true", default=False)
parser.add_argument("--consume", action="store_true", default=False)
parser.add_argument("--clean-up", action="store_true", default=False)

args = parser.parse_args()

async def consume() -> None:
mq = KafkaMessageQueue()

# register a sample consumer
def message_handler(message: QueueMessage) -> None:
print(f"MESSAGE: {message}")

test_consumer = CallableMessageConsumer(
message_type="test", handler=message_handler
)

start_consuming_callable = await mq.register_consumer(test_consumer)
await start_consuming_callable()

async def produce() -> None:
mq = KafkaMessageQueue()
mq._create_new_topic(topic_name="test")

test_message = QueueMessage(type="test", data={"message": "this is a test"})
await mq.publish(test_message)

async def clean_up() -> None:
mq = KafkaMessageQueue()
await mq.cleanup_local(["test"])

if args.produce:
asyncio.run(produce())

if args.consume:
asyncio.run(consume())

if args.clean_up:
asyncio.run(clean_up())
return KafkaMessageQueueConfig(url=self._config.url)
Loading

0 comments on commit b65ee1b

Please sign in to comment.