Skip to content
This repository has been archived by the owner on Mar 13, 2020. It is now read-only.

Commit

Permalink
Merge pull request #61 from pageuppeople-opensource/feature/SP-149-lo…
Browse files Browse the repository at this point in the history
…g-all-exec-and-model-details

SP-149 - Log all available details when logging execution and execution-model
  • Loading branch information
ChintanRaval authored Jul 30, 2019
2 parents df984d3 + 1bb9990 commit 13d6f02
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 26 deletions.
6 changes: 3 additions & 3 deletions rdl/data_load_tracking/DataLoadTrackerRepository.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def complete_execution(self, execution_id, total_number_of_models,
.one()

execution_end_time = session.query(func.now()).scalar()
total_execution_seconds = (execution_end_time - current_execution.started_on).total_seconds()
total_execution_seconds = max((execution_end_time - current_execution.started_on).total_seconds(), 1)
total_rows_processed = self.get_execution_rows(current_execution.execution_id)
total_batches_processed = self.get_execution_batches(current_execution.execution_id)

Expand All @@ -53,7 +53,7 @@ def complete_execution(self, execution_id, total_number_of_models,
current_execution.rows_processed = total_rows_processed
current_execution.batches_processed = total_batches_processed
session.commit()
self.logger.info(current_execution)
self.logger.info(f'Completed {current_execution}')
session.close()
return total_rows_processed

Expand Down Expand Up @@ -82,7 +82,7 @@ def save_execution_model(self, data_load_tracker):
.one()

execution_end_time = session.query(func.now()).scalar()
total_execution_seconds = (execution_end_time - current_execution_model.started_on).total_seconds()
total_execution_seconds = max((execution_end_time - current_execution_model.started_on).total_seconds(), 1)

current_execution_model.completed_on = execution_end_time
current_execution_model.execution_time_ms = int(total_execution_seconds * 1000)
Expand Down
2 changes: 1 addition & 1 deletion rdl/data_sources/DataSourceFactory.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def __init__(self, logger=None):
def create_source(self, connection_string):
for source in self.sources:
if source.can_handle_connection_string(connection_string):
self.logger.debug(f"Found handler '{source}' for connection string.")
self.logger.info(f"Found handler '{source}' for given connection string.")
return source(connection_string)

raise RuntimeError('There are no data sources that can handle this connection string')
Expand Down
45 changes: 32 additions & 13 deletions rdl/entities/execution_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,35 @@ class ExecutionEntity(Base):
models_processed = Column(Integer, nullable=True)

def __str__(self):
if self.status == Constants.ExecutionStatus.STARTED:
return f"Started Execution ID: {self.execution_id} at {self.started_on}"

total_execution_seconds = self.execution_time_s
execution_hours = total_execution_seconds // 3600
execution_minutes = (total_execution_seconds // 60) % 60
execution_seconds = total_execution_seconds % 60

return f"Completed Execution ID: {self.execution_id}" \
f"; Models Processed: {self.models_processed:,}" \
f"; Rows Processed: {self.rows_processed:,}" \
f"; Execution Time: {execution_hours}h {execution_minutes}m {execution_seconds}s" \
f"; Average rows processed per second: {(self.rows_processed//max(total_execution_seconds, 1)):,}."
execution_time_str = None
rows_per_second = None

if self.execution_time_s:
total_execution_seconds = self.execution_time_s

execution_hours = total_execution_seconds // 3600
execution_minutes = (total_execution_seconds // 60) % 60
execution_seconds = total_execution_seconds % 60
execution_time_str = f'{execution_hours}h {execution_minutes}m {execution_seconds}s'

if self.rows_processed:
rows_per_second = (self.rows_processed//max(total_execution_seconds, 1))

return 'Execution ID: {exec_id}; ' \
'Status: {status}; ' \
'Started on: {started}; ' \
'Completed on: {completed}; ' \
'Execution time: {exec_time}; ' \
'Models processed: {models}; ' \
'Batches processed: {batches};' \
'Rows processed: {rows}; ' \
'Average rows processed per second: {rows_per_second};'.format(
exec_id=self.execution_id,
status=self.status,
started=self.started_on.isoformat(),
completed=self.completed_on.isoformat() if self.completed_on else 'n/a',
exec_time=execution_time_str if execution_time_str else 'n/a',
models=f'{self.models_processed:,}' if self.models_processed else 'n/a',
batches=f'{self.batches_processed:,}' if self.batches_processed else 'n/a',
rows=f'{self.rows_processed:,}' if self.rows_processed else 'n/a',
rows_per_second=f'{rows_per_second:,.2f}' if rows_per_second else 'n/a')
38 changes: 29 additions & 9 deletions rdl/entities/execution_model_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,32 @@ class ExecutionModelEntity(Base):
failure_reason = Column(String(1000), nullable=True)

def __str__(self):
load_type = 'FULL' if self.is_full_refresh else f"INCREMENTAL from " \
f"version '{self.last_sync_version}' " \
f"to '{self.sync_version}'"

execution_tims_s = max(self.execution_time_ms // 1000, 1)
rows_per_second = self.rows_processed / execution_tims_s
return f"Rows: {self.rows_processed}, " \
f"Load type: {load_type}, " \
f"Total Execution Time: {execution_tims_s}s @ {rows_per_second:.2f} rows per second "
load_type = f'FULL ({self.full_refresh_reason})' if self.is_full_refresh else \
f"INCREMENTAL from version '{self.last_sync_version}' to '{self.sync_version}'"
execution_time_s = None
rows_per_second = None

if self.execution_time_ms:
execution_time_s = max(self.execution_time_ms // 1000, 1)

if self.rows_processed:
rows_per_second = self.rows_processed / execution_time_s

return 'Model: {model}; ' \
'Load type: {load_type}; ' \
'Status: {status}; ' \
'Started on: {started}; ' \
'Completed on: {completed}; ' \
'Execution time: {exec_time}; ' \
'Batches processed: {batches}; ' \
'Rows processed: {rows}; ' \
'Average rows processed per second: {rows_per_second};'.format(
model=self.model_name,
load_type=load_type,
status=self.status,
started=self.started_on.isoformat(),
completed=self.completed_on.isoformat() if self.completed_on else 'n/a',
exec_time=f'{execution_time_s}s' if execution_time_s else 'n/a',
batches=f'{self.batches_processed:,}' if self.batches_processed else 'n/a',
rows=f'{self.rows_processed:,}' if self.rows_processed else 'n/a',
rows_per_second=f'{rows_per_second:,.2f}' if rows_per_second else 'n/a')

0 comments on commit 13d6f02

Please sign in to comment.