Skip to content

Commit

Permalink
update a test suite
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Jan 29, 2024
1 parent 2081661 commit 774532d
Showing 1 changed file with 7 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -600,9 +600,7 @@ def upstream():
],
)
def downstream(context: AssetExecutionContext):
upstream_key = datetime.strptime(
context.asset_partition_key_for_input("upstream"), "%Y-%m-%d"
)
upstream_key = datetime.strptime(context.upstream_partition_key("upstream"), "%Y-%m-%d")

current_partition_key = datetime.strptime(context.partition_key, "%Y-%m-%d")

Expand Down Expand Up @@ -653,12 +651,8 @@ def multi_asset_1():

@multi_asset(specs=[asset_3, asset_4], partitions_def=partitions_def)
def multi_asset_2(context: AssetExecutionContext):
asset_1_key = datetime.strptime(
context.asset_partition_key_for_input("asset_1"), "%Y-%m-%d"
)
asset_2_key = datetime.strptime(
context.asset_partition_key_for_input("asset_2"), "%Y-%m-%d"
)
asset_1_key = datetime.strptime(context.upstream_partition_key("asset_1"), "%Y-%m-%d")
asset_2_key = datetime.strptime(context.upstream_partition_key("asset_2"), "%Y-%m-%d")

current_partition_key = datetime.strptime(context.partition_key, "%Y-%m-%d")

Expand Down Expand Up @@ -760,7 +754,7 @@ def test_self_dependent_partition_mapping_with_asset_deps():
)
def self_dependent(context: AssetExecutionContext):
upstream_key = datetime.strptime(
context.asset_partition_key_for_input("self_dependent"), "%Y-%m-%d"
context.upstream_partition_key("self_dependent"), "%Y-%m-%d"
)

current_partition_key = datetime.strptime(context.partition_key, "%Y-%m-%d")
Expand All @@ -786,9 +780,7 @@ def self_dependent(context: AssetExecutionContext):

@multi_asset(specs=[asset_1], partitions_def=partitions_def)
def the_multi_asset(context: AssetExecutionContext):
asset_1_key = datetime.strptime(
context.asset_partition_key_for_input("asset_1"), "%Y-%m-%d"
)
asset_1_key = datetime.strptime(context.upstream_partition_key("asset_1"), "%Y-%m-%d")

current_partition_key = datetime.strptime(context.partition_key, "%Y-%m-%d")

Expand All @@ -810,7 +802,7 @@ def upstream():
deps=[AssetDep(upstream, partition_mapping=SpecificPartitionsPartitionMapping(["apple"]))],
)
def downstream(context: AssetExecutionContext):
assert context.asset_partition_key_for_input("upstream") == "apple"
assert context.upstream_partition_key("upstream") == "apple"
assert context.partition_key == "orange"

with instance_for_test() as instance:
Expand Down Expand Up @@ -840,7 +832,7 @@ def asset_1_multi_asset():

@multi_asset(specs=[asset_2], partitions_def=partitions_def)
def asset_2_multi_asset(context: AssetExecutionContext):
assert context.asset_partition_key_for_input("asset_1") == "apple"
assert context.upstream_partition_key("asset_1") == "apple"
assert context.partition_key == "orange"

with instance_for_test() as instance:
Expand Down

0 comments on commit 774532d

Please sign in to comment.