From 7367aed07a2cf0cd905d0ed6e492d8482b6fae04 Mon Sep 17 00:00:00 2001 From: Yuhan Luo <4531914+yuhan@users.noreply.github.com> Date: Wed, 11 Oct 2023 09:34:15 -0700 Subject: [PATCH] add resource telemetry to pipes client resources (#17111) ## Summary & Motivation This PR adds telemetry tracking to the following resourcers: - PipesSubprocessClient - PipesK8sClient - PipesDockerClient - PipesDatabricksClient These resources are implemented in the `ResourceParam[TheBarePythonObject]` pattern such as [here](https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/_core/pipes/subprocess.py#L113), which goes through the hardcoded_resource path in core. So this PR threads the info through hardcoded_resource into external resource data, in order to pass in the correct `dagster_maintained` value and the correct original resource type. I'm entirely not sure if this is a reasonable implementation to check in. Open to more reasonable paths! ## How I Tested These Changes in local: ```python defs = Definitions( assets=[my_asset], resources={ "pipes_subprocess_client": PipesSubprocessClient(), "pipes_k8s_client": PipesK8sClient(), "pipes_docker_client": PipesDockerClient(), "pipes_databricks_client": PipesDatabricksClient(client=MagicMock()), }, ) ``` writes the following telemetry: ```bash { 'action': 'update_repo_stats', 'client_time': '2023-10-09 22:28:46.098683', 'event_id': '18b2fdc3-8b80-407b-879e-010f8b05b8f5', 'elapsed_time': '', 'instance_id': '9ed76fef-a574-4c90-9356-05e0edaa0345', 'metadata': { 'dagster_resources': [ { 'module_name': 'dagster_databricks', 'class_name': '_PipesDatabricksClient' }, { 'module_name': 'dagster_docker', 'class_name': '_PipesDockerClient' }, { 'module_name': 'dagster_k8s', 'class_name': '_PipesK8sClient' }, { 'module_name': 'dagster', 'class_name': '_PipesSubprocess' } ], 'has_custom_resources': 'False', 'num_pipelines_in_repo': '1', 'num_schedules_in_repo': '0', 'num_sensors_in_repo': '0', 'num_assets_in_repo': '1', 'num_source_assets_in_repo': '0', 'num_partitioned_assets_in_repo': '0', 'num_dynamic_partitioned_assets_in_repo': '0', 'num_multi_partitioned_assets_in_repo': '0', 'num_assets_with_freshness_policies_in_repo': '0', 'num_assets_with_eager_auto_materialize_policies_in_repo': '0', 'num_assets_with_lazy_auto_materialize_policies_in_repo': '0', 'num_assets_with_single_run_backfill_policies_in_repo': '0', 'num_assets_with_multi_run_backfill_policies_in_repo': '0', 'num_observable_source_assets_in_repo': '0', 'num_dbt_assets_in_repo': '0', 'num_assets_with_code_versions_in_repo': '0', 'num_asset_reconciliation_sensors_in_repo': '0', 'num_asset_checks': '1', 'source': 'dagit', 'pipeline_name_hash': '', 'repo_hash': 'f17e9128abe12b4ff329425c469a7c5abc06bace32a2237848bc3a71cf9ef808', 'location_name_hash': '04af437226cf0d37c15f17e4082824fe53867e3ba8ffc35f46ad78a33cda2160' }, 'python_version': '3.9.16', 'dagster_version': '1!0+dev', 'os_desc': 'macOS-13.5-arm64-arm-64bit', 'os_platform': 'Darwin', 'run_storage_id': '', 'is_known_ci_env': 'False' } ``` --- .../_core/definitions/resource_definition.py | 11 ++++++++++- .../host_representation/external_data.py | 9 +++++++-- .../dagster/dagster/_core/pipes/subprocess.py | 4 ++++ .../cli_tests/command_tests/test_telemetry.py | 19 +++++++++++++++++++ .../dagster_databricks/pipes.py | 4 ++++ .../dagster-docker/dagster_docker/pipes.py | 4 ++++ .../dagster-k8s/dagster_k8s/pipes.py | 4 ++++ 7 files changed, 52 insertions(+), 3 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/resource_definition.py b/python_modules/dagster/dagster/_core/definitions/resource_definition.py index 4e400630a1ce8..94f48314c9ab0 100644 --- a/python_modules/dagster/dagster/_core/definitions/resource_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/resource_definition.py @@ -107,6 +107,7 @@ def __init__( # this attribute will be updated by the @dagster_maintained_resource and @dagster_maintained_io_manager decorators self._dagster_maintained = False + self._hardcoded_resource_type = None @staticmethod def dagster_internal_init( @@ -182,7 +183,15 @@ def hardcoded_resource(value: Any, description: Optional[str] = None) -> "Resour Returns: [ResourceDefinition]: A hardcoded resource. """ - return ResourceDefinition(resource_fn=lambda _init_context: value, description=description) + resource_def = ResourceDefinition( + resource_fn=lambda _init_context: value, description=description + ) + # Make sure telemetry info gets passed in to hardcoded resources + if hasattr(value, "_is_dagster_maintained"): + resource_def._dagster_maintained = value._is_dagster_maintained() # noqa: SLF001 + resource_def._hardcoded_resource_type = type(value) # noqa: SLF001 + + return resource_def @public @staticmethod diff --git a/python_modules/dagster/dagster/_core/host_representation/external_data.py b/python_modules/dagster/dagster/_core/host_representation/external_data.py index 9c7a5bcfaced0..ddc2aed0f5cc4 100644 --- a/python_modules/dagster/dagster/_core/host_representation/external_data.py +++ b/python_modules/dagster/dagster/_core/host_representation/external_data.py @@ -1803,8 +1803,13 @@ def external_resource_data_from_def( # use the resource function name as the resource type if it's a function resource # (ie direct instantiation of ResourceDefinition or IOManagerDefinition) if type(resource_type_def) in (ResourceDefinition, IOManagerDefinition): - module_name = check.not_none(inspect.getmodule(resource_type_def.resource_fn)).__name__ - resource_type = f"{module_name}.{resource_type_def.resource_fn.__name__}" + original_resource_fn = ( + resource_type_def._hardcoded_resource_type # noqa: SLF001 + if resource_type_def._hardcoded_resource_type # noqa: SLF001 + else resource_type_def.resource_fn + ) + module_name = check.not_none(inspect.getmodule(original_resource_fn)).__name__ + resource_type = f"{module_name}.{original_resource_fn.__name__}" # if it's a Pythonic resource, get the underlying Pythonic class name elif isinstance( resource_type_def, diff --git a/python_modules/dagster/dagster/_core/pipes/subprocess.py b/python_modules/dagster/dagster/_core/pipes/subprocess.py index b65e91aeb0ffa..bd56eede481f7 100644 --- a/python_modules/dagster/dagster/_core/pipes/subprocess.py +++ b/python_modules/dagster/dagster/_core/pipes/subprocess.py @@ -64,6 +64,10 @@ def __init__( or PipesTempFileMessageReader() ) + @classmethod + def _is_dagster_maintained(cls) -> bool: + return True + @public def run( self, diff --git a/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_telemetry.py b/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_telemetry.py index 56c095844627d..33ee2599b1471 100644 --- a/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_telemetry.py +++ b/python_modules/dagster/dagster_tests/cli_tests/command_tests/test_telemetry.py @@ -14,6 +14,7 @@ DynamicPartitionsDefinition, FreshnessPolicy, MultiPartitionsDefinition, + PipesSubprocessClient, SourceAsset, StaticPartitionsDefinition, asset, @@ -510,6 +511,24 @@ def asset1(): ... assert stats["has_custom_resources"] == "True" +def test_get_stats_from_external_repo_pipes_client(): + external_repo = ExternalRepository( + external_repository_data_from_def( + Definitions( + resources={ + "pipes_subprocess_client": PipesSubprocessClient(), + }, + ).get_repository_def() + ), + repository_handle=MagicMock(spec=RepositoryHandle), + ) + stats = get_stats_from_external_repo(external_repo) + assert stats["dagster_resources"] == [ + {"module_name": "dagster", "class_name": "_PipesSubprocess"} + ] + assert stats["has_custom_resources"] == "False" + + def test_get_stats_from_external_repo_delayed_resource_configuration(): class MyResource(ConfigurableResource): foo: str diff --git a/python_modules/libraries/dagster-databricks/dagster_databricks/pipes.py b/python_modules/libraries/dagster-databricks/dagster_databricks/pipes.py index 2720fe796e56c..39ecd08d28316 100644 --- a/python_modules/libraries/dagster-databricks/dagster_databricks/pipes.py +++ b/python_modules/libraries/dagster-databricks/dagster_databricks/pipes.py @@ -71,6 +71,10 @@ def __init__( PipesMessageReader, ) or PipesDbfsMessageReader(client=self.client) + @classmethod + def _is_dagster_maintained(cls) -> bool: + return True + def run( self, *, diff --git a/python_modules/libraries/dagster-docker/dagster_docker/pipes.py b/python_modules/libraries/dagster-docker/dagster_docker/pipes.py index 76a17d4dfa90e..694daf1842ad8 100644 --- a/python_modules/libraries/dagster-docker/dagster_docker/pipes.py +++ b/python_modules/libraries/dagster-docker/dagster_docker/pipes.py @@ -102,6 +102,10 @@ def __init__( or PipesDockerLogsMessageReader() ) + @classmethod + def _is_dagster_maintained(cls) -> bool: + return True + def run( self, *, diff --git a/python_modules/libraries/dagster-k8s/dagster_k8s/pipes.py b/python_modules/libraries/dagster-k8s/dagster_k8s/pipes.py index debf2dbccac2d..ec9779078d198 100644 --- a/python_modules/libraries/dagster-k8s/dagster_k8s/pipes.py +++ b/python_modules/libraries/dagster-k8s/dagster_k8s/pipes.py @@ -124,6 +124,10 @@ def __init__( or PipesK8sPodLogsMessageReader() ) + @classmethod + def _is_dagster_maintained(cls) -> bool: + return True + def run( self, *,