Skip to content

Commit

Permalink
Implement event streaming from workflows (#260)
Browse files Browse the repository at this point in the history
  • Loading branch information
logan-markewich authored Sep 24, 2024
1 parent 411c646 commit e5a8284
Show file tree
Hide file tree
Showing 27 changed files with 1,058 additions and 271 deletions.
36 changes: 36 additions & 0 deletions .github/workflows/e2e_test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: E2E Testing

on:
push:
branches:
- main
pull_request:

env:
POETRY_VERSION: "1.6.1"

jobs:
test:
runs-on: ubuntu-latest
strategy:
# You can use PyPy versions in python-version.
# For example, pypy-2.7 and pypy-3.8
matrix:
python-version: ["3.11", "3.12"]
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
- name: Set up python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install deps
shell: bash
run: pip install -e .
- name: Run All E2E Tests
env:
CI: true
shell: bash
working-directory: e2e_tests
run: sh run_all_e2e_tests.sh
2 changes: 1 addition & 1 deletion .github/workflows/publish_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:

env:
POETRY_VERSION: "1.8.3"
PYTHON_VERSION: "3.9"
PYTHON_VERSION: "3.11"

jobs:
build-n-publish:
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ repos:
--ignore-missing-imports,
--python-version=3.11,
]
exclude: examples/
exclude: ^(examples/|e2e_tests/)
- repo: https://github.com/adamchainz/blacken-docs
rev: 1.16.0
hooks:
Expand Down
40 changes: 38 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,33 @@ from llama_deploy import (
ControlPlaneConfig,
SimpleMessageQueueConfig,
)
from llama_index.core.workflow import Workflow, StartEvent, StopEvent, step
from llama_index.core.workflow import (
Context,
Event,
Workflow,
StartEvent,
StopEvent,
step,
)


class ProgressEvent(Event):
progress: str


# create a dummy workflow
class MyWorkflow(Workflow):
@step()
async def run_step(self, ev: StartEvent) -> StopEvent:
async def run_step(self, ctx: Context, ev: StartEvent) -> StopEvent:
# Your workflow logic here
arg1 = str(ev.get("arg1", ""))
result = arg1 + "_result"

# stream events as steps run
ctx.write_event_to_stream(
ProgressEvent(progress="I am doing something!")
)

return StopEvent(result=result)


Expand Down Expand Up @@ -133,6 +150,25 @@ print(result)
# prints 'hello_world_result'
```

If you want to see the event stream as well, you can do:

```python
# create a session
session = client.create_session()

# kick off run
task_id = session.run_nowait("streaming_workflow", arg1="hello_world")

# stream events -- the will yield a dict representing each event
for event in session.get_task_result_stream(task_id):
print(event)

# get final result
result = session.get_task_result(task_id)
print(result)
# prints 'hello_world_result'
```

### Deploying Nested Workflows

Every `Workflow` is capable of injecting and running nested workflows. For example
Expand Down
93 changes: 82 additions & 11 deletions docs/docs/module_guides/workflow/deployment.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
# 🦙 `llama_deploy` 🤖

[`llama_deploy`](https://github.com/run-llama/llama_deploy) (formerly `llama-agents`) is an async-first framework for deploying, scaling, and productionizing agentic multi-service systems based on [workflows from `llama_index`](https://docs.llamaindex.ai/en/stable/understanding/workflows/). With `llama_deploy`, you can build any number of workflows in `llama_index` and then bring them into `llama_deploy` for deployment.
`llama_deploy` (formerly `llama-agents`) is an async-first framework for deploying, scaling, and productionizing agentic multi-service systems based on [workflows from `llama_index`](https://docs.llamaindex.ai/en/stable/understanding/workflows/). With `llama_deploy`, you can build any number of workflows in `llama_index` and then bring them into `llama_deploy` for deployment.

In `llama_deploy`, each workflow is seen as a `service`, endlessly processing incoming tasks. Each workflow pulls and publishes messages to and from a `message queue`.

At the top of a `llama_deploy` system is the `control plane`. The control plane handles ongoing tasks, manages state, keeps track of which services are in the network, and also decides which service should handle the next step of a task using an `orchestrator`. The default `orchestrator` is purely programmatic, handling failures, retries, and state-passing.

The overall system layout is pictured below.

![A basic system in llama_deploy](./system_diagram.png)

## Why `llama_deploy`?

1. **Seamless Deployment**: It bridges the gap between development and production, allowing you to deploy `llama_index` workflows with minimal changes to your code.
Expand Down Expand Up @@ -63,7 +67,7 @@ async def main():
)


if name == "main":
if __name__ == "__main__":
import asyncio

asyncio.run(main())
Expand All @@ -76,23 +80,39 @@ This will set up the basic infrastructure for your `llama_deploy` system. You ca
To deploy a workflow as a service, you can use the `deploy_workflow` function:

```python
python
from llama_deploy import (
deploy_workflow,
WorkflowServiceConfig,
ControlPlaneConfig,
SimpleMessageQueueConfig,
)
from llama_index.core.workflow import Workflow, StartEvent, StopEvent, step
from llama_index.core.workflow import (
Context,
Event,
Workflow,
StartEvent,
StopEvent,
step,
)


class ProgressEvent(Event):
progress: str


# create a dummy workflow
class MyWorkflow(Workflow):
@step()
async def run_step(self, ev: StartEvent) -> StopEvent:
async def run_step(self, ctx: Context, ev: StartEvent) -> StopEvent:
# Your workflow logic here
arg1 = str(ev.get("arg1", ""))
result = arg1 + "_result"

# stream events as steps run
ctx.write_event_to_stream(
ProgressEvent(progress="I am doing something!")
)

return StopEvent(result=result)


Expand All @@ -106,7 +126,7 @@ async def main():
)


if name == "main":
if __name__ == "__main__":
import asyncio

asyncio.run(main())
Expand All @@ -119,7 +139,7 @@ This will deploy your workflow as a service within the `llama_deploy` system, an
Once deployed, you can interact with your deployment using a client.

```python
from llama_deploy import LlamaDeployClient
from llama_deploy import LlamaDeployClient, ControlPlaneConfig

# points to deployed control plane
client = LlamaDeployClient(ControlPlaneConfig())
Expand All @@ -130,6 +150,25 @@ print(result)
# prints 'hello_world_result'
```

If you want to see the event stream as well, you can do:

```python
# create a session
session = client.create_session()

# kick off run
task_id = session.run_nowait("streaming_workflow", arg1="hello_world")

# stream events -- the will yield a dict representing each event
for event in session.get_task_result_stream(task_id):
print(event)

# get final result
result = session.get_task_result(task_id)
print(result)
# prints 'hello_world_result'
```

### Deploying Nested Workflows

Every `Workflow` is capable of injecting and running nested workflows. For example
Expand Down Expand Up @@ -207,7 +246,7 @@ async def main():
await asyncio.gather(inner_task, outer_task)


if name == "main":
if __name__ == "__main__":
import asyncio

asyncio.run(main())
Expand Down Expand Up @@ -460,6 +499,7 @@ async_client = AsyncLlamaDeployClient(ControlPlaneConfig())

- `client.deregister_service(service_name)`: Deregisters a service from the control plane.
Example:

```python
client.deregister_service("my_workflow")
```
Expand Down Expand Up @@ -502,14 +542,45 @@ async_client = AsyncLlamaDeployClient(ControlPlaneConfig())

- `session.get_task_result(task_id)`: Gets the result of a task in the session if it has one.
Example:

```python
result = session.get_task_result("task_123")
if result:
print(result.result)
```

## Examples
### Message Queue Integrations

In addition to `SimpleMessageQueue`, we provide integrations for various
message queue providers, such as RabbitMQ, Redis, etc. The general usage pattern
for any of these message queues is the same as that for `SimpleMessageQueue`,
however the appropriate extra would need to be installed along with `llama-deploy`.

We have many examples showing how to use various message queues, and different ways to scaffold your project for deployment with docker and kubernetes.
For example, for `RabbitMQMessageQueue`, we need to install the "rabbitmq" extra:

You can find all examples in the [examples folder for the `llama-deploy` repository.](https://github.com/run-llama/llama_deploy/tree/main/examples)
```sh
# using pip install
pip install llama-agents[rabbitmq]

# using poetry
poetry add llama-agents -E "rabbitmq"
```

Using the `RabbitMQMessageQueue` is then done as follows:

```python
from llama_agents.message_queue.rabbitmq import (
RabbitMQMessageQueueConfig,
RabbitMQMessageQueue,
)

message_queue_config = (
RabbitMQMessageQueueConfig()
) # loads params from environment vars
message_queue = RabbitMQMessageQueue(**message_queue_config)
```

<!-- prettier-ignore-start -->
> [!NOTE]
> `RabbitMQMessageQueueConfig` can load its params from environment variables.
<!-- prettier-ignore-end -->
9 changes: 9 additions & 0 deletions e2e_tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# E2E Tests

E2E tests are run in the `e2e-tests` workflow in `.github/workflows/e2e-tests.yml`.

They are written such that a `run.sh` script can be executed to run some number of tests locally.

Each folder in this directory represents a set of tests for simple scenarios.

When new folders are added, they will be executed automatically in the CI/CD pipeline, assuming the `run.sh` script runs successfully.
14 changes: 14 additions & 0 deletions e2e_tests/basic_streaming/launch_core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from llama_deploy import deploy_core, ControlPlaneConfig, SimpleMessageQueueConfig


async def main():
await deploy_core(
ControlPlaneConfig(),
SimpleMessageQueueConfig(),
)


if __name__ == "__main__":
import asyncio

asyncio.run(main())
Loading

0 comments on commit e5a8284

Please sign in to comment.