Skip to content
This repository has been archived by the owner on Mar 13, 2020. It is now read-only.

Commit

Permalink
Merge pull request #60 from pageuppeople-opensource/feature/SP-149-pe…
Browse files Browse the repository at this point in the history
…rsist-batch-count

SP-149 - Persist count of batches per model and per execution
  • Loading branch information
ChintanRaval authored Jul 28, 2019
2 parents bb3ee5e + d13c143 commit df984d3
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 0 deletions.
26 changes: 26 additions & 0 deletions rdl/alembic/versions/bb0c5e8d05e2_add_batches_processed_count.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
"""add batches_processed count
Revision ID: bb0c5e8d05e2
Revises: 00f2b412576b
Create Date: 2019-07-26 13:56:06.412042
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = 'bb0c5e8d05e2'
down_revision = '00f2b412576b'
branch_labels = None
depends_on = None


def upgrade():
op.add_column('execution', sa.Column('batches_processed', sa.Integer(), nullable=True), schema='rdl')
op.add_column('execution_model', sa.Column('batches_processed', sa.Integer(), nullable=True), schema='rdl')


def downgrade():
op.drop_column('execution_model', 'batches_processed', schema='rdl')
op.drop_column('execution', 'batches_processed', schema='rdl')
11 changes: 11 additions & 0 deletions rdl/data_load_tracking/DataLoadTrackerRepository.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,14 @@ def complete_execution(self, execution_id, total_number_of_models,
execution_end_time = session.query(func.now()).scalar()
total_execution_seconds = (execution_end_time - current_execution.started_on).total_seconds()
total_rows_processed = self.get_execution_rows(current_execution.execution_id)
total_batches_processed = self.get_execution_batches(current_execution.execution_id)

current_execution.models_processed = total_number_of_models
current_execution.status = status
current_execution.completed_on = execution_end_time
current_execution.execution_time_s = total_execution_seconds
current_execution.rows_processed = total_rows_processed
current_execution.batches_processed = total_batches_processed
session.commit()
self.logger.info(current_execution)
session.close()
Expand Down Expand Up @@ -85,6 +87,7 @@ def save_execution_model(self, data_load_tracker):
current_execution_model.completed_on = execution_end_time
current_execution_model.execution_time_ms = int(total_execution_seconds * 1000)

current_execution_model.batches_processed = len(data_load_tracker.batches)
current_execution_model.rows_processed = data_load_tracker.total_row_count
current_execution_model.status = data_load_tracker.status
current_execution_model.is_full_refresh = data_load_tracker.is_full_refresh
Expand All @@ -104,6 +107,14 @@ def get_execution_rows(self, execution_id):
session.close()
return results

def get_execution_batches(self, execution_id):
session = self.session_maker()
results = session.query(func.sum(ExecutionModelEntity.batches_processed))\
.filter(ExecutionModelEntity.execution_id == execution_id)\
.scalar()
session.close()
return results

def get_full_refresh_since(self, timestamp):
session = self.session_maker()
results = session.query(ExecutionModelEntity.model_name)\
Expand Down
1 change: 1 addition & 0 deletions rdl/entities/execution_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class ExecutionEntity(Base):
completed_on = Column(DateTime(timezone=True), nullable=True)
execution_time_s = Column(BigInteger, nullable=True)
rows_processed = Column(BigInteger, nullable=True)
batches_processed = Column(Integer, nullable=True)
models_processed = Column(Integer, nullable=True)

def __str__(self):
Expand Down
1 change: 1 addition & 0 deletions rdl/entities/execution_model_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class ExecutionModelEntity(Base):
completed_on = Column(DateTime(timezone=True), nullable=True)
execution_time_ms = Column(BigInteger, nullable=True)
rows_processed = Column(BigInteger, nullable=True)
batches_processed = Column(Integer, nullable=True)
model_checksum = Column(String(100), nullable=False)
failure_reason = Column(String(1000), nullable=True)

Expand Down

0 comments on commit df984d3

Please sign in to comment.