Skip to content

Commit

Permalink
add error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal committed Dec 18, 2024
1 parent 5946558 commit 2ccd889
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 47 deletions.
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ def infos(self) -> LossyList[StructuredLogEntry]:

@dataclass
class SourceReport(Report):
event_not_produced_warn: bool = True
events_produced: int = 0
events_produced_per_sec: int = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def auto_workunit_reporter(report: "SourceReport", stream: Iterable[T]) -> Itera
report.report_workunit(wu)
yield wu

if report.events_produced == 0:
if report.event_not_produced_warn and report.events_produced == 0:
report.warning(
title="No metadata was produced by the source",
message="Please check the source configuration, filters, and permissions.",
Expand Down
54 changes: 24 additions & 30 deletions metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,18 @@ class DataHubGcSourceConfig(ConfigModel):
description="Sleep between truncation monitoring.",
)

dataprocess_cleanup: Optional[DataProcessCleanupConfig] = Field(
default=None,
dataprocess_cleanup: DataProcessCleanupConfig = Field(
default_factory=DataProcessCleanupConfig,
description="Configuration for data process cleanup",
)

soft_deleted_entities_cleanup: Optional[SoftDeletedEntitiesCleanupConfig] = Field(
default=None,
soft_deleted_entities_cleanup: SoftDeletedEntitiesCleanupConfig = Field(
default_factory=SoftDeletedEntitiesCleanupConfig,
description="Configuration for soft deleted entities cleanup",
)

execution_request_cleanup: Optional[DatahubExecutionRequestCleanupConfig] = Field(
default=None,
execution_request_cleanup: DatahubExecutionRequestCleanupConfig = Field(
default_factory=DatahubExecutionRequestCleanupConfig,
description="Configuration for execution request cleanup",
)

Expand Down Expand Up @@ -108,28 +108,22 @@ def __init__(self, ctx: PipelineContext, config: DataHubGcSourceConfig):
self.ctx = ctx
self.config = config
self.report = DataHubGcSourceReport()
self.report.event_not_produced_warn = False
self.graph = ctx.require_graph("The DataHubGc source")
self.dataprocess_cleanup: Optional[DataProcessCleanup] = None
self.soft_deleted_entities_cleanup: Optional[SoftDeletedEntitiesCleanup] = None
self.execution_request_cleanup: Optional[DatahubExecutionRequestCleanup] = None

if self.config.dataprocess_cleanup:
self.dataprocess_cleanup = DataProcessCleanup(
ctx, self.config.dataprocess_cleanup, self.report, self.config.dry_run
)
if self.config.soft_deleted_entities_cleanup:
self.soft_deleted_entities_cleanup = SoftDeletedEntitiesCleanup(
ctx,
self.config.soft_deleted_entities_cleanup,
self.report,
self.config.dry_run,
)
if self.config.execution_request_cleanup:
self.execution_request_cleanup = DatahubExecutionRequestCleanup(
config=self.config.execution_request_cleanup,
graph=self.graph,
report=self.report,
)
self.dataprocess_cleanup = DataProcessCleanup(
ctx, self.config.dataprocess_cleanup, self.report, self.config.dry_run
)
self.soft_deleted_entities_cleanup = SoftDeletedEntitiesCleanup(
ctx,
self.config.soft_deleted_entities_cleanup,
self.report,
self.config.dry_run,
)
self.execution_request_cleanup = DatahubExecutionRequestCleanup(
config=self.config.execution_request_cleanup,
graph=self.graph,
report=self.report,
)

@classmethod
def create(cls, config_dict, ctx):
Expand All @@ -153,19 +147,19 @@ def get_workunits_internal(
self.truncate_indices()
except Exception as e:
self.report.failure("While trying to truncate indices ", exc=e)
if self.soft_deleted_entities_cleanup:
if self.config.soft_deleted_entities_cleanup.enabled:
try:
self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities()
except Exception as e:
self.report.failure(
"While trying to cleanup soft deleted entities ", exc=e
)
if self.execution_request_cleanup:
if self.config.execution_request_cleanup.enabled:
try:
self.execution_request_cleanup.run()
except Exception as e:
self.report.failure("While trying to cleanup execution request ", exc=e)
if self.dataprocess_cleanup:
if self.config.dataprocess_cleanup.enabled:
try:
yield from self.dataprocess_cleanup.get_workunits_internal()
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@


class DataProcessCleanupConfig(ConfigModel):
enabled: bool = Field(
default=False, description="Whether to do data process cleanup."
)
retention_days: Optional[int] = Field(
10,
description="Number of days to retain metadata in DataHub",
Expand Down Expand Up @@ -371,17 +374,26 @@ def get_data_flows(self) -> Iterable[DataFlowEntity]:
previous_scroll_id: Optional[str] = None

while True:
result = self.ctx.graph.execute_graphql(
DATAFLOW_QUERY,
{
"query": "*",
"scrollId": scroll_id if scroll_id else None,
"batchSize": self.config.batch_size,
},
)
result = None
try:
result = self.ctx.graph.execute_graphql(
DATAFLOW_QUERY,
{
"query": "*",
"scrollId": scroll_id if scroll_id else None,
"batchSize": self.config.batch_size,
},
)
except Exception as e:
self.report.failure(
f"While trying to get dataflows with {scroll_id}", exc=e
)
break

scrollAcrossEntities = result.get("scrollAcrossEntities")
if not scrollAcrossEntities:
raise ValueError("Missing scrollAcrossEntities in response")
logger.info(f"Got {scrollAcrossEntities.get('count')} DataFlow entities")

scroll_id = scrollAcrossEntities.get("nextScrollId")
for flow in scrollAcrossEntities.get("searchResults"):
Expand All @@ -398,6 +410,8 @@ def get_data_flows(self) -> Iterable[DataFlowEntity]:
previous_scroll_id = scroll_id

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
if not self.config.enabled:
return []
assert self.ctx.graph

dataFlows: Dict[str, DataFlowEntity] = {}
Expand All @@ -411,14 +425,20 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
deleted_jobs: int = 0

while True:
result = self.ctx.graph.execute_graphql(
DATAJOB_QUERY,
{
"query": "*",
"scrollId": scroll_id if scroll_id else None,
"batchSize": self.config.batch_size,
},
)
try:
result = self.ctx.graph.execute_graphql(
DATAJOB_QUERY,
{
"query": "*",
"scrollId": scroll_id if scroll_id else None,
"batchSize": self.config.batch_size,
},
)
except Exception as e:
self.report.failure(
f"While trying to get data jobs with {scroll_id}", exc=e
)
break
scrollAcrossEntities = result.get("scrollAcrossEntities")
if not scrollAcrossEntities:
raise ValueError("Missing scrollAcrossEntities in response")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@


class SoftDeletedEntitiesCleanupConfig(ConfigModel):
enabled: bool = Field(
default=False, description="Whether to do soft deletion cleanup."
)
retention_days: Optional[int] = Field(
10,
description="Number of days to retain metadata in DataHub",
Expand Down Expand Up @@ -156,6 +159,8 @@ def delete_soft_deleted_entity(self, urn: str) -> None:
self.delete_entity(urn)

def cleanup_soft_deleted_entities(self) -> None:
if not self.config.enabled:
return
assert self.ctx.graph
start_time = time.time()

Expand Down

0 comments on commit 2ccd889

Please sign in to comment.