Skip to content

Commit

Permalink
Merge branch 'master' into parallelize-smoke-test
Browse files Browse the repository at this point in the history
  • Loading branch information
chakru-r authored Dec 26, 2024
2 parents c519f1f + 16698da commit c4037e1
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 17 deletions.
23 changes: 18 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
SoftDeletedEntitiesCleanupConfig,
SoftDeletedEntitiesReport,
)
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -86,6 +87,7 @@ class DataHubGcSourceReport(
DataProcessCleanupReport,
SoftDeletedEntitiesReport,
DatahubExecutionRequestCleanupReport,
IngestionStageReport,
):
expired_tokens_revoked: int = 0

Expand Down Expand Up @@ -139,31 +141,40 @@ def get_workunits_internal(
) -> Iterable[MetadataWorkUnit]:
if self.config.cleanup_expired_tokens:
try:
self.report.report_ingestion_stage_start("Expired Token Cleanup")
self.revoke_expired_tokens()
except Exception as e:
self.report.failure("While trying to cleanup expired token ", exc=e)
if self.config.truncate_indices:
try:
self.report.report_ingestion_stage_start("Truncate Indices")
self.truncate_indices()
except Exception as e:
self.report.failure("While trying to truncate indices ", exc=e)
if self.config.soft_deleted_entities_cleanup.enabled:
try:
self.report.report_ingestion_stage_start(
"Soft Deleted Entities Cleanup"
)
self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities()
except Exception as e:
self.report.failure(
"While trying to cleanup soft deleted entities ", exc=e
)
if self.config.execution_request_cleanup.enabled:
try:
self.execution_request_cleanup.run()
except Exception as e:
self.report.failure("While trying to cleanup execution request ", exc=e)
if self.config.dataprocess_cleanup.enabled:
try:
self.report.report_ingestion_stage_start("Data Process Cleanup")
yield from self.dataprocess_cleanup.get_workunits_internal()
except Exception as e:
self.report.failure("While trying to cleanup data process ", exc=e)
if self.config.execution_request_cleanup.enabled:
try:
self.report.report_ingestion_stage_start("Execution request Cleanup")
self.execution_request_cleanup.run()
except Exception as e:
self.report.failure("While trying to cleanup execution request ", exc=e)
# Otherwise last stage's duration does not get calculated.
self.report.report_ingestion_stage_start("End")
yield from []

def truncate_indices(self) -> None:
Expand Down Expand Up @@ -281,6 +292,8 @@ def revoke_expired_tokens(self) -> None:
list_access_tokens = expired_tokens_res.get("listAccessTokens", {})
tokens = list_access_tokens.get("tokens", [])
total = list_access_tokens.get("total", 0)
if tokens == []:
break
for token in tokens:
self.report.expired_tokens_revoked += 1
token_id = token["id"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import logging
import time
from typing import Any, Dict, Iterator, Optional
Expand Down Expand Up @@ -42,16 +43,28 @@ class DatahubExecutionRequestCleanupConfig(ConfigModel):
description="Global switch for this cleanup task",
)

runtime_limit_seconds: int = Field(
default=3600,
description="Maximum runtime in seconds for the cleanup task",
)

max_read_errors: int = Field(
default=10,
description="Maximum number of read errors before aborting",
)

def keep_history_max_milliseconds(self):
return self.keep_history_max_days * 24 * 3600 * 1000


class DatahubExecutionRequestCleanupReport(SourceReport):
execution_request_cleanup_records_read: int = 0
execution_request_cleanup_records_preserved: int = 0
execution_request_cleanup_records_deleted: int = 0
execution_request_cleanup_read_errors: int = 0
execution_request_cleanup_delete_errors: int = 0
ergc_records_read: int = 0
ergc_records_preserved: int = 0
ergc_records_deleted: int = 0
ergc_read_errors: int = 0
ergc_delete_errors: int = 0
ergc_start_time: Optional[datetime.datetime] = None
ergc_end_time: Optional[datetime.datetime] = None


class CleanupRecord(BaseModel):
Expand Down Expand Up @@ -124,6 +137,13 @@ def _scroll_execution_requests(
params.update(overrides)

while True:
if self._reached_runtime_limit():
break
if self.report.ergc_read_errors >= self.config.max_read_errors:
self.report.failure(
f"ergc({self.instance_id}): too many read errors, aborting."
)
break
try:
url = f"{self.graph.config.server}/openapi/v2/entity/{DATAHUB_EXECUTION_REQUEST_ENTITY_NAME}"
response = self.graph._session.get(url, headers=headers, params=params)
Expand All @@ -141,7 +161,7 @@ def _scroll_execution_requests(
logger.error(
f"ergc({self.instance_id}): failed to fetch next batch of execution requests: {e}"
)
self.report.execution_request_cleanup_read_errors += 1
self.report.ergc_read_errors += 1

def _scroll_garbage_records(self):
state: Dict[str, Dict] = {}
Expand All @@ -150,7 +170,7 @@ def _scroll_garbage_records(self):
running_guard_timeout = now_ms - 30 * 24 * 3600 * 1000

for entry in self._scroll_execution_requests():
self.report.execution_request_cleanup_records_read += 1
self.report.ergc_records_read += 1
key = entry.ingestion_source

# Always delete corrupted records
Expand All @@ -171,15 +191,15 @@ def _scroll_garbage_records(self):

# Do not delete if number of requests is below minimum
if state[key]["count"] < self.config.keep_history_min_count:
self.report.execution_request_cleanup_records_preserved += 1
self.report.ergc_records_preserved += 1
continue

# Do not delete if number of requests do not exceed allowed maximum,
# or the cutoff date.
if (state[key]["count"] < self.config.keep_history_max_count) and (
entry.requested_at > state[key]["cutoffTimestamp"]
):
self.report.execution_request_cleanup_records_preserved += 1
self.report.ergc_records_preserved += 1
continue

# Do not delete if status is RUNNING or PENDING and created within last month. If the record is >month old and it did not
Expand All @@ -188,7 +208,7 @@ def _scroll_garbage_records(self):
"RUNNING",
"PENDING",
]:
self.report.execution_request_cleanup_records_preserved += 1
self.report.ergc_records_preserved += 1
continue

# Otherwise delete current record
Expand All @@ -200,7 +220,7 @@ def _scroll_garbage_records(self):
f"record timestamp: {entry.requested_at}."
)
)
self.report.execution_request_cleanup_records_deleted += 1
self.report.ergc_records_deleted += 1
yield entry

def _delete_entry(self, entry: CleanupRecord) -> None:
Expand All @@ -210,17 +230,31 @@ def _delete_entry(self, entry: CleanupRecord) -> None:
)
self.graph.delete_entity(entry.urn, True)
except Exception as e:
self.report.execution_request_cleanup_delete_errors += 1
self.report.ergc_delete_errors += 1
logger.error(
f"ergc({self.instance_id}): failed to delete ExecutionRequest {entry.request_id}: {e}"
)

def _reached_runtime_limit(self) -> bool:
if (
self.config.runtime_limit_seconds
and self.report.ergc_start_time
and (
datetime.datetime.now() - self.report.ergc_start_time
>= datetime.timedelta(seconds=self.config.runtime_limit_seconds)
)
):
logger.info(f"ergc({self.instance_id}): max runtime reached.")
return True
return False

def run(self) -> None:
if not self.config.enabled:
logger.info(
f"ergc({self.instance_id}): ExecutionRequest cleaner is disabled."
)
return
self.report.ergc_start_time = datetime.datetime.now()

logger.info(
(
Expand All @@ -232,8 +266,11 @@ def run(self) -> None:
)

for entry in self._scroll_garbage_records():
if self._reached_runtime_limit():
break
self._delete_entry(entry)

self.report.ergc_end_time = datetime.datetime.now()
logger.info(
f"ergc({self.instance_id}): Finished cleanup of ExecutionRequest records."
)
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ def report_ingestion_stage_start(self, stage: str) -> None:
self._timer = PerfTimer()

self.ingestion_stage = f"{stage} at {datetime.now(timezone.utc)}"
logger.info(f"Stage started: {self.ingestion_stage}")
self._timer.start()

0 comments on commit c4037e1

Please sign in to comment.