Skip to content

Commit

Permalink
fix(ingest/gc): add delete limit execution request (#12313)
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Jan 10, 2025
1 parent a92a107 commit a4f5ab4
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class DatahubExecutionRequestCleanupConfig(ConfigModel):
)

keep_history_max_days: int = Field(
30,
90,
description="Maximum number of days to keep execution requests for, per ingestion source",
)

Expand All @@ -48,6 +48,10 @@ class DatahubExecutionRequestCleanupConfig(ConfigModel):
description="Maximum runtime in seconds for the cleanup task",
)

limit_entities_delete: Optional[int] = Field(
10000, description="Max number of execution requests to hard delete."
)

max_read_errors: int = Field(
default=10,
description="Maximum number of read errors before aborting",
Expand All @@ -65,6 +69,8 @@ class DatahubExecutionRequestCleanupReport(SourceReport):
ergc_delete_errors: int = 0
ergc_start_time: Optional[datetime.datetime] = None
ergc_end_time: Optional[datetime.datetime] = None
ergc_delete_limit_reached: bool = False
ergc_runtime_limit_reached: bool = False


class CleanupRecord(BaseModel):
Expand All @@ -85,12 +91,20 @@ def __init__(
self.graph = graph
self.report = report
self.instance_id = int(time.time())
self.last_print_time = 0.0

if config is not None:
self.config = config
else:
self.config = DatahubExecutionRequestCleanupConfig()

def _print_report(self) -> None:
time_taken = round(time.time() - self.last_print_time, 1)
# Print report every 2 minutes
if time_taken > 120:
self.last_print_time = time.time()
logger.info(f"\n{self.report.as_string()}")

def _to_cleanup_record(self, entry: Dict) -> CleanupRecord:
input_aspect = (
entry.get("aspects", {})
Expand Down Expand Up @@ -175,6 +189,7 @@ def _scroll_garbage_records(self):
running_guard_timeout = now_ms - 30 * 24 * 3600 * 1000

for entry in self._scroll_execution_requests():
self._print_report()
self.report.ergc_records_read += 1
key = entry.ingestion_source

Expand Down Expand Up @@ -225,15 +240,12 @@ def _scroll_garbage_records(self):
f"record timestamp: {entry.requested_at}."
)
)
self.report.ergc_records_deleted += 1
yield entry

def _delete_entry(self, entry: CleanupRecord) -> None:
try:
logger.info(
f"ergc({self.instance_id}): going to delete ExecutionRequest {entry.request_id}"
)
self.graph.delete_entity(entry.urn, True)
self.report.ergc_records_deleted += 1
except Exception as e:
self.report.ergc_delete_errors += 1
self.report.failure(
Expand All @@ -252,10 +264,23 @@ def _reached_runtime_limit(self) -> bool:
>= datetime.timedelta(seconds=self.config.runtime_limit_seconds)
)
):
self.report.ergc_runtime_limit_reached = True
logger.info(f"ergc({self.instance_id}): max runtime reached.")
return True
return False

def _reached_delete_limit(self) -> bool:
if (
self.config.limit_entities_delete
and self.report.ergc_records_deleted >= self.config.limit_entities_delete
):
logger.info(
f"ergc({self.instance_id}): max delete limit reached: {self.config.limit_entities_delete}."
)
self.report.ergc_delete_limit_reached = True
return True
return False

def run(self) -> None:
if not self.config.enabled:
logger.info(
Expand All @@ -274,7 +299,7 @@ def run(self) -> None:
)

for entry in self._scroll_garbage_records():
if self._reached_runtime_limit():
if self._reached_runtime_limit() or self._reached_delete_limit():
break
self._delete_entry(entry)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,15 @@ def _process_futures(self, futures: Dict[Future, str]) -> Dict[Future, str]:
def _get_soft_deleted(self, graphql_query: str, entity_type: str) -> Iterable[str]:
assert self.ctx.graph
scroll_id: Optional[str] = None

batch_size = self.config.batch_size
if entity_type == "DATA_PROCESS_INSTANCE":
# Due to a bug in Data process instance querying this is a temp workaround
# to avoid a giant stacktrace by having a smaller batch size in first call
# This will be remove in future version after server with fix has been
# around for a while
batch_size = 10

while True:
try:
result = self.ctx.graph.execute_graphql(
Expand All @@ -240,7 +249,7 @@ def _get_soft_deleted(self, graphql_query: str, entity_type: str) -> Iterable[st
"types": [entity_type],
"query": "*",
"scrollId": scroll_id if scroll_id else None,
"count": self.config.batch_size,
"count": batch_size,
"orFilters": [
{
"and": [
Expand All @@ -263,6 +272,10 @@ def _get_soft_deleted(self, graphql_query: str, entity_type: str) -> Iterable[st
scroll_across_entities = result.get("scrollAcrossEntities")
if not scroll_across_entities or not scroll_across_entities.get("count"):
break
if entity_type == "DATA_PROCESS_INSTANCE":
# Temp workaround. See note in beginning of the function
# We make the batch size = config after call has succeeded once
batch_size = self.config.batch_size
scroll_id = scroll_across_entities.get("nextScrollId")
self.report.num_queries_found += scroll_across_entities.get("count")
for query in scroll_across_entities.get("searchResults"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,30 @@
truncate_indices: {{truncate_indices}}{{^truncate_indices}}true{{/truncate_indices}}
truncate_index_older_than_days: {{truncate_indices_retention_days}}{{^truncate_indices_retention_days}}30{{/truncate_indices_retention_days}}
dataprocess_cleanup:
enabled: {{dataprocess_cleanup.enabled}}{{^dataprocess_cleanup.enabled}}false{{/dataprocess_cleanup.enabled}}
retention_days: {{dataprocess_cleanup.retention_days}}{{^dataprocess_cleanup.retention_days}}10{{/dataprocess_cleanup.retention_days}}
delete_empty_data_jobs: {{dataprocess_cleanup.delete_empty_data_jobs}}{{^dataprocess_cleanup.delete_empty_data_jobs}}true{{/dataprocess_cleanup.delete_empty_data_jobs}}
delete_empty_data_flows: {{dataprocess_cleanup.delete_empty_data_flows}}{{^dataprocess_cleanup.delete_empty_data_flows}}true{{/dataprocess_cleanup.delete_empty_data_flows}}
delete_empty_data_jobs: {{dataprocess_cleanup.delete_empty_data_jobs}}{{^dataprocess_cleanup.delete_empty_data_jobs}}false{{/dataprocess_cleanup.delete_empty_data_jobs}}
delete_empty_data_flows: {{dataprocess_cleanup.delete_empty_data_flows}}{{^dataprocess_cleanup.delete_empty_data_flows}}false{{/dataprocess_cleanup.delete_empty_data_flows}}
hard_delete_entities: {{dataprocess_cleanup.hard_delete_entities}}{{^dataprocess_cleanup.hard_delete_entities}}false{{/dataprocess_cleanup.hard_delete_entities}}
keep_last_n: {{dataprocess_cleanup.keep_last_n}}{{^dataprocess_cleanup.keep_last_n}}5{{/dataprocess_cleanup.keep_last_n}}
batch_size: {{dataprocess_cleanup.batch_size}}{{^dataprocess_cleanup.batch_size}}500{{/dataprocess_cleanup.batch_size}}
max_workers: {{dataprocess_cleanup.max_workers}}{{^dataprocess_cleanup.max_workers}}10{{/dataprocess_cleanup.max_workers}}
soft_deleted_entities_cleanup:
retention_days: {{soft_deleted_entities_cleanup.retention_days}}{{^soft_deleted_entities_cleanup.retention_days}}10{{/soft_deleted_entities_cleanup.retention_days}}
enabled: {{soft_deleted_entities_cleanup.enabled}}{{^soft_deleted_entities_cleanup.enabled}}true{{/soft_deleted_entities_cleanup.enabled}}
batch_size: {{soft_deleted_entities_cleanup.batch_size}}{{^soft_deleted_entities_cleanup.batch_size}}500{{/soft_deleted_entities_cleanup.batch_size}}
max_workers: {{soft_deleted_entities_cleanup.max_workers}}{{^soft_deleted_entities_cleanup.max_workers}}10{{/soft_deleted_entities_cleanup.max_workers}}
limit_entities_delete: {{soft_deleted_entities_cleanup.limit_entities_delete}}{{^soft_deleted_entities_cleanup.limit_entities_delete}}25000{{/soft_deleted_entities_cleanup.limit_entities_delete}}
runtime_limit_seconds: {{soft_deleted_entities_cleanup.runtime_limit_seconds}}{{^soft_deleted_entities_cleanup.runtime_limit_seconds}}7200{{/soft_deleted_entities_cleanup.runtime_limit_seconds}}
execution_request_cleanup:
keep_history_min_count: {{execution_request_cleanup.keep_history_min_count}}{{^execution_request_cleanup.keep_history_min_count}}10{{/execution_request_cleanup.keep_history_min_count}}
keep_history_max_count: {{execution_request_cleanup.keep_history_max_count}}{{^execution_request_cleanup.keep_history_max_count}}1000{{/execution_request_cleanup.keep_history_max_count}}
keep_history_max_days: {{execution_request_cleanup.keep_history_max_days}}{{^execution_request_cleanup.keep_history_max_days}}30{{/execution_request_cleanup.keep_history_max_days}}
keep_history_max_days: {{execution_request_cleanup.keep_history_max_days}}{{^execution_request_cleanup.keep_history_max_days}}90{{/execution_request_cleanup.keep_history_max_days}}
batch_read_size: {{execution_request_cleanup.batch_read_size}}{{^execution_request_cleanup.batch_read_size}}100{{/execution_request_cleanup.batch_read_size}}
enabled: {{execution_request_cleanup.enabled}}{{^execution_request_cleanup.enabled}}false{{/execution_request_cleanup.enabled}}
enabled: {{execution_request_cleanup.enabled}}{{^execution_request_cleanup.enabled}}true{{/execution_request_cleanup.enabled}}
runtime_limit_seconds: {{execution_request_cleanup.runtime_limit_seconds}}{{^execution_request_cleanup.runtime_limit_seconds}}3600{{/execution_request_cleanup.runtime_limit_seconds}}
limit_entities_delete: {{execution_request_cleanup.limit_entities_delete}}{{^execution_request_cleanup.limit_entities_delete}}10000{{/execution_request_cleanup.limit_entities_delete}}
max_read_errors: {{execution_request_cleanup.max_read_errors}}{{^execution_request_cleanup.max_read_errors}}10{{/execution_request_cleanup.max_read_errors}}
extraArgs: {}
debugMode: false
executorId: default
Expand Down

0 comments on commit a4f5ab4

Please sign in to comment.