Skip to content

Commit

Permalink
Merge pull request MaterializeInc#7841 from philip-stoev/mzworkflows_…
Browse files Browse the repository at this point in the history
…py-steps

mzworkflows: allow Python workflows to run Steps
  • Loading branch information
philip-stoev authored Aug 13, 2021
2 parents 775ed08 + f869eae commit e88ad32
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 28 deletions.
3 changes: 1 addition & 2 deletions misc/python/materialize/cli/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
from pathlib import Path
from typing import IO, List, Optional, Sequence, Text, Tuple

from typing_extensions import NoReturn

from materialize import errors, mzbuild, mzcompose, spawn, ui
from typing_extensions import NoReturn

announce = ui.speaker("==> ")
say = ui.speaker("")
Expand Down
37 changes: 26 additions & 11 deletions misc/python/materialize/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@
import pg8000 # type: ignore
import pymysql
import yaml
from typing_extensions import Literal, TypedDict

from materialize import errors, mzbuild, spawn, ui
from typing_extensions import Literal, TypedDict

T = TypeVar("T")
say = ui.speaker("C> ")
Expand Down Expand Up @@ -197,7 +196,7 @@ def __init__(self, repo: mzbuild.Repository, name: str):
compose = yaml.safe_load(f)

# Stash away sub workflows so that we can load them with the correct environment variables
self.yaml_workflows = compose.pop("mzworkflows", None)
self.yaml_workflows = compose.pop("mzworkflows", {})

# Load the mzworkflows.py file, if any.
mzworkflows_py = self.path.parent / "mzworkflows.py"
Expand Down Expand Up @@ -600,7 +599,7 @@ def run(self) -> None:
os.environ.update(self.env)

try:
self.func(self.composition)
self.func(self)
finally:
os.environ.clear()
os.environ.update(old_env)
Expand All @@ -619,18 +618,34 @@ def named(cls, name: str) -> Type["WorkflowStep"]:
raise errors.UnknownItem("step", name, list(cls._steps))

@classmethod
def register(cls, name: str) -> Callable[[Type[T]], Type[T]]:
def register(
cls, name: str
) -> Callable[[Type["WorkflowStep"]], Type["WorkflowStep"]]:
if name in cls._steps:
raise ValueError(f"Double registration of step name: {name}")

def reg(to_register: Type[T]) -> Type[T]:
if not issubclass(to_register, WorkflowStep):
raise ValueError(
f"Registered step must be a WorkflowStep: {to_register}"
)
def reg(to_register: Type["WorkflowStep"]) -> Type["WorkflowStep"]:
cls._steps[name] = to_register
to_register.name = name
return to_register # type: ignore

# Allow the step to also be called as a Workflow.step_name() classmethod
def run_step(workflow: Workflow, **kwargs: Any) -> None:
step: WorkflowStep = to_register(**kwargs)
step.run(workflow)

func_name = name.replace("-", "_")
if func_name == "run":
# Temporary workaround for the fact that `Workflow.run` already
# exists.
func_name = "run_service"
if not hasattr(Workflow, func_name):
setattr(Workflow, func_name, run_step)
else:
raise errors.Failed(
f"Unable to register method Workflow.{func_name} as one already exists."
)

return to_register

return reg

Expand Down
3 changes: 3 additions & 0 deletions test/mzworkflows_py/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ An example of python-based workflows that does not actually test anything.
If the directory contains a file named mzworkflows.py, all functions
from it that start with 'workflow_' will be made available for execution
as if they are regular mzcompose.yml workflows.

A Python workflow can call other Steps as methods to the Workflow,
as shown in the mzworkflows.py example in this directory.
12 changes: 0 additions & 12 deletions test/mzworkflows_py/mzcompose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,6 @@

version: '3.7'

mzworkflows:
mzworkflows_py:
steps:
- step: start-services
services: [materialized]

- step: wait-for-mz
service: materialized

- step: workflow
workflow: workflow_in_python

services:
materialized:
mzbuild: materialized
Expand Down
8 changes: 5 additions & 3 deletions test/mzworkflows_py/mzworkflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

import materialize.mzcompose
from materialize.mzcompose import Workflow


def workflow_in_python(composition: materialize.mzcompose.Composition):
pass
def workflow_in_python(w: Workflow):
w.start_services(services=["materialized"])
w.wait_for_mz(service="materialized")
w.kill_services(services=["materialized"])

0 comments on commit e88ad32

Please sign in to comment.