Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate trace context to triggered deployments from within parent run #16364

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
0c77919
adds traceparent to labels if none is found
jeanluciano Nov 26, 2024
5a8bb4d
added logging
jeanluciano Nov 27, 2024
7f1d04b
update flow run labels unit tests
jeanluciano Nov 27, 2024
6bc104a
Add debug logging to `update_flow_run_labels`
bunchesofdonald Dec 2, 2024
eacbc3b
Merge branch 'main' into jean/cloud-734-move-update_flow_run_labels-t…
bunchesofdonald Dec 3, 2024
e039277
Use `print` instead of log to get around logger issue
bunchesofdonald Dec 3, 2024
9755216
Explicitly use a transaction when updating labels
bunchesofdonald Dec 3, 2024
c739e23
Remove debug logging from api/model methods
bunchesofdonald Dec 3, 2024
c5dc699
Merge branch 'main' into jean/cloud-734-move-update_flow_run_labels-t…
jeanluciano Dec 3, 2024
fa33d61
refactor flow run engines to use RunTelemetry
jeanluciano Dec 4, 2024
f7d8a21
Merge branch 'main' of https://github.com/PrefectHQ/prefect into jean…
jeanluciano Dec 5, 2024
e0f5911
WIP:Flow run telemetry uses RunTelemetry class
jeanluciano Dec 5, 2024
e2967de
union type fix
jeanluciano Dec 5, 2024
79bc53b
conflict fixes
jeanluciano Dec 9, 2024
aa55cfd
Merge branch 'main' into jean/cloud-740-update-flow-run-instrumentati…
jeanluciano Dec 9, 2024
71907de
engine span fix
jeanluciano Dec 10, 2024
5040810
Merge branch 'jean/cloud-740-update-flow-run-instrumentation-to-use-r…
jeanluciano Dec 10, 2024
9cc5c63
task engine updated
jeanluciano Dec 10, 2024
2404c8d
Merge branch 'main' into jean/cloud-740-update-flow-run-instrumentati…
jeanluciano Dec 10, 2024
af2af49
end_span_on_success update
jeanluciano Dec 10, 2024
1bd3296
end_span_on_success update method
jeanluciano Dec 10, 2024
7d1d714
context propagation moved to RunTelemetry
jeanluciano Dec 10, 2024
207a476
Merge branch 'main' into jean/cloud-740-update-flow-run-instrumentati…
jeanluciano Dec 10, 2024
1bd1894
typing
jeanluciano Dec 10, 2024
24c6643
Merge branch 'jean/cloud-740-update-flow-run-instrumentation-to-use-r…
jeanluciano Dec 10, 2024
22b1df9
run_deployment sends otel labels
jeanluciano Dec 12, 2024
8d707d9
Merge branch 'main' into jean/cloud-696-propagate-trace-context-to-tr…
jeanluciano Dec 12, 2024
f331c60
schema changes
jeanluciano Dec 13, 2024
ad3303f
Merge branch 'jean/cloud-696-propagate-trace-context-to-triggered-dep…
jeanluciano Dec 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions docs/v3/api-ref/rest-api/server/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -15840,6 +15840,40 @@
"title": "Idempotency Key",
"description": "An optional idempotency key. If a flow run with the same idempotency key has already been created, the existing flow run will be returned."
},
"labels": {
"anyOf": [
{
"additionalProperties": {
"anyOf": [
{
"type": "boolean"
},
{
"type": "integer"
},
{
"type": "number"
},
{
"type": "string"
}
]
},
"type": "object"
},
{
"type": "null"
}
],
"title": "Labels",
"description": "A dictionary of key-value labels. Values can be strings, numbers, or booleans.",
"examples": [
{
"key": "value1",
"key2": 42
}
]
},
"parent_task_run_id": {
"anyOf": [
{
Expand Down
2 changes: 2 additions & 0 deletions src/prefect/client/orchestration.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,7 @@ async def create_flow_run_from_deployment(
parent_task_run_id: Optional[UUID] = None,
work_queue_name: Optional[str] = None,
job_variables: Optional[dict[str, Any]] = None,
labels: Optional[KeyValueLabelsField] = None,
) -> FlowRun:
"""
Create a flow run for a deployment.
Expand Down Expand Up @@ -651,6 +652,7 @@ async def create_flow_run_from_deployment(
idempotency_key=idempotency_key,
parent_task_run_id=parent_task_run_id,
job_variables=job_variables,
labels=labels,
)

# done separately to avoid including this field in payloads sent to older API versions
Expand Down
2 changes: 1 addition & 1 deletion src/prefect/client/schemas/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ class DeploymentFlowRunCreate(ActionBaseModel):
parent_task_run_id: Optional[UUID] = Field(None)
work_queue_name: Optional[str] = Field(None)
job_variables: Optional[dict] = Field(None)
labels: KeyValueLabelsField = Field(default_factory=dict)
labels: Optional[KeyValueLabelsField] = Field(default=None)


class SavedSearchCreate(ActionBaseModel):
Expand Down
17 changes: 17 additions & 0 deletions src/prefect/deployments/flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
from prefect.results import BaseResult, ResultRecordMetadata
from prefect.states import Pending, Scheduled
from prefect.tasks import Task
from prefect.telemetry.run_telemetry import (
LABELS_TRACEPARENT_KEY,
TRACEPARENT_KEY,
)
from prefect.types import KeyValueLabels
from prefect.utilities.asyncutils import sync_compatible
from prefect.utilities.slugify import slugify

Expand Down Expand Up @@ -156,6 +161,17 @@ async def run_deployment(
else:
parent_task_run_id = None

if flow_run_ctx and flow_run_ctx.flow_run:
traceparent = flow_run_ctx.flow_run.labels.get(LABELS_TRACEPARENT_KEY)
else:
traceparent = None

trace_labels = {}
if traceparent:
carrier: KeyValueLabels = {TRACEPARENT_KEY: traceparent}

trace_labels = {LABELS_TRACEPARENT_KEY: carrier[TRACEPARENT_KEY]}

flow_run = await client.create_flow_run_from_deployment(
deployment.id,
parameters=parameters,
Expand All @@ -166,6 +182,7 @@ async def run_deployment(
parent_task_run_id=parent_task_run_id,
work_queue_name=work_queue_name,
job_variables=job_variables,
labels=trace_labels,
)

flow_run_id = flow_run.id
Expand Down
5 changes: 5 additions & 0 deletions src/prefect/server/schemas/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,11 @@ class DeploymentFlowRunCreate(ActionBaseModel):
" has already been created, the existing flow run will be returned."
),
)
labels: Union[schemas.core.KeyValueLabels, None] = Field(
None,
description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.",
examples=[{"key": "value1", "key2": 42}],
)
parent_task_run_id: Optional[UUID] = Field(None)
work_queue_name: Optional[str] = Field(None)
job_variables: Optional[Dict[str, Any]] = Field(None)
Expand Down
60 changes: 60 additions & 0 deletions tests/deployment/test_flow_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@
from httpx import Response

from prefect import flow
from prefect.client.schemas.responses import DeploymentResponse
from prefect.context import FlowRunContext
from prefect.deployments import run_deployment
from prefect.flow_engine import run_flow_async
from prefect.server.schemas.core import TaskRunResult
from prefect.settings import (
PREFECT_API_URL,
)
from prefect.tasks import task
from prefect.utilities.slugify import slugify
from tests.telemetry.instrumentation_tester import InstrumentationTester

if TYPE_CHECKING:
from prefect.client.orchestration import PrefectClient
Expand Down Expand Up @@ -433,3 +436,60 @@ async def foo():
)
]
}

async def test_propagates_otel_trace_to_deployment_flow_run(
self,
test_deployment: DeploymentResponse,
instrumentation: InstrumentationTester,
prefect_client: "PrefectClient",
):
"""Test that OTEL trace context gets propagated from parent flow to deployment flow run"""
deployment = test_deployment
mock_traceparent = "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01"

@flow(name="child-flow")
async def child_flow() -> None:
pass

flow_id = await prefect_client.create_flow(child_flow)

deployment_id = await prefect_client.create_deployment(
name="foo-deployment", flow_id=flow_id, parameter_openapi_schema={}
)
deployment = await prefect_client.read_deployment(deployment_id)

@flow(name="parent-flow")
async def parent_flow():
# Set OTEL context in the parent flow's labels
flow_run = FlowRunContext.get().flow_run

flow_run.labels["__OTEL_TRACEPARENT"] = mock_traceparent

return await run_deployment(
f"foo/{deployment.name}",
timeout=0,
poll_interval=0,
)

parent_state = await parent_flow(return_state=True)
child_flow_run = await parent_state.result()

await run_flow_async(child_flow, child_flow_run)
# Get all spans
spans = instrumentation.get_finished_spans()

# Find parent flow span
parent_span = next(
span
for span in spans
if span.attributes.get("prefect.flow.name") == "parent-flow"
)
child_span = next(
span
for span in spans
if span.attributes.get("prefect.flow.name") == "child-flow"
)
assert child_span
assert parent_span

assert mock_traceparent == child_flow_run.labels["__OTEL_TRACEPARENT"]
7 changes: 7 additions & 0 deletions ui-v2/src/api/prefect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5434,6 +5434,13 @@ export interface components {
* @description An optional idempotency key. If a flow run with the same idempotency key has already been created, the existing flow run will be returned.
*/
idempotency_key?: string | null;
/**
* Labels
* @description A dictionary of key-value labels. Values can be strings, numbers, or booleans.
*/
labels?: {
[key: string]: boolean | number | string;
} | null;
/** Parent Task Run Id */
parent_task_run_id?: string | null;
/** Work Queue Name */
Expand Down
Loading