From 95d3ff882cce58665ecee17faa5e2dbc3d80fedd Mon Sep 17 00:00:00 2001 From: Chintan Raval Date: Mon, 29 Jul 2019 15:41:21 +1000 Subject: [PATCH 1/4] SP-149 - when logging execution and execution-model, ensure that all available details are logged. --- .../DataLoadTrackerRepository.py | 2 +- rdl/entities/execution_entity.py | 45 +++++++++++++------ rdl/entities/execution_model_entity.py | 38 ++++++++++++---- 3 files changed, 62 insertions(+), 23 deletions(-) diff --git a/rdl/data_load_tracking/DataLoadTrackerRepository.py b/rdl/data_load_tracking/DataLoadTrackerRepository.py index b1415ce..5cb5d65 100644 --- a/rdl/data_load_tracking/DataLoadTrackerRepository.py +++ b/rdl/data_load_tracking/DataLoadTrackerRepository.py @@ -53,7 +53,7 @@ def complete_execution(self, execution_id, total_number_of_models, current_execution.rows_processed = total_rows_processed current_execution.batches_processed = total_batches_processed session.commit() - self.logger.info(current_execution) + self.logger.info(f'Completed {current_execution}') session.close() return total_rows_processed diff --git a/rdl/entities/execution_entity.py b/rdl/entities/execution_entity.py index 5a1da35..2889931 100644 --- a/rdl/entities/execution_entity.py +++ b/rdl/entities/execution_entity.py @@ -23,16 +23,35 @@ class ExecutionEntity(Base): models_processed = Column(Integer, nullable=True) def __str__(self): - if self.status == Constants.ExecutionStatus.STARTED: - return f"Started Execution ID: {self.execution_id} at {self.started_on}" - - total_execution_seconds = self.execution_time_s - execution_hours = total_execution_seconds // 3600 - execution_minutes = (total_execution_seconds // 60) % 60 - execution_seconds = total_execution_seconds % 60 - - return f"Completed Execution ID: {self.execution_id}" \ - f"; Models Processed: {self.models_processed:,}" \ - f"; Rows Processed: {self.rows_processed:,}" \ - f"; Execution Time: {execution_hours}h {execution_minutes}m {execution_seconds}s" \ - f"; Average rows processed per second: {(self.rows_processed//max(total_execution_seconds, 1)):,}." + execution_time_str = None + rows_per_second = None + + if self.execution_time_s: + total_execution_seconds = self.execution_time_s + + execution_hours = total_execution_seconds // 3600 + execution_minutes = (total_execution_seconds // 60) % 60 + execution_seconds = total_execution_seconds % 60 + execution_time_str = f'{execution_hours}h {execution_minutes}m {execution_seconds}s' + + if self.rows_processed: + rows_per_second = (self.rows_processed//max(total_execution_seconds, 1)) + + return 'Execution ID: {exec_id}; ' \ + 'Status: {status}; ' \ + 'Started on: {started}; ' \ + 'Completed on: {completed}; ' \ + 'Execution time: {exec_time}; ' \ + 'Models processed: {models}; ' \ + 'Batches processed: {batches};' \ + 'Rows processed: {rows}; ' \ + 'Average rows processed per second: {rows_per_second};'.format( + exec_id=self.execution_id, + status=self.status, + started=self.started_on.isoformat(), + completed=self.completed_on.isoformat() if self.completed_on else '', + exec_time=execution_time_str if self.execution_time_str else '', + models=f'{self.models_processed:,}' if self.models_processed else '', + batches=f'{self.batches_processed:,}' if self.batches_processed else '', + rows=f'{self.rows_processed:,}' if self.rows_processed else '', + rows_per_second=f'{rows_per_second:,}' if rows_per_second else '') diff --git a/rdl/entities/execution_model_entity.py b/rdl/entities/execution_model_entity.py index 3ff6c6b..b50ebfd 100644 --- a/rdl/entities/execution_model_entity.py +++ b/rdl/entities/execution_model_entity.py @@ -36,12 +36,32 @@ class ExecutionModelEntity(Base): failure_reason = Column(String(1000), nullable=True) def __str__(self): - load_type = 'FULL' if self.is_full_refresh else f"INCREMENTAL from " \ - f"version '{self.last_sync_version}' " \ - f"to '{self.sync_version}'" - - execution_tims_s = max(self.execution_time_ms // 1000, 1) - rows_per_second = self.rows_processed / execution_tims_s - return f"Rows: {self.rows_processed}, " \ - f"Load type: {load_type}, " \ - f"Total Execution Time: {execution_tims_s}s @ {rows_per_second:.2f} rows per second " + load_type = f'FULL ({self.full_refresh_reason})' if self.is_full_refresh else \ + f"INCREMENTAL from version '{self.last_sync_version}' to '{self.sync_version}'" + execution_time_s = None + rows_per_second = None + + if self.execution_time_ms: + execution_time_s = max(self.execution_time_ms // 1000, 1) + + if self.rows_processed: + rows_per_second = self.rows_processed / execution_time_s + + return 'Model: {model}; ' \ + 'Load type: {load_type}; ' \ + 'Status: {status}; ' \ + 'Started on: {started}; ' \ + 'Completed on: {completed}; ' \ + 'Execution time: {exec_time}; ' \ + 'Batches processed: {batches}; ' \ + 'Rows processed: {rows}; ' \ + 'Average rows processed per second: {rows_per_second};'.format( + model=self.model_name, + load_type=load_type, + status=self.status, + started=self.started_on.isoformat(), + completed=self.completed_on.isoformat() if self.completed_on else '', + exec_time=f'{execution_time_s}s' if execution_time_s else '', + batches=f'{self.batches_processed:,}' if self.batches_processed else '', + rows=f'{self.rows_processed:,}' if self.rows_processed else '', + rows_per_second=f'{rows_per_second:,}' if rows_per_second else '') From afa7a0d68c9fd7f2a02d0a1efa8b1e9bac5524f6 Mon Sep 17 00:00:00 2001 From: Chintan Raval Date: Mon, 29 Jul 2019 15:47:48 +1000 Subject: [PATCH 2/4] SP-149 - restrict rows/second values to 2 decimals --- rdl/entities/execution_entity.py | 4 ++-- rdl/entities/execution_model_entity.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rdl/entities/execution_entity.py b/rdl/entities/execution_entity.py index 2889931..960214d 100644 --- a/rdl/entities/execution_entity.py +++ b/rdl/entities/execution_entity.py @@ -50,8 +50,8 @@ def __str__(self): status=self.status, started=self.started_on.isoformat(), completed=self.completed_on.isoformat() if self.completed_on else '', - exec_time=execution_time_str if self.execution_time_str else '', + exec_time=execution_time_str if execution_time_str else '', models=f'{self.models_processed:,}' if self.models_processed else '', batches=f'{self.batches_processed:,}' if self.batches_processed else '', rows=f'{self.rows_processed:,}' if self.rows_processed else '', - rows_per_second=f'{rows_per_second:,}' if rows_per_second else '') + rows_per_second=f'{rows_per_second:,.2f}' if rows_per_second else '') diff --git a/rdl/entities/execution_model_entity.py b/rdl/entities/execution_model_entity.py index b50ebfd..bf9dd86 100644 --- a/rdl/entities/execution_model_entity.py +++ b/rdl/entities/execution_model_entity.py @@ -64,4 +64,4 @@ def __str__(self): exec_time=f'{execution_time_s}s' if execution_time_s else '', batches=f'{self.batches_processed:,}' if self.batches_processed else '', rows=f'{self.rows_processed:,}' if self.rows_processed else '', - rows_per_second=f'{rows_per_second:,}' if rows_per_second else '') + rows_per_second=f'{rows_per_second:,.2f}' if rows_per_second else '') From afa3f50cf7e17df1ba8d24ebe9248d19a2e3c9d7 Mon Sep 17 00:00:00 2001 From: Chintan Raval Date: Mon, 29 Jul 2019 16:03:46 +1000 Subject: [PATCH 3/4] SP-149 - bug fix - log at least 1 second as execution time per model/execution --- rdl/data_load_tracking/DataLoadTrackerRepository.py | 4 ++-- rdl/entities/execution_entity.py | 12 ++++++------ rdl/entities/execution_model_entity.py | 10 +++++----- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/rdl/data_load_tracking/DataLoadTrackerRepository.py b/rdl/data_load_tracking/DataLoadTrackerRepository.py index 5cb5d65..6fa679c 100644 --- a/rdl/data_load_tracking/DataLoadTrackerRepository.py +++ b/rdl/data_load_tracking/DataLoadTrackerRepository.py @@ -42,7 +42,7 @@ def complete_execution(self, execution_id, total_number_of_models, .one() execution_end_time = session.query(func.now()).scalar() - total_execution_seconds = (execution_end_time - current_execution.started_on).total_seconds() + total_execution_seconds = max((execution_end_time - current_execution.started_on).total_seconds(), 1) total_rows_processed = self.get_execution_rows(current_execution.execution_id) total_batches_processed = self.get_execution_batches(current_execution.execution_id) @@ -82,7 +82,7 @@ def save_execution_model(self, data_load_tracker): .one() execution_end_time = session.query(func.now()).scalar() - total_execution_seconds = (execution_end_time - current_execution_model.started_on).total_seconds() + total_execution_seconds = max((execution_end_time - current_execution_model.started_on).total_seconds(), 1) current_execution_model.completed_on = execution_end_time current_execution_model.execution_time_ms = int(total_execution_seconds * 1000) diff --git a/rdl/entities/execution_entity.py b/rdl/entities/execution_entity.py index 960214d..32e15ee 100644 --- a/rdl/entities/execution_entity.py +++ b/rdl/entities/execution_entity.py @@ -49,9 +49,9 @@ def __str__(self): exec_id=self.execution_id, status=self.status, started=self.started_on.isoformat(), - completed=self.completed_on.isoformat() if self.completed_on else '', - exec_time=execution_time_str if execution_time_str else '', - models=f'{self.models_processed:,}' if self.models_processed else '', - batches=f'{self.batches_processed:,}' if self.batches_processed else '', - rows=f'{self.rows_processed:,}' if self.rows_processed else '', - rows_per_second=f'{rows_per_second:,.2f}' if rows_per_second else '') + completed=self.completed_on.isoformat() if self.completed_on else 'n/a', + exec_time=execution_time_str if execution_time_str else 'n/a', + models=f'{self.models_processed:,}' if self.models_processed else 'n/a', + batches=f'{self.batches_processed:,}' if self.batches_processed else 'n/a', + rows=f'{self.rows_processed:,}' if self.rows_processed else 'n/a', + rows_per_second=f'{rows_per_second:,.2f}' if rows_per_second else 'n/a') diff --git a/rdl/entities/execution_model_entity.py b/rdl/entities/execution_model_entity.py index bf9dd86..c139f7f 100644 --- a/rdl/entities/execution_model_entity.py +++ b/rdl/entities/execution_model_entity.py @@ -60,8 +60,8 @@ def __str__(self): load_type=load_type, status=self.status, started=self.started_on.isoformat(), - completed=self.completed_on.isoformat() if self.completed_on else '', - exec_time=f'{execution_time_s}s' if execution_time_s else '', - batches=f'{self.batches_processed:,}' if self.batches_processed else '', - rows=f'{self.rows_processed:,}' if self.rows_processed else '', - rows_per_second=f'{rows_per_second:,.2f}' if rows_per_second else '') + completed=self.completed_on.isoformat() if self.completed_on else 'n/a', + exec_time=f'{execution_time_s}s' if execution_time_s else 'n/a', + batches=f'{self.batches_processed:,}' if self.batches_processed else 'n/a', + rows=f'{self.rows_processed:,}' if self.rows_processed else 'n/a', + rows_per_second=f'{rows_per_second:,.2f}' if rows_per_second else 'n/a') From 1bb9990dbbf49530518eb9275246b829385557ca Mon Sep 17 00:00:00 2001 From: Chintan Raval Date: Tue, 30 Jul 2019 09:16:54 +1000 Subject: [PATCH 4/4] SP-149 - info log DataSource to be used for execution --- rdl/data_sources/DataSourceFactory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rdl/data_sources/DataSourceFactory.py b/rdl/data_sources/DataSourceFactory.py index 001e007..b82888e 100644 --- a/rdl/data_sources/DataSourceFactory.py +++ b/rdl/data_sources/DataSourceFactory.py @@ -11,7 +11,7 @@ def __init__(self, logger=None): def create_source(self, connection_string): for source in self.sources: if source.can_handle_connection_string(connection_string): - self.logger.debug(f"Found handler '{source}' for connection string.") + self.logger.info(f"Found handler '{source}' for given connection string.") return source(connection_string) raise RuntimeError('There are no data sources that can handle this connection string')