Skip to content

Commit

Permalink
Merge pull request #45 from PrefectHQ/cruft-update
Browse files Browse the repository at this point in the history
Cruft update + update block docstring examples
  • Loading branch information
zzstoatzz authored Dec 28, 2022
2 parents 66b2ded + 7664891 commit e310da1
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 37 deletions.
2 changes: 1 addition & 1 deletion .cruft.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"template": "https://github.com/PrefectHQ/prefect-collection-template",
"commit": "61a4158a76515426f7a1736215565d486b379045",
"commit": "4c571079c74c0e69878e6a926ee4cf632e5b2a54",
"checkout": null,
"context": {
"cookiecutter": {
Expand Down
8 changes: 2 additions & 6 deletions MAINTAINERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ Welcome to your new Prefect collection! Feel free to modify this README to fit t

## Getting Started

Now that you've bootstrapped a project, follow the steps below to get started developing your Prefect Collection!

### Python setup

Requires an installation of Python 3.7+
Expand All @@ -12,12 +14,6 @@ We recommend using a Python virtual environment manager such as pipenv, conda or

### GitHub setup

Generate a Prefect Collection project in the terminal:

```bash
cookiecutter https://github.com/PrefectHQ/prefect-collection-template
```

Create a Git respoitory for the newly generated collection and create the first commit:

```bash
Expand Down
52 changes: 43 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ Install `prefect-airbyte`
pip install prefect-airbyte
```

A list of available blocks in `prefect-airbyte` and their setup instructions can be found [here](https://PrefectHQ.github.io/prefect-airbyte/#blocks-catalog/).

### Examples
#### Create an `AirbyteServer` block and save it
```python
Expand Down Expand Up @@ -146,23 +148,55 @@ def example_export_configuration_flow(filepath: str):
if __name__ == "__main__":
example_export_configuration_flow('*://**/my_destination.gz')
```
#### Use `with_options` to customize options on any existing task or flow

## Resources
```python
from prefect import flow
from prefect_airbyte.connections import AirbyteConnection
from prefect_airbyte.flows import run_connection_sync

If you encounter and bugs while using `prefect-airbyte`, feel free to open an issue in the [prefect-airbyte](https://github.com/PrefectHQ/prefect-airbyte) repository.
custom_run_connection_sync = run_connection_sync.with_options(
name="Custom Airbyte Sync Flow",
retries=2,
retry_delay_seconds=10,
)

@flow
def some_airbyte_flow():
custom_run_connection_sync(
airbyte_connection=AirbyteConnection.load("my-airbyte-connection-block")
)

some_airbyte_flow()
```

If you have any questions or issues while using `prefect-airbyte`, you can find help in either the [Prefect Discourse forum](https://discourse.prefect.io/) or the [Prefect Slack community](https://prefect.io/slack)
For more tips on how to use tasks and flows in a Collection, check out [Using Collections](https://orion-docs.prefect.io/collections/usage/)!

Feel free to ⭐️ or watch [`prefect-airbyte`](https://github.com/PrefectHQ/prefect-airbyte) for updates too!

## Development
## Resources

If you'd like to install a version of `prefect-airbyte` for development, first clone the repository and then perform an editable install with `pip`:
If you encounter and bugs while using `prefect-airbyte`, feel free to open an issue in the [prefect-airbyte](https://github.com/PrefectHQ/prefect-airbyte) repository.

```bash
git clone https://github.com/PrefectHQ/prefect-airbyte.git
If you have any questions or issues while using `prefect-airbyte`, you can find help in either the [Prefect Discourse forum](https://discourse.prefect.io/) or the [Prefect Slack community](https://prefect.io/slack)

Feel free to star or watch [`prefect-airbyte`](https://github.com/PrefectHQ/prefect-airbyte) for updates too!

cd prefect-airbyte/
## Contribute

If you'd like to help contribute to fix an issue or add a feature to `prefect-airbyte`, please [propose changes through a pull request from a fork of the repository](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-a-pull-request-from-a-fork).

### Contribution Steps:
1. [Fork the repository](https://docs.github.com/en/get-started/quickstart/fork-a-repo#forking-a-repository)
2. [Clone the forked repository](https://docs.github.com/en/get-started/quickstart/fork-a-repo#cloning-your-forked-repository)
3. Install the repository and its dependencies:
```
pip install -e ".[dev]"
```
4. Make desired changes.
5. Add tests.
6. Insert an entry to [CHANGELOG.md](https://github.com/PrefectHQ/prefect-airbyte/blob/main/CHANGELOG.md)
7. Install `pre-commit` to perform quality checks prior to commit:
```
pre-commit install
```
8. `git commit`, `git push`, and create a pull request.
81 changes: 80 additions & 1 deletion docs/gen_ref_pages.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,94 @@
"Copies README.md to index.md."
"""
Copies README.md to index.md. Also discovers all blocks and
generates a list of them in the docs under the Blocks Catalog heading.
"""

from pathlib import Path
from textwrap import dedent

import mkdocs_gen_files
from prefect.blocks.core import Block
from prefect.utilities.dispatch import get_registry_for_type
from prefect.utilities.importtools import to_qualified_name

COLLECTION_SLUG = "prefect_airbyte"


def find_module_blocks():
blocks = get_registry_for_type(Block)
collection_blocks = [
block
for block in blocks.values()
if to_qualified_name(block).startswith(COLLECTION_SLUG)
]
module_blocks = {}
for block in collection_blocks:
block_name = block.__name__
module_nesting = tuple(to_qualified_name(block).split(".")[1:-1])
if module_nesting not in module_blocks:
module_blocks[module_nesting] = []
module_blocks[module_nesting].append(block_name)
return module_blocks


def insert_blocks_catalog(generated_file):
module_blocks = find_module_blocks()
generated_file.write("## Blocks Catalog\n")
generated_file.write(
dedent(
f"""
Below is a list of Blocks available for registration in
`prefect-airbyte`.
To register blocks in this module to
[view and edit them](https://orion-docs.prefect.io/ui/blocks/)
on Prefect Cloud:
```bash
prefect block register -m {COLLECTION_SLUG}
```
"""
)
)
generated_file.write(
"Note, to use the `load` method on Blocks, you must already have a block document " # noqa
"[saved through code](https://orion-docs.prefect.io/concepts/blocks/#saving-blocks) " # noqa
"or [saved through the UI](https://orion-docs.prefect.io/ui/blocks/).\n"
)
for module_nesting, block_names in module_blocks.items():
module_path = " ".join(module_nesting)
module_title = module_path.replace("_", " ").title()
generated_file.write(f"### {module_title} Module\n")
for block_name in block_names:
generated_file.write(
f"**[{block_name}][{COLLECTION_SLUG}.{module_path}.{block_name}]**\n"
)
generated_file.write(
dedent(
f"""
To load the {block_name}:
```python
from prefect import flow
from {COLLECTION_SLUG}.{module_path} import {block_name}
@flow
def my_flow():
my_block = {block_name}.load("MY_BLOCK_NAME")
my_flow()
```
"""
)
)


readme_path = Path("README.md")
docs_index_path = Path("index.md")

with open(readme_path, "r") as readme:
with mkdocs_gen_files.open(docs_index_path, "w") as generated_file:
for line in readme:
if line.startswith("## Resources"):
insert_blocks_catalog(generated_file)
generated_file.write(line)

mkdocs_gen_files.set_edit_path(Path(docs_index_path), readme_path)
1 change: 1 addition & 0 deletions prefect_airbyte/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from . import _version

from prefect_airbyte.connections import AirbyteConnection # noqa F401
from prefect_airbyte.server import AirbyteServer # noqa F401

__version__ = _version.get_versions()["version"]
28 changes: 18 additions & 10 deletions prefect_airbyte/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ async def fetch_result(self) -> AirbyteSyncResult:


class AirbyteConnection(JobBlock):
"""`JobBlock` representing an existing Airbyte connection.
"""A block representing an existing Airbyte connection.
Attributes:
airbyte_server: `AirbyteServer` block representing the Airbyte instance
Expand All @@ -284,27 +284,35 @@ class AirbyteConnection(JobBlock):
status_updates: Whether to log job status on each poll of the Airbyte sync job.
timeout: Timeout in seconds for requests made by `httpx.AsyncClient`.
Example:
Examples:
Load an existing `AirbyteConnection` block:
```python
from prefect_airbyte import AirbyteConnection
airbyte_connection = AirbyteConnection.load("BLOCK_NAME")
```
# trigger the Airbyte connection sync
airbyte_sync = airbyte_connection.trigger()
Run an Airbyte connection sync as a flow:
```python
from prefect import flow
from prefect_airbyte import AirbyteConnection
from prefect_airbyte.flows import run_connection_sync # this is a flow
# wait for the Airbyte sync to complete
airbyte_sync.wait_for_completion()
airbyte_connection = AirbyteConnection.load("BLOCK_NAME")
# fetch the result of the Airbyte sync
airbyte_sync_result = airbyte_sync.fetch_result()
@flow
def airbyte_orchestrator():
run_connection_sync(airbyte_connection) # now it's a subflow
```
"""

_block_type_name = "Airbyte Connection"
_logo_url = "https://images.ctfassets.net/zscdif0zqppk/6gm7wsC7ANnKYQsm7oiSYz/aac1ad5e054d35d9e24af8d6ed3aed5f/59758427?h=250" # noqa: E501

airbyte_server: AirbyteServer = Field(
default=...,
description=(
"`AirbyteServer` block representing the Airbyte instance "
"AirbyteServer block representing the Airbyte instance "
"where the Airbyte connection is defined."
),
)
Expand All @@ -326,7 +334,7 @@ class AirbyteConnection(JobBlock):

timeout: int = Field(
default=5,
description="Timeout in seconds for requests made by `httpx.AsyncClient`.",
description="Timeout in seconds for requests made by httpx.AsyncClient.",
)

@sync_compatible
Expand Down
11 changes: 1 addition & 10 deletions prefect_airbyte/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,10 @@ class AirbyteServer(Block):
use_ssl: Whether to use a secure url for calls to the Airbyte API.
Example:
Create an `AirbyteServer` block for an Airbyte instance running on localhost:
```python
from prefect import flow
from prefect_airbyte.connection import trigger_sync
from prefect_airbyte.server import AirbyteServer
@flow
def airbyte_orchestration_flow():
airbyte_server = AirbyteServer()
trigger_sync(
airbyte_server=airbyte_server,
connection_id="my_connection_id",
)
server = AirbyteServer.load("BLOCK_NAME")
```
"""

Expand Down
22 changes: 22 additions & 0 deletions tests/test_block_standards.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import pytest
from prefect.blocks.core import Block
from prefect.testing.standard_test_suites import BlockStandardTestSuite
from prefect.utilities.dispatch import get_registry_for_type
from prefect.utilities.importtools import to_qualified_name


def find_module_blocks():
blocks = get_registry_for_type(Block)
module_blocks = [
block
for block in blocks.values()
if to_qualified_name(block).startswith("prefect_airbyte")
]
return module_blocks


@pytest.mark.parametrize("block", find_module_blocks())
class TestAllBlocksAdhereToStandards(BlockStandardTestSuite):
@pytest.fixture
def block(self, block):
return block

0 comments on commit e310da1

Please sign in to comment.