diff --git a/rdl/alembic/versions/bb0c5e8d05e2_add_batches_processed_count.py b/rdl/alembic/versions/bb0c5e8d05e2_add_batches_processed_count.py new file mode 100644 index 0000000..4245ad2 --- /dev/null +++ b/rdl/alembic/versions/bb0c5e8d05e2_add_batches_processed_count.py @@ -0,0 +1,26 @@ +"""add batches_processed count + +Revision ID: bb0c5e8d05e2 +Revises: 00f2b412576b +Create Date: 2019-07-26 13:56:06.412042 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'bb0c5e8d05e2' +down_revision = '00f2b412576b' +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column('execution', sa.Column('batches_processed', sa.Integer(), nullable=True), schema='rdl') + op.add_column('execution_model', sa.Column('batches_processed', sa.Integer(), nullable=True), schema='rdl') + + +def downgrade(): + op.drop_column('execution_model', 'batches_processed', schema='rdl') + op.drop_column('execution', 'batches_processed', schema='rdl') diff --git a/rdl/data_load_tracking/DataLoadTrackerRepository.py b/rdl/data_load_tracking/DataLoadTrackerRepository.py index e77a6a5..b1415ce 100644 --- a/rdl/data_load_tracking/DataLoadTrackerRepository.py +++ b/rdl/data_load_tracking/DataLoadTrackerRepository.py @@ -44,12 +44,14 @@ def complete_execution(self, execution_id, total_number_of_models, execution_end_time = session.query(func.now()).scalar() total_execution_seconds = (execution_end_time - current_execution.started_on).total_seconds() total_rows_processed = self.get_execution_rows(current_execution.execution_id) + total_batches_processed = self.get_execution_batches(current_execution.execution_id) current_execution.models_processed = total_number_of_models current_execution.status = status current_execution.completed_on = execution_end_time current_execution.execution_time_s = total_execution_seconds current_execution.rows_processed = total_rows_processed + current_execution.batches_processed = total_batches_processed session.commit() self.logger.info(current_execution) session.close() @@ -85,6 +87,7 @@ def save_execution_model(self, data_load_tracker): current_execution_model.completed_on = execution_end_time current_execution_model.execution_time_ms = int(total_execution_seconds * 1000) + current_execution_model.batches_processed = len(data_load_tracker.batches) current_execution_model.rows_processed = data_load_tracker.total_row_count current_execution_model.status = data_load_tracker.status current_execution_model.is_full_refresh = data_load_tracker.is_full_refresh @@ -104,6 +107,14 @@ def get_execution_rows(self, execution_id): session.close() return results + def get_execution_batches(self, execution_id): + session = self.session_maker() + results = session.query(func.sum(ExecutionModelEntity.batches_processed))\ + .filter(ExecutionModelEntity.execution_id == execution_id)\ + .scalar() + session.close() + return results + def get_full_refresh_since(self, timestamp): session = self.session_maker() results = session.query(ExecutionModelEntity.model_name)\ diff --git a/rdl/entities/execution_entity.py b/rdl/entities/execution_entity.py index 2a02400..5a1da35 100644 --- a/rdl/entities/execution_entity.py +++ b/rdl/entities/execution_entity.py @@ -19,6 +19,7 @@ class ExecutionEntity(Base): completed_on = Column(DateTime(timezone=True), nullable=True) execution_time_s = Column(BigInteger, nullable=True) rows_processed = Column(BigInteger, nullable=True) + batches_processed = Column(Integer, nullable=True) models_processed = Column(Integer, nullable=True) def __str__(self): diff --git a/rdl/entities/execution_model_entity.py b/rdl/entities/execution_model_entity.py index 47a673a..3ff6c6b 100644 --- a/rdl/entities/execution_model_entity.py +++ b/rdl/entities/execution_model_entity.py @@ -31,6 +31,7 @@ class ExecutionModelEntity(Base): completed_on = Column(DateTime(timezone=True), nullable=True) execution_time_ms = Column(BigInteger, nullable=True) rows_processed = Column(BigInteger, nullable=True) + batches_processed = Column(Integer, nullable=True) model_checksum = Column(String(100), nullable=False) failure_reason = Column(String(1000), nullable=True)