Skip to content

Commit

Permalink
Merge pull request #359 from datayoga-io/358-jobs-should-be-scaffolde…
Browse files Browse the repository at this point in the history
…d-as-dyyamldyyml

Changed jobs to *.dy.yaml
  • Loading branch information
spicy-sauce authored Mar 13, 2024
2 parents 453750a + 9755eb7 commit c702877
Show file tree
Hide file tree
Showing 36 changed files with 677 additions and 669 deletions.
1 change: 1 addition & 0 deletions .github/workflows/generate-docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ jobs:
run: |
git config user.name github-actions
git config user.email [email protected]
git pull
git add .
git diff --cached --exit-code || git commit -m "update autogenerated docs"
git push
6 changes: 4 additions & 2 deletions .github/workflows/generate-jsonschema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,7 @@ jobs:
git config user.name github-actions
git config user.email [email protected]
git add .
git diff --cached --exit-code || git commit -m "update json schemas"
git push --force
if ! git diff --cached --exit-code; then
git commit -m "update json schemas"
git push
fi
2 changes: 1 addition & 1 deletion cli/src/datayoga/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def run(
sys.exit(1)

# validate the job
job_file = path.join(directory, "jobs", job_name.replace(".", os.sep) + ".yaml")
job_file = path.join(directory, "jobs", job_name.replace(".", os.sep) + ".dy.yaml")
try:
job_settings = utils.read_yaml(job_file)
logger.debug(f"job_settings: {job_settings}")
Expand Down
4 changes: 2 additions & 2 deletions core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ This demonstrates how to transform data using a DataYoga job.

### Create a Job

Use this `example.yaml`:
Use this `example.dy.yaml`:

```yaml
steps:
Expand Down Expand Up @@ -65,7 +65,7 @@ from datayoga_core.job import Job
from datayoga_core.result import Result, Status
from datayoga_core.utils import read_yaml
job_settings = read_yaml("example.yaml")
job_settings = read_yaml("example.dy.yaml")
job = dy.compile(job_settings)
assert job.transform([{"fname": "jane", "lname": "smith", "country_code": 1, "country_name": "usa", "credit_card": "1234-5678-0000-9999", "gender": "F"}]).processed == [
Expand Down
37 changes: 17 additions & 20 deletions core/src/datayoga_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,29 @@
logger = logging.getLogger("dy")


def compile(
job_settings: Dict[str, Any],
whitelisted_blocks: Optional[List[str]] = None) -> Job:
"""
Compiles a job in YAML
def compile(job_settings: Dict[str, Any], whitelisted_blocks: Optional[List[str]] = None) -> Job:
"""Compiles a job in YAML.
Args:
job_settings (Dict[str, Any]): Job settings
job_settings (Dict[str, Any]): Job settings.
whitelisted_blocks: (Optional[List[str]], optional): Whitelisted blocks. Defaults to None.
Returns:
Job: Compiled job
Job: Compiled job.
"""
logger.debug("Compiling job")
return Job.compile(job_settings, whitelisted_blocks)


def validate(job_settings: Dict[str, Any], whitelisted_blocks: Optional[List[str]] = None):
"""
Validates a job in YAML
"""Validates a job in YAML.
Args:
job_settings (Dict[str, Any]): Job settings
job_settings (Dict[str, Any]): Job settings.
whitelisted_blocks: (Optional[List[str]], optional): Whitelisted blocks. Defaults to None.
Raises:
ValueError: When the job is invalid
ValueError: When the job is invalid.
"""
logger.debug("Validating job")
try:
Expand All @@ -49,21 +45,22 @@ def validate(job_settings: Dict[str, Any], whitelisted_blocks: Optional[List[str
raise ValueError(e)


def transform(job_settings: Dict[str, Any],
data: List[Dict[str, Any]],
context: Optional[Context] = None,
whitelisted_blocks: Optional[List[str]] = None) -> JobResult:
"""
Transforms data against a certain job
def transform(
job_settings: Dict[str, Any],
data: List[Dict[str, Any]],
context: Optional[Context] = None,
whitelisted_blocks: Optional[List[str]] = None
) -> JobResult:
"""Transforms data against a certain job.
Args:
job_settings (Dict[str, Any]): Job settings
data (List[Dict[str, Any]]): Data to transform
job_settings (Dict[str, Any]): Job settings.
data (List[Dict[str, Any]]): Data to transform.
context (Optional[Context]): Context. Defaults to None.
whitelisted_blocks: (Optional[List[str]]): Whitelisted blocks. Defaults to None.
Returns:
JobResult: Job result
JobResult: Job result.
"""
job = compile(job_settings, whitelisted_blocks)
job.init(context)
Expand Down
4 changes: 2 additions & 2 deletions core/src/datayoga_core/resources/scaffold/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
├── connections.yaml
└── jobs
└── sample
└── hello.yaml
└── hello.dy.yaml
```

- `.gitignore`: For convenience, this is used to ignore the data folder.
- `data`: Folder to store data input files or output. This folder can be located anywhere as long as the runner has access to it.
- `connections.yaml`: Contains definitions of source and target connectors and other general settings.
- `jobs`: Source job YAMLs. These can be nested and referenced as modules using a dot notation. e.g. `jobs/sample/hello.yaml` is referenced as `sample.hello` when running the job.
- `jobs`: Source job YAMLs. These can be nested and referenced as modules using a dot notation. e.g. `jobs/sample/hello.dy.yaml` is referenced as `sample.hello` when running the job.

## Run a Job

Expand Down
12 changes: 6 additions & 6 deletions docs/creating-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ nav_order: 5

# Creating Jobs

Jobs are created by creating yaml files in the `jobs` folder. Each job is composed of several `steps` that activate `blocks`. A `block` defines the business logic of an action. For example, a `block` can write to a Kafka stream, can read from a cloud API, can transform structure, or enrich a message with external data. A `steps` activates the `block` with a set of parameters.
Jobs are created by creating `dy.yaml` files in the `jobs` folder. Each job is composed of several `steps` that activate `blocks`. A `block` defines the business logic of an action. For example, a `block` can write to a Kafka stream, can read from a cloud API, can transform structure, or enrich a message with external data. A `steps` activates the `block` with a set of parameters.

## Overview of the Job Yaml Structure
## Overview of the Job YAML Structure

Each Job must start with a block that either produces data or accepts data from external sources.
The subsequent blocks each receive the output of the previous step as an input. The data will be streamed through these blocks as data flows through the chain.
Expand All @@ -33,7 +33,7 @@ It supports both async processing, multi-threading, and multi-processing to enab
To deploy a job to the DataYoga Runner, use the DataYoga CLI.

```bash
datayoga run jobname.yaml
datayoga run jobname
```

## Tutorial - a Job that Reads from Redis and Writes to Postgres
Expand All @@ -58,7 +58,7 @@ docker run -p 5432:5432 -e POSTGRES_PASSWORD=mysecretpassword -d postgres

DataYoga manages connections in a special file named `connections.yaml`. Each connection is defined with a logical name and can define properties needed for the connection. Reference to environment variables, interpolation, and secrets is available.

Add the connections to Redis and Postgres above to the connections.yaml:
Add the connections to Redis and Postgres above to the `connections.yaml`:

```bash
cat << EOF > connections.yaml
Expand All @@ -79,7 +79,7 @@ EOF
### Create the Job

```bash
cat << EOF > redis_to_pg.yaml
cat << EOF > redis_to_pg.dy.yaml
steps:
- uses: redis.read_stream
with:
Expand Down Expand Up @@ -114,5 +114,5 @@ EOF
### Run the Job in the DataYoga Runner

```bash
datayoga run redis_to_pg.yaml
datayoga run redis_to_pg
```
4 changes: 2 additions & 2 deletions docs/directory-structure.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ The `datayoga init` command produces the following directory structure:
├── connections.yaml
└── jobs
└── sample
└── hello.yaml
└── hello.dy.yaml
```

- `.gitignore`: For convenience, this is used to ignore the data folder.
- `data`: Folder to store data input files or output. This folder can be located anywhere as long as the runner has access to it.
- `connections.yaml`: Contains definitions of source and target connectors and other general settings.
- `jobs`: Source job YAMLs. These can be nested and referenced as modules using a dot notation. e.g. `jobs/sample/hello.yaml` is referenced as `sample.hello` when running the job.
- `jobs`: Source job YAMLs. These can be nested and referenced as modules using a dot notation. e.g. `jobs/sample/hello.dy.yaml` is referenced as `sample.hello` when running the job.
2 changes: 1 addition & 1 deletion docs/library.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ import datayoga_core as dy
from datayoga_core.job import Job
from datayoga_core.utils import read_yaml
job_settings = read_yaml("example.yaml")
job_settings = read_yaml("example.dy.yaml")
job = dy.compile(job_settings)
assert job.transform({"fname": "jane", "lname": "smith", "country_code": 1, "country_name": "usa", "credit_card": "1234-5678-0000-9999", "gender": "F"})[0] == {"first_name": "jane", "last_name": "smith", "country": "1 - USA", "full_name": "jane smith", "greeting": "Hello Ms. jane smith"}
Expand Down
File renamed without changes.
27 changes: 27 additions & 0 deletions examples/test_csv_producer.dy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
input:
uses: files.read_csv
with:
file: examples/test.csv
steps:
- uses: add_field
with:
fields:
- field: full_name
language: jmespath
expression: concat([fname, ' ' , lname])
- uses: map
with:
expression:
{
first_name: fname,
last_name: lname,
country: country_code || ' - ' || UPPER(country_name),
full_name: full_name,
greeting: "'Hello ' || CASE WHEN gender = 'F' THEN 'Ms.' WHEN gender = 'M' THEN 'Mr.' ELSE 'N/A' END || ' ' || full_name"
}
language: sql
- uses: relational.write
with:
connection: hr
schema: hr
table: emp
25 changes: 0 additions & 25 deletions examples/test_csv_producer.yaml

This file was deleted.

28 changes: 28 additions & 0 deletions examples/test_redis_producer.dy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
input:
uses: redis.read_stream
with:
connection: cache
stream_name: emp
steps:
- uses: add_field
with:
fields:
- field: full_name
language: jmespath
expression: concat([fname, ' ' , lname])
- uses: map
with:
expression:
{
first_name: fname,
last_name: lname,
country: country_code || ' - ' || UPPER(country_name),
full_name: full_name,
greeting: "'Hello ' || CASE WHEN gender = 'F' THEN 'Ms.' WHEN gender = 'M' THEN 'Mr.' ELSE 'N/A' END || ' ' || full_name"
}
language: sql
- uses: relational.write
with:
connection: hr
schema: hr
table: emp
26 changes: 0 additions & 26 deletions examples/test_redis_producer.yaml

This file was deleted.

10 changes: 7 additions & 3 deletions integration-tests/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def execute_program(command: str, background: bool = False) -> Optional[Popen]:

def wait_program(process: Popen, sig: Optional[int] = signal.SIGTERM, ignore_errors: bool = False):
"""Waits a child program to finish and logs its output.
Sends a signal to the process if it set
Sends a signal to the process if it set.
Args:
process (Popen): Process to kill.
Expand All @@ -57,8 +57,12 @@ def wait_program(process: Popen, sig: Optional[int] = signal.SIGTERM, ignore_err
raise ValueError("command failed")


def run_job(job_name: str, piped_from: Optional[str] = None, piped_to: Optional[str] = None,
background: bool = False) -> Optional[Popen]:
def run_job(
job_name: str,
piped_from: Optional[str] = None,
piped_to: Optional[str] = None,
background: bool = False
) -> Optional[Popen]:
"""Runs a job using the `datayoga` command-line tool.
Args:
Expand Down
Loading

0 comments on commit c702877

Please sign in to comment.