Skip to content

Commit

Permalink
cover callback_filter in Flow.update_metadata in combo with dynamic=True
Browse files Browse the repository at this point in the history
updates test_flow_update_metadata_dynamic
  • Loading branch information
janosh committed Nov 18, 2024
1 parent 1993511 commit 9f16d51
Showing 1 changed file with 63 additions and 0 deletions.
63 changes: 63 additions & 0 deletions tests/core/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,69 @@ def create_nested_flow(maker):
assert "nested_dynamic" in outer_flow[0].metadata
assert outer_flow[0].metadata["nested_dynamic"] == "nested_value"

# Test callback filter with dynamic updates
@job
def create_dynamic_flow(maker):
nested_job = maker.make()
nested_job.name = "dynamic_nested_job" # Set specific name for testing
return Response(replace=Flow([nested_job]))

maker = TestMaker()
initial_job = create_dynamic_flow(maker)
flow = Flow([initial_job])

# Update metadata only for jobs named "dynamic_nested_job"
flow.update_metadata(
{"dynamic_filtered": "filtered_value"},
callback_filter=lambda x: isinstance(x, Job) and x.name == "dynamic_nested_job",
dynamic=True,
)

run_locally(flow, store=memory_jobstore)

# Original job shouldn't have the metadata
assert "dynamic_filtered" not in flow[0].metadata

# Get the replacement flow and check its job
replacement_flow = flow[0].run(memory_jobstore).replace
assert "dynamic_filtered" in replacement_flow[0].metadata
assert replacement_flow[0].metadata["dynamic_filtered"] == "filtered_value"

# Verify callback_filter was stored and propagated
assert any(
"callback_filter" in update and "dynamic_filtered" in update["update"]
for update in replacement_flow[0].metadata_updates
)

# Test callback filter with nested dynamic updates
nested_initial_job = create_dynamic_flow(maker)
outer_flow = Flow([nested_initial_job])

# Update metadata only for flows containing jobs with specific names
outer_flow.update_metadata(
{"nested_dynamic_filtered": "nested_filtered_value"},
callback_filter=lambda x: (
isinstance(x, Flow) and any(j.name == "dynamic_nested_job" for j in x)
),
dynamic=True,
)

run_locally(outer_flow, store=memory_jobstore)

# Check that the callback filter worked correctly
replacement_flow = outer_flow[0].run(memory_jobstore).replace
assert "nested_dynamic_filtered" in replacement_flow.metadata
assert (
replacement_flow.metadata["nested_dynamic_filtered"] == "nested_filtered_value"
)
assert "nested_dynamic_filtered" not in replacement_flow[0].metadata

# Verify callback_filter was stored and propagated correctly
assert any(
"callback_filter" in update and "nested_dynamic_filtered" in update["update"]
for update in replacement_flow.metadata_updates
)


def test_flow_metadata_serialization():
import json
Expand Down

0 comments on commit 9f16d51

Please sign in to comment.