Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make the KV store configurable via env vars #391

Merged
merged 8 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions examples/redis_state_store/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Using Redis as State Store

> [!NOTE]
> This example is mostly based on the [Quick Start](../quick_start/README.md), see there for more details.

We'll be deploying a simple workflow on a local instance of Llama Deploy using Redis as a scalable storage for the
global state. This is mostly needed when you have more than one control plane running concurrently.
masci marked this conversation as resolved.
Show resolved Hide resolved

Before starting Llama Deploy, use Docker compose to start the Redis container and run it in the background:

```
$ docker compose up -d
```

Make sure to install the package to support the Redis KV store in the virtual environment where we'll run Llama Deploy:

```
$ pip install -r requirements.txt
```

This is the code defining our deployment, with comments to the relevant bits:

```yaml
name: QuickStart

control-plane:
port: 8000
# Here we tell the Control Plane to use Redis
state_store_uri: redis://localhost:6379
logan-markewich marked this conversation as resolved.
Show resolved Hide resolved

default-service: echo_workflow

services:
echo_workflow:
name: Echo Workflow
source:
type: local
name: ./src
path: workflow:echo_workflow
```

Note how we provide a connection URI for Redis in the `state_store_uri` field of the control plane configuration.

At this point we have all we need to run this deployment. Ideally, we would have the API server already running
somewhere in the cloud, but to get started let's start an instance locally. Run the following python script
from a shell:

```
$ python -m llama_deploy.apiserver
INFO: Started server process [10842]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:4501 (Press CTRL+C to quit)
```

From another shell, use the CLI, `llamactl`, to create the deployment:

```
$ llamactl deploy quick_start.yml
Deployment successful: QuickStart
```

Our workflow is now part of the `QuickStart` deployment and ready to serve requests! We can use `llamactl` to interact
with this deployment:

```
$ llamactl run --deployment QuickStart --arg message 'Hello from my shell!'
Message received: Hello from my shell!
```
12 changes: 12 additions & 0 deletions examples/redis_state_store/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
services:
redis:
# Use as KV store
image: redis:latest
hostname: redis
ports:
- "6379:6379"
healthcheck:
test: redis-cli --raw incr ping
interval: 5s
timeout: 3s
retries: 5
15 changes: 15 additions & 0 deletions examples/redis_state_store/redis_store.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
name: QuickStart

control-plane:
port: 8000
state_store_uri: redis://localhost:6379

default-service: dummy_workflow

services:
dummy_workflow:
name: Dummy Workflow
source:
type: local
name: src
path: workflow:echo_workflow
1 change: 1 addition & 0 deletions examples/redis_state_store/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
llama-index-storage-kvstore-redis
24 changes: 24 additions & 0 deletions examples/redis_state_store/src/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import asyncio

from llama_index.core.workflow import Workflow, StartEvent, StopEvent, step


# create a dummy workflow
class EchoWorkflow(Workflow):
"""A dummy workflow with only one step sending back the input given."""

@step()
async def run_step(self, ev: StartEvent) -> StopEvent:
message = str(ev.get("message", ""))
return StopEvent(result=f"Message received: {message}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a better example would be something that keeps state/context between runs? Like a counter?



echo_workflow = EchoWorkflow()


async def main():
print(await echo_workflow.run(message="Hello!"))


if __name__ == "__main__":
asyncio.run(main())
17 changes: 0 additions & 17 deletions llama_deploy/control_plane/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def as_consumer(self, remote: bool = False) -> BaseMessageQueueConsumer:
Returns:
BaseMessageQueueConsumer: Message queue consumer.
"""
...

@abstractmethod
async def register_service(
Expand All @@ -57,7 +56,6 @@ async def register_service(
Args:
service_def (ServiceDefinition): Definition of the service.
"""
...

@abstractmethod
async def deregister_service(self, service_name: str) -> None:
Expand All @@ -67,7 +65,6 @@ async def deregister_service(self, service_name: str) -> None:
Args:
service_name (str): Name of the service.
"""
...

@abstractmethod
async def get_service(self, service_name: str) -> ServiceDefinition:
Expand All @@ -80,7 +77,6 @@ async def get_service(self, service_name: str) -> ServiceDefinition:
Returns:
ServiceDefinition: Definition of the service.
"""
...

@abstractmethod
async def get_all_services(self) -> Dict[str, ServiceDefinition]:
Expand All @@ -90,7 +86,6 @@ async def get_all_services(self) -> Dict[str, ServiceDefinition]:
Returns:
dict: All services, mapped from service name to service definition.
"""
...

@abstractmethod
async def create_session(self) -> str:
Expand All @@ -100,7 +95,6 @@ async def create_session(self) -> str:
Returns:
str: Session ID.
"""
...

@abstractmethod
async def add_task_to_session(
Expand All @@ -116,7 +110,6 @@ async def add_task_to_session(
Returns:
str: Task ID.
"""
...

@abstractmethod
async def send_task_to_service(self, task_def: TaskDefinition) -> TaskDefinition:
Expand All @@ -129,7 +122,6 @@ async def send_task_to_service(self, task_def: TaskDefinition) -> TaskDefinition
Returns:
TaskDefinition: Task definition with updated state.
"""
...

@abstractmethod
async def handle_service_completion(
Expand All @@ -142,7 +134,6 @@ async def handle_service_completion(
Args:
task_result (TaskResult): Result of the task.
"""
...

@abstractmethod
async def get_session(self, session_id: str) -> SessionDefinition:
Expand All @@ -155,7 +146,6 @@ async def get_session(self, session_id: str) -> SessionDefinition:
Returns:
SessionDefinition: The session definition.
"""
...

@abstractmethod
async def delete_session(self, session_id: str) -> None:
Expand All @@ -165,7 +155,6 @@ async def delete_session(self, session_id: str) -> None:
Args:
session_id (str): Unique identifier of the session.
"""
...

@abstractmethod
async def get_all_sessions(self) -> Dict[str, SessionDefinition]:
Expand All @@ -175,7 +164,6 @@ async def get_all_sessions(self) -> Dict[str, SessionDefinition]:
Returns:
dict: All sessions, mapped from session ID to session definition.
"""
...

@abstractmethod
async def get_session_tasks(self, session_id: str) -> List[TaskDefinition]:
Expand All @@ -188,7 +176,6 @@ async def get_session_tasks(self, session_id: str) -> List[TaskDefinition]:
Returns:
List[TaskDefinition]: All tasks in the session.
"""
...

@abstractmethod
async def get_current_task(self, session_id: str) -> Optional[TaskDefinition]:
Expand All @@ -201,7 +188,6 @@ async def get_current_task(self, session_id: str) -> Optional[TaskDefinition]:
Returns:
Optional[TaskDefinition]: The current task, if any.
"""
...

@abstractmethod
async def get_task(self, task_id: str) -> TaskDefinition:
Expand All @@ -214,7 +200,6 @@ async def get_task(self, task_id: str) -> TaskDefinition:
Returns:
TaskDefinition: The task definition.
"""
...

@abstractmethod
async def get_message_queue_config(self) -> Dict[str, dict]:
Expand All @@ -224,14 +209,12 @@ async def get_message_queue_config(self) -> Dict[str, dict]:
Returns:
Dict[str, dict]: A dict of message queue name -> config dict
"""
...

@abstractmethod
async def launch_server(self) -> None:
"""
Launch the control plane server.
"""
...

@abstractmethod
async def register_to_message_queue(self) -> StartConsumingCallable:
Expand Down
40 changes: 36 additions & 4 deletions llama_deploy/control_plane/config.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import List
from urllib.parse import urlparse

from llama_index.core.storage.kvstore.types import BaseKVStore
from pydantic_settings import BaseSettings, SettingsConfigDict


Expand All @@ -21,10 +23,40 @@ class ControlPlaneConfig(BaseSettings):
running: bool = True
cors_origins: List[str] | None = None
topic_namespace: str = "llama_deploy"
state_store_uri: str | None = None

@property
def url(self) -> str:
if self.port:
return f"http://{self.host}:{self.port}"
else:
return f"http://{self.host}"
return f"http://{self.host}:{self.port}"
logan-markewich marked this conversation as resolved.
Show resolved Hide resolved


def parse_state_store_uri(uri: str) -> BaseKVStore:
logan-markewich marked this conversation as resolved.
Show resolved Hide resolved
bits = urlparse(uri)

if bits.scheme == "redis":
try:
from llama_index.storage.kvstore.redis import RedisKVStore # type: ignore

return RedisKVStore(redis_uri=uri)
except ImportError:
msg = (
f"key-value store {bits.scheme} is not available, please install the required "
"llama_index integration with 'pip install llama-index-storage-kvstore-redis'."
)
raise ValueError(msg)
elif bits.scheme == "mongodb+srv":
try:
from llama_index.storage.kvstore.mongodb import ( # type:ignore
MongoDBKVStore,
)

return MongoDBKVStore(uri=uri)
except ImportError:
msg = (
f"key-value store {bits.scheme} is not available, please install the required "
"llama_index integration with 'pip install llama-index-storage-kvstore-mongodb'."
)
raise ValueError(msg)
else:
msg = f"key-value store '{bits.scheme}' is not supported."
raise ValueError(msg)
Loading
Loading