Skip to content
This repository has been archived by the owner on Mar 13, 2020. It is now read-only.

Commit

Permalink
Merge pull request #77 from pageuppeople-opensource/execution_model-i…
Browse files Browse the repository at this point in the history
…ndexing

Add indexes to execution model
  • Loading branch information
elexisvenator authored Nov 25, 2019
2 parents 11a85a6 + 375b9e9 commit 2ec42bf
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 102 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""add indexes to execution_model
Revision ID: 3834c837f5f2
Revises: bb0c5e8d05e2
Create Date: 2019-11-26 08:42:13.575198
"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '3834c837f5f2'
down_revision = 'bb0c5e8d05e2'
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_index('execution_model__index_on_execution_id_model_name', 'execution_model', ['execution_id', 'model_name'], unique=True, schema='rdl')
op.create_index('execution_model__index_on_model_name_completed_on', 'execution_model', ['model_name', 'completed_on'], unique=False, schema='rdl')
op.create_index(op.f('ix_rdl_execution_model_completed_on'), 'execution_model', ['completed_on'], unique=False, schema='rdl')
op.create_index(op.f('ix_rdl_execution_model_status'), 'execution_model', ['status'], unique=False, schema='rdl')
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_rdl_execution_model_status'), table_name='execution_model', schema='rdl')
op.drop_index(op.f('ix_rdl_execution_model_completed_on'), table_name='execution_model', schema='rdl')
op.drop_index('execution_model__index_on_model_name_completed_on', table_name='execution_model', schema='rdl')
op.drop_index('execution_model__index_on_execution_id_model_name', table_name='execution_model', schema='rdl')
# ### end Alembic commands ###
209 changes: 107 additions & 102 deletions rdl/entities/execution_model_entity.py
Original file line number Diff line number Diff line change
@@ -1,102 +1,107 @@
import uuid

from sqlalchemy import (
Column,
DateTime,
Integer,
String,
Boolean,
BigInteger,
ForeignKey,
)
from sqlalchemy.sql import func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.inspection import inspect

from rdl.entities import Base
from rdl.entities import ExecutionEntity
from rdl.shared import Constants


class ExecutionModelEntity(Base):
__tablename__ = "execution_model"
__table_args__ = {"schema": Constants.DATA_PIPELINE_EXECUTION_SCHEMA_NAME}
execution_model_id = Column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
created_on = Column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
updated_on = Column(
DateTime(timezone=True),
nullable=False,
server_default=func.now(),
onupdate=func.now(),
)
execution_id = Column(
UUID(as_uuid=True),
ForeignKey(
f"{Constants.DATA_PIPELINE_EXECUTION_SCHEMA_NAME}."
f"{inspect(ExecutionEntity).tables[0].name}."
f"{inspect(ExecutionEntity).primary_key[0].name}"
),
nullable=False,
)
model_name = Column(String(250), nullable=False)
status = Column(
String(50),
nullable=False,
server_default=str(Constants.ExecutionModelStatus.STARTED),
)
last_sync_version = Column(BigInteger, nullable=False)
sync_version = Column(BigInteger, nullable=False)
is_full_refresh = Column(Boolean, nullable=False)
full_refresh_reason = Column(String(100), nullable=False)
started_on = Column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
completed_on = Column(DateTime(timezone=True), nullable=True)
execution_time_ms = Column(BigInteger, nullable=True)
rows_processed = Column(BigInteger, nullable=True)
batches_processed = Column(Integer, nullable=True)
model_checksum = Column(String(100), nullable=False)
failure_reason = Column(String(1000), nullable=True)

def __str__(self):
load_type = (
f"FULL ({self.full_refresh_reason})"
if self.is_full_refresh
else f"INCREMENTAL from version '{self.last_sync_version}' to '{self.sync_version}'"
)
execution_time_s = None
rows_per_second = None

if self.execution_time_ms:
execution_time_s = max(self.execution_time_ms // 1000, 1)

if self.rows_processed:
rows_per_second = self.rows_processed / execution_time_s

return (
"Model: {model}; "
"Load type: {load_type}; "
"Status: {status}; "
"Started on: {started}; "
"Completed on: {completed}; "
"Execution time: {exec_time}; "
"Batches processed: {batches}; "
"Rows processed: {rows}; "
"Average rows processed per second: {rows_per_second};".format(
model=self.model_name,
load_type=load_type,
status=self.status,
started=self.started_on.isoformat(),
completed=self.completed_on.isoformat() if self.completed_on else "n/a",
exec_time=f"{execution_time_s}s" if execution_time_s else "n/a",
batches=f"{self.batches_processed:,}"
if self.batches_processed
else "n/a",
rows=f"{self.rows_processed:,}" if self.rows_processed else "n/a",
rows_per_second=f"{rows_per_second:,.2f}" if rows_per_second else "n/a",
)
)
import uuid

from sqlalchemy import (
Column,
DateTime,
Integer,
String,
Boolean,
BigInteger,
ForeignKey,
Index,
)
from sqlalchemy.sql import func
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.inspection import inspect

from rdl.entities import Base
from rdl.entities import ExecutionEntity
from rdl.shared import Constants


class ExecutionModelEntity(Base):
__tablename__ = "execution_model"
__table_args__ = {"schema": Constants.DATA_PIPELINE_EXECUTION_SCHEMA_NAME}
execution_model_id = Column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
created_on = Column(
DateTime(timezone=True), nullable=False, server_default=func.now()
)
updated_on = Column(
DateTime(timezone=True),
nullable=False,
server_default=func.now(),
onupdate=func.now(),
)
execution_id = Column(
UUID(as_uuid=True),
ForeignKey(
f"{Constants.DATA_PIPELINE_EXECUTION_SCHEMA_NAME}."
f"{inspect(ExecutionEntity).tables[0].name}."
f"{inspect(ExecutionEntity).primary_key[0].name}"
),
nullable=False,
)
model_name = Column(String(250), nullable=False)
status = Column(
String(50),
nullable=False,
server_default=str(Constants.ExecutionModelStatus.STARTED),
index=True
)
last_sync_version = Column(BigInteger, nullable=False)
sync_version = Column(BigInteger, nullable=False)
is_full_refresh = Column(Boolean, nullable=False)
full_refresh_reason = Column(String(100), nullable=False)
started_on = Column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
completed_on = Column(DateTime(timezone=True), nullable=True, index=True)
execution_time_ms = Column(BigInteger, nullable=True)
rows_processed = Column(BigInteger, nullable=True)
batches_processed = Column(Integer, nullable=True)
model_checksum = Column(String(100), nullable=False)
failure_reason = Column(String(1000), nullable=True)

index_on_execution_id_model_name = Index("execution_model__index_on_execution_id_model_name", execution_id, model_name, unique=True)
index_on_model_name_completed_on = Index("execution_model__index_on_model_name_completed_on", model_name, completed_on)

def __str__(self):
load_type = (
f"FULL ({self.full_refresh_reason})"
if self.is_full_refresh
else f"INCREMENTAL from version '{self.last_sync_version}' to '{self.sync_version}'"
)
execution_time_s = None
rows_per_second = None

if self.execution_time_ms:
execution_time_s = max(self.execution_time_ms // 1000, 1)

if self.rows_processed:
rows_per_second = self.rows_processed / execution_time_s

return (
"Model: {model}; "
"Load type: {load_type}; "
"Status: {status}; "
"Started on: {started}; "
"Completed on: {completed}; "
"Execution time: {exec_time}; "
"Batches processed: {batches}; "
"Rows processed: {rows}; "
"Average rows processed per second: {rows_per_second};".format(
model=self.model_name,
load_type=load_type,
status=self.status,
started=self.started_on.isoformat(),
completed=self.completed_on.isoformat() if self.completed_on else "n/a",
exec_time=f"{execution_time_s}s" if execution_time_s else "n/a",
batches=f"{self.batches_processed:,}"
if self.batches_processed
else "n/a",
rows=f"{self.rows_processed:,}" if self.rows_processed else "n/a",
rows_per_second=f"{rows_per_second:,.2f}" if rows_per_second else "n/a",
)
)

0 comments on commit 2ec42bf

Please sign in to comment.