Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docs? How to run tasks in parallel and mark tasks that were done to be incomplete and/or loop executions #240

Open
MarkEdmondson1234 opened this issue Jul 18, 2024 · 0 comments
Labels
enhancement Enhance an existing feature

Comments

@MarkEdmondson1234
Copy link

MarkEdmondson1234 commented Jul 18, 2024

Enhancement Description

I've looked through the docs but its not clear to me if its possible to assign tasks to run in parrallel, or if its an anti-pattern to try to mark tasks that are complete as not complete anymore to get the behaviour I'm looking for. I'm looking for a self-improvement loop where the same tasks are completed by agents but with each iteration with a new follow up question derived from the conversation so far.

Use Case

I have this flow:

import controlflow as cf
from typing import Annotated, List, Optional
from pydantic import BaseModel, Field

from agents import (
    code_executor,
    web_researcher,
    database_researcher,
    compere
)

class FastThoughts(BaseModel):
    inital_response: str
    can_answer: bool

@cf.flow()
def fast_n_slow_flow(question, vector_name="control_flow", chat_history=[]):

    agents = ["code_executor", "web_researcher", "database_researcher"]
    agents_obj = [code_executor, web_researcher, database_researcher]

    task_master = cf.Task(
        """
Frame the question from the user in language that all your co-assistants can understand and ask which agents on your team will be able to answer it.
They should reply with thier first initial reactions, but will start a slower more considered response that should come later.
""",
        agents=[compere],
        result_type=str, 
    )
    task_master.run()
    print(task_master.result)

    slow_thought_agents = []
    

    for agent in agents_obj:
        fast_thoughts = cf.Task(
            "Indicate your reaction to the recent chat history without going in to too much depth, including if you think you can make a good next contribution that will help drive the conversation towards the goal of answering the question posed by the user.",
            agents=[agent],
            context=dict(user_question=task_master),
            result_type=FastThoughts
        )
        fast_answer = fast_thoughts.run()
        print(fast_answer)

        if fast_answer.can_answer:
            print(f"Adding {agent.name} to slow thoughts")
            slow_thought_agents.append(agent)


    who_speaks_next = cf.Task(
        "From the conversation so far and the assistants recent reactions, decide who should speak next",
        context=dict(assistant_reactions=fast_thoughts),
        agents=[compere],
        result_type=agents
    )
    who_speaks_next.run()
    print(who_speaks_next.result) # only result available if you do not supply a result_type


    slow_thoughts = []
    for agent in slow_thought_agents:
        try:
            slow_thought = cf.Task(
                "You indicated you could contribute more to the conversation with your skills - please now answer the question fully using all tools at your disposal.",
                agents=[agent]
            )
        except ValueError:
            print("{agent.name} could not help with a slow thought")
            continue

        slow_thought.run()
        print(slow_thought.result)
        slow_thoughts.append(slow_thought)

    summary = cf.Task(
        """
        Compile all the answers so far and summarise the conversation so far.  Highlight what each team member has brought to the discussion.
        Consider the initial question and the answers so far, reframe a new question that will help tease out any unexplored areas.
        """,
        agents = [compere],
        context=dict(fast_thoughts=fast_thoughts, slow_thoughts=slow_thoughts, who_speaks_next=who_speaks_next)
    )
    summary.run()
    print(summary.result)

    shall_we_end = cf.Task(
        "If you believe the conversation has reached its natural conclusion, mark this task complete, otherwise keep the conversation going",
        agents=[compere],
        context=dict(summary=summary),
        result=bool)
    
    shall_we_end.run()
    print(shall_we_end)

    if shall_we_end.result:
        return slow_thoughts, summary.result

it works, but I'd like for it to loop back and use the follow up question the compere adds at the last step, but it is not clear to me how to do this. I'd also like to avoid the for loops or to make them async or parrallel somehow, since they don't depend on each other.

Proposed Implementation

Maybe the below is already possible :) but how I'd theoretically like it is below.

I'd prefer instead of this for loop:

    for agent in agents_obj:
        fast_thoughts = cf.Task(
            "Indicate your reaction to the recent chat history without going in to too much depth, including if you think you can make a good next contribution that will help drive the conversation towards the goal of answering the question posed by the user.",
            agents=[agent],
            context=dict(user_question=task_master),
            result_type=FastThoughts
        )
        fast_answer = fast_thoughts.run()
        print(fast_answer)

        if fast_answer.can_answer:
            print(f"Adding {agent.name} to slow thoughts")
            slow_thought_agents.append(agent)

I just add the agents and then have the results in a list:

# this task is done in parrallel
fast_thoughts = cf.Task(
            "Indicate your reaction to the recent chat history without going in to too much depth, including if you think you can make a good next contribution that will help drive the conversation towards the goal of answering the question posed by the user.",
            agents=[code_executor, web_researcher, database_researcher], # 
            context=dict(user_question=task_master),
            result_type=[FastThoughts] # a list of the output, one per passed in agent
        )

fast_answers = fast_thoughts.run()

for answer in fast_answers:
      if answer.can_answer:
              print(f"Adding {agent.name} to slow thoughts")
              slow_thought_agents.append(agent)

Then I am also unclear how to mark tasks as incomplete or if this is an anti-pattern, but something like

fast_thoughts.set_incomplete()

or perhaps create a new task with subtasks for the next loop?

@MarkEdmondson1234 MarkEdmondson1234 added the enhancement Enhance an existing feature label Jul 18, 2024
@MarkEdmondson1234 MarkEdmondson1234 changed the title Docs? How to run tasks in parrellal and mark tasks that were done to be incomplete and/or loop executions Docs? How to run tasks in parallel and mark tasks that were done to be incomplete and/or loop executions Jul 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhance an existing feature
Projects
None yet
Development

No branches or pull requests

1 participant