Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/gc): misc fixes in gc source #12226

Merged
merged 2 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
SoftDeletedEntitiesCleanupConfig,
SoftDeletedEntitiesReport,
)
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport

Check warning on line 37 in metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py#L37

Added line #L37 was not covered by tests

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -86,6 +87,7 @@
DataProcessCleanupReport,
SoftDeletedEntitiesReport,
DatahubExecutionRequestCleanupReport,
IngestionStageReport,
):
expired_tokens_revoked: int = 0

Expand Down Expand Up @@ -139,31 +141,40 @@
) -> Iterable[MetadataWorkUnit]:
if self.config.cleanup_expired_tokens:
try:
self.report.report_ingestion_stage_start("Expired Token Cleanup")

Check warning on line 144 in metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py#L144

Added line #L144 was not covered by tests
self.revoke_expired_tokens()
except Exception as e:
self.report.failure("While trying to cleanup expired token ", exc=e)
if self.config.truncate_indices:
try:
self.report.report_ingestion_stage_start("Truncate Indices")

Check warning on line 150 in metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py#L150

Added line #L150 was not covered by tests
self.truncate_indices()
except Exception as e:
self.report.failure("While trying to truncate indices ", exc=e)
if self.config.soft_deleted_entities_cleanup.enabled:
try:
self.report.report_ingestion_stage_start(

Check warning on line 156 in metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py#L156

Added line #L156 was not covered by tests
"Soft Deleted Entities Cleanup"
)
self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities()
except Exception as e:
self.report.failure(
"While trying to cleanup soft deleted entities ", exc=e
)
if self.config.execution_request_cleanup.enabled:
try:
self.execution_request_cleanup.run()
except Exception as e:
self.report.failure("While trying to cleanup execution request ", exc=e)
if self.config.dataprocess_cleanup.enabled:
try:
self.report.report_ingestion_stage_start("Data Process Cleanup")

Check warning on line 166 in metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py#L166

Added line #L166 was not covered by tests
yield from self.dataprocess_cleanup.get_workunits_internal()
except Exception as e:
self.report.failure("While trying to cleanup data process ", exc=e)
if self.config.execution_request_cleanup.enabled:
try:
self.report.report_ingestion_stage_start("Execution request Cleanup")
self.execution_request_cleanup.run()
except Exception as e:
self.report.failure("While trying to cleanup execution request ", exc=e)

Check warning on line 175 in metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py#L170-L175

Added lines #L170 - L175 were not covered by tests
# Otherwise last stage's duration does not get calculated.
self.report.report_ingestion_stage_start("End")

Check warning on line 177 in metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py#L177

Added line #L177 was not covered by tests
yield from []

def truncate_indices(self) -> None:
Expand Down Expand Up @@ -281,6 +292,8 @@
list_access_tokens = expired_tokens_res.get("listAccessTokens", {})
tokens = list_access_tokens.get("tokens", [])
total = list_access_tokens.get("total", 0)
if tokens == []:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't total be set to 0 if no tokens are returned ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be. But seems like we might have a bug in the backend.

break

Check warning on line 296 in metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py#L295-L296

Added lines #L295 - L296 were not covered by tests
for token in tokens:
self.report.expired_tokens_revoked += 1
token_id = token["id"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime

Check warning on line 1 in metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py#L1

Added line #L1 was not covered by tests
import logging
import time
from typing import Any, Dict, Iterator, Optional
Expand Down Expand Up @@ -42,16 +43,28 @@
description="Global switch for this cleanup task",
)

runtime_limit_seconds: int = Field(

Check warning on line 46 in metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py#L46

Added line #L46 was not covered by tests
default=3600,
description="Maximum runtime in seconds for the cleanup task",
)

max_read_errors: int = Field(

Check warning on line 51 in metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py#L51

Added line #L51 was not covered by tests
default=10,
description="Maximum number of read errors before aborting",
)

def keep_history_max_milliseconds(self):
return self.keep_history_max_days * 24 * 3600 * 1000


class DatahubExecutionRequestCleanupReport(SourceReport):
execution_request_cleanup_records_read: int = 0
execution_request_cleanup_records_preserved: int = 0
execution_request_cleanup_records_deleted: int = 0
execution_request_cleanup_read_errors: int = 0
execution_request_cleanup_delete_errors: int = 0
ergc_records_read: int = 0
ergc_records_preserved: int = 0
ergc_records_deleted: int = 0
ergc_read_errors: int = 0
ergc_delete_errors: int = 0
ergc_start_time: Optional[datetime.datetime] = None
ergc_end_time: Optional[datetime.datetime] = None

Check warning on line 67 in metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py#L61-L67

Added lines #L61 - L67 were not covered by tests


class CleanupRecord(BaseModel):
Expand Down Expand Up @@ -124,6 +137,13 @@
params.update(overrides)

while True:
if self._reached_runtime_limit():
break
if self.report.ergc_read_errors >= self.config.max_read_errors:
self.report.failure(

Check warning on line 143 in metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py#L140-L143

Added lines #L140 - L143 were not covered by tests
f"ergc({self.instance_id}): too many read errors, aborting."
)
break

Check warning on line 146 in metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py#L146

Added line #L146 was not covered by tests
try:
url = f"{self.graph.config.server}/openapi/v2/entity/{DATAHUB_EXECUTION_REQUEST_ENTITY_NAME}"
response = self.graph._session.get(url, headers=headers, params=params)
Expand All @@ -141,7 +161,7 @@
logger.error(
f"ergc({self.instance_id}): failed to fetch next batch of execution requests: {e}"
)
self.report.execution_request_cleanup_read_errors += 1
self.report.ergc_read_errors += 1

Check warning on line 164 in metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py#L164

Added line #L164 was not covered by tests

def _scroll_garbage_records(self):
state: Dict[str, Dict] = {}
Expand All @@ -150,7 +170,7 @@
running_guard_timeout = now_ms - 30 * 24 * 3600 * 1000

for entry in self._scroll_execution_requests():
self.report.execution_request_cleanup_records_read += 1
self.report.ergc_records_read += 1

Check warning on line 173 in metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py#L173

Added line #L173 was not covered by tests
key = entry.ingestion_source

# Always delete corrupted records
Expand All @@ -171,15 +191,15 @@

# Do not delete if number of requests is below minimum
if state[key]["count"] < self.config.keep_history_min_count:
self.report.execution_request_cleanup_records_preserved += 1
self.report.ergc_records_preserved += 1

Check warning on line 194 in metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py#L194

Added line #L194 was not covered by tests
continue

# Do not delete if number of requests do not exceed allowed maximum,
# or the cutoff date.
if (state[key]["count"] < self.config.keep_history_max_count) and (
entry.requested_at > state[key]["cutoffTimestamp"]
):
self.report.execution_request_cleanup_records_preserved += 1
self.report.ergc_records_preserved += 1

Check warning on line 202 in metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py#L202

Added line #L202 was not covered by tests
continue

# Do not delete if status is RUNNING or PENDING and created within last month. If the record is >month old and it did not
Expand All @@ -188,7 +208,7 @@
"RUNNING",
"PENDING",
]:
self.report.execution_request_cleanup_records_preserved += 1
self.report.ergc_records_preserved += 1

Check warning on line 211 in metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py#L211

Added line #L211 was not covered by tests
continue

# Otherwise delete current record
Expand All @@ -200,7 +220,7 @@
f"record timestamp: {entry.requested_at}."
)
)
self.report.execution_request_cleanup_records_deleted += 1
self.report.ergc_records_deleted += 1

Check warning on line 223 in metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py#L223

Added line #L223 was not covered by tests
yield entry

def _delete_entry(self, entry: CleanupRecord) -> None:
Expand All @@ -210,17 +230,31 @@
)
self.graph.delete_entity(entry.urn, True)
except Exception as e:
self.report.execution_request_cleanup_delete_errors += 1
self.report.ergc_delete_errors += 1

Check warning on line 233 in metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py#L233

Added line #L233 was not covered by tests
logger.error(
f"ergc({self.instance_id}): failed to delete ExecutionRequest {entry.request_id}: {e}"
)

def _reached_runtime_limit(self) -> bool:
if (

Check warning on line 239 in metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py#L238-L239

Added lines #L238 - L239 were not covered by tests
self.config.runtime_limit_seconds
and self.report.ergc_start_time
and (
datetime.datetime.now() - self.report.ergc_start_time
>= datetime.timedelta(seconds=self.config.runtime_limit_seconds)
)
):
logger.info(f"ergc({self.instance_id}): max runtime reached.")
return True
return False

Check warning on line 249 in metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py#L247-L249

Added lines #L247 - L249 were not covered by tests

def run(self) -> None:
if not self.config.enabled:
logger.info(
f"ergc({self.instance_id}): ExecutionRequest cleaner is disabled."
)
return
self.report.ergc_start_time = datetime.datetime.now()

Check warning on line 257 in metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py#L257

Added line #L257 was not covered by tests

logger.info(
(
Expand All @@ -232,8 +266,11 @@
)

for entry in self._scroll_garbage_records():
if self._reached_runtime_limit():
break

Check warning on line 270 in metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py#L269-L270

Added lines #L269 - L270 were not covered by tests
self._delete_entry(entry)

self.report.ergc_end_time = datetime.datetime.now()

Check warning on line 273 in metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py#L273

Added line #L273 was not covered by tests
logger.info(
f"ergc({self.instance_id}): Finished cleanup of ExecutionRequest records."
)
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ def report_ingestion_stage_start(self, stage: str) -> None:
self._timer = PerfTimer()

self.ingestion_stage = f"{stage} at {datetime.now(timezone.utc)}"
logger.info(f"Stage started: {self.ingestion_stage}")
self._timer.start()
Loading