diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py index f9a00d7f00905..c1763b16f3670 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py @@ -29,7 +29,7 @@ class DatahubExecutionRequestCleanupConfig(ConfigModel): ) keep_history_max_days: int = Field( - 30, + 90, description="Maximum number of days to keep execution requests for, per ingestion source", ) @@ -48,6 +48,10 @@ class DatahubExecutionRequestCleanupConfig(ConfigModel): description="Maximum runtime in seconds for the cleanup task", ) + limit_entities_delete: Optional[int] = Field( + 10000, description="Max number of execution requests to hard delete." + ) + max_read_errors: int = Field( default=10, description="Maximum number of read errors before aborting", @@ -65,6 +69,8 @@ class DatahubExecutionRequestCleanupReport(SourceReport): ergc_delete_errors: int = 0 ergc_start_time: Optional[datetime.datetime] = None ergc_end_time: Optional[datetime.datetime] = None + ergc_delete_limit_reached: bool = False + ergc_runtime_limit_reached: bool = False class CleanupRecord(BaseModel): @@ -85,12 +91,20 @@ def __init__( self.graph = graph self.report = report self.instance_id = int(time.time()) + self.last_print_time = 0.0 if config is not None: self.config = config else: self.config = DatahubExecutionRequestCleanupConfig() + def _print_report(self) -> None: + time_taken = round(time.time() - self.last_print_time, 1) + # Print report every 2 minutes + if time_taken > 120: + self.last_print_time = time.time() + logger.info(f"\n{self.report.as_string()}") + def _to_cleanup_record(self, entry: Dict) -> CleanupRecord: input_aspect = ( entry.get("aspects", {}) @@ -175,6 +189,7 @@ def _scroll_garbage_records(self): running_guard_timeout = now_ms - 30 * 24 * 3600 * 1000 for entry in self._scroll_execution_requests(): + self._print_report() self.report.ergc_records_read += 1 key = entry.ingestion_source @@ -225,15 +240,12 @@ def _scroll_garbage_records(self): f"record timestamp: {entry.requested_at}." ) ) - self.report.ergc_records_deleted += 1 yield entry def _delete_entry(self, entry: CleanupRecord) -> None: try: - logger.info( - f"ergc({self.instance_id}): going to delete ExecutionRequest {entry.request_id}" - ) self.graph.delete_entity(entry.urn, True) + self.report.ergc_records_deleted += 1 except Exception as e: self.report.ergc_delete_errors += 1 self.report.failure( @@ -252,10 +264,23 @@ def _reached_runtime_limit(self) -> bool: >= datetime.timedelta(seconds=self.config.runtime_limit_seconds) ) ): + self.report.ergc_runtime_limit_reached = True logger.info(f"ergc({self.instance_id}): max runtime reached.") return True return False + def _reached_delete_limit(self) -> bool: + if ( + self.config.limit_entities_delete + and self.report.ergc_records_deleted >= self.config.limit_entities_delete + ): + logger.info( + f"ergc({self.instance_id}): max delete limit reached: {self.config.limit_entities_delete}." + ) + self.report.ergc_delete_limit_reached = True + return True + return False + def run(self) -> None: if not self.config.enabled: logger.info( @@ -274,7 +299,7 @@ def run(self) -> None: ) for entry in self._scroll_garbage_records(): - if self._reached_runtime_limit(): + if self._reached_runtime_limit() or self._reached_delete_limit(): break self._delete_entry(entry) diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py index 0a52b7e17bf71..471eeff0224ed 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py @@ -231,6 +231,15 @@ def _process_futures(self, futures: Dict[Future, str]) -> Dict[Future, str]: def _get_soft_deleted(self, graphql_query: str, entity_type: str) -> Iterable[str]: assert self.ctx.graph scroll_id: Optional[str] = None + + batch_size = self.config.batch_size + if entity_type == "DATA_PROCESS_INSTANCE": + # Due to a bug in Data process instance querying this is a temp workaround + # to avoid a giant stacktrace by having a smaller batch size in first call + # This will be remove in future version after server with fix has been + # around for a while + batch_size = 10 + while True: try: result = self.ctx.graph.execute_graphql( @@ -240,7 +249,7 @@ def _get_soft_deleted(self, graphql_query: str, entity_type: str) -> Iterable[st "types": [entity_type], "query": "*", "scrollId": scroll_id if scroll_id else None, - "count": self.config.batch_size, + "count": batch_size, "orFilters": [ { "and": [ @@ -263,6 +272,10 @@ def _get_soft_deleted(self, graphql_query: str, entity_type: str) -> Iterable[st scroll_across_entities = result.get("scrollAcrossEntities") if not scroll_across_entities or not scroll_across_entities.get("count"): break + if entity_type == "DATA_PROCESS_INSTANCE": + # Temp workaround. See note in beginning of the function + # We make the batch size = config after call has succeeded once + batch_size = self.config.batch_size scroll_id = scroll_across_entities.get("nextScrollId") self.report.num_queries_found += scroll_across_entities.get("count") for query in scroll_across_entities.get("searchResults"): diff --git a/metadata-service/configuration/src/main/resources/bootstrap_mcps/ingestion-datahub-gc.yaml b/metadata-service/configuration/src/main/resources/bootstrap_mcps/ingestion-datahub-gc.yaml index c0c5be85b16b1..8879a2f654994 100644 --- a/metadata-service/configuration/src/main/resources/bootstrap_mcps/ingestion-datahub-gc.yaml +++ b/metadata-service/configuration/src/main/resources/bootstrap_mcps/ingestion-datahub-gc.yaml @@ -21,19 +21,30 @@ truncate_indices: {{truncate_indices}}{{^truncate_indices}}true{{/truncate_indices}} truncate_index_older_than_days: {{truncate_indices_retention_days}}{{^truncate_indices_retention_days}}30{{/truncate_indices_retention_days}} dataprocess_cleanup: + enabled: {{dataprocess_cleanup.enabled}}{{^dataprocess_cleanup.enabled}}false{{/dataprocess_cleanup.enabled}} retention_days: {{dataprocess_cleanup.retention_days}}{{^dataprocess_cleanup.retention_days}}10{{/dataprocess_cleanup.retention_days}} - delete_empty_data_jobs: {{dataprocess_cleanup.delete_empty_data_jobs}}{{^dataprocess_cleanup.delete_empty_data_jobs}}true{{/dataprocess_cleanup.delete_empty_data_jobs}} - delete_empty_data_flows: {{dataprocess_cleanup.delete_empty_data_flows}}{{^dataprocess_cleanup.delete_empty_data_flows}}true{{/dataprocess_cleanup.delete_empty_data_flows}} + delete_empty_data_jobs: {{dataprocess_cleanup.delete_empty_data_jobs}}{{^dataprocess_cleanup.delete_empty_data_jobs}}false{{/dataprocess_cleanup.delete_empty_data_jobs}} + delete_empty_data_flows: {{dataprocess_cleanup.delete_empty_data_flows}}{{^dataprocess_cleanup.delete_empty_data_flows}}false{{/dataprocess_cleanup.delete_empty_data_flows}} hard_delete_entities: {{dataprocess_cleanup.hard_delete_entities}}{{^dataprocess_cleanup.hard_delete_entities}}false{{/dataprocess_cleanup.hard_delete_entities}} keep_last_n: {{dataprocess_cleanup.keep_last_n}}{{^dataprocess_cleanup.keep_last_n}}5{{/dataprocess_cleanup.keep_last_n}} + batch_size: {{dataprocess_cleanup.batch_size}}{{^dataprocess_cleanup.batch_size}}500{{/dataprocess_cleanup.batch_size}} + max_workers: {{dataprocess_cleanup.max_workers}}{{^dataprocess_cleanup.max_workers}}10{{/dataprocess_cleanup.max_workers}} soft_deleted_entities_cleanup: retention_days: {{soft_deleted_entities_cleanup.retention_days}}{{^soft_deleted_entities_cleanup.retention_days}}10{{/soft_deleted_entities_cleanup.retention_days}} + enabled: {{soft_deleted_entities_cleanup.enabled}}{{^soft_deleted_entities_cleanup.enabled}}true{{/soft_deleted_entities_cleanup.enabled}} + batch_size: {{soft_deleted_entities_cleanup.batch_size}}{{^soft_deleted_entities_cleanup.batch_size}}500{{/soft_deleted_entities_cleanup.batch_size}} + max_workers: {{soft_deleted_entities_cleanup.max_workers}}{{^soft_deleted_entities_cleanup.max_workers}}10{{/soft_deleted_entities_cleanup.max_workers}} + limit_entities_delete: {{soft_deleted_entities_cleanup.limit_entities_delete}}{{^soft_deleted_entities_cleanup.limit_entities_delete}}25000{{/soft_deleted_entities_cleanup.limit_entities_delete}} + runtime_limit_seconds: {{soft_deleted_entities_cleanup.runtime_limit_seconds}}{{^soft_deleted_entities_cleanup.runtime_limit_seconds}}7200{{/soft_deleted_entities_cleanup.runtime_limit_seconds}} execution_request_cleanup: keep_history_min_count: {{execution_request_cleanup.keep_history_min_count}}{{^execution_request_cleanup.keep_history_min_count}}10{{/execution_request_cleanup.keep_history_min_count}} keep_history_max_count: {{execution_request_cleanup.keep_history_max_count}}{{^execution_request_cleanup.keep_history_max_count}}1000{{/execution_request_cleanup.keep_history_max_count}} - keep_history_max_days: {{execution_request_cleanup.keep_history_max_days}}{{^execution_request_cleanup.keep_history_max_days}}30{{/execution_request_cleanup.keep_history_max_days}} + keep_history_max_days: {{execution_request_cleanup.keep_history_max_days}}{{^execution_request_cleanup.keep_history_max_days}}90{{/execution_request_cleanup.keep_history_max_days}} batch_read_size: {{execution_request_cleanup.batch_read_size}}{{^execution_request_cleanup.batch_read_size}}100{{/execution_request_cleanup.batch_read_size}} - enabled: {{execution_request_cleanup.enabled}}{{^execution_request_cleanup.enabled}}false{{/execution_request_cleanup.enabled}} + enabled: {{execution_request_cleanup.enabled}}{{^execution_request_cleanup.enabled}}true{{/execution_request_cleanup.enabled}} + runtime_limit_seconds: {{execution_request_cleanup.runtime_limit_seconds}}{{^execution_request_cleanup.runtime_limit_seconds}}3600{{/execution_request_cleanup.runtime_limit_seconds}} + limit_entities_delete: {{execution_request_cleanup.limit_entities_delete}}{{^execution_request_cleanup.limit_entities_delete}}10000{{/execution_request_cleanup.limit_entities_delete}} + max_read_errors: {{execution_request_cleanup.max_read_errors}}{{^execution_request_cleanup.max_read_errors}}10{{/execution_request_cleanup.max_read_errors}} extraArgs: {} debugMode: false executorId: default