diff --git a/.github/workflows/dagster-plugin.yml b/.github/workflows/dagster-plugin.yml index 37b6c93ec841ab..910c3cfa53312c 100644 --- a/.github/workflows/dagster-plugin.yml +++ b/.github/workflows/dagster-plugin.yml @@ -31,9 +31,9 @@ jobs: DATAHUB_TELEMETRY_ENABLED: false strategy: matrix: - python-version: ["3.8", "3.10"] + python-version: ["3.9", "3.10"] include: - - python-version: "3.8" + - python-version: "3.9" extraPythonRequirement: "dagster>=1.3.3" - python-version: "3.10" extraPythonRequirement: "dagster>=1.3.3" diff --git a/metadata-ingestion-modules/dagster-plugin/setup.py b/metadata-ingestion-modules/dagster-plugin/setup.py index 660dbb2981c516..0e0685cb378c1b 100644 --- a/metadata-ingestion-modules/dagster-plugin/setup.py +++ b/metadata-ingestion-modules/dagster-plugin/setup.py @@ -123,7 +123,7 @@ def get_long_description(): ], # Package info. zip_safe=False, - python_requires=">=3.8", + python_requires=">=3.9", package_dir={"": "src"}, packages=setuptools.find_namespace_packages(where="./src"), entry_points=entry_points, diff --git a/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/sensors/datahub_sensors.py b/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/sensors/datahub_sensors.py index f6b0629b7ca7b9..bccdb4ac7922a5 100644 --- a/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/sensors/datahub_sensors.py +++ b/metadata-ingestion-modules/dagster-plugin/src/datahub_dagster_plugin/sensors/datahub_sensors.py @@ -28,10 +28,15 @@ from dagster._core.definitions.multi_asset_sensor_definition import ( AssetMaterializationFunctionReturn, ) -from dagster._core.definitions.sensor_definition import ( - DefaultSensorStatus, - RawSensorEvaluationFunctionReturn, -) +from dagster._core.definitions.sensor_definition import DefaultSensorStatus + +# This SensorReturnTypesUnion is from Dagster 1.9.1+ and is not available in older versions +# of Dagster. We need to import it conditionally to avoid breaking compatibility with older +try: + from dagster._core.definitions.sensor_definition import SensorReturnTypesUnion +except ImportError: + from dagster._core.definitions.sensor_definition import RawSensorEvaluationFunctionReturn as SensorReturnTypesUnion # type: ignore + from dagster._core.definitions.target import ExecutableDefinition from dagster._core.definitions.unresolved_asset_job_definition import ( UnresolvedAssetJobDefinition, @@ -689,9 +694,7 @@ def _emit_asset_metadata( return SkipReason("Asset metadata processed") - def _emit_metadata( - self, context: RunStatusSensorContext - ) -> RawSensorEvaluationFunctionReturn: + def _emit_metadata(self, context: RunStatusSensorContext) -> SensorReturnTypesUnion: """ Function to emit metadata for datahub rest. """