Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduces Streaming #25

Merged
merged 2 commits into from
Jun 20, 2024
Merged

Introduces Streaming #25

merged 2 commits into from
Jun 20, 2024

Conversation

liamgriffiths
Copy link
Contributor

@liamgriffiths liamgriffiths commented Jun 18, 2024

Implements basic streaming requests and a response with stream message iterator helpers.

The request interface largely mirrors the run and async_run functions, but the response is a SubstrateStreamingResponse.

a = GenerateText(prompt="tell me about windmills", max_tokens=10)
b = GenerateText(prompt=sb.concat("is this true? ", a.future.text), max_tokens=10)

# async interface
async def amain():
    response = await substrate.async_stream(a, b)
    async for event in response.async_iter():
        print(event)
asyncio.run(amain())

# sync interface
def main():
    response = substrate.stream(a, b)
    for event in response.iter():
        print(event)

main()

The SubstrateStreamingResponse has 4 helper methods that return iterators:

  • .iter() yields ServerSentEvent objects
  • .async_iter() yields ServerSentEvent objects
  • .iter_events() yields strings formatted as Server-Sent Events
  • .async_iter_events() yields strings formatted as Server-Sent Events

If the user would like to do some processing on the message objects, using the .iter() or .async_iter() would allow them to access the message data as dictionaries.

The ServerSentEvent.data property may contain the following messages:

  • node.result: completed the result of a node (identified by node_id)
  • node.delta: when possible, incremental chunks of output (identified by node_id)
  • node.error: any error messages encountered when running a node
  • graph.result: the completed results for the entire run (same structure as the SubstrateResponse.json)

The message data are currently dictionaries, but the structure maps to the node "Out" types (including the ErrorOut type). The delta messages are structured similarly, except for array fields - which are dictionaries that contain an object field with the value array.item, an index field that corresponds to the index the chunk belongs to, and an item field when is the same structure of that array item.

If the user is interested in proxying the Server-Sent Events from the Substrate server response to another client, then using .iter_events() or .async_iter_events() provides convenience for passing this along to a response writer (such as StreamingResponse from FastAPI).

# FastAPI endpoint example 
@app.get("/qotd")
def quote_of_the_day():
    quote = Llama3Instruct8B(prompt="What's an inspirational quote of the day?")
    response = substrate.stream(quote)
    return StreamingResponse(response.iter_events(), media_type="text/event-stream")

@liamgriffiths liamgriffiths marked this pull request as ready for review June 19, 2024 16:12
@0thernet 0thernet merged commit a0c1779 into main Jun 20, 2024
3 checks passed
@0thernet 0thernet deleted the liam/streaming branch June 20, 2024 02:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants