Skip to content

Commit

Permalink
feat(ingest/dagster): Dagster source (#10071)
Browse files Browse the repository at this point in the history
Co-authored-by: shubhamjagtap639 <[email protected]>
  • Loading branch information
treff7es and shubhamjagtap639 authored Mar 25, 2024
1 parent 9de15a2 commit 7e5610f
Show file tree
Hide file tree
Showing 31 changed files with 2,357 additions and 56 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ jobs:
-x :metadata-io:test \
-x :metadata-ingestion-modules:airflow-plugin:build \
-x :metadata-ingestion-modules:airflow-plugin:check \
-x :metadata-ingestion-modules:dagster-plugin:build \
-x :metadata-ingestion-modules:dagster-plugin:check \
-x :datahub-frontend:build \
-x :datahub-web-react:build \
--parallel
Expand Down
85 changes: 85 additions & 0 deletions .github/workflows/dagster-plugin.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
name: Dagster Plugin
on:
push:
branches:
- master
paths:
- ".github/workflows/dagster-plugin.yml"
- "metadata-ingestion-modules/dagster-plugin/**"
- "metadata-ingestion/**"
- "metadata-models/**"
pull_request:
branches:
- master
paths:
- ".github/**"
- "metadata-ingestion-modules/dagster-plugin/**"
- "metadata-ingestion/**"
- "metadata-models/**"
release:
types: [published]

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
dagster-plugin:
runs-on: ubuntu-latest
env:
SPARK_VERSION: 3.0.3
DATAHUB_TELEMETRY_ENABLED: false
strategy:
matrix:
python-version: ["3.8", "3.10"]
include:
- python-version: "3.8"
extraPythonRequirement: "dagster>=1.3.3"
- python-version: "3.10"
extraPythonRequirement: "dagster>=1.3.3"
fail-fast: false
steps:
- name: Set up JDK 17
uses: actions/setup-java@v3
with:
distribution: "zulu"
java-version: 17
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
cache: "pip"
- name: Install dependencies
run: ./metadata-ingestion/scripts/install_deps.sh
- name: Install dagster package and test (extras ${{ matrix.extraPythonRequirement }})
run: ./gradlew -Pextra_pip_requirements='${{ matrix.extraPythonRequirement }}' :metadata-ingestion-modules:dagster-plugin:lint :metadata-ingestion-modules:dagster-plugin:testQuick
- name: pip freeze show list installed
if: always()
run: source metadata-ingestion-modules/dagster-plugin/venv/bin/activate && pip freeze
- uses: actions/upload-artifact@v3
if: ${{ always() && matrix.python-version == '3.10' && matrix.extraPythonRequirement == 'dagster>=1.3.3' }}
with:
name: Test Results (dagster Plugin ${{ matrix.python-version}})
path: |
**/build/reports/tests/test/**
**/build/test-results/test/**
**/junit.*.xml
- name: Upload coverage to Codecov
if: always()
uses: codecov/codecov-action@v3
with:
token: ${{ secrets.CODECOV_TOKEN }}
directory: .
fail_ci_if_error: false
flags: dagster-${{ matrix.python-version }}-${{ matrix.extraPythonRequirement }}
name: pytest-dagster
verbose: true

event-file:
runs-on: ubuntu-latest
steps:
- name: Upload
uses: actions/upload-artifact@v3
with:
name: Event File
path: ${{ github.event_path }}
2 changes: 1 addition & 1 deletion .github/workflows/test-results.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Test Results

on:
workflow_run:
workflows: ["build & test", "metadata ingestion", "Airflow Plugin"]
workflows: ["build & test", "metadata ingestion", "Airflow Plugin", "Dagster Plugin"]
types:
- completed

Expand Down
10 changes: 9 additions & 1 deletion docs-website/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,19 @@ task yarnInstall(type: YarnTask) {
)
outputs.dir('node_modules')
}
task airflowPluginBuild(dependsOn: [':metadata-ingestion-modules:airflow-plugin:buildWheel']) {
}

// The Dagster plugin build and airflow plugin build can't be built at the same time; otherwise, it will raise
// fatal: Unable to create '/home/runner/work/datahub/datahub/.git/index.lock': File exists.. on CI
task dagsterPluginBuild(dependsOn: [':metadata-ingestion-modules:dagster-plugin:buildWheel', airflowPluginBuild]) {
}

task yarnGenerate(type: YarnTask, dependsOn: [yarnInstall,
generateGraphQLSchema, generateJsonSchema,
':metadata-ingestion:modelDocGen', ':metadata-ingestion:docGen',
':metadata-ingestion:buildWheel', ':metadata-ingestion-modules:airflow-plugin:buildWheel'] ) {
':metadata-ingestion:buildWheel',
airflowPluginBuild, dagsterPluginBuild] ) {
inputs.files(projectMdFiles)
outputs.cacheIf { true }
args = ['run', 'generate']
Expand Down
1 change: 1 addition & 0 deletions docs-website/generateDocsDir.ts
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,7 @@ function copy_python_wheels(): void {
const wheel_dirs = [
"../metadata-ingestion/dist",
"../metadata-ingestion-modules/airflow-plugin/dist",
"../metadata-ingestion-modules/dagster-plugin/dist",
];

const wheel_output_directory = path.join(STATIC_DIRECTORY, "wheels");
Expand Down
6 changes: 6 additions & 0 deletions docs-website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,11 @@ module.exports = {
id: "docs/lineage/airflow",
label: "Airflow",
},
{
type: "doc",
id: "docs/lineage/dagster",
label: "Dagster",
},
//"docker/airflow/local_airflow",
"metadata-integration/java/spark-lineage/README",
"metadata-ingestion/integration_docs/great-expectations",
Expand Down Expand Up @@ -766,6 +771,7 @@ module.exports = {
// "metadata-integration/java/spark-lineage-beta/README.md
// "metadata-integration/java/openlineage-converter/README"
//"metadata-ingestion-modules/airflow-plugin/README"
//"metadata-ingestion-modules/dagster-plugin/README"
// "metadata-ingestion/schedule_docs/datahub", // we can delete this
// TODO: change the titles of these, removing the "What is..." portion from the sidebar"
// "docs/what/entity",
Expand Down
89 changes: 89 additions & 0 deletions docs/lineage/dagster.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Dagster Integration
DataHub supports the integration of

- Dagster Pipeline metadata
- Job and Op run information as well as
- Lineage information when present

## Using Datahub's Dagster Sensor

Dagster sensors allow us to perform some actions based on some state change. Datahub's defined dagster sensor will emit metadata after every dagster pipeline run execution. This sensor is able to emit both pipeline success as well as failures. For more details about Dagster sensors please refer [Sensors](https://docs.dagster.io/concepts/partitions-schedules-sensors/sensors).

### Prerequisites

1. You need to create a new dagster project. See <https://docs.dagster.io/getting-started/create-new-project>.
2. There are two ways to define Dagster definition before starting dagster UI. One using [Definitions](https://docs.dagster.io/_apidocs/definitions#dagster.Definitions) class (recommended) and second using [Repositories](https://docs.dagster.io/concepts/repositories-workspaces/repositories#repositories).
3. Creation of new dagster project by default uses Definition class to define Dagster definition.

### Setup

1. You need to install the required dependency.

```shell
pip install acryl_datahub_dagster_plugin
```

2. You need to import DataHub dagster plugin provided sensor definition and add it in Dagster definition or dagster repository before starting dagster UI as show below:
**Using Definitions class:**

```python
{{ inline /metadata-ingestion-modules/dagster-plugin/examples/basic_setup.py }}
```

3. The DataHub dagster plugin provided sensor internally uses below configs. You can set these configs using environment variables. If not set, the sensor will take the default value.

**Configuration options:**

| Configuration Option | Default value | Description |
|-------------------------------|---------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| datahub_client_config | | The DataHub client config |
| dagster_url | | The url to your Dagster Webserver. |
| capture_asset_materialization | True | Whether to capture asset keys as Dataset on AssetMaterialization event |
| capture_input_output | True | Whether to capture and try to parse input and output from HANDLED_OUTPUT,.LOADED_INPUT events. (currently only [PathMetadataValue](https://github.com/dagster-io/dagster/blob/7e08c05dcecef9fd07f887c7846bd1c9a90e7d84/python_modules/dagster/dagster/_core/definitions/metadata/__init__.py#L655) metadata supported (EXPERIMENTAL) |
| platform_instance | | The instance of the platform that all assets produced by this recipe belong to. It is optional |
| asset_lineage_extractor | | You can implement your own logic to capture asset lineage information. See example for details[] |

4. Once Dagster UI is up, you need to turn on the provided sensor execution. To turn on the sensor, click on Overview tab and then on Sensors tab. You will see a toggle button in front of all defined sensors to turn it on/off.

5. DataHub dagster plugin provided sensor is ready to emit metadata after every dagster pipeline run execution.

### How to validate installation

1. Go and check in Dagster UI at Overview -> Sensors menu if you can see the 'datahub_sensor'.
2. Run a Dagster Job. In the dagster daemon logs, you should see DataHub related log messages like:

```
datahub_sensor - Emitting metadata...
```

## Dagster Ins and Out

We can provide inputs and outputs to both assets and ops explicitly using a dictionary of `Ins` and `Out` corresponding to the decorated function arguments. While providing inputs and outputs explicitly we can provide metadata as well.
To create dataset upstream and downstream dependency for the assets and ops you can use an ins and out dictionary with metadata provided. For reference, look at the sample jobs created using assets [`assets_job.py`](../../metadata-ingestion-modules/dagster-plugin/examples/assets_job.py), or ops [`ops_job.py`](../../metadata-ingestion-modules/dagster-plugin/examples/ops_job.py).

## Define your custom logic to capture asset lineage information
You can define your own logic to capture asset lineage information.

The output Tuple contains two dictionaries, one for input assets and the other for output assets. The key of the dictionary is the op key and the value is the set of asset urns that are upstream or downstream of the op.

```python
from datahub_dagster_plugin.client.dagster_generator import DagsterGenerator, DatasetLineage

def asset_lineage_extractor(
context: RunStatusSensorContext,
dagster_generator: DagsterGenerator,
graph: DataHubGraph,
) -> Dict[str, DatasetLineage]:
dataset_lineage: Dict[str, DatasetLineage] = {}

# Extracting input and output assets from the context
return dataset_lineage
```

[See example job here](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion-modules/dagster-plugin/examples/advanced_ops_jobs.py).

## Debugging

### Connection error for Datahub Rest URL

If you get ConnectionError: HTTPConnectionPool(host='localhost', port=8080), then in that case your DataHub GMS service is not up.
143 changes: 143 additions & 0 deletions metadata-ingestion-modules/dagster-plugin/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
.envrc
src/datahub_dagster_plugin/__init__.py.bak
.vscode/
output
pvenv36/
bq_credentials.json
/tmp
*.bak

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
.python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# Generated classes
src/datahub/metadata/
wheels/
junit.quick.xml
4 changes: 4 additions & 0 deletions metadata-ingestion-modules/dagster-plugin/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Datahub Dagster Plugin

See the DataHub Dagster docs for details.

Loading

0 comments on commit 7e5610f

Please sign in to comment.