Skip to content

Commit

Permalink
Merge branch 'master' into nikki/docs/asset-catalog-doc
Browse files Browse the repository at this point in the history
  • Loading branch information
neverett committed Jan 10, 2025
2 parents fcf6c2f + ab0fcba commit 4c102ae
Show file tree
Hide file tree
Showing 52 changed files with 715 additions and 213 deletions.
8 changes: 8 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## 1.9.8 (core) / 0.25.8 (libraries)

### Bugfixes

- Fixed a bug with `load_assets_from_x` functions where we began erroring when a spec and AssetsDefinition had the same key in a given module. We now only error in this case if `include_specs=True`.
- [dagster-azure] Fixed a bug in 1.9.6 and 1.9.7 where the default behavior of the compute log manager switched from showing logs in the UI to showing a URL. You can toggle the `show_url_only` option to `True` to enable the URL showing behavior.
- [dagster-dbt] Fixed an issue where group names set on partitioned dbt assets created using the `@dbt_assets` decorator would be ignored

# 1.9.7 (core) / 0.25.7 (libraries)

### New
Expand Down
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
14 changes: 7 additions & 7 deletions docs/docs-beta/docs/api/api-lifecycle.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ The lifecycle stages described below provide a clear framework for understanding

## API lifecycle stages

| Stage | Description | Lifetime |
|---------|------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------|
| Preview | API in the design phase, can change significantly, or be removed completely. Not for production use. | Until design is complete, or implementation cancelled |
| Beta | Features that is still being tested and may change. More stable than Preview, but still subject to change. | At most, two 1.x releases before it is either considered stable or returned to preview |
| Generally Available (GA)| Ready for production use, with minimal risk of breaking changes. | Supported until at least 2.0
| Superseded | This API is still available, but is no longer the best practice. A better alternative is available. | Supported until at least 2.0
| Deprecated | API is still available but will be removed in the future; avoid new usage. | Will be removed in a minor release, the DeprecationWarning will indicate the next release that will remove the API.
| Stage | Description | Lifetime |
|---------|-----------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------|
| Preview | This API may have breaking changes in patch version releases. This feature is not considered ready for production use. | Until design is complete, or implementation cancelled |
| Beta | This API may have breaking changes in minor version releases, with behavior changes in patch releases. | At most, two 1.x releases before it is either considered stable or returned to preview |
| Generally Available (GA)| Ready for production use, with minimal risk of breaking changes. | Supported until at least 2.0
| Superseded | This API is still available, but is no longer the best practice. A better alternative is available. | Supported until at least 2.0
| Deprecated | API is still available but will be removed in the future; avoid new usage. | Will be removed in a minor release, the DeprecationWarning will indicate the next release that will remove the API.


## Understanding the stages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,15 +119,13 @@ def lambda_handler(event, _context):

Let's review what this code does:

{/* TODO change `PipesMappingParamsLoader` to <PyObject section="libraries" module="dagster_pipes" object="PipesMappingParamsLoader" /> */}

- Imports `PipesMappingParamsLoader` and <PyObject section="libraries" object="open_dagster_pipes" module="dagster_pipes" /> from `dagster_pipes`
- Imports <PyObject section="libraries" module="dagster_pipes" object="PipesMappingParamsLoader" /> and <PyObject section="libraries" object="open_dagster_pipes" module="dagster_pipes" /> from `dagster_pipes`

- **Defines a [Lambda function handler](https://docs.aws.amazon.com/lambda/latest/dg/python-handler.html), which is a method in function code that processes events**. This method accepts `event` and `context` arguments, which map to the `event` payload and `context` we'll define in our Dagster asset.

- **Initializes the Dagster Pipes context (<PyObject section="libraries" object="open_dagster_pipes" module="dagster_pipes" />), which yields an instance of <PyObject section="libraries" object="PipesContext" module="dagster_pipes" /> called `pipes`.**

On the orchestration side - which we'll discuss in the next section - we'll set up a Dagster asset that uses the <PyObject section="libraries" module="dagster_aws" object="pipes.PipesLambdaClient" /> to inject information needed for Pipes in an `event` payload. In this code on the AWS Lambda side, we're passing this payload to `PipesMappingParamsLoader` and using it in <PyObject section="libraries" object="open_dagster_pipes" module="dagster_pipes" />.
On the orchestration side - which we'll discuss in the next section - we'll set up a Dagster asset that uses the <PyObject section="libraries" module="dagster_aws" object="pipes.PipesLambdaClient" /> to inject information needed for Pipes in an `event` payload. In this code on the AWS Lambda side, we're passing this payload to <PyObject section="libraries" module="dagster_pipes" object="PipesMappingParamsLoader" /> and using it in <PyObject section="libraries" object="open_dagster_pipes" module="dagster_pipes" />.

We're using the default context loader (<PyObject section="libraries" object="PipesDefaultContextLoader" module="dagster_pipes" />) and message writer (<PyObject section="libraries" object="PipesDefaultMessageWriter" module="dagster_pipes" />) in this example. These objects establish communication between the orchestration and external process. On the orchestration end, these match a corresponding `PipesLambdaEventContextInjector` and `PipesLambdaLogsMessageReader`, which are instantiated inside the <PyObject section="libraries" module="dagster_aws" object="pipes.PipesLambdaClient" />.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,10 @@ Let's review what's happening in this code:

The submitted task must:

{/* TODO change `PipesDbfsLogReader` to <PyObject section="libraries" object="PipesDbfsLogReader" module="dagster_databricks" /> */}

- **Specify `dagster-pipes` as a PyPI dependency**. You can include a version pin (e.g. `dagster-pipes==1.5.4`) if desired.
- Use a `spark_python_task`.
- Specify either `new_cluster` (this is the **recommended approach**) or `existing_cluster_id`. The `new_cluster` field is used in this example.
- If `new_cluster` is set, then setting `new_cluster.cluster_log_conf.dbfs` enables the <PyObject section="libraries" object="PipesDatabricksClient" module="dagster_databricks" /> to automatically set up `PipesDbfsLogReader` objects for `stdout` and `stderr` of the driver process. These will periodically forward the `stdout` and `stderr` logs written by Databricks back to Dagster. **Note**: Because Databricks only updates these log files every five minutes, that is the maximum frequency at which Dagster can forward the logs.
- If `new_cluster` is set, then setting `new_cluster.cluster_log_conf.dbfs` enables the <PyObject section="libraries" object="PipesDatabricksClient" module="dagster_databricks" /> to automatically set up <PyObject section="libraries" object="PipesDbfsLogReader" module="dagster_databricks" /> objects for `stdout` and `stderr` of the driver process. These will periodically forward the `stdout` and `stderr` logs written by Databricks back to Dagster. **Note**: Because Databricks only updates these log files every five minutes, that is the maximum frequency at which Dagster can forward the logs.
- If `existing_cluster_id` is set, <PyObject section="libraries" object="PipesDatabricksClient" module="dagster_databricks" /> won't be able to forward `stdout` and `stderr` driver logs to Dagster. Using an existing cluster **requires passing an instance of <PyObject section="libraries" object="PipesCliArgsParamsLoader" module="dagster_pipes" /> to <PyObject section="libraries" object="open_dagster_pipes" module="dagster_pipes" />** in the Python script which is executed on Databricks. This is because setting environment variables is only possible when creating a new cluster, so we have to use the alternative method of passing Pipes parameters as command-line arguments.

- **Defines an `extras` dictionary containing some arbitrary data (`some_parameter`).** This is where you can put various data, e.g. from the Dagster run config, that you want to be available in Databricks. Anything added here must be JSON-serializable.
Expand Down
Binary file modified docs/next/public/objects.inv
Binary file not shown.
10 changes: 9 additions & 1 deletion docs/sphinx/sections/api/apidocs/definitions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,12 @@ Definitions

.. autofunction:: create_repository_using_definitions_args

.. autofunction:: load_definitions_from_module
.. autofunction:: load_definitions_from_current_module

.. autofunction:: load_definitions_from_module

.. autofunction:: load_definitions_from_modules

.. autofunction:: load_definitions_from_package_module

.. autofunction:: load_definitions_from_package_name
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ Pipes

.. autoclass:: PipesDbfsMessageReader

.. autoclass:: PipesDbfsLogReader

Other
=====

Expand Down
2 changes: 2 additions & 0 deletions docs/sphinx/sections/api/apidocs/libraries/dagster-pipes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ Params loaders load the bootstrap payload from some globally accessible key-valu

.. autoclass:: PipesCliArgsParamsLoader

.. autoclass:: PipesMappingParamsLoader

----

Message writers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class AzureBlobComputeLogManager(BaseModel):
localDir: Optional[StringSource] = None
prefix: Optional[StringSource] = None
uploadInterval: Optional[int] = None
showUrlOnly: Optional[bool] = None


class GCSComputeLogManager(BaseModel):
Expand Down
2 changes: 2 additions & 0 deletions helm/dagster/schema/schema_tests/test_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,7 @@ def test_azure_blob_compute_log_manager(template: HelmTemplate):
localDir=local_dir,
prefix=prefix,
uploadInterval=upload_interval,
showUrlOnly=True,
)
),
)
Expand All @@ -734,6 +735,7 @@ def test_azure_blob_compute_log_manager(template: HelmTemplate):
"prefix": prefix,
"upload_interval": upload_interval,
"access_key_or_sas_token": access_key_or_sas_token,
"show_url_only": True,
}

# Test all config fields in configurable class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ config:
{{- if $azureBlobComputeLogManagerConfig.defaultAzureCredential }}
default_azure_credential: {{ $azureBlobComputeLogManagerConfig.defaultAzureCredential | toYaml | nindent 4 }}
{{- end }}

{{- if $azureBlobComputeLogManagerConfig.showUrlOnly }}
show_url_only: {{ $azureBlobComputeLogManagerConfig.showUrlOnly }}
{{- end }}

{{- end }}

{{- define "dagsterYaml.computeLogManager.gcs" }}
Expand Down
12 changes: 12 additions & 0 deletions helm/dagster/values.schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions helm/dagster/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ computeLogManager:
# localDir: ~
# prefix: ~
# uploadInterval: ~
# showUrlOnly: ~
## Uncomment this configuration if the GCSComputeLogManager is selected
# gcsComputeLogManager:
# bucket: ~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ deps =
-e ../../../python_modules/dagster-pipes
-e ../../../python_modules/dagster-graphql
-e ../../../python_modules/libraries/dagster-gcp
-e ../../../python_modules/libraries/dagster-pandas
-e .
allowlist_externals =
/bin/bash
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ interface RefreshLocationsIfNeededParams
React.SetStateAction<Record<string, WorkspaceLocationNodeFragment | PythonErrorFragment>>
>;
previousLocationVersionsRef: React.MutableRefObject<Record<string, string>>;
fetchingStatusesRef: React.MutableRefObject<Record<string, Promise<any>>>;
}

interface HandleDeletedLocationsParams
Expand All @@ -113,6 +114,9 @@ interface FetchLocationDataParams
setLocationEntryData: React.Dispatch<
React.SetStateAction<Record<string, WorkspaceLocationNodeFragment | PythonErrorFragment>>
>;
locationStatuses: Record<string, LocationStatusEntryFragment>;
fetchingStatusesRef: React.MutableRefObject<Record<string, Promise<any> | undefined>>;
previousLocationVersionsRef: React.MutableRefObject<Record<string, string>>;
}

const UNLOADED_CACHED_DATA = {};
Expand All @@ -129,6 +133,7 @@ export const WorkspaceProvider = ({children}: {children: React.ReactNode}) => {
);

const previousLocationVersionsRef = useRef<Record<string, string>>({});
const fetchingStatusesRef = useRef<Record<string, Promise<any>>>({});

const {locationStatuses, loading: loadingLocationStatuses} =
useCodeLocationStatuses(localCacheIdPrefix);
Expand Down Expand Up @@ -162,6 +167,7 @@ export const WorkspaceProvider = ({children}: {children: React.ReactNode}) => {
getData,
setLocationEntryData,
previousLocationVersionsRef,
fetchingStatusesRef,
};
refreshLocationsIfNeeded(params);
}, [locationStatuses, locationEntryData, client, localCacheIdPrefix, getData, loadingCachedData]);
Expand Down Expand Up @@ -197,6 +203,9 @@ export const WorkspaceProvider = ({children}: {children: React.ReactNode}) => {
localCacheIdPrefix,
getData,
setLocationEntryData,
locationStatuses,
fetchingStatusesRef,
previousLocationVersionsRef,
};
return await fetchLocationData(params);
});
Expand Down Expand Up @@ -268,6 +277,7 @@ async function refreshLocationsIfNeeded(params: RefreshLocationsIfNeededParams):
getData,
setLocationEntryData,
previousLocationVersionsRef,
fetchingStatusesRef,
} = params;

const locationsToFetch = identifyStaleLocations(
Expand All @@ -281,18 +291,19 @@ async function refreshLocationsIfNeeded(params: RefreshLocationsIfNeededParams):
}

await Promise.all(
locationsToFetch.map((location) =>
fetchLocationData({
locationsToFetch.map(async (location) => {
await fetchLocationData({
name: location.name,
client,
localCacheIdPrefix,
getData,
setLocationEntryData,
}),
),
locationStatuses,
previousLocationVersionsRef,
fetchingStatusesRef,
});
}),
);

previousLocationVersionsRef.current = mapLocationVersions(locationStatuses);
}

/**
Expand Down Expand Up @@ -339,29 +350,53 @@ function handleDeletedLocations(params: HandleDeletedLocationsParams): void {
async function fetchLocationData(
params: FetchLocationDataParams,
): Promise<LocationWorkspaceQuery | undefined> {
const {name, client, localCacheIdPrefix, getData, setLocationEntryData} = params;
try {
const {data} = await getData<LocationWorkspaceQuery, LocationWorkspaceQueryVariables>({
client,
query: LOCATION_WORKSPACE_QUERY,
key: `${localCacheIdPrefix}${locationWorkspaceKey(name)}`,
version: LocationWorkspaceQueryVersion,
variables: {name},
bypassCache: true,
});
const {
name,
client,
localCacheIdPrefix,
getData,
setLocationEntryData,
locationStatuses,
fetchingStatusesRef,
previousLocationVersionsRef,
} = params;

const entry = data?.workspaceLocationEntryOrError;
if (entry) {
setLocationEntryData((prevData) => ({
...prevData,
[name]: entry,
}));
}
return data;
} catch (error) {
console.error(`Error fetching location data for ${name}:`, error);
const localKey = name + '@' + (locationStatuses[name]?.versionKey ?? '');
if (fetchingStatusesRef.current[localKey]) {
return fetchingStatusesRef.current[localKey];
}
return undefined;
fetchingStatusesRef.current[localKey] = new Promise(async (res) => {
try {
const {data} = await getData<LocationWorkspaceQuery, LocationWorkspaceQueryVariables>({
client,
query: LOCATION_WORKSPACE_QUERY,
key: `${localCacheIdPrefix}${locationWorkspaceKey(name)}`,
version: LocationWorkspaceQueryVersion,
variables: {name},
bypassCache: true,
});

const entry = data?.workspaceLocationEntryOrError;
if (entry) {
if (entry.__typename === 'WorkspaceLocationEntry') {
previousLocationVersionsRef.current[name] = entry.versionKey;
}
setLocationEntryData((prevData) => ({
...prevData,
[name]: entry,
}));
}
res(data);
} catch (error) {
console.error(`Error fetching location data for ${name}:`, error);
}
res(undefined);
});
const result = await fetchingStatusesRef.current[localKey];
requestAnimationFrame(() => {
delete fetchingStatusesRef.current[localKey];
});
return result;
}

/**
Expand Down Expand Up @@ -422,6 +457,7 @@ function identifyStaleLocations(
const entry = locationEntryData[statusEntry.name];
const locationEntry = entry?.__typename === 'WorkspaceLocationEntry' ? entry : null;
const dataVersion = locationEntry?.versionKey || '';

return currentVersion !== prevVersion || currentVersion !== dataVersion;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,23 @@ export const buildCodeLocationsStatusQuery = (

export const buildWorkspaceMocks = (
entries: WorkspaceLocationEntry[],
options: Partial<Omit<MockedResponse, 'result' | 'query' | 'variables' | 'data'>> = {},
options: Partial<Omit<MockedResponse, 'result' | 'query' | 'variables' | 'data'>> & {
cascadingUpdates?: boolean;
} = {},
) => {
return [
buildCodeLocationsStatusQuery(
entries.map((entry) =>
buildWorkspaceLocationStatusEntry({
...entry,
updateTimestamp: entry.updatedTimestamp,
versionKey: '' + entry.updatedTimestamp,
__typename: 'WorkspaceLocationStatusEntry',
}),
),
options,
),
...entries.map((entry) =>
...entries.map((entry, index) =>
buildQueryMock<LocationWorkspaceQuery, LocationWorkspaceQueryVariables>({
query: LOCATION_WORKSPACE_QUERY,
variables: {
Expand All @@ -56,6 +59,11 @@ export const buildWorkspaceMocks = (
workspaceLocationEntryOrError: entry,
},
...options,
...(options.cascadingUpdates
? {
delay: 100 * (1 + index),
}
: {}),
}),
),
];
Expand Down
Loading

0 comments on commit 4c102ae

Please sign in to comment.