diff --git a/tests/core/test_flow.py b/tests/core/test_flow.py index ab9955fc..e53af5e8 100644 --- a/tests/core/test_flow.py +++ b/tests/core/test_flow.py @@ -1082,6 +1082,69 @@ def create_nested_flow(maker): assert "nested_dynamic" in outer_flow[0].metadata assert outer_flow[0].metadata["nested_dynamic"] == "nested_value" + # Test callback filter with dynamic updates + @job + def create_dynamic_flow(maker): + nested_job = maker.make() + nested_job.name = "dynamic_nested_job" # Set specific name for testing + return Response(replace=Flow([nested_job])) + + maker = TestMaker() + initial_job = create_dynamic_flow(maker) + flow = Flow([initial_job]) + + # Update metadata only for jobs named "dynamic_nested_job" + flow.update_metadata( + {"dynamic_filtered": "filtered_value"}, + callback_filter=lambda x: isinstance(x, Job) and x.name == "dynamic_nested_job", + dynamic=True, + ) + + run_locally(flow, store=memory_jobstore) + + # Original job shouldn't have the metadata + assert "dynamic_filtered" not in flow[0].metadata + + # Get the replacement flow and check its job + replacement_flow = flow[0].run(memory_jobstore).replace + assert "dynamic_filtered" in replacement_flow[0].metadata + assert replacement_flow[0].metadata["dynamic_filtered"] == "filtered_value" + + # Verify callback_filter was stored and propagated + assert any( + "callback_filter" in update and "dynamic_filtered" in update["update"] + for update in replacement_flow[0].metadata_updates + ) + + # Test callback filter with nested dynamic updates + nested_initial_job = create_dynamic_flow(maker) + outer_flow = Flow([nested_initial_job]) + + # Update metadata only for flows containing jobs with specific names + outer_flow.update_metadata( + {"nested_dynamic_filtered": "nested_filtered_value"}, + callback_filter=lambda x: ( + isinstance(x, Flow) and any(j.name == "dynamic_nested_job" for j in x) + ), + dynamic=True, + ) + + run_locally(outer_flow, store=memory_jobstore) + + # Check that the callback filter worked correctly + replacement_flow = outer_flow[0].run(memory_jobstore).replace + assert "nested_dynamic_filtered" in replacement_flow.metadata + assert ( + replacement_flow.metadata["nested_dynamic_filtered"] == "nested_filtered_value" + ) + assert "nested_dynamic_filtered" not in replacement_flow[0].metadata + + # Verify callback_filter was stored and propagated correctly + assert any( + "callback_filter" in update and "nested_dynamic_filtered" in update["update"] + for update in replacement_flow.metadata_updates + ) + def test_flow_metadata_serialization(): import json