The prefect-datahub
collection makes it easy to leverage the capabilities of datahub emitter in your flows, featuring support for ingesting metadata of flows, tasks & workspace to datahub gms rest.
In order to use 'prefect-datahub' collection, you'll first need to deploy the new instance of Datahub.
You can get the instructions on deploying the open source Datahub locally by navigating to the apps page.
Successful deployment of Datahub locally will lead creation of datahub GMS service running on 'http://localhost:8080'.
This is a one-time activity, where you can save the configuration on the Prefect block document store. While saving you can provide below configutions. Default value will get set if not provided while saving the configuration to block.
Config | Type | Default | Description |
---|---|---|---|
datahub_rest_url | str |
http://localhost:8080 | Datahub GMS Rest url |
env | str |
PROD | The environment that all assets produced by this orchestrator belong to. For more detail and possible values refer here. |
platform_instance | str |
None | The instance of the platform that all assets produced by this recipe belong to. For more detail please refer here. |
from prefect_datahub import DatahubEmitter
DatahubEmitter(
datahub_rest_url="http://localhost:8080",
env="PROD",
platform_instance="local_prefect"
).save("BLOCK-NAME-PLACEHOLDER")
Congrats! You can now load the saved block to use your credentials in your Python code:
from prefect_datahub import DatahubEmitter
DatahubEmitter.load("BLOCK-NAME-PLACEHOLDER")
!!! info "Registering blocks"
Register blocks in this module to
[view and edit them](https://docs.prefect.io/ui/blocks/)
on Prefect Cloud:
```bash
prefect block register -m prefect_datahub
```
After installing prefect-datahub
and saving the configution, you can easily use it within your flows to help you emit metadata as show below!
from datahub_provider.entities import Dataset
from prefect import flow, task
from prefect_datahub import DatahubEmitter
datahub_emitter = DatahubEmitter.load("MY_BLOCK_NAME")
@task(name="Transform", description="Transform the data")
def transform(data):
data = data.split(" ")
datahub_emitter.add_task(
inputs=[Dataset("snowflake", "mydb.schema.tableA")],
outputs=[Dataset("snowflake", "mydb.schema.tableC")],
)
return data
@flow(name="ETL flow", description="Extract transform load flow")
def etl():
data = transform("This is data")
datahub_emitter.emit_flow()
For more tips on how to use tasks and flows in a Collection, check out Using Collections!
Install prefect-datahub
with pip
:
pip install prefect-datahub
Requires an installation of Python 3.7+.
We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.
These tasks are designed to work with Prefect 2.0. For more information about how to use Prefect, please refer to the Prefect documentation.
If you encounter any bugs while using prefect-datahub
, feel free to open an issue in the prefect-datahub repository.
If you have any questions or issues while using prefect-datahub
, you can find help in either the Prefect Discourse forum or the Prefect Slack community.
Feel free to star or watch prefect-datahub
for updates too!
If you'd like to help contribute to fix an issue or add a feature to prefect-datahub
, please propose changes through a pull request from a fork of the repository.
Here are the steps:
- Fork the repository
- Clone the forked repository
- Install the repository and its dependencies:
pip install -e ".[dev]"
- Make desired changes
- Add tests
- Insert an entry to CHANGELOG.md
- Install
pre-commit
to perform quality checks prior to commit:
pre-commit install
git commit
,git push
, and create a pull request