Skip to content

Commit

Permalink
add resource telemetry to pipes client resources (#17111)
Browse files Browse the repository at this point in the history
## Summary & Motivation
This PR adds telemetry tracking to the following resourcers:
- PipesSubprocessClient
- PipesK8sClient
- PipesDockerClient
- PipesDatabricksClient

These resources are implemented in the
`ResourceParam[TheBarePythonObject]` pattern such as
[here](https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/_core/pipes/subprocess.py#L113),
which goes through the hardcoded_resource path in core. So this PR
threads the info through hardcoded_resource into external resource data,
in order to pass in the correct `dagster_maintained` value and the
correct original resource type. I'm entirely not sure if this is a
reasonable implementation to check in. Open to more reasonable paths!

## How I Tested These Changes
in local:
```python
defs = Definitions(
    assets=[my_asset],
    resources={
        "pipes_subprocess_client": PipesSubprocessClient(),
        "pipes_k8s_client": PipesK8sClient(),
        "pipes_docker_client": PipesDockerClient(),
        "pipes_databricks_client": PipesDatabricksClient(client=MagicMock()),
    },
)
```
writes the following telemetry:
```bash
{
  'action': 'update_repo_stats',
  'client_time': '2023-10-09 22:28:46.098683',
  'event_id': '18b2fdc3-8b80-407b-879e-010f8b05b8f5',
  'elapsed_time': '',
  'instance_id': '9ed76fef-a574-4c90-9356-05e0edaa0345',
  'metadata': {
    'dagster_resources': [
      {
        'module_name': 'dagster_databricks',
        'class_name': '_PipesDatabricksClient'
      },
      {
        'module_name': 'dagster_docker',
        'class_name': '_PipesDockerClient'
      },
      {
        'module_name': 'dagster_k8s',
        'class_name': '_PipesK8sClient'
      },
      {
        'module_name': 'dagster',
        'class_name': '_PipesSubprocess'
      }
    ],
    'has_custom_resources': 'False',
    'num_pipelines_in_repo': '1',
    'num_schedules_in_repo': '0',
    'num_sensors_in_repo': '0',
    'num_assets_in_repo': '1',
    'num_source_assets_in_repo': '0',
    'num_partitioned_assets_in_repo': '0',
    'num_dynamic_partitioned_assets_in_repo': '0',
    'num_multi_partitioned_assets_in_repo': '0',
    'num_assets_with_freshness_policies_in_repo': '0',
    'num_assets_with_eager_auto_materialize_policies_in_repo': '0',
    'num_assets_with_lazy_auto_materialize_policies_in_repo': '0',
    'num_assets_with_single_run_backfill_policies_in_repo': '0',
    'num_assets_with_multi_run_backfill_policies_in_repo': '0',
    'num_observable_source_assets_in_repo': '0',
    'num_dbt_assets_in_repo': '0',
    'num_assets_with_code_versions_in_repo': '0',
    'num_asset_reconciliation_sensors_in_repo': '0',
    'num_asset_checks': '1',
    'source': 'dagit',
    'pipeline_name_hash': '',
    'repo_hash': 'f17e9128abe12b4ff329425c469a7c5abc06bace32a2237848bc3a71cf9ef808',
    'location_name_hash': '04af437226cf0d37c15f17e4082824fe53867e3ba8ffc35f46ad78a33cda2160'
  },
  'python_version': '3.9.16',
  'dagster_version': '1!0+dev',
  'os_desc': 'macOS-13.5-arm64-arm-64bit',
  'os_platform': 'Darwin',
  'run_storage_id': '',
  'is_known_ci_env': 'False'
}
```
  • Loading branch information
yuhan authored and dpeng817 committed Oct 11, 2023
1 parent 0c702a3 commit 7367aed
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def __init__(

# this attribute will be updated by the @dagster_maintained_resource and @dagster_maintained_io_manager decorators
self._dagster_maintained = False
self._hardcoded_resource_type = None

@staticmethod
def dagster_internal_init(
Expand Down Expand Up @@ -182,7 +183,15 @@ def hardcoded_resource(value: Any, description: Optional[str] = None) -> "Resour
Returns:
[ResourceDefinition]: A hardcoded resource.
"""
return ResourceDefinition(resource_fn=lambda _init_context: value, description=description)
resource_def = ResourceDefinition(
resource_fn=lambda _init_context: value, description=description
)
# Make sure telemetry info gets passed in to hardcoded resources
if hasattr(value, "_is_dagster_maintained"):
resource_def._dagster_maintained = value._is_dagster_maintained() # noqa: SLF001
resource_def._hardcoded_resource_type = type(value) # noqa: SLF001

return resource_def

@public
@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1803,8 +1803,13 @@ def external_resource_data_from_def(
# use the resource function name as the resource type if it's a function resource
# (ie direct instantiation of ResourceDefinition or IOManagerDefinition)
if type(resource_type_def) in (ResourceDefinition, IOManagerDefinition):
module_name = check.not_none(inspect.getmodule(resource_type_def.resource_fn)).__name__
resource_type = f"{module_name}.{resource_type_def.resource_fn.__name__}"
original_resource_fn = (
resource_type_def._hardcoded_resource_type # noqa: SLF001
if resource_type_def._hardcoded_resource_type # noqa: SLF001
else resource_type_def.resource_fn
)
module_name = check.not_none(inspect.getmodule(original_resource_fn)).__name__
resource_type = f"{module_name}.{original_resource_fn.__name__}"
# if it's a Pythonic resource, get the underlying Pythonic class name
elif isinstance(
resource_type_def,
Expand Down
4 changes: 4 additions & 0 deletions python_modules/dagster/dagster/_core/pipes/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ def __init__(
or PipesTempFileMessageReader()
)

@classmethod
def _is_dagster_maintained(cls) -> bool:
return True

@public
def run(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
DynamicPartitionsDefinition,
FreshnessPolicy,
MultiPartitionsDefinition,
PipesSubprocessClient,
SourceAsset,
StaticPartitionsDefinition,
asset,
Expand Down Expand Up @@ -510,6 +511,24 @@ def asset1(): ...
assert stats["has_custom_resources"] == "True"


def test_get_stats_from_external_repo_pipes_client():
external_repo = ExternalRepository(
external_repository_data_from_def(
Definitions(
resources={
"pipes_subprocess_client": PipesSubprocessClient(),
},
).get_repository_def()
),
repository_handle=MagicMock(spec=RepositoryHandle),
)
stats = get_stats_from_external_repo(external_repo)
assert stats["dagster_resources"] == [
{"module_name": "dagster", "class_name": "_PipesSubprocess"}
]
assert stats["has_custom_resources"] == "False"


def test_get_stats_from_external_repo_delayed_resource_configuration():
class MyResource(ConfigurableResource):
foo: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ def __init__(
PipesMessageReader,
) or PipesDbfsMessageReader(client=self.client)

@classmethod
def _is_dagster_maintained(cls) -> bool:
return True

def run(
self,
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ def __init__(
or PipesDockerLogsMessageReader()
)

@classmethod
def _is_dagster_maintained(cls) -> bool:
return True

def run(
self,
*,
Expand Down
4 changes: 4 additions & 0 deletions python_modules/libraries/dagster-k8s/dagster_k8s/pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ def __init__(
or PipesK8sPodLogsMessageReader()
)

@classmethod
def _is_dagster_maintained(cls) -> bool:
return True

def run(
self,
*,
Expand Down

0 comments on commit 7367aed

Please sign in to comment.