From e1d2339b98898d6873123bc31a41cb0e2f3b69e4 Mon Sep 17 00:00:00 2001 From: Andrei Fajardo <92402603+nerdai@users.noreply.github.com> Date: Sun, 21 Jul 2024 00:40:51 -0400 Subject: [PATCH] Add KafkaMessageQueue (#148) * Bump certifi from 2024.6.2 to 2024.7.4 (#108) * Bump certifi from 2024.6.2 to 2024.7.4 (#108) * kafka dep aiokafka * start KafkaMessageQueue and impl create topic * todo note * register consumer * return start consuming callble * bug * clean up delete topics * functional * add argparse for convenient testing * start unit tests * add some unit tests * mock producer.start * start examples * wip * clean up * lint * update readme * docker compose * wip * wip * working docker example * nit * update pyproject * docs * nit in README * cr * cr --------- Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- examples/kafka/README.md | 183 ++++++++++++ .../kafka/pig-latin-translation/.gitignore | 11 + .../kafka/pig-latin-translation/Dockerfile | 49 ++++ .../kafka/pig-latin-translation/README.md | 1 + .../pig-latin-translation/docker-compose.yml | 142 ++++++++++ .../kafka/pig-latin-translation/logging.ini | 18 ++ .../pig_latin_translation/__init__.py | 0 .../additional_services/__init__.py | 0 .../additional_services/human_consumer.py | 46 +++ .../additional_services/task_result.py | 89 ++++++ .../agent_services/__init__.py | 0 .../correct_first_character_agent.py | 106 +++++++ .../agent_services/decorators.py | 69 +++++ .../agent_services/remove_ay_agent.py | 115 ++++++++ .../core_services/__init__.py | 0 .../core_services/control_plane.py | 71 +++++ .../core_services/message_queue.py | 10 + .../pig_latin_translation/local_launcher.py | 26 ++ .../pig_latin_translation/utils.py | 9 + .../pig-latin-translation/pyproject.toml | 17 ++ .../scripts/simulation.py | 46 +++ .../pig-latin-translation/template.env.docker | 12 + .../pig-latin-translation/template.env.local | 12 + .../local_launcher_human_single.py | 62 ++++ llama_agents/launchers/local.py | 1 + llama_agents/message_queues/apache_kafka.py | 266 ++++++++++++++++++ poetry.lock | 77 ++++- pyproject.toml | 5 + tests/message_queues/test_apache_kafka.py | 57 ++++ 29 files changed, 1497 insertions(+), 3 deletions(-) create mode 100644 examples/kafka/README.md create mode 100644 examples/kafka/pig-latin-translation/.gitignore create mode 100644 examples/kafka/pig-latin-translation/Dockerfile create mode 100644 examples/kafka/pig-latin-translation/README.md create mode 100644 examples/kafka/pig-latin-translation/docker-compose.yml create mode 100644 examples/kafka/pig-latin-translation/logging.ini create mode 100644 examples/kafka/pig-latin-translation/pig_latin_translation/__init__.py create mode 100644 examples/kafka/pig-latin-translation/pig_latin_translation/additional_services/__init__.py create mode 100644 examples/kafka/pig-latin-translation/pig_latin_translation/additional_services/human_consumer.py create mode 100644 examples/kafka/pig-latin-translation/pig_latin_translation/additional_services/task_result.py create mode 100644 examples/kafka/pig-latin-translation/pig_latin_translation/agent_services/__init__.py create mode 100644 examples/kafka/pig-latin-translation/pig_latin_translation/agent_services/correct_first_character_agent.py create mode 100644 examples/kafka/pig-latin-translation/pig_latin_translation/agent_services/decorators.py create mode 100644 examples/kafka/pig-latin-translation/pig_latin_translation/agent_services/remove_ay_agent.py create mode 100644 examples/kafka/pig-latin-translation/pig_latin_translation/core_services/__init__.py create mode 100644 examples/kafka/pig-latin-translation/pig_latin_translation/core_services/control_plane.py create mode 100644 examples/kafka/pig-latin-translation/pig_latin_translation/core_services/message_queue.py create mode 100644 examples/kafka/pig-latin-translation/pig_latin_translation/local_launcher.py create mode 100644 examples/kafka/pig-latin-translation/pig_latin_translation/utils.py create mode 100644 examples/kafka/pig-latin-translation/pyproject.toml create mode 100644 examples/kafka/pig-latin-translation/scripts/simulation.py create mode 100644 examples/kafka/pig-latin-translation/template.env.docker create mode 100644 examples/kafka/pig-latin-translation/template.env.local create mode 100644 examples/kafka/simple-scripts/local_launcher_human_single.py create mode 100644 llama_agents/message_queues/apache_kafka.py create mode 100644 tests/message_queues/test_apache_kafka.py diff --git a/examples/kafka/README.md b/examples/kafka/README.md new file mode 100644 index 00000000..a1170b42 --- /dev/null +++ b/examples/kafka/README.md @@ -0,0 +1,183 @@ +# Using Apache Kafka as the MessageQueue + +The examples contained in this subdirectory make use of the Apache Kafka integration +within `llama-agents`. + +## Installation + +Simply install `llama-agents` with the `kafka` extra: + +```sh +# using pip install +pip install llama-agents[kafka] + +# using poetry +poetry add llama-agents -E "kafka" +``` + +A virtual environment with this installation of `llama-agents` is what will +be needed to run the example scripts in `simple-scripts/`. + +## Usage Pattern + +```python +from llama_agents.message_queue.apache_kafka import KafkaMessageQueue + +message_queue = KafkaMessageQueue( + url=... +) # if no url is supplied the default localhost:9092 is used +``` + +## Examples + +### Simple Scripts + +A couple of scripts using `LocalLauncher` and a `LocalServer` with +`KafkaMessageQueue` (rather than `SimpleMessageQueue`) are included in this +subdirectory. + +Before running any of these scrtips we first need to have Kafka cluster running. +For a quick setup, we recommend using the official Kafka community [docker image](https://hub.docker.com/r/apache/kafka): + +```sh +docker run -p 9092:9092 apache/kafka:3.7.1 +``` + +With our Kafka broker running, we can now run our example scripts. + +```sh +# using LocalLauncher +python ./simple-scripts/local_launcher_human_single.py +``` + +The script above will build a simple multi-agent app, connect it to the Kafka +message queue, and subsequently send the specified task. + +### Example App: Pig-Latin Translator + +In this example, we build a multi-agent app for translating simple Pig-Latin, +where when given a sentence, all words in the sentence are modified with the +following two steps: + +1. the first letter is moved to the end of the word +2. the suffix "ay" is added to the end of the word + +For example: "hi how are you" becomes "eyhay owhay siay tiay oinggay" in simple +Pig-Latin. + +The multi-agent system translate's simple Pig-Latin text by reversing the +previously mentioned two steps. It does so by using two agents that work in +sequence: `remove_ay_agent` and `correct_first_character_agent`. + +This multi-agent system also features a `TaskResultService` that is a consumer +of messages containing the final `TaskResult` data. That is, when a task is +completed, the control plane sends a message containing the results of the task +to a consumer that subscribes to the message type "human". This is precisely +what `TaskResultService` is, and it consumes these messages by appending the +results to a `task_results.jsonl` file that is stored in a `task_results` folder +that gets created in the directory from which the service was launched. + +#### Launching Without Docker + +As with running our example simple scripts above, we need to standup our +Kafka node manually: + +```sh +docker run -p 9092:9092 apache/kafka:3.7.1 +``` + +Next, in order to launch this multi-agent system, we first need to set the +required environment variables. To do that fill in the provided +`template.env.local` file found in the `pig-latin-translation/` folder. After filling +in the file rename it to `.env.local` (i.e., remove "template" from the name) +and the run the commands that follow. + +```sh +# set environment variables +cd pig-latin-translation +set -a && source .env.local + +# activate the project virtual env +poetry shell && poetry install +``` + +Finally to launch the example multi-agent app: + +```sh +python pig_lating_translation/local_launcher.py +``` + +Once launched, we can send tasks to our multi-agent system using the +`LlamaAgentsClient` (note: the code below introduce static delay to handle +asynchronous call for quick test purpose only): + +```python +from llama_agents import LlamaAgentsClient +import time + +client = LlamaAgentsClient("http://0.0.0.0:8001") +task_id = client.create_task("lamaindexlay siay hetay estbay") +time.sleep(10) +task_result = client.get_task_result(task_id) +print(task_result.result) +``` + +#### Launching With Docker + +_Prerequisites_: Must have docker installed. (See +[here](https://docs.docker.com/get-docker/) for how to install Docker Desktop +which comes with `docker-compose`.) + +**NOTE:** In this example, we don't need to run the Kafka server manually. So you +can go ahead and shutdown the Kafka docker container that we had running in +previous launch if you haven't yet done so. The Kafka server is bundled within +the multi-agent deployment defined in the `docker-compose.yaml` file. + +To Launch with Docker, this example makes use of `docker-compose` that will take +care of launching the individual services (and building a default bridge network +so that the services/containers can communicate with one another by name.). + +Before building the docker image and launching the services (as with the case +for launching without Docker), we first need to set the required environment +variables. Fill in the values in the `template.env.docker` file and after doing so rename +the file to `.env.docker`. Note there are some variables there that we recommend +not modifying as they are used to the service definitions establisehed in the +`docker_compose.yml`. + +This example is provided without a `poetry.lock` file as recommended in the +[poetry documentation for library developers](https://python-poetry.org/docs/basic-usage/#as-a-library-developer). +Before running docker-compose the first time, we must create the `poetry.lock` +file. If you are coming from the previous section where we Launched Without +Docker, then you have obtained the lock file after running `poetry install`. +If not, then use the command below + +```sh +# while in pig-latin-translation/ +poetry install +``` + +To launch the services we now use the `docker-compose` command line tool. + +```sh +docker-compose up --build +``` + +This command will start the servers in sequence: first the Kafka service, +then the control plane, followed by the agent services and the human consumer +service. This sequencing is required since the later services depend must register +to the message queue and control plane (and they need to be up and running before +being able to do so). + +Once all the services are up and running, we can send tasks to our multi-agent +system: + +```python +from llama_agents import LlamaAgentsClient +import time + +client = LlamaAgentsClient("http://0.0.0.0:8001") +task_id = client.create_task("lamaindexlay siay hetay estbay") +time.sleep(10) +task_result = client.get_task_result(task_id) +print(task_result.result) +``` diff --git a/examples/kafka/pig-latin-translation/.gitignore b/examples/kafka/pig-latin-translation/.gitignore new file mode 100644 index 00000000..fb89d726 --- /dev/null +++ b/examples/kafka/pig-latin-translation/.gitignore @@ -0,0 +1,11 @@ +.env.* +.env +poetry.lock +index.html.* +index.html +task_results +.ipynb_checkpoints/ +secrets.yaml +Dockerfile.local +docker-compose.local.yml +pyproject.local.toml diff --git a/examples/kafka/pig-latin-translation/Dockerfile b/examples/kafka/pig-latin-translation/Dockerfile new file mode 100644 index 00000000..a65573e9 --- /dev/null +++ b/examples/kafka/pig-latin-translation/Dockerfile @@ -0,0 +1,49 @@ +FROM --platform=linux/amd64 python:3.10-slim as builder + +WORKDIR /app + +ENV POETRY_VERSION=1.7.1 + +# Install libraries for necessary python package builds +RUN apt-get update && apt-get --no-install-recommends install build-essential python3-dev libpq-dev -y && \ + pip install --no-cache-dir --upgrade pip && \ + pip install --no-cache-dir --upgrade poetry==${POETRY_VERSION} + +# Install ssh +RUN apt-get -yq update && apt-get -yqq install ssh + +# Configure Poetry +ENV POETRY_CACHE_DIR=/tmp/poetry_cache +ENV POETRY_NO_INTERACTION=1 +ENV POETRY_VIRTUALENVS_IN_PROJECT=true +ENV POETRY_VIRTUALENVS_CREATE=true + +# Install dependencies +COPY ./poetry.lock ./pyproject.toml ./ + +RUN mkdir -p -m 0600 ~/.ssh && ssh-keyscan github.com >> ~/.ssh/known_hosts +RUN --mount=type=secret,id=id_ed25519,dst=/root/.ssh/id_ed25519 poetry install --no-cache --no-root -vvv + +RUN poetry install --no-cache --no-root + +FROM --platform=linux/amd64 python:3.10-slim as runtime + +# Install wget for healthcheck +RUN apt-get update && apt-get install -y wget + +RUN apt-get update -y && \ + apt-get install --no-install-recommends libpq5 -y && \ + rm -rf /var/lib/apt/lists/* # Install libpq for psycopg2 + +RUN groupadd -r appuser && useradd --no-create-home -g appuser -r appuser +USER appuser + +WORKDIR /app + +ENV VIRTUAL_ENV=/app/.venv +COPY --from=builder ${VIRTUAL_ENV} ${VIRTUAL_ENV} +ENV PATH="${VIRTUAL_ENV}/bin:${PATH}" + +# Copy source code +COPY ./logging.ini ./logging.ini +COPY ./pig_latin_translation ./pig_latin_translation diff --git a/examples/kafka/pig-latin-translation/README.md b/examples/kafka/pig-latin-translation/README.md new file mode 100644 index 00000000..96d769fa --- /dev/null +++ b/examples/kafka/pig-latin-translation/README.md @@ -0,0 +1 @@ +# Pig-Latin Translation diff --git a/examples/kafka/pig-latin-translation/docker-compose.yml b/examples/kafka/pig-latin-translation/docker-compose.yml new file mode 100644 index 00000000..50894809 --- /dev/null +++ b/examples/kafka/pig-latin-translation/docker-compose.yml @@ -0,0 +1,142 @@ +version: "3" +services: + kafka: + image: apache/kafka:3.7.1 + hostname: kafka + container_name: kafka + ports: + - "9092:9092" + env_file: + - .env.docker + environment: + KAFKA_NODE_ID: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT" + KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT_HOST://localhost:9092,PLAINTEXT://kafka:19092" + KAFKA_PROCESS_ROLES: "broker,controller" + KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka:29093" + KAFKA_LISTENERS: "CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092" + KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT" + KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs" + healthcheck: + test: nc -z localhost 9092 || exit -1 + start_period: 15s + interval: 30s + timeout: 10s + retries: 5 + control_plane: + image: pig_latin_translation:latest + command: sh -c "python -m pig_latin_translation.core_services.control_plane" + env_file: + - .env.docker + ports: + - "8001:8001" + volumes: + - ./pig_latin_translation:/app/pig_latin_translation # load local code change to container without the need of rebuild + - ./logging.ini:/app/logging.ini + depends_on: + kafka: + condition: service_healthy + platform: linux/amd64 + build: + context: . + dockerfile: ./Dockerfile + secrets: + - id_ed25519 + healthcheck: + test: wget --no-verbose --tries=1 http://0.0.0.0:8001/ || exit 1 + interval: 30s + retries: 5 + start_period: 20s + timeout: 10s + ay_agent: + image: pig_latin_translation:latest + command: sh -c "python -m pig_latin_translation.agent_services.remove_ay_agent" + env_file: + - ./.env.docker + ports: + - "8002:8002" + volumes: + - ./pig_latin_translation:/app/pig_latin_translation # load local code change to container without the need of rebuild + - ./logging.ini:/app/logging.ini + platform: linux/amd64 + depends_on: + kafka: + condition: service_healthy + control_plane: + condition: service_healthy + build: + context: . + dockerfile: ./Dockerfile + secrets: + - id_ed25519 + healthcheck: + test: wget --no-verbose --tries=1 http://0.0.0.0:8002/ || exit 1 + interval: 30s + retries: 5 + start_period: 20s + timeout: 10s + first_char_agent: + image: pig_latin_translation:latest + command: sh -c "python -m pig_latin_translation.agent_services.correct_first_character_agent" + env_file: + - ./.env.docker + ports: + - "8003:8003" + volumes: + - ./pig_latin_translation:/app/pig_latin_translation # load local code change to container without the need of rebuild + - ./logging.ini:/app/logging.ini + depends_on: + kafka: + condition: service_healthy + control_plane: + condition: service_healthy + platform: linux/amd64 + build: + context: . + dockerfile: ./Dockerfile + secrets: + - id_ed25519 + healthcheck: + test: wget --no-verbose --tries=1 http://0.0.0.0:8003/ || exit 1 + interval: 30s + retries: 5 + start_period: 20s + timeout: 10s + human_consumer: + image: pig_latin_translation:latest + command: sh -c "python -m pig_latin_translation.additional_services.human_consumer" + env_file: + - ./.env.docker + ports: + - "8004:8004" + volumes: + - ./pig_latin_translation:/app/pig_latin_translation # load local code change to container without the need of rebuild + - ./logging.ini:/app/logging.ini + - ./task_results:/app/task_results + platform: linux/amd64 + depends_on: + kafka: + condition: service_healthy + control_plane: + condition: service_healthy + build: + context: . + dockerfile: ./Dockerfile + secrets: + - id_ed25519 + healthcheck: + test: wget --no-verbose --tries=1 http://0.0.0.0:8004/ || exit 1 + interval: 30s + retries: 5 + start_period: 20s + timeout: 10s +volumes: + kafka: +secrets: + id_ed25519: + file: ~/.ssh/id_ed25519 diff --git a/examples/kafka/pig-latin-translation/logging.ini b/examples/kafka/pig-latin-translation/logging.ini new file mode 100644 index 00000000..07bad50a --- /dev/null +++ b/examples/kafka/pig-latin-translation/logging.ini @@ -0,0 +1,18 @@ +[loggers] +keys=root + +[handlers] +keys=consoleHandler + +[formatters] +keys=sampleFormatter + +[logger_root] +handlers=consoleHandler + +[handler_consoleHandler] +class=StreamHandler +formatter=sampleFormatter + +[formatter_sampleFormatter] +format=%(asctime)s - %(levelname)s - %(message)s diff --git a/examples/kafka/pig-latin-translation/pig_latin_translation/__init__.py b/examples/kafka/pig-latin-translation/pig_latin_translation/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/kafka/pig-latin-translation/pig_latin_translation/additional_services/__init__.py b/examples/kafka/pig-latin-translation/pig_latin_translation/additional_services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/kafka/pig-latin-translation/pig_latin_translation/additional_services/human_consumer.py b/examples/kafka/pig-latin-translation/pig_latin_translation/additional_services/human_consumer.py new file mode 100644 index 00000000..4fe656cb --- /dev/null +++ b/examples/kafka/pig-latin-translation/pig_latin_translation/additional_services/human_consumer.py @@ -0,0 +1,46 @@ +import asyncio +import uvicorn +from llama_agents.message_queues.apache_kafka import KafkaMessageQueue +from pig_latin_translation.additional_services.task_result import TaskResultService +from pig_latin_translation.utils import load_from_env + +message_queue_host = load_from_env("KAFKA_HOST") +message_queue_port = load_from_env("KAFKA_PORT") +human_consumer_host = load_from_env("HUMAN_CONSUMER_HOST") +human_consumer_port = load_from_env("HUMAN_CONSUMER_PORT") +localhost = load_from_env("LOCALHOST") + + +# create our multi-agent framework components +message_queue = KafkaMessageQueue.from_url_params( + host=message_queue_host, + port=int(message_queue_port) if message_queue_port else None, +) + +human_consumer_server = TaskResultService( + message_queue=message_queue, + host=human_consumer_host, + port=int(human_consumer_port) if human_consumer_port else None, + name="human", +) + +app = human_consumer_server._app + + +# launch +async def launch() -> None: + # register to message queue and start consuming + start_consuming_callable = await human_consumer_server.register_to_message_queue() + _ = asyncio.create_task(start_consuming_callable()) + + cfg = uvicorn.Config( + human_consumer_server._app, + host=localhost, + port=human_consumer_server.port, + ) + server = uvicorn.Server(cfg) + await server.serve() + + +if __name__ == "__main__": + asyncio.run(launch()) diff --git a/examples/kafka/pig-latin-translation/pig_latin_translation/additional_services/task_result.py b/examples/kafka/pig-latin-translation/pig_latin_translation/additional_services/task_result.py new file mode 100644 index 00000000..81a226b6 --- /dev/null +++ b/examples/kafka/pig-latin-translation/pig_latin_translation/additional_services/task_result.py @@ -0,0 +1,89 @@ +import json +from pathlib import Path +from typing import Dict, Optional +from llama_agents import ( + CallableMessageConsumer, + QueueMessage, +) +from fastapi import FastAPI +from llama_agents.message_queues.base import BaseMessageQueue +from llama_agents.message_consumers.base import ( + BaseMessageQueueConsumer, + StartConsumingCallable, +) +from llama_agents.message_consumers.remote import RemoteMessageConsumer +from logging import getLogger + +logger = getLogger(__name__) + + +class TaskResultService: + """TaskResultService. + + This is a consumer service for the Task Result of the multi-agent system. + When a task is completed, the control plane sends a message containing the + TaskResult to a consumer of message type "human". + + This TaskResultService accepts those messages and appends the TaskResult + JSON object to the `task_results.jsonl` that is stored in the folder + `task_results` that gets created in the directory from which the service + is launched. + """ + + def __init__( + self, + message_queue: BaseMessageQueue, + name: str = "human", + host: str = "127.0.0.1", + port: Optional[int] = 8000, + ) -> None: + self.name = name + self.host = host + self.port = port + + self._message_queue = message_queue + + # app + self._app = FastAPI() + self._app.add_api_route( + "/", self.home, methods=["GET"], tags=["Human Consumer"] + ) + self._app.add_api_route( + "/process_message", + self.process_message, + methods=["POST"], + tags=["Human Consumer"], + ) + + @property + def message_queue(self) -> BaseMessageQueue: + return self._message_queue + + def as_consumer(self, remote: bool = False) -> BaseMessageQueueConsumer: + if remote: + return RemoteMessageConsumer( + url=( + f"http://{self.host}:{self.port}/process_message" + if self.port + else f"http://{self.host}/process_message" + ), + message_type=self.name, + ) + + return CallableMessageConsumer( + message_type=self.name, + handler=self.process_message, + ) + + async def process_message(self, message: QueueMessage) -> None: + Path("task_results").mkdir(exist_ok=True) + with open("task_results/task_results.jsonl", "+a") as f: + json.dump(message.model_dump(), f) + f.write("\n") + + async def home(self) -> Dict[str, str]: + return {"message": "hello, human."} + + async def register_to_message_queue(self) -> StartConsumingCallable: + """Register to the message queue.""" + return await self.message_queue.register_consumer(self.as_consumer(remote=True)) diff --git a/examples/kafka/pig-latin-translation/pig_latin_translation/agent_services/__init__.py b/examples/kafka/pig-latin-translation/pig_latin_translation/agent_services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/kafka/pig-latin-translation/pig_latin_translation/agent_services/correct_first_character_agent.py b/examples/kafka/pig-latin-translation/pig_latin_translation/agent_services/correct_first_character_agent.py new file mode 100644 index 00000000..4efc315e --- /dev/null +++ b/examples/kafka/pig-latin-translation/pig_latin_translation/agent_services/correct_first_character_agent.py @@ -0,0 +1,106 @@ +import asyncio +import uvicorn + +from llama_agents import AgentService +from llama_agents.message_queues.apache_kafka import KafkaMessageQueue + +from llama_index.core.agent import FunctionCallingAgentWorker +from llama_index.core.tools import FunctionTool +from llama_index.llms.openai import OpenAI + +from pig_latin_translation.utils import load_from_env +from pig_latin_translation.agent_services.decorators import exponential_delay + +from logging import getLogger + +logger = getLogger(__name__) + +message_queue_host = load_from_env("KAFKA_HOST") +message_queue_port = load_from_env("KAFKA_PORT") +control_plane_host = load_from_env("CONTROL_PLANE_HOST") +control_plane_port = load_from_env("CONTROL_PLANE_PORT") +correct_first_character_agent_host = load_from_env("FIRST_CHAR_AGENT_HOST") +correct_first_character_agent_port = load_from_env("FIRST_CHAR_AGENT_PORT") +localhost = load_from_env("LOCALHOST") + + +STARTUP_RATE = 1 + + +# create an agent +@exponential_delay(STARTUP_RATE) +def sync_correct_first_character(input: str) -> str: + """Corrects the first character.""" + logger.info(f"received task input: {input}") + tokens = input.split() + res = " ".join([t[-1] + t[0:-1] for t in tokens]) + logger.info(f"Corrected first character: {res}") + return res + + +@exponential_delay(STARTUP_RATE) +async def async_correct_first_character(input: str) -> str: + """Corrects the first character.""" + logger.info(f"received task input: {input}") + tokens = input.split() + res = " ".join([t[-1] + t[0:-1] for t in tokens]) + logger.info(f"Corrected first character: {res}") + return res + + +tool = FunctionTool.from_defaults( + fn=sync_correct_first_character, async_fn=async_correct_first_character +) +worker = FunctionCallingAgentWorker.from_tools( + [tool], llm=OpenAI(), max_function_calls=1 +) +agent = worker.as_agent() + +# create agent server +message_queue = KafkaMessageQueue.from_url_params( + host=message_queue_host, + port=int(message_queue_port) if message_queue_port else None, +) + +agent_server = AgentService( + agent=agent, + message_queue=message_queue, + description="Brings back the last character to the correct position.", + service_name="correct_first_character_agent", + host=correct_first_character_agent_host, + port=( + int(correct_first_character_agent_port) + if correct_first_character_agent_port + else None + ), +) + +app = agent_server._app + + +# launch +async def launch() -> None: + # register to message queue + start_consuming_callable = await agent_server.register_to_message_queue() + _ = asyncio.create_task(start_consuming_callable()) + + # register to control plane + await agent_server.register_to_control_plane( + control_plane_url=( + f"http://{control_plane_host}:{control_plane_port}" + if control_plane_port + else f"http://{control_plane_host}" + ) + ) + + cfg = uvicorn.Config( + agent_server._app, + host=localhost, + port=agent_server.port, + ) + server = uvicorn.Server(cfg) + await server.serve() + + +if __name__ == "__main__": + asyncio.run(launch()) diff --git a/examples/kafka/pig-latin-translation/pig_latin_translation/agent_services/decorators.py b/examples/kafka/pig-latin-translation/pig_latin_translation/agent_services/decorators.py new file mode 100644 index 00000000..67f1d6aa --- /dev/null +++ b/examples/kafka/pig-latin-translation/pig_latin_translation/agent_services/decorators.py @@ -0,0 +1,69 @@ +import asyncio +import time +import numpy as np +import functools +from typing import Any, Callable + +from logging import getLogger + +logger = getLogger(__name__) + + +def exponential_delay(exponential_rate: float) -> Callable: + """Wrapper for exponential tool.""" + + def decorator(func: Callable) -> Callable: + @functools.wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> str: + # random delay + delay = np.random.exponential(exponential_rate) + logger.info(f"waiting for {delay} seconds") + time.sleep(delay) + return func(*args, **kwargs) + + @functools.wraps(func) + async def async_wrapper(*args: Any, **kwargs: Any) -> str: + # random delay + delay = np.random.exponential(exponential_rate) + logger.info(f"waiting for {delay} seconds") + # await asyncio.sleep(delay) + return await func(*args, **kwargs) + + return async_wrapper if asyncio.iscoroutinefunction(func) else wrapper + + return decorator + + +async def main() -> None: + @exponential_delay(2) + async def get_the_secret_fact() -> str: + """Returns the secret fact.""" + return "The secret fact is: A baby llama is called a 'Cria'." + + @exponential_delay(1) + async def async_correct_first_character(input: str) -> str: + """Corrects the first character.""" + tokens = input.split() + return " ".join([t[-1] + t[0:-1] for t in tokens]) + + @exponential_delay(0.5) + async def async_remove_ay_suffix(input: str) -> str: + """Removes 'ay' suffix from each token in the input_sentence. + + Params: + input_sentence (str): The input sentence i.e., sequence of words + """ + logger.info(f"received task input: {input}") + tokens = input.split() + res = " ".join([t[:-2] for t in tokens]) + logger.info(f"Removed 'ay' suffix: {res}") + return res + + output = await async_remove_ay_suffix(input="eyhay ouyay") + + print(output) + print(async_remove_ay_suffix.__doc__) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/kafka/pig-latin-translation/pig_latin_translation/agent_services/remove_ay_agent.py b/examples/kafka/pig-latin-translation/pig_latin_translation/agent_services/remove_ay_agent.py new file mode 100644 index 00000000..e8395ea5 --- /dev/null +++ b/examples/kafka/pig-latin-translation/pig_latin_translation/agent_services/remove_ay_agent.py @@ -0,0 +1,115 @@ +import asyncio +import uvicorn + +from llama_agents import AgentService +from llama_agents.message_queues.apache_kafka import KafkaMessageQueue + +from llama_index.core.agent import FunctionCallingAgentWorker +from llama_index.core.tools import FunctionTool +from llama_index.llms.openai import OpenAI + +from pig_latin_translation.utils import load_from_env +from pig_latin_translation.agent_services.decorators import exponential_delay + +from logging import getLogger + +logger = getLogger(__name__) + +message_queue_host = load_from_env("KAFKA_HOST") +message_queue_port = load_from_env("KAFKA_PORT") +control_plane_host = load_from_env("CONTROL_PLANE_HOST") +control_plane_port = load_from_env("CONTROL_PLANE_PORT") +remove_ay_agent_host = load_from_env("AY_AGENT_HOST") +remove_ay_agent_port = load_from_env("AY_AGENT_PORT") +localhost = load_from_env("LOCALHOST") + + +STARTUP_RATE = 3 +SYSTEM_PROMPT = """Pass the entire sentence to the remove 'ay' suffix tool. +The tool will remove 'ay' from every word in the sentence. +Do not send tokens one at a time to the tool! +Do not call the tool more than once! +""" + + +# create an agent +@exponential_delay(STARTUP_RATE) +def sync_remove_ay_suffix(input_sentence: str) -> str: + """Removes 'ay' suffix from each token in the input_sentence. + + Params: + input_sentence (str): The input sentence i.e., sequence of words + """ + logger.info(f"received task input: {input_sentence}") + tokens = input_sentence.split() + res = " ".join([t[:-2] for t in tokens]) + logger.info(f"Removed 'ay' suffix: {res}") + return res + + +@exponential_delay(STARTUP_RATE) +async def async_remove_ay_suffix(input_sentence: str) -> str: + """Removes 'ay' suffix from each token in the input_sentence. + + Params: + input_sentence (str): The input sentence i.e., sequence of words + """ + logger.info(f"received task input: {input_sentence}") + tokens = input_sentence.split() + res = " ".join([t[:-2] for t in tokens]) + logger.info(f"Removed 'ay' suffix: {res}") + return res + + +tool = FunctionTool.from_defaults( + fn=sync_remove_ay_suffix, async_fn=async_remove_ay_suffix +) +worker = FunctionCallingAgentWorker.from_tools( + [tool], llm=OpenAI(), system_prompt=SYSTEM_PROMPT, max_function_calls=1 +) +agent = worker.as_agent() + +# create agent server +message_queue = KafkaMessageQueue.from_url_params( + host=message_queue_host, + port=int(message_queue_port) if message_queue_port else None, +) + +agent_server = AgentService( + agent=agent, + message_queue=message_queue, + description="Removes the 'ay' suffix from each token from a provided input_sentence.", + service_name="remove_ay_agent", + host=remove_ay_agent_host, + port=int(remove_ay_agent_port) if remove_ay_agent_port else None, +) + +app = agent_server._app + + +# launch +async def launch() -> None: + # register to message queue + start_consuming_callable = await agent_server.register_to_message_queue() + _ = asyncio.create_task(start_consuming_callable()) + + # register to control plane + await agent_server.register_to_control_plane( + control_plane_url=( + f"http://{control_plane_host}:{control_plane_port}" + if control_plane_port + else f"http://{control_plane_host}" + ) + ) + + cfg = uvicorn.Config( + agent_server._app, + host=localhost, + port=agent_server.port, + ) + server = uvicorn.Server(cfg) + await server.serve() + + +if __name__ == "__main__": + asyncio.run(launch()) diff --git a/examples/kafka/pig-latin-translation/pig_latin_translation/core_services/__init__.py b/examples/kafka/pig-latin-translation/pig_latin_translation/core_services/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/examples/kafka/pig-latin-translation/pig_latin_translation/core_services/control_plane.py b/examples/kafka/pig-latin-translation/pig_latin_translation/core_services/control_plane.py new file mode 100644 index 00000000..b748c558 --- /dev/null +++ b/examples/kafka/pig-latin-translation/pig_latin_translation/core_services/control_plane.py @@ -0,0 +1,71 @@ +import asyncio +import uvicorn + +from llama_agents import ControlPlaneServer, PipelineOrchestrator, ServiceComponent +from llama_agents.message_queues.apache_kafka import KafkaMessageQueue +from llama_index.core.query_pipeline import QueryPipeline + +from pig_latin_translation.utils import load_from_env +from pig_latin_translation.agent_services.remove_ay_agent import ( + agent_server as remove_ay_agent_server, +) +from pig_latin_translation.agent_services.correct_first_character_agent import ( + agent_server as correct_first_character_agent_server, +) + +message_queue_host = load_from_env("KAFKA_HOST") +message_queue_port = load_from_env("KAFKA_PORT") +control_plane_host = load_from_env("CONTROL_PLANE_HOST") +control_plane_port = load_from_env("CONTROL_PLANE_PORT") +localhost = load_from_env("LOCALHOST") + + +# setup message queue +message_queue = KafkaMessageQueue.from_url_params( + host=message_queue_host, + port=int(message_queue_port) if message_queue_port else None, +) + +# setup control plane +remove_ay_agent_component = ServiceComponent.from_service_definition( + remove_ay_agent_server +) +correct_first_character_agent_component = ServiceComponent.from_service_definition( + correct_first_character_agent_server +) + +pipeline = QueryPipeline( + chain=[ + remove_ay_agent_component, + correct_first_character_agent_component, + ] +) + +pipeline_orchestrator = PipelineOrchestrator(pipeline) + +control_plane = ControlPlaneServer( + message_queue=message_queue, + orchestrator=pipeline_orchestrator, + host=control_plane_host, + port=int(control_plane_port) if control_plane_port else None, +) +app = control_plane.app + + +# launch +async def launch() -> None: + # register to message queue and start consuming + start_consuming_callable = await control_plane.register_to_message_queue() + _ = asyncio.create_task(start_consuming_callable()) + + cfg = uvicorn.Config( + control_plane.app, + host=localhost, + port=control_plane.port, + ) + server = uvicorn.Server(cfg) + await server.serve() + + +if __name__ == "__main__": + asyncio.run(launch()) diff --git a/examples/kafka/pig-latin-translation/pig_latin_translation/core_services/message_queue.py b/examples/kafka/pig-latin-translation/pig_latin_translation/core_services/message_queue.py new file mode 100644 index 00000000..c634ccb1 --- /dev/null +++ b/examples/kafka/pig-latin-translation/pig_latin_translation/core_services/message_queue.py @@ -0,0 +1,10 @@ +from llama_agents.message_queues.apache_kafka import KafkaMessageQueue +from pig_latin_translation.utils import load_from_env + +message_queue_host = load_from_env("KAFKA_HOST") +message_queue_port = load_from_env("KAFKA_PORT") + +message_queue = KafkaMessageQueue.from_url_params( + host=message_queue_host, + port=int(message_queue_port) if message_queue_port else None, +) diff --git a/examples/kafka/pig-latin-translation/pig_latin_translation/local_launcher.py b/examples/kafka/pig-latin-translation/pig_latin_translation/local_launcher.py new file mode 100644 index 00000000..4abb6e9d --- /dev/null +++ b/examples/kafka/pig-latin-translation/pig_latin_translation/local_launcher.py @@ -0,0 +1,26 @@ +from llama_agents import ServerLauncher + +from pig_latin_translation.core_services.message_queue import message_queue +from pig_latin_translation.core_services.control_plane import control_plane +from pig_latin_translation.agent_services.remove_ay_agent import ( + agent_server as remove_ay_agent_server, +) +from pig_latin_translation.agent_services.correct_first_character_agent import ( + agent_server as correct_first_character_agent_server, +) +from pig_latin_translation.additional_services.human_consumer import ( + human_consumer_server, +) + + +# launch it +launcher = ServerLauncher( + [remove_ay_agent_server, correct_first_character_agent_server], + control_plane, + message_queue, + additional_consumers=[human_consumer_server.as_consumer()], +) + + +if __name__ == "__main__": + launcher.launch_servers() diff --git a/examples/kafka/pig-latin-translation/pig_latin_translation/utils.py b/examples/kafka/pig-latin-translation/pig_latin_translation/utils.py new file mode 100644 index 00000000..b566803b --- /dev/null +++ b/examples/kafka/pig-latin-translation/pig_latin_translation/utils.py @@ -0,0 +1,9 @@ +import os + + +def load_from_env(var: str) -> str: + try: + res = os.environ[var] + except KeyError: + raise ValueError(f"Missing env var '{var}'.") + return res diff --git a/examples/kafka/pig-latin-translation/pyproject.toml b/examples/kafka/pig-latin-translation/pyproject.toml new file mode 100644 index 00000000..d4239330 --- /dev/null +++ b/examples/kafka/pig-latin-translation/pyproject.toml @@ -0,0 +1,17 @@ +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" + +[tool.poetry] +name = "pig-latin-translation" +version = "0.1.0" +description = "" +authors = ["Andrei Fajardo "] +readme = "README.md" + +[tool.poetry.dependencies] +python = "^3.10" +uvicorn = "^0.30.3" +llama-index-llms-openai = "^0.1.26" +llama-index-agent-openai = "^0.2.9" +llama-agents = {version = "^0.0.11", extras = ["kafka"]} diff --git a/examples/kafka/pig-latin-translation/scripts/simulation.py b/examples/kafka/pig-latin-translation/scripts/simulation.py new file mode 100644 index 00000000..d043bfaf --- /dev/null +++ b/examples/kafka/pig-latin-translation/scripts/simulation.py @@ -0,0 +1,46 @@ +"""Module for sending simulated tasks to pig-latin translation system.""" + +import asyncio +import numpy as np +import random + +from llama_agents import LlamaAgentsClient +from llama_index.llms.openai import OpenAI +from llama_index.core.llms import LLM + + +def pig_latin(text: str) -> str: + tokens = text.lower().split() + tmp = [] + for token in tokens: + token = token[1:] + token[0] + "ay" + tmp.append(token) + return " ".join(tmp) + + +async def send_new_task(client: LlamaAgentsClient, llm: LLM) -> None: + seed = random.random() + num_tasks = np.random.poisson(2) + for _ in range(num_tasks): + response = await llm.acomplete( + f"({seed}) Provide a 3 to 5 word phrase. Don't include any punctuation." + ) + task = pig_latin(response.text) + print(f"text: {response.text}, task: {task}") + client.create_task(task) + + +async def main() -> None: + client = LlamaAgentsClient("http://0.0.0.0:8001") + llm = OpenAI("gpt-4o", temperature=1) + try: + while True: + interarrival_time = np.random.exponential(3) + await asyncio.sleep(interarrival_time) + await send_new_task(client, llm) + except KeyboardInterrupt: + print("Shutting down.") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/kafka/pig-latin-translation/template.env.docker b/examples/kafka/pig-latin-translation/template.env.docker new file mode 100644 index 00000000..6cfe44cd --- /dev/null +++ b/examples/kafka/pig-latin-translation/template.env.docker @@ -0,0 +1,12 @@ +KAFKA_HOST="kafka" # don't modify +KAFKA_PORT=19092 +CONTROL_PLANE_HOST="control_plane" # don't modify +CONTROL_PLANE_PORT=8001 +AY_AGENT_HOST="ay_agent" # don't modify +AY_AGENT_PORT=8002 +FIRST_CHAR_AGENT_HOST= "first_char_agent" # don't modify +FIRST_CHAR_AGENT_PORT=8003 +HUMAN_CONSUMER_HOST="human_consumer" # don't modify +HUMAN_CONSUMER_PORT=8004 +LOCALHOST="0.0.0.0" +OPENAI_API_KEY= diff --git a/examples/kafka/pig-latin-translation/template.env.local b/examples/kafka/pig-latin-translation/template.env.local new file mode 100644 index 00000000..3b08bb47 --- /dev/null +++ b/examples/kafka/pig-latin-translation/template.env.local @@ -0,0 +1,12 @@ +KAFKA_HOST="localhost" # don't modify +KAFKA_PORT=9092 +CONTROL_PLANE_HOST="0.0.0.0" # don't modify +CONTROL_PLANE_PORT=8001 +AY_AGENT_HOST="0.0.0.0" # don't modify +AY_AGENT_PORT=8002 +FIRST_CHAR_AGENT_HOST="0.0.0.0" # don't modify +FIRST_CHAR_AGENT_PORT=8003 +HUMAN_CONSUMER_HOST="0.0.0.0" # don't modify +HUMAN_CONSUMER_PORT=8004 +LOCALHOST="0.0.0.0" +OPENAI_API_KEY= diff --git a/examples/kafka/simple-scripts/local_launcher_human_single.py b/examples/kafka/simple-scripts/local_launcher_human_single.py new file mode 100644 index 00000000..ed217023 --- /dev/null +++ b/examples/kafka/simple-scripts/local_launcher_human_single.py @@ -0,0 +1,62 @@ +from llama_agents import ( + AgentService, + HumanService, + ControlPlaneServer, + PipelineOrchestrator, + ServiceComponent, + LocalLauncher, +) +from llama_agents.message_queues.apache_kafka import KafkaMessageQueue + +from llama_index.core.agent import FunctionCallingAgentWorker +from llama_index.core.tools import FunctionTool +from llama_index.core.query_pipeline import RouterComponent, QueryPipeline +from llama_index.llms.openai import OpenAI +from llama_index.core.selectors import PydanticSingleSelector + + +# create an agent +def get_the_secret_fact() -> str: + """Returns the secret fact.""" + return "The secret fact is: A baby llama is called a 'Cria'." + + +tool = FunctionTool.from_defaults(fn=get_the_secret_fact) + +# create our multi-agent framework components +message_queue = KafkaMessageQueue() + +worker = FunctionCallingAgentWorker.from_tools([tool], llm=OpenAI()) +agent = worker.as_agent() +agent_service = AgentService( + agent=agent, + message_queue=message_queue, + description="Useful for getting the secret fact.", + service_name="secret_fact_agent", +) +agent_component = ServiceComponent.from_service_definition(agent_service) + +human_service = HumanService( + message_queue=message_queue, description="Answers queries about math." +) +human_component = ServiceComponent.from_service_definition(human_service) + +pipeline = QueryPipeline( + chain=[ + RouterComponent( + selector=PydanticSingleSelector.from_defaults(llm=OpenAI()), + choices=[agent_service.description, human_service.description], + components=[agent_component, human_component], + ) + ] +) + +pipeline_orchestrator = PipelineOrchestrator(pipeline) + +control_plane = ControlPlaneServer(message_queue, pipeline_orchestrator) + +# launch it +launcher = LocalLauncher([agent_service, human_service], control_plane, message_queue) +result = launcher.launch_single("What is 1 + 2 + 3 + 4 + 5?") + +print(f"Result: {result}") diff --git a/llama_agents/launchers/local.py b/llama_agents/launchers/local.py index c570d443..076fce97 100644 --- a/llama_agents/launchers/local.py +++ b/llama_agents/launchers/local.py @@ -200,6 +200,7 @@ async def alaunch_single(self, initial_task: str) -> str: # shutdown tasks for task in bg_tasks + start_consuming_tasks: task.cancel() + await asyncio.sleep(0.1) # clean up registered services for service in self.services: diff --git a/llama_agents/message_queues/apache_kafka.py b/llama_agents/message_queues/apache_kafka.py new file mode 100644 index 00000000..fe55ece9 --- /dev/null +++ b/llama_agents/message_queues/apache_kafka.py @@ -0,0 +1,266 @@ +"""Apache Kafka Message Queue.""" + +import asyncio +import json +from logging import getLogger +from typing import Any, Callable, Coroutine, Dict, List, Optional +from llama_agents import CallableMessageConsumer, QueueMessage +from llama_agents.message_queues.base import ( + BaseMessageQueue, +) +from llama_agents.message_consumers.base import ( + BaseMessageQueueConsumer, +) + +import logging + +logger = getLogger(__name__) +logger.setLevel(logging.INFO) + + +DEFAULT_URL = "localhost:9092" +DEFAULT_TOPIC_PARTITIONS = 10 +DEFAULT_TOPIC_REPLICATION_FACTOR = 1 +DEFAULT_GROUP_ID = "default_group" # single group for competing consumers + + +class KafkaMessageQueue(BaseMessageQueue): + """Apache Kafka integration with aiokafka. + + This class implements a traditional message broker using Apache Kafka. + - Topics are created with N partitions + - Consumers are registered to a single group to implement a competing + consumer scheme where only one consumer subscribed to a topic gets the + message + - Default round-robin assignment is used + + Attributes: + url (str): The broker url string to connect to the Kafka server + + Examples: + ```python + from llama_agents.message_queues.apache_kafka import KafkaMessageQueue + + message_queue = KafkaMessageQueue() # uses the default url + ``` + """ + + url: str = DEFAULT_URL + + def __init__( + self, + url: str = DEFAULT_URL, + ) -> None: + super().__init__(url=url) + + @classmethod + def from_url_params( + cls, + host: str, + port: Optional[int] = None, + ) -> "KafkaMessageQueue": + """Convenience constructor from url params. + + Args: + host (str): host for rabbitmq server + port (Optional[int], optional): port for rabbitmq server. Defaults to None. + + Returns: + KafkaMessageQueue: An Apache Kafka MessageQueue integration. + """ + url = f"{host}:{port}" if port else f"{host}" + return cls(url=url) + + def _create_new_topic( + self, + topic_name: str, + num_partitions: Optional[int] = None, + replication_factor: Optional[int] = None, + **kwargs: Dict[str, Any], + ) -> None: + """Create a new topic. + + Use kafka-python-ng instead of aio-kafka as latter has issues with + resolving api_version with broker. + + TODO: convert to aiokafka once this it is resolved there. + """ + try: + from kafka.admin import KafkaAdminClient, NewTopic + from kafka.errors import TopicAlreadyExistsError + except ImportError: + raise ImportError( + "kafka-python-ng is not installed. " + "Please install it using `pip install kafka-python-ng`." + ) + + admin_client = KafkaAdminClient(bootstrap_servers=self.url) + try: + topic = NewTopic( + name=topic_name, + num_partitions=num_partitions or DEFAULT_TOPIC_PARTITIONS, + replication_factor=replication_factor + or DEFAULT_TOPIC_REPLICATION_FACTOR, + **kwargs, + ) + admin_client.create_topics(new_topics=[topic]) + logger.info(f"New topic {topic_name} created.") + except TopicAlreadyExistsError: + logger.info(f"Topic {topic_name} already exists.") + pass + + async def _publish(self, message: QueueMessage) -> Any: + """Publish message to the queue.""" + try: + from aiokafka import AIOKafkaProducer + except ImportError: + raise ImportError( + "aiokafka is not installed. " + "Please install it using `pip install aiokafka`." + ) + + producer = AIOKafkaProducer(bootstrap_servers=self.url) + await producer.start() + try: + message_body = json.dumps(message.model_dump()).encode("utf-8") + await producer.send_and_wait(message.type, message_body) + logger.info(f"published message {message.id_}") + finally: + await producer.stop() + + async def cleanup_local( + self, message_types: List[str], *args: Any, **kwargs: Dict[str, Any] + ) -> None: + """Cleanup for local runs. + + Use kafka-python-ng instead of aio-kafka as latter has issues with + resolving api_version with broker when using admin client. + + TODO: convert to aiokafka once this it is resolved there. + """ + try: + from kafka.admin import KafkaAdminClient + except ImportError: + raise ImportError( + "aiokafka is not installed. " + "Please install it using `pip install aiokafka`." + ) + + admin_client = KafkaAdminClient(bootstrap_servers=self.url) + active_topics = admin_client.list_topics() + topics_to_delete = [el for el in message_types if el in active_topics] + admin_client.delete_consumer_groups(DEFAULT_GROUP_ID) + if topics_to_delete: + admin_client.delete_topics(topics_to_delete) + + async def deregister_consumer(self, consumer: BaseMessageQueueConsumer) -> Any: + """Deregister a consumer.""" + pass + + async def launch_local(self) -> asyncio.Task: + """Launch the message queue locally, in-process. + + Launches a dummy task. + """ + return asyncio.create_task(self.processing_loop()) + + async def launch_server(self) -> None: + """Launch server.""" + pass + + async def processing_loop(self) -> None: + pass + + async def register_consumer( + self, consumer: BaseMessageQueueConsumer + ) -> Callable[..., Coroutine[Any, Any, None]]: + """Register a new consumer.""" + try: + from aiokafka import AIOKafkaConsumer + except ImportError: + raise ImportError( + "aiokafka is not installed. " + "Please install it using `pip install aiokafka`." + ) + + # register topic + self._create_new_topic(consumer.message_type) + kafka_consumer = AIOKafkaConsumer( + consumer.message_type, + bootstrap_servers=self.url, + group_id=DEFAULT_GROUP_ID, + auto_offset_reset="earliest", + ) + await kafka_consumer.start() + + logger.info( + f"Registered consumer {consumer.id_}: {consumer.message_type}", + ) + + async def start_consuming_callable() -> None: + """StartConsumingCallable.""" + try: + async for msg in kafka_consumer: + decoded_message = json.loads(msg.value.decode("utf-8")) + queue_message = QueueMessage.model_validate(decoded_message) + await consumer.process_message(queue_message) + finally: + stop_task = asyncio.create_task(kafka_consumer.stop()) + stop_task.add_done_callback( + lambda _: logger.info( + f"stopped kafka consumer {consumer.id_}: {consumer.message_type}" + ) + ) + await asyncio.shield(stop_task) + + return start_consuming_callable + + +if __name__ == "__main__": + # for testing + import argparse + import sys + + logging.basicConfig(stream=sys.stdout, level=logging.INFO) + logger.addHandler(logging.StreamHandler(stream=sys.stdout)) + + parser = argparse.ArgumentParser() + parser.add_argument("--produce", action="store_true", default=False) + parser.add_argument("--consume", action="store_true", default=False) + parser.add_argument("--clean-up", action="store_true", default=False) + + args = parser.parse_args() + + async def consume() -> None: + mq = KafkaMessageQueue() + + # register a sample consumer + def message_handler(message: QueueMessage) -> None: + print(f"MESSAGE: {message}") + + test_consumer = CallableMessageConsumer( + message_type="test", handler=message_handler + ) + + start_consuming_callable = await mq.register_consumer(test_consumer) + await start_consuming_callable() + + async def produce() -> None: + mq = KafkaMessageQueue() + mq._create_new_topic(topic_name="test") + + test_message = QueueMessage(type="test", data={"message": "this is a test"}) + await mq.publish(test_message) + + async def clean_up() -> None: + mq = KafkaMessageQueue() + await mq.cleanup_local(["test"]) + + if args.produce: + asyncio.run(produce()) + + if args.consume: + asyncio.run(consume()) + + if args.clean_up: + asyncio.run(clean_up()) diff --git a/poetry.lock b/poetry.lock index 572a9059..aafdb40d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. [[package]] name = "aio-pika" @@ -111,6 +111,58 @@ yarl = ">=1.0,<2.0" [package.extras] speedups = ["Brotli", "aiodns", "brotlicffi"] +[[package]] +name = "aiokafka" +version = "0.11.0" +description = "Kafka integration with asyncio" +optional = false +python-versions = ">=3.8" +files = [ + {file = "aiokafka-0.11.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:926f93fb6a39891fd4364494432b479c0602f9cac708778d4a262a2c2e20d3b4"}, + {file = "aiokafka-0.11.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:38e1917e706c1158d5e1f612d1fc1b40f706dc46c534e73ab4de8ae2868a31be"}, + {file = "aiokafka-0.11.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:516e1d68d9a377860b2e17453580afe304605bc71894f684d3e7b6618f6f939f"}, + {file = "aiokafka-0.11.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:acfd0a5d0aec762ba73eeab73b23edce14f315793f063b6a4b223b6f79e36bb8"}, + {file = "aiokafka-0.11.0-cp310-cp310-win32.whl", hash = "sha256:0d80590c4ef0ba546a299cee22ea27c3360c14241ec43a8e6904653f7b22d328"}, + {file = "aiokafka-0.11.0-cp310-cp310-win_amd64.whl", hash = "sha256:1d519bf9875ac867fb19d55de3750833b1eb6379a08de29a68618e24e6a49fc0"}, + {file = "aiokafka-0.11.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:0e957b42ae959365efbb45c9b5de38032c573608553c3670ad8695cc210abec9"}, + {file = "aiokafka-0.11.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:224db2447f6c1024198d8342e7099198f90401e2fa29c0762afbc51eadf5c490"}, + {file = "aiokafka-0.11.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6ef3e7c8a923e502caa4d24041f2be778fd7f9ee4587bf0bcb4f74cac05122fa"}, + {file = "aiokafka-0.11.0-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:59f4b935589ebb244620afad8bf3320e3bc86879a8b1c692ad06bd324f6c6127"}, + {file = "aiokafka-0.11.0-cp311-cp311-win32.whl", hash = "sha256:560839ae6bc13e71025d71e94df36980f5c6e36a64916439e598b6457267a37f"}, + {file = "aiokafka-0.11.0-cp311-cp311-win_amd64.whl", hash = "sha256:1f8ae91f0373830e4664376157fe61b611ca7e573d8a559b151aef5bf53df46c"}, + {file = "aiokafka-0.11.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:4e0cc080a7f4c659ee4e1baa1c32adedcccb105a52156d4909f357d76fac0dc1"}, + {file = "aiokafka-0.11.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:55a07a39d82c595223a17015ea738d152544cee979d3d6d822707a082465621c"}, + {file = "aiokafka-0.11.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3711fa64ee8640dcd4cb640f1030f9439d02e85acd57010d09053017092d8cc2"}, + {file = "aiokafka-0.11.0-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:818a6f8e44b02113b9e795bee6029c8a4e525ab38f29d7adb0201f3fec74c808"}, + {file = "aiokafka-0.11.0-cp312-cp312-win32.whl", hash = "sha256:8ba981956243767b37c929845c398fda2a2e35a4034d218badbe2b62e6f98f96"}, + {file = "aiokafka-0.11.0-cp312-cp312-win_amd64.whl", hash = "sha256:9a478a14fd23fd1ffe9c7a21238d818b5f5e0626f7f06146b687f3699298391b"}, + {file = "aiokafka-0.11.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:0973a245b8b9daf8ef6814253a80a700f1f54d2da7d88f6fe479f46e0fd83053"}, + {file = "aiokafka-0.11.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ee0c61a2dcabbe4474ff237d708f9bd663dd2317e03a9cb7239a212c9ee05b12"}, + {file = "aiokafka-0.11.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:230170ce2e8a0eb852e2e8b78b08ce2e29b77dfe2c51bd56f5ab4be0f332a63b"}, + {file = "aiokafka-0.11.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:eac78a009b713e28b5b4c4daae9d062acbf2b7980e5734467643a810134583b5"}, + {file = "aiokafka-0.11.0-cp38-cp38-win32.whl", hash = "sha256:73584be8ba7906e3f33ca0f08f6af21a9ae31b86c6b635b93db3b1e6f452657b"}, + {file = "aiokafka-0.11.0-cp38-cp38-win_amd64.whl", hash = "sha256:d724b6fc484e453b373052813e4e543fc028a22c3fbda10e13b6829740000b8a"}, + {file = "aiokafka-0.11.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:419dd28c8ed6e926061bdc60929af08a6b52f1721e1179d9d21cc72ae28fd6f6"}, + {file = "aiokafka-0.11.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f1c85f66eb3564c5e74d8e4c25df4ac1fd94f1a6f6e66f005aafa6f791bde215"}, + {file = "aiokafka-0.11.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:eaafe134de57b184f3c030e1a11051590caff7953c8bf58048eefd8d828e39d7"}, + {file = "aiokafka-0.11.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:807f699cf916369b1a512e4f2eaec714398c202d8803328ef8711967d99a56ce"}, + {file = "aiokafka-0.11.0-cp39-cp39-win32.whl", hash = "sha256:d59fc7aec088c9ffc02d37e61591f053459bd11912cf04c70ac4f7e60405667d"}, + {file = "aiokafka-0.11.0-cp39-cp39-win_amd64.whl", hash = "sha256:702aec15b63bad5e4476294bcb1cb177559149fce3e59335794f004c279cbd6a"}, + {file = "aiokafka-0.11.0.tar.gz", hash = "sha256:f2def07fe1720c4fe37c0309e355afa9ff4a28e0aabfe847be0692461ac69352"}, +] + +[package.dependencies] +async-timeout = "*" +packaging = "*" +typing-extensions = ">=4.10.0" + +[package.extras] +all = ["cramjam (>=2.8.0)", "gssapi"] +gssapi = ["gssapi"] +lz4 = ["cramjam (>=2.8.0)"] +snappy = ["cramjam"] +zstd = ["cramjam"] + [[package]] name = "aiormq" version = "6.8.0" @@ -780,6 +832,24 @@ files = [ {file = "joblib-1.4.2.tar.gz", hash = "sha256:2382c5816b2636fbd20a09e0f4e9dad4736765fdfb7dca582943b9c1366b3f0e"}, ] +[[package]] +name = "kafka-python-ng" +version = "2.2.2" +description = "Pure Python client for Apache Kafka" +optional = false +python-versions = ">=3.8" +files = [ + {file = "kafka-python-ng-2.2.2.tar.gz", hash = "sha256:87ad3a766e2c0bec71d9b99bdd9e9c5cda62d96cfda61a8ca16510484d6ad7d4"}, + {file = "kafka_python_ng-2.2.2-py2.py3-none-any.whl", hash = "sha256:3fab1a03133fade1b6fd5367ff726d980e59031c4aaca9bf02c516840a4f8406"}, +] + +[package.extras] +boto = ["botocore"] +crc32c = ["crc32c"] +lz4 = ["lz4"] +snappy = ["python-snappy"] +zstd = ["zstandard"] + [[package]] name = "linkify-it-py" version = "2.0.3" @@ -1244,8 +1314,8 @@ files = [ [package.dependencies] numpy = [ {version = ">=1.20.3", markers = "python_version < \"3.10\""}, - {version = ">=1.21.0", markers = "python_version >= \"3.10\" and python_version < \"3.11\""}, {version = ">=1.23.2", markers = "python_version >= \"3.11\""}, + {version = ">=1.21.0", markers = "python_version >= \"3.10\" and python_version < \"3.11\""}, ] python-dateutil = ">=2.8.2" pytz = ">=2020.1" @@ -2369,10 +2439,11 @@ idna = ">=2.0" multidict = ">=4.0" [extras] +kafka = ["aiokafka", "kafka-python-ng"] rabbitmq = ["aio-pika"] redis = ["redis"] [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<4.0" -content-hash = "c124f8aebf627760c96a1b494e7dff8194e162075be9c390fe7f17f667474c10" +content-hash = "e14f5d38a2e250f9b8106c10c8188bad80217f63de095e95f8b41feab11b7222" diff --git a/pyproject.toml b/pyproject.toml index 82f3b770..59854371 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,8 +24,11 @@ aio-pika = {version = "^9.4.2", optional = true} redis = {version = "^5.0.7", optional = true} uvicorn = "^0.30.1" pytest-mock = "^3.14.0" +aiokafka = {version = "^0.11.0", optional = true} +kafka-python-ng = {version = "^2.2.2", optional = true} [tool.poetry.extras] +kafka = ["aiokafka", "kafka-python-ng"] rabbitmq = ["aio-pika"] redis = ["redis"] @@ -37,6 +40,8 @@ aio-pika = "^9.4.2" redis = "^5.0.7" pytest-cov = "^5.0.0" coverage = "^7.6.0" +aiokafka = "^0.11.0" +kafka-python-ng = "^2.2.2" [tool.poetry.scripts] llama-agents = 'llama_agents.cli.command_line:main' diff --git a/tests/message_queues/test_apache_kafka.py b/tests/message_queues/test_apache_kafka.py new file mode 100644 index 00000000..4b619c70 --- /dev/null +++ b/tests/message_queues/test_apache_kafka.py @@ -0,0 +1,57 @@ +import json +import pytest +from unittest.mock import patch, AsyncMock +from llama_agents import QueueMessage +from llama_agents.message_queues.apache_kafka import KafkaMessageQueue + + +try: + import aiokafka +except (ModuleNotFoundError, ImportError): + aiokafka = None + + +def test_init() -> None: + # arrange/act + mq = KafkaMessageQueue(url="0.0.0.0:5555") + + # assert + assert mq.url == "0.0.0.0:5555" + + +def test_from_url_params() -> None: + # arrange + host = "mock-host" + port = 8080 + + # act + mq = KafkaMessageQueue.from_url_params(host=host, port=port) + + # assert + assert mq.url == f"{host}:{port}" + + +@pytest.mark.asyncio() +@pytest.mark.skipif(aiokafka is None, reason="aiokafka not installed") +async def test_publish() -> None: + from aiokafka import AIOKafkaProducer + + # Arrange + mq = KafkaMessageQueue() + + # message types + queue_message = QueueMessage(publisher_id="test", id_="1") + message_body = json.dumps(queue_message.model_dump()).encode("utf-8") + + with patch.object(AIOKafkaProducer, "start", new_callable=AsyncMock) as mock_start: + with patch.object( + AIOKafkaProducer, "send_and_wait", new_callable=AsyncMock + ) as mock_send_and_wait: + # Act + _ = await mq._publish(queue_message) + + # Assert + mock_start.assert_awaited_once() + mock_send_and_wait.assert_awaited_once_with( + queue_message.type, message_body + )