Skip to content

Commit

Permalink
testing
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Jan 29, 2024
1 parent fdffac1 commit f00a840
Showing 1 changed file with 79 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@
import pytest
from dagster import (
AssetCheckResult,
AssetDep,
AssetExecutionContext,
AssetKey,
AssetOut,
DagsterInstance,
DailyPartitionsDefinition,
Definitions,
GraphDefinition,
MaterializeResult,
OpExecutionContext,
Output,
TimeWindowPartitionMapping,
asset,
asset_check,
graph_asset,
graph_multi_asset,
instance_for_test,
job,
materialize,
multi_asset,
Expand Down Expand Up @@ -437,7 +442,7 @@ def upstream(context: AssetExecutionContext):

@asset
def downstream(context: AssetExecutionContext, upstream):
mat = context.latest_materialization_for_upstream_asset("upstream")
mat = context.fetch_latest_materialization_for_upstream_asset("upstream")
assert mat is not None
assert mat.metadata["foo"].value == "bar"

Expand All @@ -452,8 +457,80 @@ def upstream():

@asset
def downstream(context: AssetExecutionContext, upstream):
mat = context.latest_materialization_for_upstream_asset("upstream")
mat = context.fetch_latest_materialization_for_upstream_asset("upstream")
assert mat is not None
assert mat.metadata["foo"].value == "bar"

materialize([upstream, downstream])


def test_upstream_metadata_with_partitions():
partitions_def = DailyPartitionsDefinition(start_date="2024-01-01")
first_materialization_partition_key = "2024-01-01"
second_materialization_partition_key = "2024-01-02"

@asset(partitions_def=partitions_def)
def upstream(context):
return MaterializeResult(metadata={"partition_key": context.partition_key})

@asset(partitions_def=partitions_def, deps=[upstream])
def downstream(context: AssetExecutionContext):
mat = context.fetch_latest_materialization_for_upstream_asset("upstream")
assert mat is not None
assert mat.metadata["partition_key"].value == context.partition_key

# upstream_asset_materialization_events has the latest AssetMaterialization and is not scoped to
# a specific partition
latest_mat = context._step_execution_context.upstream_asset_materialization_events.get( # noqa: SLF001
AssetKey.from_coercible("upstream")
)
assert latest_mat is not None
assert latest_mat.metadata["partition_key"].value != context.partition_key
assert latest_mat.metadata["partition_key"].value == second_materialization_partition_key

with instance_for_test() as instance:
materialize(
[upstream], partition_key=first_materialization_partition_key, instance=instance
)

# the metadata from this materialization will be what's in upstream_asset_materialization_events
materialize(
[upstream], partition_key=second_materialization_partition_key, instance=instance
)

# asserts that fetch_latest_materialization_for_upstream_asset gets the AssetMaterialization
# scoped to the correct partition, not just the latest AssetMaterialization
materialize(
[downstream], partition_key=first_materialization_partition_key, instance=instance
)


def test_upstream_metadata_with_partition_mapping():
partitions_def = DailyPartitionsDefinition(start_date="2024-01-01")

@asset(partitions_def=partitions_def)
def upstream(context):
return MaterializeResult(metadata={"partition_key": context.partition_key})

@asset(
partitions_def=partitions_def,
deps=[
AssetDep(asset=upstream, partition_mapping=TimeWindowPartitionMapping(start_offset=-2))
],
)
def downstream(context: AssetExecutionContext):
with pytest.raises(DagsterInvariantViolationError):
context.fetch_latest_materialization_for_upstream_asset("upstream")

mat = context.fetch_latest_materialization_for_upstream_asset(
"upstream", partition_key="2024-01-01"
)
assert mat is not None
assert mat.metadata["partition_key"].value == "2024-01-01"

with instance_for_test() as instance:
materialize([upstream], partition_key="2024-01-01", instance=instance)
materialize([upstream], partition_key="2024-01-02", instance=instance)
materialize([upstream], partition_key="2024-01-03", instance=instance)

materialize([downstream], partition_key="2024-01-03", instance=instance)

0 comments on commit f00a840

Please sign in to comment.