From 609ffd56d7b07a6a2d2aef30b25c89b843ca91a9 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Wed, 13 Sep 2023 13:10:16 -0400 Subject: [PATCH 1/5] start interface to begin discussion --- .../dagster-pipes/dagster_pipes/__init__.py | 76 ++++++++++++++++++- 1 file changed, 75 insertions(+), 1 deletion(-) diff --git a/python_modules/dagster-pipes/dagster_pipes/__init__.py b/python_modules/dagster-pipes/dagster_pipes/__init__.py index 379c8176a691d..74846841f1d6a 100644 --- a/python_modules/dagster-pipes/dagster_pipes/__init__.py +++ b/python_modules/dagster-pipes/dagster_pipes/__init__.py @@ -623,6 +623,80 @@ def make_channel( # ######################## +class IContext(ABC): + """Base class for asset context implemented by AssetExecutionContext and ExtContext.""" + + @property + @abstractmethod + def is_asset_step(self) -> bool: + """TODO.""" + + @property + @abstractmethod + def asset_key(self) -> str: + """TODO.""" + + @property + @abstractmethod + def asset_keys(self) -> Sequence[str]: + """TODO.""" + + @property + @abstractmethod + def provenance(self) -> Optional[ExtDataProvenance]: + """TODO.""" + + @property + @abstractmethod + def provenance_by_asset_key(self) -> Mapping[str, Optional[ExtDataProvenance]]: + """TODO.""" + + @property + @abstractmethod + def code_version(self) -> Optional[str]: + """TODO.""" + + @property + @abstractmethod + def code_version_by_asset_key(self) -> Mapping[str, Optional[str]]: + """TODO.""" + + @property + @abstractmethod + def is_partition_step(self) -> bool: + """TODO.""" + + @property + @abstractmethod + def partition_key(self) -> str: + """TODO.""" + + @property + @abstractmethod + def partition_key_range(self) -> Optional["ExtPartitionKeyRange"]: + """TODO.""" + + @property + @abstractmethod + def partition_time_window(self) -> Optional["ExtTimeWindow"]: + """TODO.""" + + @property + @abstractmethod + def run_id(self) -> str: + """TODO.""" + + @property + @abstractmethod + def job_name(self) -> Optional[str]: + """TODO.""" + + @property + @abstractmethod + def retry_number(self) -> int: + """TODO.""" + + def init_dagster_ext( *, context_loader: Optional[ExtContextLoader] = None, @@ -650,7 +724,7 @@ def init_dagster_ext( return context -class ExtContext: +class ExtContext(IContext): _instance: ClassVar[Optional["ExtContext"]] = None @classmethod From 2dcb056deed2c635861755e272a50c3f7c51d77d Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 21 Sep 2023 17:05:14 -0400 Subject: [PATCH 2/5] revert --- python_modules/dagster/setup.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python_modules/dagster/setup.py b/python_modules/dagster/setup.py index 173179b373667..36e16b031f193 100644 --- a/python_modules/dagster/setup.py +++ b/python_modules/dagster/setup.py @@ -28,9 +28,9 @@ def get_version() -> str: # grpcio 1.44.0 is the min version compatible with both protobuf 3 and 4 GRPC_VERSION_FLOOR = "1.44.0" -ver = get_version() -# dont pin dev installs to avoid pip dep resolver issues -pin = "" if ver == "1!0+dev" else f"=={ver}" +# ver = get_version() +# # dont pin dev installs to avoid pip dep resolver issues +# pin = "" if ver == "1!0+dev" else f"=={ver}" setup( name="dagster", From 58b41a5ef6815ebf1c2ea8f0e45d9afadaf6958c Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 22 Sep 2023 08:57:50 -0400 Subject: [PATCH 3/5] pr comments, cleanup --- .../dagster-pipes/dagster_pipes/__init__.py | 66 ++++++++----------- 1 file changed, 28 insertions(+), 38 deletions(-) diff --git a/python_modules/dagster-pipes/dagster_pipes/__init__.py b/python_modules/dagster-pipes/dagster_pipes/__init__.py index 74846841f1d6a..fc0cf9fbc2755 100644 --- a/python_modules/dagster-pipes/dagster_pipes/__init__.py +++ b/python_modules/dagster-pipes/dagster_pipes/__init__.py @@ -628,73 +628,63 @@ class IContext(ABC): @property @abstractmethod - def is_asset_step(self) -> bool: - """TODO.""" - - @property - @abstractmethod - def asset_key(self) -> str: - """TODO.""" - - @property - @abstractmethod - def asset_keys(self) -> Sequence[str]: - """TODO.""" - - @property - @abstractmethod - def provenance(self) -> Optional[ExtDataProvenance]: - """TODO.""" + def asset_key(self): + """The AssetKey for the asset being materialized. If no asset is being materialized, errors. If + multiple assets are being materialized (as in a @multi_asset), errors. + """ @property @abstractmethod - def provenance_by_asset_key(self) -> Mapping[str, Optional[ExtDataProvenance]]: - """TODO.""" + def asset_keys(self): + """The AssetKeys for the asset being materialized. If no asset is being materialized, errors.""" @property @abstractmethod - def code_version(self) -> Optional[str]: - """TODO.""" + def provenance(self): + """The data provenance for the asset being materialized. If no asset is being materialized, errors. If + multiple assets are being materialized (as in a @multi_asset), errors. + """ @property @abstractmethod - def code_version_by_asset_key(self) -> Mapping[str, Optional[str]]: - """TODO.""" + def provenance_by_asset_key(self): + """A dictionary of data provenance for the assets being materialized, keyed by asset key. + If no asset is being materialized, errors. + """ @property @abstractmethod - def is_partition_step(self) -> bool: - """TODO.""" + def code_version(self): + """The code version for the asset being materialized. If no asset is being materialized, errors. If + multiple assets are being materialized (as in a @multi_asset), errors. + """ @property @abstractmethod - def partition_key(self) -> str: - """TODO.""" + def code_version_by_asset_key(self): + """A dictionary of code versions for the assets being materialized, keyed by asset key. + If no asset is being materialized, errors. + """ @property @abstractmethod - def partition_key_range(self) -> Optional["ExtPartitionKeyRange"]: - """TODO.""" - - @property - @abstractmethod - def partition_time_window(self) -> Optional["ExtTimeWindow"]: - """TODO.""" + def is_partitioned(self) -> bool: + """True if the current execution is partitioned.""" @property @abstractmethod def run_id(self) -> str: - """TODO.""" + """The run id of the current execution.""" @property @abstractmethod def job_name(self) -> Optional[str]: - """TODO.""" + """The name of the job that is executing.""" @property @abstractmethod def retry_number(self) -> int: - """TODO.""" + """The number of retries of this execution.""" def init_dagster_ext( @@ -808,7 +798,7 @@ def code_version_by_asset_key(self) -> Mapping[str, Optional[str]]: return code_version_by_asset_key @property - def is_partition_step(self) -> bool: + def is_partitioned(self) -> bool: return self._data["partition_key_range"] is not None @property From 30236e266670d6749cd852f488d4f21cf570235b Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 22 Sep 2023 10:17:40 -0400 Subject: [PATCH 4/5] test fix --- .../dagster-pipes/dagster_pipes_tests/test_context.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python_modules/dagster-pipes/dagster_pipes_tests/test_context.py b/python_modules/dagster-pipes/dagster_pipes_tests/test_context.py index c3dcf16d4c2e7..3f846e73083ff 100644 --- a/python_modules/dagster-pipes/dagster_pipes_tests/test_context.py +++ b/python_modules/dagster-pipes/dagster_pipes_tests/test_context.py @@ -132,7 +132,7 @@ def test_multi_asset_context(): def test_no_partition_context(): context = _make_external_execution_context() - assert not context.is_partition_step + assert not context.is_partitioned _assert_undefined(context, "partition_key") _assert_undefined(context, "partition_key_range") _assert_undefined(context, "partition_time_window") @@ -147,7 +147,7 @@ def test_single_partition_context(): partition_time_window=None, ) - assert context.is_partition_step + assert context.is_partitioned assert context.partition_key == "foo" assert context.partition_key_range == partition_key_range assert context.partition_time_window is None @@ -163,7 +163,7 @@ def test_multiple_partition_context(): partition_time_window=time_window, ) - assert context.is_partition_step + assert context.is_partitioned _assert_undefined(context, "partition_key") assert context.partition_key_range == partition_key_range assert context.partition_time_window == time_window From 51620ffaa64a7d0d6a627a612dfe06aa90532c56 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 22 Sep 2023 10:24:42 -0400 Subject: [PATCH 5/5] fix conflict --- python_modules/dagster/setup.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python_modules/dagster/setup.py b/python_modules/dagster/setup.py index 36e16b031f193..173179b373667 100644 --- a/python_modules/dagster/setup.py +++ b/python_modules/dagster/setup.py @@ -28,9 +28,9 @@ def get_version() -> str: # grpcio 1.44.0 is the min version compatible with both protobuf 3 and 4 GRPC_VERSION_FLOOR = "1.44.0" -# ver = get_version() -# # dont pin dev installs to avoid pip dep resolver issues -# pin = "" if ver == "1!0+dev" else f"=={ver}" +ver = get_version() +# dont pin dev installs to avoid pip dep resolver issues +pin = "" if ver == "1!0+dev" else f"=={ver}" setup( name="dagster",