From a77f213a1a43ddfe58eaab10dc5ca9eedff7857c Mon Sep 17 00:00:00 2001 From: Mathieu Larose Date: Fri, 10 Jan 2025 09:15:30 -0500 Subject: [PATCH] [dagster-azure]: log manager metadata represented as namedtuple instead of json string --- .../test_compute_log_manager.py | 9 ++--- .../dagster-ui/packages/ui-core/client.json | 4 +- .../ui-core/src/graphql/schema.graphql | 11 +++++- .../packages/ui-core/src/graphql/types.ts | 38 ++++++++++++++++++- .../ui-core/src/runs/CapturedLogPanel.tsx | 17 +++------ .../packages/ui-core/src/runs/LogsRow.tsx | 8 +++- .../ui-core/src/runs/RunMetadataProvider.tsx | 12 ++++-- .../src/runs/types/LogsProvider.types.ts | 25 +++++++++--- .../ui-core/src/runs/types/LogsRow.types.ts | 7 +++- ...LogsScrollingTableMessageFragment.types.ts | 7 +++- .../src/runs/types/RunFragments.types.ts | 7 +++- .../runs/types/RunMetadataProvider.types.ts | 7 +++- .../dagster_graphql/implementation/events.py | 2 +- .../dagster_graphql/schema/logs/events.py | 22 ++++++++++- .../dagster/dagster/_core/events/__init__.py | 10 +++-- .../cloud_storage_compute_log_manager.py | 14 ++----- .../_core/storage/compute_log_manager.py | 34 +++++++++++++++-- .../dagster_azure/blob/compute_log_manager.py | 21 +++++----- 18 files changed, 192 insertions(+), 63 deletions(-) diff --git a/integration_tests/test_suites/dagster-azure-live-tests/integration_tests/test_compute_log_manager.py b/integration_tests/test_suites/dagster-azure-live-tests/integration_tests/test_compute_log_manager.py index c87434fe3dc4a..72f0df45283f8 100644 --- a/integration_tests/test_suites/dagster-azure-live-tests/integration_tests/test_compute_log_manager.py +++ b/integration_tests/test_suites/dagster-azure-live-tests/integration_tests/test_compute_log_manager.py @@ -1,4 +1,3 @@ -import json import os import subprocess from pathlib import Path @@ -66,7 +65,7 @@ def test_compute_log_manager( assert logs_captured_data.external_stderr_url.endswith(logs_captured_data.stderr_uri_or_path) assert logs_captured_data.log_manager_metadata - metadata = json.loads(logs_captured_data.log_manager_metadata) - assert metadata["type"] == "AzureBlobComputeLogManager" - assert metadata["storage_account"] == os.getenv("TEST_AZURE_STORAGE_ACCOUNT_ID") - assert metadata["container"] == os.getenv("TEST_AZURE_CONTAINER_ID") + metadata = logs_captured_data.log_manager_metadata + assert metadata.log_manager_class == "AzureBlobComputeLogManager" + assert metadata.container == os.getenv("TEST_AZURE_CONTAINER_ID") + assert metadata.storage_account == os.getenv("TEST_AZURE_STORAGE_ACCOUNT_ID") diff --git a/js_modules/dagster-ui/packages/ui-core/client.json b/js_modules/dagster-ui/packages/ui-core/client.json index b6de886010f7a..bbfa9f585a3d6 100644 --- a/js_modules/dagster-ui/packages/ui-core/client.json +++ b/js_modules/dagster-ui/packages/ui-core/client.json @@ -118,8 +118,8 @@ "CapturedLogsSubscription": "fa5e55b59e9d8632ae71a8387c54230ba71e6f57849a974225ba039808acfa93", "CapturedLogsMetadataQuery": "b59ada7585593473002a7b044f09daa85f160445cbc9a4e8ffe0b46d51875cb1", "CapturedLogsQuery": "872b617b4f33ee5f6feeba9a4c76ec986fca357695a114e3d7b63172e4600b57", - "PipelineRunLogsSubscription": "2f30c164760561ac620a80e7150088f0f5df364e89aa32648c1ecd9cd6698b8d", - "RunLogsQuery": "fc247c26adf353841ebe6e395922586491da0657d7f1f828cc4c066c1eec90d9", + "PipelineRunLogsSubscription": "4e2c6f3f97fcda52e4df12ce253bf0e9be1786261fd3bcc52dd4e7b207b5e23f", + "RunLogsQuery": "b22469d32fc7bdd49032360fdd7aa64adf128907f50ae28aa8632c06f80daef2", "QueuedRunCriteriaQuery": "da19aeed8a0a7e6f47619c6ba9efd721345481d8f08223282ea774e468400f21", "QueueDaemonStatusQuery": "aa51c596ee907346e60e2fe173bba10ae2ead067d45109225a2cd400a2278841", "PipelineEnvironmentQuery": "3b668b028997fb35b17b4d8a90a18b78dd8a70910f2c12aac63065c0584e3a10", diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index c84111d6706be..c669445539469 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -1240,13 +1240,22 @@ type LogsCapturedEvent implements MessageEvent { externalUrl: String externalStdoutUrl: String externalStderrUrl: String - logManagerMetadata: String + logManagerMetadataRaw: JSONString + logManagerMetadata: LogManagerMetadata stderrUriOrPath: String stdoutUriOrPath: String pid: Int logKey: String! } +scalar JSONString + +type LogManagerMetadata { + logManagerClass: String! + storageAccount: String + container: String +} + type AlertStartEvent implements MessageEvent & RunEvent { runId: String! message: String! diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index e53fcf036ac68..ec1b053ce7503 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -17,6 +17,7 @@ export type Scalars = { Int: {input: number; output: number}; Float: {input: number; output: number}; GenericScalar: {input: any; output: any}; + JSONString: {input: any; output: any}; RunConfigData: {input: any; output: any}; }; @@ -2397,6 +2398,13 @@ export enum LogLevel { WARNING = 'WARNING', } +export type LogManagerMetadata = { + __typename: 'LogManagerMetadata'; + container: Maybe; + logManagerClass: Scalars['String']['output']; + storageAccount: Maybe; +}; + export type LogMessageEvent = MessageEvent & { __typename: 'LogMessageEvent'; eventType: Maybe; @@ -2431,7 +2439,8 @@ export type LogsCapturedEvent = MessageEvent & { fileKey: Scalars['String']['output']; level: LogLevel; logKey: Scalars['String']['output']; - logManagerMetadata: Maybe; + logManagerMetadata: Maybe; + logManagerMetadataRaw: Maybe; message: Scalars['String']['output']; pid: Maybe; runId: Scalars['String']['output']; @@ -9759,6 +9768,25 @@ export const buildLocationStateChangeSubscription = ( }; }; +export const buildLogManagerMetadata = ( + overrides?: Partial, + _relationshipsToOmit: Set = new Set(), +): {__typename: 'LogManagerMetadata'} & LogManagerMetadata => { + const relationshipsToOmit: Set = new Set(_relationshipsToOmit); + relationshipsToOmit.add('LogManagerMetadata'); + return { + __typename: 'LogManagerMetadata', + container: + overrides && overrides.hasOwnProperty('container') ? overrides.container! : 'architecto', + logManagerClass: + overrides && overrides.hasOwnProperty('logManagerClass') + ? overrides.logManagerClass! + : 'placeat', + storageAccount: + overrides && overrides.hasOwnProperty('storageAccount') ? overrides.storageAccount! : 'quasi', + }; +}; + export const buildLogMessageEvent = ( overrides?: Partial, _relationshipsToOmit: Set = new Set(), @@ -9844,7 +9872,13 @@ export const buildLogsCapturedEvent = ( logManagerMetadata: overrides && overrides.hasOwnProperty('logManagerMetadata') ? overrides.logManagerMetadata! - : 'distinctio', + : relationshipsToOmit.has('LogManagerMetadata') + ? ({} as LogManagerMetadata) + : buildLogManagerMetadata({}, relationshipsToOmit), + logManagerMetadataRaw: + overrides && overrides.hasOwnProperty('logManagerMetadataRaw') + ? overrides.logManagerMetadataRaw! + : 'nisi', message: overrides && overrides.hasOwnProperty('message') ? overrides.message! : 'ex', pid: overrides && overrides.hasOwnProperty('pid') ? overrides.pid! : 7623, runId: overrides && overrides.hasOwnProperty('runId') ? overrides.runId! : 'modi', diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/CapturedLogPanel.tsx b/js_modules/dagster-ui/packages/ui-core/src/runs/CapturedLogPanel.tsx index 200e8dc8360e4..cd85d21d69b35 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/CapturedLogPanel.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/runs/CapturedLogPanel.tsx @@ -45,17 +45,12 @@ export const CapturedOrExternalLogPanel = React.memo( if (logCaptureInfo?.logManagerMetadata) { const path = ioType === 'stdout' ? logCaptureInfo.stdoutUriOrPath : logCaptureInfo.stderrUriOrPath; - - try { - const metadata = JSON.parse(logCaptureInfo.logManagerMetadata); - switch (metadata.type) { - case 'AzureBlobComputeLogManager': - if (metadata.storage_account && metadata.container) { - return `az storage blob download --account-name ${metadata.storage_account} --container-name ${metadata.container} --name ${path}`; - } - } - } catch { - return undefined; + const metadata = logCaptureInfo.logManagerMetadata; + switch (metadata.logManagerClass) { + case 'AzureBlobComputeLogManager': + if (metadata.storageAccount && metadata.container) { + return `az storage blob download --account-name ${metadata.storageAccount} --container-name ${metadata.container} --name ${path}`; + } } } return undefined; diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/LogsRow.tsx b/js_modules/dagster-ui/packages/ui-core/src/runs/LogsRow.tsx index 9a3e4861e0869..e6d0f94e78596 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/LogsRow.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/runs/LogsRow.tsx @@ -197,7 +197,13 @@ export const LOGS_ROW_STRUCTURED_FRAGMENT = gql` externalUrl externalStdoutUrl externalStderrUrl - logManagerMetadata + logManagerMetadata { + ... on LogManagerMetadata { + logManagerClass + container + storageAccount + } + } stdoutUriOrPath stderrUriOrPath } diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/RunMetadataProvider.tsx b/js_modules/dagster-ui/packages/ui-core/src/runs/RunMetadataProvider.tsx index cc07a2a8ca6d5..95138fd8345d2 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/RunMetadataProvider.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/runs/RunMetadataProvider.tsx @@ -12,7 +12,7 @@ import { RunStepStatsQuery, RunStepStatsQueryVariables, } from './types/RunMetadataProvider.types'; -import {StepEventStatus} from '../graphql/types'; +import {LogManagerMetadata, StepEventStatus} from '../graphql/types'; import {METADATA_ENTRY_FRAGMENT} from '../metadata/MetadataEntryFragment'; export enum IStepState { @@ -73,7 +73,7 @@ export interface ILogCaptureInfo { pid?: string; externalStdoutUrl?: string; externalStderrUrl?: string; - logManagerMetadata?: string; + logManagerMetadata?: LogManagerMetadata; stdoutUriOrPath?: string; stderrUriOrPath?: string; } @@ -460,7 +460,13 @@ export const RUN_METADATA_PROVIDER_MESSAGE_FRAGMENT = gql` pid externalStdoutUrl externalStderrUrl - logManagerMetadata + logManagerMetadata { + ... on LogManagerMetadata { + logManagerClass + container + storageAccount + } + } stdoutUriOrPath stderrUriOrPath } diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsProvider.types.ts b/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsProvider.types.ts index 7ee68233e62e8..88aa289114c72 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsProvider.types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsProvider.types.ts @@ -1654,11 +1654,16 @@ export type PipelineRunLogsSubscription = { pid: number | null; externalStdoutUrl: string | null; externalStderrUrl: string | null; - logManagerMetadata: string | null; stdoutUriOrPath: string | null; stderrUriOrPath: string | null; eventType: Types.DagsterEventType | null; externalUrl: string | null; + logManagerMetadata: { + __typename: 'LogManagerMetadata'; + logManagerClass: string; + container: string | null; + storageAccount: string | null; + } | null; } | { __typename: 'MaterializationEvent'; @@ -4999,11 +5004,16 @@ export type RunLogsSubscriptionSuccessFragment = { pid: number | null; externalStdoutUrl: string | null; externalStderrUrl: string | null; - logManagerMetadata: string | null; stdoutUriOrPath: string | null; stderrUriOrPath: string | null; eventType: Types.DagsterEventType | null; externalUrl: string | null; + logManagerMetadata: { + __typename: 'LogManagerMetadata'; + logManagerClass: string; + container: string | null; + storageAccount: string | null; + } | null; } | { __typename: 'MaterializationEvent'; @@ -8353,11 +8363,16 @@ export type RunLogsQuery = { pid: number | null; externalStdoutUrl: string | null; externalStderrUrl: string | null; - logManagerMetadata: string | null; stdoutUriOrPath: string | null; stderrUriOrPath: string | null; eventType: Types.DagsterEventType | null; externalUrl: string | null; + logManagerMetadata: { + __typename: 'LogManagerMetadata'; + logManagerClass: string; + container: string | null; + storageAccount: string | null; + } | null; } | { __typename: 'MaterializationEvent'; @@ -10093,6 +10108,6 @@ export type RunLogsQuery = { | {__typename: 'RunNotFoundError'}; }; -export const PipelineRunLogsSubscriptionVersion = '2f30c164760561ac620a80e7150088f0f5df364e89aa32648c1ecd9cd6698b8d'; +export const PipelineRunLogsSubscriptionVersion = '4e2c6f3f97fcda52e4df12ce253bf0e9be1786261fd3bcc52dd4e7b207b5e23f'; -export const RunLogsQueryVersion = 'fc247c26adf353841ebe6e395922586491da0657d7f1f828cc4c066c1eec90d9'; +export const RunLogsQueryVersion = 'b22469d32fc7bdd49032360fdd7aa64adf128907f50ae28aa8632c06f80daef2'; diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsRow.types.ts b/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsRow.types.ts index a965a910be967..991e4c7eaf7fa 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsRow.types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsRow.types.ts @@ -1484,9 +1484,14 @@ export type LogsRowStructuredFragment_LogsCapturedEvent = { externalUrl: string | null; externalStdoutUrl: string | null; externalStderrUrl: string | null; - logManagerMetadata: string | null; stdoutUriOrPath: string | null; stderrUriOrPath: string | null; + logManagerMetadata: { + __typename: 'LogManagerMetadata'; + logManagerClass: string; + container: string | null; + storageAccount: string | null; + } | null; }; export type LogsRowStructuredFragment_MaterializationEvent = { diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsScrollingTableMessageFragment.types.ts b/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsScrollingTableMessageFragment.types.ts index 9ba1cae4d579f..9a5423ee3c8d3 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsScrollingTableMessageFragment.types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/runs/types/LogsScrollingTableMessageFragment.types.ts @@ -1484,9 +1484,14 @@ export type LogsScrollingTableMessageFragment_LogsCapturedEvent = { externalUrl: string | null; externalStdoutUrl: string | null; externalStderrUrl: string | null; - logManagerMetadata: string | null; stdoutUriOrPath: string | null; stderrUriOrPath: string | null; + logManagerMetadata: { + __typename: 'LogManagerMetadata'; + logManagerClass: string; + container: string | null; + storageAccount: string | null; + } | null; }; export type LogsScrollingTableMessageFragment_MaterializationEvent = { diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/types/RunFragments.types.ts b/js_modules/dagster-ui/packages/ui-core/src/runs/types/RunFragments.types.ts index 845a3c848ff51..3f570e9dfbb2f 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/types/RunFragments.types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/runs/types/RunFragments.types.ts @@ -1532,11 +1532,16 @@ export type RunDagsterRunEventFragment_LogsCapturedEvent = { pid: number | null; externalStdoutUrl: string | null; externalStderrUrl: string | null; - logManagerMetadata: string | null; stdoutUriOrPath: string | null; stderrUriOrPath: string | null; eventType: Types.DagsterEventType | null; externalUrl: string | null; + logManagerMetadata: { + __typename: 'LogManagerMetadata'; + logManagerClass: string; + container: string | null; + storageAccount: string | null; + } | null; }; export type RunDagsterRunEventFragment_MaterializationEvent = { diff --git a/js_modules/dagster-ui/packages/ui-core/src/runs/types/RunMetadataProvider.types.ts b/js_modules/dagster-ui/packages/ui-core/src/runs/types/RunMetadataProvider.types.ts index dfec67ff85f47..7a88016bb49ca 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/runs/types/RunMetadataProvider.types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/runs/types/RunMetadataProvider.types.ts @@ -207,9 +207,14 @@ export type RunMetadataProviderMessageFragment_LogsCapturedEvent = { pid: number | null; externalStdoutUrl: string | null; externalStderrUrl: string | null; - logManagerMetadata: string | null; stdoutUriOrPath: string | null; stderrUriOrPath: string | null; + logManagerMetadata: { + __typename: 'LogManagerMetadata'; + logManagerClass: string; + container: string | null; + storageAccount: string | null; + } | null; }; export type RunMetadataProviderMessageFragment_MaterializationEvent = { diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/events.py b/python_modules/dagster-graphql/dagster_graphql/implementation/events.py index b2cf8ef57e343..50ee6221085b6 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/events.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/events.py @@ -427,7 +427,7 @@ def from_dagster_event_record(event_record: EventLogEntry, pipeline_name: str) - externalUrl=data.external_url, externalStdoutUrl=data.external_stdout_url or data.external_url, externalStderrUrl=data.external_stderr_url or data.external_url, - logManagerMetadata=data.log_manager_metadata, + logManagerMetadataRaw=data.log_manager_metadata, stdoutUriOrPath=data.stdout_uri_or_path, stderrUriOrPath=data.stderr_uri_or_path, pid=dagster_event.pid, diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/logs/events.py b/python_modules/dagster-graphql/dagster_graphql/schema/logs/events.py index b4a860f3c7f0e..75c2a010e23e9 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/logs/events.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/logs/events.py @@ -299,6 +299,15 @@ class Meta: name = "HookErroredEvent" +class GrapheneLogManagerMetadata(graphene.ObjectType): + class Meta: + name = "LogManagerMetadata" + + logManagerClass = graphene.String(required=True) + storageAccount = graphene.String() + container = graphene.String() + + class GrapheneLogsCapturedEvent(graphene.ObjectType): class Meta: interfaces = (GrapheneMessageEvent,) @@ -309,7 +318,8 @@ class Meta: externalUrl = graphene.String() externalStdoutUrl = graphene.String() externalStderrUrl = graphene.String() - logManagerMetadata = graphene.String() # TODO - replace with union? + logManagerMetadataRaw = graphene.JSONString() + logManagerMetadata = graphene.Field(GrapheneLogManagerMetadata) stderrUriOrPath = graphene.String() stdoutUriOrPath = graphene.String() pid = graphene.Int() @@ -317,6 +327,16 @@ class Meta: # renamed to fileKey for newer versions of the Dagster UI logKey = graphene.NonNull(graphene.String) + def resolve_logManagerMetadata(self, info): + if not self.logManagerMetadataRaw: + return None + + return GrapheneLogManagerMetadata( + logManagerClass=self.logManagerMetadataRaw.log_manager_class, + container=self.logManagerMetadataRaw.container, + storageAccount=self.logManagerMetadataRaw.storage_account, + ) + def _construct_asset_event_metadata_params(event, metadata): metadata_params = {"label": metadata.label, "description": metadata.description} diff --git a/python_modules/dagster/dagster/_core/events/__init__.py b/python_modules/dagster/dagster/_core/events/__init__.py index 518a59916ccf3..e62e9a83a0a83 100644 --- a/python_modules/dagster/dagster/_core/events/__init__.py +++ b/python_modules/dagster/dagster/_core/events/__init__.py @@ -47,7 +47,7 @@ from dagster._core.execution.plan.objects import StepFailureData, StepRetryData, StepSuccessData from dagster._core.execution.plan.outputs import StepOutputData from dagster._core.log_manager import DagsterLogManager -from dagster._core.storage.compute_log_manager import CapturedLogContext +from dagster._core.storage.compute_log_manager import CapturedLogContext, LogManagerMetadata from dagster._core.storage.dagster_run import DagsterRunStatus from dagster._serdes import NamedTupleSerializer, whitelist_for_serdes from dagster._serdes.serdes import EnumSerializer, UnpackContext, is_whitelisted_for_serdes_object @@ -1877,7 +1877,7 @@ class ComputeLogsCaptureData( ("external_url", Optional[str]), ("external_stdout_url", Optional[str]), ("external_stderr_url", Optional[str]), - ("log_manager_metadata", Optional[str]), + ("log_manager_metadata", Optional[LogManagerMetadata]), ("stdout_uri_or_path", Optional[str]), ("stderr_uri_or_path", Optional[str]), ], @@ -1890,7 +1890,7 @@ def __new__( external_url: Optional[str] = None, external_stdout_url: Optional[str] = None, external_stderr_url: Optional[str] = None, - log_manager_metadata: Optional[str] = None, + log_manager_metadata: Optional[LogManagerMetadata] = None, stdout_uri_or_path: Optional[str] = None, stderr_uri_or_path: Optional[str] = None, ): @@ -1901,7 +1901,9 @@ def __new__( external_url=check.opt_str_param(external_url, "external_url"), external_stdout_url=check.opt_str_param(external_stdout_url, "external_stdout_url"), external_stderr_url=check.opt_str_param(external_stderr_url, "external_stderr_url"), - log_manager_metadata=check.opt_str_param(log_manager_metadata, "log_manager_metadata"), + log_manager_metadata=check.opt_inst_param( + log_manager_metadata, "log_manager_metadata", LogManagerMetadata + ), stdout_uri_or_path=check.opt_str_param(stdout_uri_or_path, "stdout_uri_or_path"), stderr_uri_or_path=check.opt_str_param(stderr_uri_or_path, "stderr_uri_or_path"), ) diff --git a/python_modules/dagster/dagster/_core/storage/cloud_storage_compute_log_manager.py b/python_modules/dagster/dagster/_core/storage/cloud_storage_compute_log_manager.py index 26e80f1c61076..bc3359af02c33 100644 --- a/python_modules/dagster/dagster/_core/storage/cloud_storage_compute_log_manager.py +++ b/python_modules/dagster/dagster/_core/storage/cloud_storage_compute_log_manager.py @@ -16,6 +16,7 @@ CapturedLogSubscription, ComputeIOType, ComputeLogManager, + LogManagerMetadata, ) from dagster._core.storage.local_compute_log_manager import ( IO_TYPE_EXTENSION, @@ -55,16 +56,9 @@ def uri_or_path_for_type(self, log_key: Sequence[str], io_type: ComputeIOType) - """Calculates a download uri given a log key and compute io type.""" return None - def get_log_manager_metadata(self) -> dict[str, str]: + def get_log_manager_metadata(self) -> Optional[LogManagerMetadata]: """Returns metadata about the log manager.""" - return {"type": self.__class__.__name__} - - def get_serialized_log_manager_metadata(self) -> Optional[str]: - """Returns serialized metadata about the log manager.""" - metadata = self.get_log_manager_metadata() - if not metadata: - return None - return json.dumps(metadata) + return None @abstractmethod def display_path_for_type(self, log_key: Sequence[str], io_type: ComputeIOType) -> str: @@ -151,7 +145,7 @@ def get_log_metadata(self, log_key: Sequence[str]) -> CapturedLogMetadata: stderr_location=self.display_path_for_type(log_key, ComputeIOType.STDERR), stdout_download_url=self.download_url_for_type(log_key, ComputeIOType.STDOUT), stderr_download_url=self.download_url_for_type(log_key, ComputeIOType.STDERR), - log_manager_metadata=self.get_serialized_log_manager_metadata(), + log_manager_metadata=self.get_log_manager_metadata() or None, stdout_uri_or_path=self.uri_or_path_for_type(log_key, ComputeIOType.STDOUT), stderr_uri_or_path=self.uri_or_path_for_type(log_key, ComputeIOType.STDERR), ) diff --git a/python_modules/dagster/dagster/_core/storage/compute_log_manager.py b/python_modules/dagster/dagster/_core/storage/compute_log_manager.py index f0595502092c1..5847ab1a37ccb 100644 --- a/python_modules/dagster/dagster/_core/storage/compute_log_manager.py +++ b/python_modules/dagster/dagster/_core/storage/compute_log_manager.py @@ -10,6 +10,7 @@ import dagster._check as check from dagster._core.captured_log_api import LogLineCursor from dagster._core.instance import MayHaveInstanceWeakref, T_DagsterInstance +from dagster._serdes import whitelist_for_serdes MAX_BYTES_CHUNK_READ: Final = 4194304 # 4 MB @@ -19,6 +20,31 @@ class ComputeIOType(Enum): STDERR = "stderr" +@whitelist_for_serdes +class LogManagerMetadata( + NamedTuple( + "_LogManagerMetadata", + [ + ("log_manager_class", str), + ("container", Optional[str]), + ("storage_account", Optional[str]), + ], + ) +): + def __new__( + cls, + log_manager_class: str, + container: Optional[str] = None, + storage_account: Optional[str] = None, + ): + return super().__new__( + cls, + log_manager_class=log_manager_class, + container=container, + storage_account=storage_account, + ) + + class CapturedLogContext( NamedTuple( "_CapturedLogContext", @@ -27,7 +53,7 @@ class CapturedLogContext( ("external_url", Optional[str]), ("external_stdout_url", Optional[str]), ("external_stderr_url", Optional[str]), - ("log_manager_metadata", Optional[str]), + ("log_manager_metadata", Optional[LogManagerMetadata]), ("stdout_uri_or_path", Optional[str]), ("stderr_uri_or_path", Optional[str]), ], @@ -44,7 +70,7 @@ def __new__( external_stdout_url: Optional[str] = None, external_stderr_url: Optional[str] = None, external_url: Optional[str] = None, - log_manager_metadata: Optional[str] = None, + log_manager_metadata: Optional[LogManagerMetadata] = None, stdout_uri_or_path: Optional[str] = None, stderr_uri_or_path: Optional[str] = None, ): @@ -99,7 +125,7 @@ class CapturedLogMetadata( ("stderr_location", Optional[str]), ("stdout_download_url", Optional[str]), ("stderr_download_url", Optional[str]), - ("log_manager_metadata", Optional[str]), + ("log_manager_metadata", Optional[LogManagerMetadata]), ("stdout_uri_or_path", Optional[str]), ("stderr_uri_or_path", Optional[str]), ], @@ -117,7 +143,7 @@ def __new__( stderr_location: Optional[str] = None, stdout_download_url: Optional[str] = None, stderr_download_url: Optional[str] = None, - log_manager_metadata: Optional[str] = None, + log_manager_metadata: Optional[LogManagerMetadata] = None, stdout_uri_or_path: Optional[str] = None, stderr_uri_or_path: Optional[str] = None, ): diff --git a/python_modules/libraries/dagster-azure/dagster_azure/blob/compute_log_manager.py b/python_modules/libraries/dagster-azure/dagster_azure/blob/compute_log_manager.py index b4d3839c5c533..ec13262e7c658 100644 --- a/python_modules/libraries/dagster-azure/dagster_azure/blob/compute_log_manager.py +++ b/python_modules/libraries/dagster-azure/dagster_azure/blob/compute_log_manager.py @@ -19,7 +19,11 @@ CloudStorageComputeLogManager, PollingComputeLogSubscriptionManager, ) -from dagster._core.storage.compute_log_manager import CapturedLogContext, ComputeIOType +from dagster._core.storage.compute_log_manager import ( + CapturedLogContext, + ComputeIOType, + LogManagerMetadata, +) from dagster._core.storage.local_compute_log_manager import ( IO_TYPE_EXTENSION, LocalComputeLogManager, @@ -279,13 +283,12 @@ def download_url_for_type(self, log_key: Sequence[str], io_type: ComputeIOType): self._download_urls[blob_key] = url return url - def get_log_manager_metadata(self) -> dict[str, str]: - metadata = super().get_log_manager_metadata() - return { - **metadata, - "storage_account": self._storage_account, - "container": self._container, - } + def get_log_manager_metadata(self) -> Optional[LogManagerMetadata]: + return LogManagerMetadata( + log_manager_class=self.__class__.__name__, + storage_account=self._storage_account, + container=self._container, + ) def uri_or_path_for_type(self, log_key: Sequence[str], io_type: ComputeIOType): if not self.is_capture_complete(log_key): @@ -308,7 +311,7 @@ def capture_logs(self, log_key: Sequence[str]) -> Iterator[CapturedLogContext]: local_context.log_key, external_stdout_url=out_url, external_stderr_url=err_url, - log_manager_metadata=self.get_serialized_log_manager_metadata(), + log_manager_metadata=self.get_log_manager_metadata(), stdout_uri_or_path=out_key, stderr_uri_or_path=err_key, )