Skip to content

Commit

Permalink
Lopp add more frequency (#44)
Browse files Browse the repository at this point in the history
* add another project with more frequent job runs and example of dynamicout

* update workflow

* updated failed workflow

* another attempt

* update name and frequency

* adjust flakiness and retry policies
  • Loading branch information
slopp authored Oct 18, 2023
1 parent 1113dd1 commit e27ec16
Show file tree
Hide file tree
Showing 16 changed files with 783 additions and 0 deletions.
16 changes: 16 additions & 0 deletions .github/workflows/deploy-dagster-cloud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,22 @@ jobs:
with:
command: "ci set-build-output --location-name=basics --image-tag=$IMAGE_TAG-basics"

# Build 'batch enrichment' code location
- name: Build and upload Docker image for batch enrichment
if: steps.prerun.outputs.result != 'skip'
uses: docker/build-push-action@v4
with:
context: ./hooli_batch_enrichment
push: true
tags: ${{ env.IMAGE_REGISTRY }}:${{ env.IMAGE_TAG }}-batch-enrichment

- name: Update build session with image tag for batch enrichment
id: ci-set-build-output-batch-enrichment
if: steps.prerun.outputs.result != 'skip'
uses: dagster-io/dagster-cloud-action/actions/utils/[email protected]
with:
command: "ci set-build-output --location-name=batch_enrichment --image-tag=$IMAGE_TAG-batch-enrichment"

# Deploy
- name: Deploy to Dagster Cloud
id: ci-deploy
Expand Down
6 changes: 6 additions & 0 deletions dagster_cloud.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,10 @@ locations:
build:
directory: ./hooli_basics
registry: 764506304434.dkr.ecr.us-west-2.amazonaws.com/hooli-data-science-prod
- location_name: batch_enrichment
code_source:
package_name: dagster_batch_enrichment
build:
directory: ./hooli_batch_enrichment
registry: 764506304434.dkr.ecr.us-west-2.amazonaws.com/hooli-data-science-prod

7 changes: 7 additions & 0 deletions hooli_batch_enrichment/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM python:3.10-slim

WORKDIR /opt/dagster/app

ADD . .

RUN pip install -e .
50 changes: 50 additions & 0 deletions hooli_batch_enrichment/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@

This example Dagster project shows a simple pipeline that pulls raw data from a warehouse and then uses an API to enrich that raw data.

![](enrichment_pipeline.png)

The enrichment is done in parallel batches, constructed using a Dagster graph:

![](enrichment_parallelization.png)


```python
@op
def split_rows(context, raw_data, config):
"""Split a data frame into batches, batch size controlled by config"""
...

@op
def process_chunk(context, chunk, api):
"""Process rows in each chunk by calling the enrichment API"""
...

@op
def concat_chunk_list(chunks):
"""Merge the processed chunks back together"""
...


@graph_asset
def enriched_data(raw_data):
"""Full enrichment process"""
chunks = split_rows(raw_data)
chunks_mapped = chunks.map(process_chunk)
enriched_chunks = chunks_mapped.collect()
return concat_chunk_list(enriched_chunks)
```

The number of batches to process is a function of the raw data size and the configurable batch size. The number of batches run in parallel is configured via Dagster's [multi-process executor](https://docs.dagster.io/_apidocs/execution#dagster.multiprocess_executor).

For example, a job with smaller batches and fewer parallel runs:

![](enrichment_launchpad.png)

![](enrichment_less_parallel.png)

To get started:

```bash
pip install -e ".[dev]"
dagster dev
```
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from dagster_batch_enrichment.definitions import defs as defs
35 changes: 35 additions & 0 deletions hooli_batch_enrichment/dagster_batch_enrichment/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import random

import numpy as np
import pandas as pd
import requests
import responses
from dagster import ConfigurableResource


class EnrichmentAPI(ConfigurableResource):
"""Represents a mock data enrichment API"""

@responses.activate
def get_order_details(_, order_id):
x = random.randint(0,500)
if x <= 1:
raise Exception("API time out")

responses.get(
# fake endpoint
"http://api.jaffleshop.co/v1/order_details",
# adds an order center
json=pd.DataFrame(
{
"order_id": [order_id],
"order_center": [
random.choices(["scranton", "albany", "new york"], k=1)
],
}
).to_json(),
)

return requests.get(
"http://api.jaffleshop.co/v1/order_details", params={"order_id": order_id}
)
85 changes: 85 additions & 0 deletions hooli_batch_enrichment/dagster_batch_enrichment/assets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from dagster import asset, OpExecutionContext ,MetadataValue, DynamicOut, Config, op, DynamicOutput, Out, graph_asset, RetryPolicy
from dagster_batch_enrichment.warehouse import MyWarehouse
from dagster_batch_enrichment.api import EnrichmentAPI
import numpy as np
from pydantic import Field
import pandas as pd
import json


@asset
def raw_data(context: OpExecutionContext, warehouse: MyWarehouse):
""" Placeholder for querying a real data source"""
orders_to_process = warehouse.get_raw_data()

# add any logging
context.log.info(f"Received {len(orders_to_process)} orders to process")

# associate metadata with the raw data asset materialization
context.add_output_metadata(metadata={
"preview": MetadataValue.md(orders_to_process.head(3).to_markdown()),
"nrows": len(orders_to_process)
})

return orders_to_process


# The enriched_data asset is constructed from a graph of operations
# that splits the raw data into batches and calls an enrichment API
# for each batch
# The batch size is configurable with a default of 50 records per batch
# The batches are processed in parallel threads
class ParallelizationConfig(Config):
number_records_per_batch: int = Field(50, description="Number of records to use per batch")

@op(out=DynamicOut())
def split_rows(context: OpExecutionContext, raw_data, config: ParallelizationConfig):
"""
Split a data frame into batches
"""
n_chunks = np.ceil(len(raw_data) / config.number_records_per_batch)
chunks = np.array_split(raw_data, n_chunks)
r = 0
for c in chunks:
r = r + 1
yield DynamicOutput(c, mapping_key=str(r))


@op(
retry_policy=RetryPolicy(max_retries=2)
)
def process_chunk(context: OpExecutionContext, chunk, api: EnrichmentAPI) -> pd.DataFrame:
"""
Process rows in each chunk by calling the enrichment API
within a chunk processing is sequential
but it could be parallelized with regular python techniques
"""
chunk["order_center"] = chunk.apply(
lambda row: get_order_details(row["order_id"], api), axis=1
)
return chunk


def get_order_details(order_id, api):
"""Given an order id call the enrichment API to get an order center"""
response = api.get_order_details(order_id)
response_data = json.loads(response.json())
return response_data["order_center"]


@op
def concat_chunk_list(chunks) -> pd.DataFrame:
"""Merge the processed chunks back together"""
return pd.concat(chunks)


@graph_asset
def enriched_data(raw_data) -> pd.DataFrame:
"""Full enrichment process"""
chunks = split_rows(raw_data)
chunks_mapped = chunks.map(process_chunk)
enriched_chunks = chunks_mapped.collect()
return concat_chunk_list(enriched_chunks)



30 changes: 30 additions & 0 deletions hooli_batch_enrichment/dagster_batch_enrichment/definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from dagster import Definitions, define_asset_job, ScheduleDefinition, AssetSelection
from dagster_batch_enrichment.api import EnrichmentAPI
from dagster_batch_enrichment.warehouse import MyWarehouse
from dagster_batch_enrichment.assets import raw_data, enriched_data


# define a job and schedule to run the pipeline
# alternatively could use freshness policies and auto-materialization, partitions, or other ways to orient the schedule
run_assets_job = define_asset_job(
name="run_etl_pipeline",
selection=AssetSelection.all(),
tags={"dagster/max_retries": "1"}
)

run_assets_30min = ScheduleDefinition(
name="run_assets_30min",
job=run_assets_job,
cron_schedule="*/30 * * * *"
)

defs = Definitions(
assets=[raw_data, enriched_data],
schedules=[run_assets_30min],
jobs=[run_assets_job],
resources={
"api": EnrichmentAPI(),
# place holder for a real warehouse with required connection config
"warehouse": MyWarehouse(path="raw_data.csv")
}
)
9 changes: 9 additions & 0 deletions hooli_batch_enrichment/dagster_batch_enrichment/warehouse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from typing import Callable
from dagster import ConfigurableResource
import duckdb

class MyWarehouse(ConfigurableResource):
path: str

def get_raw_data(self):
return duckdb.sql(f"SELECT * FROM \'{self.path}\'").df()
Binary file added hooli_batch_enrichment/enrichment_launchpad.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added hooli_batch_enrichment/enrichment_pipeline.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 6 additions & 0 deletions hooli_batch_enrichment/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"

[tool.dagster]
module_name = "dagster_batch_enrichment"
Loading

0 comments on commit e27ec16

Please sign in to comment.