Skip to content

Commit

Permalink
Merge pull request IntegriChain1#33 from IntegriChain1/DC-59-dag-builder
Browse files Browse the repository at this point in the history
Dc 59 dag builder
  • Loading branch information
norton120 authored Feb 13, 2019
2 parents 7ed370c + 7b246e6 commit 656f877
Show file tree
Hide file tree
Showing 34 changed files with 687 additions and 59 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
build
.coverage
htmlcov/
*.swp
3 changes: 2 additions & 1 deletion config/core_project.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ PROD_AWS_ACCOUNT: "687531504312"
AWS_ACCOUNT: "265991248033"
AWS_REGION: us-east-1
AWS_BATCH_TEST_JOB_QUEUE: core
LOGGING_CONFIG: logging.yaml
LOGGING_CONFIG: logging.yaml

File renamed without changes.
File renamed without changes.
82 changes: 82 additions & 0 deletions core/airflow/dagbuilder/dag_builder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from airflow import DAG
from core.models.configuration import Pipeline
from core.helpers.session_helper import SessionHelper
from datetime import datetime, timedelta
from core.airflow.dagbuilder.task_orchestrator import TaskOrchestrator
from core.logging import LoggerMixin


class DagBuilder(LoggerMixin):

def __init__(self):
self._dags = []
self.DEFAULT_ARGS = {
"owner": "integriChain",
"depends_on_past": False,
"start_date": datetime(2000, 1, 1),
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5)
}

def do_build_dags(self)->None:
"""Integrates all the components of getting dags, setting task deps etc."""
self.logger.info('Beginning run to build all dags...')

self._pipelines = self._get_pipelines()
sets = self._create_dag_sets(self._pipelines)
for pipeline, dag in sets:
tasks = self._get_prepped_tasks(pipeline, dag)
self._dags.append(dag)
self.logger.info(f"Done with run. {len(self._dags)} dags built.")

@property
def dags(self)->list:
return self._dags

@dags.setter
def dags(self, dags)->None:
self._dags = dags

def _create_dag_sets(self, pipelines: list)-> list:
""" creates a dag for each pipeline
RETURNS a list of tuples, each containing:
- the original pipeline object
- the matching DAG for that pipeline"""
dags = []
self.logger.debug("Creating DAGs from pipelines...")
for pipe in pipelines:
self.logger.debug(f"Created DAG {pipe.name}")
dags.append((pipe, DAG(pipe.name,
default_args=self.DEFAULT_ARGS,
# airflow may no longer support start_date in default_args
start_date=self.DEFAULT_ARGS['start_date'],
schedule_interval=f'@{pipe.run_frequency}'),))
self.logger.debug("Done creating DAGs.")
return dags

def _get_pipelines(self, only_active: bool = True) -> list:
""" gets all the pipelines from the configuration session.
ARGS
- only_active: if true ignore inactive pipelines
RETURNS list of core.model.Pipeline objects
"""
self.logger.debug(
f"getting {'active' if only_active else 'all'} pipelines from config...")
session = SessionHelper().session

if only_active:
pipelines = session.query(Pipeline).filter(Pipeline.is_active)
else:
pipelines = session.query(Pipeline)
self.logger.debug(
f"Done getting pipelines, {len([x for x in pipelines])} pipelines found.")
return pipelines

def _get_prepped_tasks(self, pipeline: Pipeline, dag: DAG)-> tuple:
"""returns a tuple of tasks with deps and dag already applied."""
to = TaskOrchestrator(pipeline, dag)
to.do_orchestrate()
return to.tasks
File renamed without changes.
146 changes: 146 additions & 0 deletions core/airflow/dagbuilder/task_orchestrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from core.models.configuration import Pipeline
from operator import attrgetter
from core.airflow.plugins.transform_operator import TransformOperator
from core.logging import LoggerMixin


class TaskOrchestrator(LoggerMixin):

def __init__(self, pipeline: Pipeline = None, dag: DAG = None)->None:
self._pipeline = pipeline
self._dag = dag
self._tasks = []

@property
def dag(self)->DAG:
return self._dag

@dag.setter
def dag(self, dag: DAG)->None:
self._dag = dag

@property
def tasks(self):
return self._tasks

@property
def pipeline(self):
return self._pipeline

@pipeline.setter
def pipeline(self, pipeline: Pipeline)->None:
self._pipeline = pipeline

@tasks.setter
def tasks(self, tasks: list)->None:
error_messge = 'TaskOrchestrator.tasks must not be set directly.'
self.logger.critical(error_message)
raise ValueError(error_message)

def do_orchestrate(self)->None:
self.logger.info("Begin orchestrating tasks...")
if not (self._pipeline and self._dag):
except_message = "TaskOrchestrator cannot run do_orchstrate without a pipeline and a dag set."
self.logger.critical(except_message)
raise ValueError(except_message)

all_pipeline_tasks = []
for state in self._pipeline.pipeline_states:
self.logger.debug(
f"Ordering transforms in state {state.pipeline_state_type.name}...")
transformations = self._order_transformations_within_group(
state.transformations)
self.logger.debug(
f"Done ordering transforms for state {state.pipeline_state_type.name}. {len(transformations)} sets of transforms created.")
all_transforms = []
for transformation_group in transformations:
converted_set = set()
for transform in transformation_group:
to = TransformOperator(transform.id)
converted_set.add(to)
all_transforms.append(
tuple([state.pipeline_state_type.name, converted_set]))
all_pipeline_tasks += all_transforms
self.logger.debug(
f"Applying dependancies to tasks for dag {self._dag.dag_id}...")
self._tasks = self._apply_deps_to_ordered_tasks(
all_pipeline_tasks, self._dag)
self.logger.info(
f"Done orchestrating tasks. {len(self._tasks)} orchestrated.")

## PRIVATE ##

def _order_transformations_within_group(self, transformations: list)->list:
""" takes a list of configuration transformations from the same pipeline state, returns them in a list of ordered sets."""
transformations = sorted(
transformations, key=attrgetter('graph_order'))
ordered_transformation_groups = [set()]
graph_cursor = 0
list_cursor = 0
for t in transformations:
if t.graph_order == graph_cursor:
ordered_transformation_groups[list_cursor].add(t)
else:
list_cursor = +1
graph_cursor = t.graph_order
ordered_transformation_groups.append({t})
self.logger.debug(
f"added task with graph order # {t.graph_order} to group {graph_cursor}")
return ordered_transformation_groups

def _apply_deps_to_ordered_tasks(self, task_groups: list, dag: DAG)->tuple:
""" takes an ordered list of tuples. each tuple is (state_name, {set_of_operator_tasks}). Assigns deps to each set for all tasks in the previous set.
Example:
-if ordered_task_sets is:
[("raw",{task_1,task2},),("raw", {task_3,task_4},), ("ingest",{task_5},)]
this will return a tuple (raw_group_task_1, raw_group_task_2, task_1, task_2, task_3...) where task_1 and task_2 depend on upstream raw_group_1 and downstream raw_group_2, task_3 and task_4 have upstream raw_group_2 and downstream ingest_group_1 etc.
RETURNS: tuple of tasks with deps applied
"""

spacers = []

### learn more about spacers in our dag at https://github.com/IntegriChain1/core/blob/master/docs/dev_guide.md#spacers
def make_spacer(id: int, state_name: str, dag: DAG)->DummyOperator:
""" look for an existing spacer. return it. if not, make it and return that."""

spacer_format = f"{state_name}_group_step_{id}"
if len(spacers) > 0:
for spacer in spacers:
if spacer.task_id == spacer_format:
return spacer
spacer = DummyOperator(task_id=spacer_format)
spacer.dag = dag
spacer.depends_on_past = True

return spacer

# first make the spacer operators
for index, packed_task_group in enumerate(task_groups):
state_name = packed_task_group[0]
spacer = make_spacer(index, state_name, dag)
spacers.append(spacer)

self.logger.debug(f"Created {len(spacers)} spacer tasks.")
prepaired_tasks = []

# now set up/downstreams to the tasks and spacers
for index, packed_task_group in enumerate(task_groups):
task_group = packed_task_group[1]

for task in task_group:
task.dag = dag
task.depends_on_past = True
self.logger.debug(
f"Set downstream on {spacers[index].task_id} to {task.task_id}.")
spacers[index] >> task
if index < len(spacers) - 1:
self.logger.debug(
f"Set downstream on {task.task_id} to {spacers[index + 1].task_id}.")
task >> spacers[index + 1]
prepaired_tasks.append(task)

prepaired_tasks += spacers

return tuple(prepaired_tasks)
File renamed without changes.
File renamed without changes.
9 changes: 9 additions & 0 deletions core/airflow/dags/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from core.airflow.dagbuilder.dag_builder import DagBuilder
from airflow import DAG

dag_builder = DagBuilder()

dag_builder.do_build_dags()

for index, dag in enumerate(dag_builder.dags):
globals()[f'dag_{index}'] = dag
30 changes: 30 additions & 0 deletions core/airflow/dags/operator_tester.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from core.constants import ENVIRONMENT
from airflow import DAG
from datetime import timedelta, datetime
from airflow.operators.dummy_operator import DummyOperator
### Import the operator you want to test here! ###

if ENVIRONMENT == "dev":

DEFAULT_ARGS = {
"owner": "integrichain",
"depends_on_past": True,
"start_date": datetime(2015, 6, 1),
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
"retry_delay": timedelta(minutes=5)
}

dag = DAG('development_dag_for_testing_operator',
default_args=DEFAULT_ARGS, schedule_interval=None)


kickoff_task = DummyOperator(task_id="task_that_does_nothing", dag=dag)

# your task with your operator goes here!
# my_task = MyOperator(task_id = <something>, dag = dag)

# put them in order
## kickoff_task >> my_task
12 changes: 12 additions & 0 deletions core/airflow/plugins/transform_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from airflow.operators import BaseOperator
from airflow.operators.bash_operator import BashOperator


class TransformOperator(BashOperator):
""" Placeholder for Rayne's class"""

def __init__(self, id: int)->None:
super().__init__(bash_command='ls',
task_id=f'super_fancy_task_{id}',

)
1 change: 0 additions & 1 deletion core/contract.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import boto3
import os
import logging
from git import Repo
from core.helpers.s3_naming_helper import S3NamingHelper as s3Name

Expand Down
20 changes: 18 additions & 2 deletions core/helpers/configuration_mocker.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ def _mock_pipelines(self)-> None:
pipeline_type_id=1, run_frequency='daily'),
p(id=2, name="bluth_profitability", brand_id=2,
pipeline_type_id=2, run_frequency='hourly'),
p(id=3, name="temocil_profitablility", brand_id=1, pipeline_type_id=1, run_frequency='daily')])
p(id=3, name="temocil_profitablility", brand_id=1, pipeline_type_id=1, run_frequency='daily'),
p(id=500, name="bluth_banana_regression_deprecated", brand_id=2,
pipeline_type_id=1, is_active=False, run_frequency='hourly')])
self.session.commit()
self.logger.debug('Done generating pipeline mocks.')

Expand Down Expand Up @@ -139,7 +141,21 @@ def _mock_transformations(self)-> None:
t(id=2, transformation_template_id=1,
pipeline_state_id=2, graph_order=0),
t(id=3, transformation_template_id=2,
pipeline_state_id=2, graph_order=1)
pipeline_state_id=2, graph_order=1),
t(id=9, transformation_template_id=2,
pipeline_state_id=2, graph_order=0),
t(id=10, transformation_template_id=2,
pipeline_state_id=2, graph_order=0),
t(id=4, transformation_template_id=1,
pipeline_state_id=4, graph_order=0),
t(id=5, transformation_template_id=1,
pipeline_state_id=4, graph_order=0),
t(id=6, transformation_template_id=1,
pipeline_state_id=4, graph_order=1),
t(id=7, transformation_template_id=1,
pipeline_state_id=4, graph_order=1),
t(id=8, transformation_template_id=1,
pipeline_state_id=4, graph_order=2)
])
self.session.commit()
self.logger.debug('Done generating transformation mocks.')
Expand Down
7 changes: 6 additions & 1 deletion core/helpers/project_root.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

from typing import Union, List
import os
import sys
Expand All @@ -18,7 +19,11 @@ def find_setup_file(self, f: str) -> List[Union[str, bool]]:
if os.path.isfile(os.path.abspath(os.path.normpath(path + os.path.sep + 'setup.py'))):
return path
elif path == '/':
raise Exception("Unable to determine project root: setup.py not found. Are you in Core?")
## the airflow docker container is a special bird where /dags and the project root do not share parent paths
if os.path.isfile('/usr/src/app/setup.py'):
return '/usr/src/app'
else:
raise Exception("Unable to determine project root: setup.py not found. Are you in Core?")

else:
return self.find_setup_file(os.path.split(os.path.abspath(os.path.normpath(f)))[0])
Expand Down
9 changes: 6 additions & 3 deletions core/helpers/session_helper.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from core.constants import ENVIRONMENT
from core.helpers.configuration_helper import ConfigurationHelper as CMock
from core.helpers.configuration_mocker import ConfigurationMocker as CMock
from sqlalchemy.orm.session import Session

class SessionHelper:
Expand All @@ -11,11 +11,14 @@ def __init__(self):
if ENVIRONMENT == "dev":
cmock = CMock()
cmock.generate_mocks()
self._session == cmock.get_session()
if ENVIRONMENT == "prod":
self._session = cmock.get_session()
elif ENVIRONMENT == "prod":
pass

@property
def session(self)-> Session:
return self._session

@session.setter
def session(self,session)->None:
raise ValueError("session cannot be explicitly set in session_helper.")
Loading

0 comments on commit 656f877

Please sign in to comment.