Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement RemotePregel class and PregelProtocol #2034

Closed
wants to merge 25 commits into from

Conversation

andrewnguonly
Copy link
Contributor

@andrewnguonly andrewnguonly commented Oct 7, 2024

Summary

This PR is a follow-up/reimplementation of this PR.

To Do

  • Add tests
  • Fix mypy errors

def with_config(
self, config: Optional[RunnableConfig] = None, **kwargs: Any
) -> Self:
return self.copy({})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems a bit strange

Copy link
Contributor Author

@andrewnguonly andrewnguonly Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure what to do here because there's no concept of "cloning" a remote Pregel instance (i.e. creating a new Assistant which is a clone of the current one).

To "clone" or "copy" an Assistant, we have to...

  1. Call GET /assistants/<graph_id>.
  2. Update the configs (via merge).
  3. Call POST /assistants to a create a new assistant, the clone/copy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to call any endpoints, just save the config in the instance and then use it on every other method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need to call any endpoints, just save the config in the instance and then use it on every other method?

Using aupdate_state() as an example... What do we do with config passed in the method and self.config on the instance. Merge them?

 async def aupdate_state(
    self,
    config: RunnableConfig,
    values: Optional[Union[dict[str, Any], Any]],
    as_node: Optional[str] = None,
) -> RunnableConfig:

    # what do we do with self.config and config?
    self.config

    response: dict = await self.client.threads.update_state(  # type: ignore
        thread_id=config["configurable"]["thread_id"],
        values=values,  # type: ignore
        as_node=as_node,
        checkpoint=self._get_checkpoint(config),
    )
    return self._get_config(response["checkpoint"])

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes exactly merge them. exactly what the real pregel class does

@andrewnguonly andrewnguonly requested a review from nfcampos October 8, 2024 14:17
PregelTask(
id=task["id"],
name=task["name"],
path=tuple(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why empty tuple?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was mapping the fields from ThreadTask to PregelTask, but there's no path field in ThreadTask.

class ThreadTask(TypedDict):
    """Represents a task within a thread."""

    id: str
    name: str
    error: Optional[str]
    interrupts: list[dict]
    checkpoint: Optional[Checkpoint]
    state: Optional["ThreadState"]


class PregelTask(NamedTuple):
    id: str
    name: str
    path: tuple[Union[str, int], ...]
    error: Optional[Exception] = None
    interrupts: tuple[Interrupt, ...] = ()
    state: Union[None, RunnableConfig, "StateSnapshot"] = None

What's supposed to be in path or how do we get the path?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, because its an internal thing I removed from the api, I guess now that comes back to bite me

node_id = node["id"]
nodes[node_id] = DrawableNode(
id=node_id,
name=node.get("name", node_id),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we pass this node_id default here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just needed to pass some default value. I can set the default to an empty string. name in DrawableNode is not Optional.

graph_id: str,
config: Optional[RunnableConfig] = None,
):
self.client = client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there some way we can accept a single client and build the other one from it?, or make only one of them required at least

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most people will be using only sync or only async

andrewnguonly added a commit that referenced this pull request Oct 11, 2024
### Summary
Redo of [this PR](#2034)
(branched from clean branch).
@andrewnguonly andrewnguonly deleted the an/04oct/remote-pregel branch October 11, 2024 16:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants