Skip to content

Commit

Permalink
Add experimental support for emitting lineage events (#16242)
Browse files Browse the repository at this point in the history
  • Loading branch information
abrookins authored Dec 12, 2024
1 parent f4f5963 commit f57b48c
Show file tree
Hide file tree
Showing 14 changed files with 702 additions and 19 deletions.
6 changes: 4 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ src/prefect/server/ui_build/*

# API artifacts


# MacOS
.DS_Store

Expand All @@ -76,4 +75,7 @@ libcairo.2.dylib

# setuptools-scm generated files
src/integrations/*/**/_version.py
*.log
*.log

# Pyright type analysis report
prefect-analysis.json
12 changes: 12 additions & 0 deletions docs/v3/develop/settings-ref.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,18 @@ Enables sending telemetry to Prefect Cloud.
**Supported environment variables**:
`PREFECT_EXPERIMENTS_TELEMETRY_ENABLED`

### `lineage_events_enabled`
If `True`, enables emitting lineage events. Set to `False` to disable lineage event emission.

**Type**: `boolean`

**Default**: `False`

**TOML dotted key path**: `experiments.lineage_events_enabled`

**Supported environment variables**:
`PREFECT_EXPERIMENTS_LINEAGE_EVENTS_ENABLED`

---
## FlowsSettings
Settings for controlling flow behavior
Expand Down
9 changes: 9 additions & 0 deletions schemas/settings.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,15 @@
],
"title": "Telemetry Enabled",
"type": "boolean"
},
"lineage_events_enabled": {
"default": false,
"description": "If `True`, enables emitting lineage events. Set to `False` to disable lineage event emission.",
"supported_environment_variables": [
"PREFECT_EXPERIMENTS_LINEAGE_EVENTS_ENABLED"
],
"title": "Lineage Events Enabled",
"type": "boolean"
}
},
"title": "ExperimentsSettings",
Expand Down
Empty file.
181 changes: 181 additions & 0 deletions src/prefect/_experimental/lineage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Sequence, Union

from prefect.events.related import related_resources_from_run_context
from prefect.events.schemas.events import RelatedResource, Resource
from prefect.events.utilities import emit_event
from prefect.settings import get_current_settings

if TYPE_CHECKING:
from prefect.results import ResultStore

UpstreamResources = Sequence[Union[RelatedResource, dict[str, str]]]
DownstreamResources = Sequence[Union[Resource, dict[str, str]]]

# Map block types to their URI schemes
STORAGE_URI_SCHEMES = {
"local-file-system": "file://{path}",
"s3-bucket": "s3://{storage.bucket_name}/{path}",
"gcs-bucket": "gs://{storage.bucket}/{path}",
"azure-blob-storage": "azure-blob://{storage.container_name}/{path}",
}


def get_result_resource_uri(
store: "ResultStore",
key: str,
) -> Optional[str]:
"""
Generate a URI for a result based on its storage backend.
Args:
store: A `ResultStore` instance.
key: The key of the result to generate a URI for.
"""
storage = store.result_storage
if storage is None:
return

path = store._resolved_key_path(key)

block_type = storage.get_block_type_slug()
if block_type and block_type in STORAGE_URI_SCHEMES:
return STORAGE_URI_SCHEMES[block_type].format(storage=storage, path=path)

# Generic fallback
return f"prefect://{block_type}/{path}"


async def emit_lineage_event(
event_name: str,
upstream_resources: Optional[UpstreamResources] = None,
downstream_resources: Optional[DownstreamResources] = None,
direction_of_run_from_event: Literal["upstream", "downstream"] = "downstream",
) -> None:
"""Emit lineage events showing relationships between resources.
Args:
event_name: The name of the event to emit
upstream_resources: Optional list of RelatedResources that were upstream of
the event
downstream_resources: Optional list of Resources that were downstream
of the event
direction_of_run_from_event: The direction of the current run from
the event. E.g., if we're in a flow run and
`direction_of_run_from_event` is "downstream", then the flow run is
considered downstream of the resource's event.
"""
from prefect.client.orchestration import get_client # Avoid a circular import

if not get_current_settings().experiments.lineage_events_enabled:
return

upstream_resources = list(upstream_resources) if upstream_resources else []
downstream_resources = list(downstream_resources) if downstream_resources else []

async with get_client() as client:
related_resources = await related_resources_from_run_context(client)

# NOTE: We handle adding run-related resources to the event here instead of in
# the EventsWorker because not all run-related resources are upstream from
# every lineage event (they might be downstream). The EventsWorker only adds
# related resources to the "related" field in the event, which, for
# lineage-related events, tracks upstream resources only. For downstream
# resources, we need to emit an event for each downstream resource.
if direction_of_run_from_event == "downstream":
downstream_resources.extend(related_resources)
else:
upstream_resources.extend(related_resources)

# Emit an event for each downstream resource. This is necessary because
# our event schema allows one primary resource and many related resources,
# and for the purposes of lineage, related resources can only represent
# upstream resources.
for resource in downstream_resources:
# Downstream lineage resources need to have the
# prefect.resource.lineage-group label. All upstram resources from a
# downstream resource with this label will be considered lineage-related
# resources.
if "prefect.resource.lineage-group" not in resource:
resource["prefect.resource.lineage-group"] = "global"

emit_kwargs: Dict[str, Any] = {
"event": event_name,
"resource": resource,
"related": upstream_resources,
}

emit_event(**emit_kwargs)


async def emit_result_read_event(
store: "ResultStore",
result_key: str,
downstream_resources: Optional[DownstreamResources] = None,
cached: bool = False,
) -> None:
"""
Emit a lineage event showing a task or flow result was read.
Args:
store: A `ResultStore` instance.
result_key: The key of the result to generate a URI for.
downstream_resources: List of resources that were
downstream of the event's resource.
"""
if not get_current_settings().experiments.lineage_events_enabled:
return

result_resource_uri = get_result_resource_uri(store, result_key)
if result_resource_uri:
upstream_resources = [
RelatedResource(
root={
"prefect.resource.id": result_resource_uri,
"prefect.resource.role": "result",
}
)
]
event_name = "prefect.result.read"
if cached:
event_name += ".cached"

await emit_lineage_event(
event_name=event_name,
upstream_resources=upstream_resources,
downstream_resources=downstream_resources,
direction_of_run_from_event="downstream",
)


async def emit_result_write_event(
store: "ResultStore",
result_key: str,
upstream_resources: Optional[UpstreamResources] = None,
) -> None:
"""
Emit a lineage event showing a task or flow result was written.
Args:
store: A `ResultStore` instance.
result_key: The key of the result to generate a URI for.
upstream_resources: Optional list of resources that were
upstream of the event's resource.
"""
if not get_current_settings().experiments.lineage_events_enabled:
return

result_resource_uri = get_result_resource_uri(store, result_key)
if result_resource_uri:
downstream_resources = [
{
"prefect.resource.id": result_resource_uri,
"prefect.resource.role": "result",
"prefect.resource.lineage-group": "global",
}
]
await emit_lineage_event(
event_name="prefect.result.write",
upstream_resources=upstream_resources,
downstream_resources=downstream_resources,
direction_of_run_from_event="upstream",
)
2 changes: 2 additions & 0 deletions src/prefect/events/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def emit_event(
payload: Optional[Dict[str, Any]] = None,
id: Optional[UUID] = None,
follows: Optional[Event] = None,
**kwargs: Optional[Dict[str, Any]],
) -> Optional[Event]:
"""
Send an event to Prefect Cloud.
Expand Down Expand Up @@ -62,6 +63,7 @@ def emit_event(
event_kwargs: Dict[str, Any] = {
"event": event,
"resource": resource,
**kwargs,
}

if occurred is None:
Expand Down
8 changes: 8 additions & 0 deletions src/prefect/events/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ async def _handle(self, event: Event):
await self._client.emit(event)

async def attach_related_resources_from_context(self, event: Event):
if "prefect.resource.lineage-group" in event.resource:
# We attach related resources to lineage events in `emit_lineage_event`,
# instead of the worker, because not all run-related resources are
# upstream from every lineage event (they might be downstream).
# The "related" field in the event schema tracks upstream resources
# only.
return

exclude = {resource.id for resource in event.involved_resources}
event.related += await related_resources_from_run_context(
client=self._orchestration_client, exclude=exclude
Expand Down
51 changes: 34 additions & 17 deletions src/prefect/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
from typing_extensions import ParamSpec, Self

import prefect
from prefect._experimental.lineage import (
emit_result_read_event,
emit_result_write_event,
)
from prefect._internal.compatibility import deprecated
from prefect._internal.compatibility.deprecated import deprecated_field
from prefect.blocks.core import Block
Expand Down Expand Up @@ -232,6 +236,10 @@ def _format_user_supplied_storage_key(key: str) -> str:
T = TypeVar("T")


def default_cache() -> LRUCache[str, "ResultRecord[Any]"]:
return LRUCache(maxsize=1000)


def result_storage_discriminator(x: Any) -> str:
if isinstance(x, dict):
if "block_type_slug" in x:
Expand Down Expand Up @@ -284,7 +292,7 @@ class ResultStore(BaseModel):
cache_result_in_memory: bool = Field(default=True)
serializer: Serializer = Field(default_factory=get_default_result_serializer)
storage_key_fn: Callable[[], str] = Field(default=DEFAULT_STORAGE_KEY_FN)
cache: LRUCache = Field(default_factory=lambda: LRUCache(maxsize=1000))
cache: LRUCache[str, "ResultRecord[Any]"] = Field(default_factory=default_cache)

# Deprecated fields
persist_result: Optional[bool] = Field(default=None)
Expand Down Expand Up @@ -446,8 +454,15 @@ async def aexists(self, key: str) -> bool:
"""
return await self._exists(key=key, _sync=False)

def _resolved_key_path(self, key: str) -> str:
if self.result_storage_block_id is None and hasattr(
self.result_storage, "_resolve_path"
):
return str(self.result_storage._resolve_path(key))
return key

@sync_compatible
async def _read(self, key: str, holder: str) -> "ResultRecord":
async def _read(self, key: str, holder: str) -> "ResultRecord[Any]":
"""
Read a result record from storage.
Expand All @@ -465,8 +480,12 @@ async def _read(self, key: str, holder: str) -> "ResultRecord":
if self.lock_manager is not None and not self.is_lock_holder(key, holder):
await self.await_for_lock(key)

if key in self.cache:
return self.cache[key]
resolved_key_path = self._resolved_key_path(key)

if resolved_key_path in self.cache:
cached_result = self.cache[resolved_key_path]
await emit_result_read_event(self, resolved_key_path, cached=True)
return cached_result

if self.result_storage is None:
self.result_storage = await get_default_result_storage()
Expand All @@ -478,31 +497,28 @@ async def _read(self, key: str, holder: str) -> "ResultRecord":
metadata.storage_key is not None
), "Did not find storage key in metadata"
result_content = await self.result_storage.read_path(metadata.storage_key)
result_record = ResultRecord.deserialize_from_result_and_metadata(
result_record: ResultRecord[
Any
] = ResultRecord.deserialize_from_result_and_metadata(
result=result_content, metadata=metadata_content
)
await emit_result_read_event(self, resolved_key_path)
else:
content = await self.result_storage.read_path(key)
result_record = ResultRecord.deserialize(
result_record: ResultRecord[Any] = ResultRecord.deserialize(
content, backup_serializer=self.serializer
)
await emit_result_read_event(self, resolved_key_path)

if self.cache_result_in_memory:
if self.result_storage_block_id is None and hasattr(
self.result_storage, "_resolve_path"
):
cache_key = str(self.result_storage._resolve_path(key))
else:
cache_key = key

self.cache[cache_key] = result_record
self.cache[resolved_key_path] = result_record
return result_record

def read(
self,
key: str,
holder: Optional[str] = None,
) -> "ResultRecord":
) -> "ResultRecord[Any]":
"""
Read a result record from storage.
Expand All @@ -520,7 +536,7 @@ async def aread(
self,
key: str,
holder: Optional[str] = None,
) -> "ResultRecord":
) -> "ResultRecord[Any]":
"""
Read a result record from storage.
Expand Down Expand Up @@ -663,12 +679,13 @@ async def _persist_result_record(self, result_record: "ResultRecord", holder: st
base_key,
content=result_record.serialize_metadata(),
)
await emit_result_write_event(self, result_record.metadata.storage_key)
# Otherwise, write the result metadata and result together
else:
await self.result_storage.write_path(
result_record.metadata.storage_key, content=result_record.serialize()
)

await emit_result_write_event(self, result_record.metadata.storage_key)
if self.cache_result_in_memory:
self.cache[key] = result_record

Expand Down
Loading

0 comments on commit f57b48c

Please sign in to comment.