Skip to content

Commit

Permalink
Add use case repository
Browse files Browse the repository at this point in the history
  • Loading branch information
PedramNavid committed Aug 7, 2024
1 parent 7b3730d commit eb066f9
Show file tree
Hide file tree
Showing 15 changed files with 731 additions and 114 deletions.
19 changes: 19 additions & 0 deletions examples/use_case_repository/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
.PHONY: install lint fix test webserver

install:
pip install --upgrade uv
uv pip install -e .[dev]

lint:
ruff check .
ruff format --check .

fix:
ruff check --fix .
ruff format .

test:
pytest

webserver:
python -m webserver.main
36 changes: 33 additions & 3 deletions examples/use_case_repository/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,37 @@
## Use Case Repository

These use-cases are read by the dagster.io website to populate a list of available uses cases for our use case repository.
This repository contains a collection of use cases that demonstrate various applications and implementations using Dagster.

Use cases are fetched via an automated simply step during the extremely build of the dagster.io website from the master branch of this repository.
### Purpose

Both the markdown and python files are used on our robust website. Please ensure you update both when making changes. Only the python files are tested.
The use cases in this repository serve two main purposes:

1. They are used to populate the list of available use cases on the Dagster.io website.
2. They provide practical examples for developers and data engineers working with Dagster.

### Integration with Dagster.io

The use cases are automatically fetched from the master branch of this repository during the build process of the Dagster.io website. This ensures that the website always displays the most up-to-date examples. In `dagster-website/scripts/fetchUseCases.js` you can find the code that fetches the use cases from this repository and updates the website.

The script fetches from the master branch of this repository, so you will need to push your changes to the master branch to see them on the website.

### File Structure

Each use case consists of two main components:

1. A Markdown (.md) file: Contains the descriptive content and documentation, along with code snippets.
2. A Python (.py) file: Contains the actual implementation code as a single file.

Both files are utilized on the Dagster.io website. However, only the Python files are subject to automated testing.

### Important Note

When updating a use case, please make sure to modify both the Markdown and Python files to maintain consistency between the documentation and the implementation.

### Local Preview

To preview your changes locally before committing, you can start a local webserver by running the following command in your terminal:

```
make webserver
```
19 changes: 14 additions & 5 deletions examples/use_case_repository/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,19 @@
packages=find_packages(exclude=["use_case_repository_tests"]),
install_requires=[
"dagster",
"dagster-cloud",
"boto3",
"pandas",
"matplotlib",
"dagster-embedded-elt",
"dagster-pipes",
"python-frontmatter",
"pymdown-extensions",
"markdown",
"flask",
"sling",
],
extras_require={"dev": ["dagster-webserver", "pytest"]},
extras_require={
"dev": [
"dagster-webserver",
"pytest",
"ruff",
]
},
)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#! /bin/bash
echo "Hello from CLI"
104 changes: 104 additions & 0 deletions examples/use_case_repository/use_case_repository/pipes_cli_command.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
---
title: "Using Dagster Pipes Subprocess to Run a CLI Command"
description: "This use case demonstrates how to use Dagster Pipes to run a CLI command within a Dagster asset. The objective is to execute non-Python workloads and integrate their outputs into Dagster's data pipeline."
tags: ["dagster pipes", "subprocess", "CLI"]
---
## Running CLI Commands with Dagster Pipes

### Overview

This guide demonstrates how to use Dagster Pipes to run a CLI command within a Dagster asset. This is useful for integrating non-Python workloads, such as Bash scripts or other command-line tools, into your Dagster data pipeline.

### Prerequisites

- Dagster and Dagster UI (`dagster-webserver`) installed. Refer to the [Installation guide](https://docs.dagster.io/getting-started/install) for more info.
- An existing CLI command or script that you want to run.

### What You’ll Learn

You will learn how to:
- Define a Dagster asset that invokes a CLI command.
- Use Dagster Pipes to manage subprocess execution.
- Capture and utilize the output of the CLI command within Dagster.

### Steps to Implement With Dagster

1. **Step 1: Define the CLI Command Script**
- Create a script that contains the CLI command you want to run. For example, create a file named `external_script.sh` with the following content:
```bash
#!/bin/bash
echo "Hello from CLI"
```

2. **Step 2: Define the Dagster Asset**
- Define a Dagster asset that uses `PipesSubprocessClient` to run the CLI command. Include any necessary environment variables or additional parameters.
```python
import shutil
from dagster import asset, Definitions, AssetExecutionContext
from dagster_pipes import PipesSubprocessClient
@asset
def cli_command_asset(
context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
):
cmd = [shutil.which("bash"), "external_script.sh"]
return pipes_subprocess_client.run(
command=cmd,
context=context,
env={"MY_ENV_VAR": "example_value"},
).get_materialize_result()
defs = Definitions(
assets=[cli_command_asset],
resources={"pipes_subprocess_client": PipesSubprocessClient()},
)
```

3. **Step 3: Configure and Run the Asset**
- Ensure the script is executable and run the Dagster asset to see the output.
```bash
chmod +x external_script.sh
dagit -f path_to_your_dagster_file.py
```

### Expected Outcomes

By following these steps, you will have a Dagster asset that successfully runs a CLI command and logs its output. This allows you to integrate non-Python workloads into your Dagster data pipeline.

### Troubleshooting

- **Permission Denied**: Ensure the script file has executable permissions using `chmod +x`.
- **Command Not Found**: Verify the command is available in the system's PATH or provide the full path to the command.
### Additional Resources
- [Dagster Pipes Documentation](https://docs.dagster.io/guides/dagster-pipes)
- [Dagster Installation Guide](https://docs.dagster.io/getting-started/install)
### Next Steps
Explore more advanced use cases with Dagster Pipes, such as integrating with other command-line tools or handling more complex workflows.
The Steps MUST always be pythonic Dagster code. If the documentation includes @solids or @ops and repository, discard it. The documentation should only use the new Dagster APIs, such as @asset and Definitions.
Use as many steps as necessary. 3 is the minimum number of steps.
Do not add an `if name == __main__` block. Do not call the `materialize` function. Only provide the definition for the assets. Avoid the following words: certainly, simply, robust, ensure
Here is a minimal Dagster code:
```python
from dagster import asset, Definitions, materialize
@asset
def example_asset():
return "Example output"
@asset(deps=[example_asset])
def another_asset():
return "Example output"
defs = Definitions(
assets=[example_asset]
)
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import shutil

from dagster import AssetExecutionContext, Definitions, PipesSubprocessClient, asset


@asset
def cli_command_asset(
context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
):
cmd = [shutil.which("bash"), "external_script.sh"]
return pipes_subprocess_client.run(
command=cmd,
context=context,
env={"MY_ENV_VAR": "example_value"},
).get_materialize_result()


defs = Definitions(
assets=[cli_command_asset],
resources={"pipes_subprocess_client": PipesSubprocessClient()},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
---
title: "Ingesting Data from S3 to Snowflake with Dagster and Sling"
description: "This use case demonstrates how to ingest data from Amazon S3 into Snowflake using Dagster and the Sling integration from dagster-embedded-elt. The objective is to automate the data ingestion process for efficient data management and analysis."
tags: ["snowflake", "s3", "dagster", "sling", "data ingestion"]
---
## Ingesting Data from S3 to Snowflake with Dagster and Sling

### Overview

This guide provides a step-by-step approach to ingest data from Amazon S3 into Snowflake using Dagster and the Sling integration from dagster-embedded-elt. The main objective is to automate the data ingestion process, making it efficient and reliable for data management and analysis.

### Prerequisites

Before you begin, ensure you have the following:

- A Snowflake account with the necessary permissions.
- An Amazon S3 bucket with data to ingest.
- AWS credentials with access to the S3 bucket.
- Python installed on your system.
- Dagster and dagster-embedded-elt installed in your Python environment.

### What You’ll Learn

By following this guide, you will learn how to:

- Set up connections to S3 and Snowflake using Sling.
- Define and configure assets in Dagster to automate the data ingestion process.
- Execute the data ingestion pipeline.

### Steps to Implement With Dagster

1. **Step 1: Install Required Packages**
- Ensure you have the necessary Python packages installed.
- Install Dagster and dagster-embedded-elt using pip.

```bash
pip install dagster dagster-embedded-elt
```

2. **Step 2: Define Sling Connections**
- Define the connections to S3 and Snowflake using SlingConnectionResource.
- Use environment variables to securely manage sensitive information.

```python
from dagster import EnvVar
from dagster_embedded_elt.sling import SlingConnectionResource, SlingResource
s3_connection = SlingConnectionResource(
name="MY_S3",
type="s3",
bucket="your-s3-bucket",
access_key_id=EnvVar("AWS_ACCESS_KEY_ID"),
secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"),
)
snowflake_connection = SlingConnectionResource(
name="MY_SNOWFLAKE",
type="snowflake",
host="your-snowflake-host",
user="your-snowflake-user",
database="your-snowflake-database",
password=EnvVar("SNOWFLAKE_PASSWORD"),
role="your-snowflake-role",
)
sling_resource = SlingResource(connections=[s3_connection, snowflake_connection])
```

3. **Step 3: Define the Data Ingestion Asset**
- Use the `@sling_assets` decorator to define an asset that runs the Sling replication job.
- Configure the replication settings to specify the source and target.

```python
from dagster import asset, Definitions
from dagster_embedded_elt.sling import sling_assets
replication_config = {
"SOURCE": "MY_S3",
"TARGET": "MY_SNOWFLAKE",
"defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
"streams": {
"s3://your-s3-bucket/your-file.csv": {
"object": "your_snowflake_schema.your_table",
"primary_key": "id",
},
},
}
@sling_assets(replication_config=replication_config)
def ingest_s3_to_snowflake(context, sling: SlingResource):
yield from sling.replicate(context=context)
defs = Definitions(assets=[ingest_s3_to_snowflake], resources={"sling": sling_resource})
```

### Expected Outcomes

After implementing this use case, you should have an automated pipeline that ingests data from Amazon S3 into Snowflake. The data will be available in Snowflake for further processing and analysis.

### Troubleshooting

- **Connection Issues**: Ensure that your AWS and Snowflake credentials are correctly set in the environment variables.
- **Data Format Issues**: Verify that the data format in S3 matches the expected format in Snowflake.
- **Permissions**: Ensure that the Snowflake user has the necessary permissions to write to the target schema and table.

### Additional Resources

- [Dagster Documentation](https://docs.dagster.io/)
- [Embedded ELT Documentation](https://docs.dagster.io/integrations/embedded-elt)
- [Sling Documentation](https://docs.slingdata.io/)
- [Snowflake Documentation](https://docs.snowflake.com/)

### Next Steps

- Explore more advanced configurations and transformations using Sling.
- Integrate additional data sources and targets to expand your data pipeline.
- Implement monitoring and alerting for your data ingestion pipeline.

By following these steps, you can efficiently automate the process of ingesting data from S3 to Snowflake using Dagster and Sling.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# pyright: reportCallIssue=none
# pyright: reportOptionalMemberAccess=none

from dagster import Definitions, EnvVar
from dagster_embedded_elt.sling import SlingConnectionResource, SlingResource, sling_assets

# Step 2: Define Sling Connections
s3_connection = SlingConnectionResource(
name="MY_S3",
type="s3",
bucket="your-s3-bucket",
access_key_id=EnvVar("AWS_ACCESS_KEY_ID"),
secret_access_key=EnvVar("AWS_SECRET_ACCESS_KEY"),
)

snowflake_connection = SlingConnectionResource(
name="MY_SNOWFLAKE",
type="snowflake",
host="your-snowflake-host",
user="your-snowflake-user",
database="your-snowflake-database",
password=EnvVar("SNOWFLAKE_PASSWORD"),
role="your-snowflake-role",
)

sling_resource = SlingResource(connections=[s3_connection, snowflake_connection])

# Step 3: Define the Data Ingestion Asset
replication_config = {
"SOURCE": "MY_S3",
"TARGET": "MY_SNOWFLAKE",
"defaults": {"mode": "full-refresh", "object": "{stream_schema}_{stream_table}"},
"streams": {
"s3://your-s3-bucket/your-file.csv": {
"object": "your_snowflake_schema.your_table",
"primary_key": "id",
},
},
}


@sling_assets(replication_config=replication_config)
def ingest_s3_to_snowflake(context, sling: SlingResource):
yield from sling.replicate(context=context)


defs = Definitions(assets=[ingest_s3_to_snowflake], resources={"sling": sling_resource})
Empty file.
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
def test_use_cases():
assert True
from use_case_repository.snowflake_to_s3_embedded_elt import ingest_s3_to_snowflake, sling_resource


def test_snowflake_to_s3_embedded_elt():
assert ingest_s3_to_snowflake
assert sling_resource
Loading

0 comments on commit eb066f9

Please sign in to comment.