Skip to content

Commit

Permalink
Add test pipeline script with immediate pipeline.run
Browse files Browse the repository at this point in the history
  • Loading branch information
sultaniman committed Mar 25, 2024
1 parent 2de0e1f commit c8d5e1e
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 52 deletions.
8 changes: 2 additions & 6 deletions tests/cli/cases/cli_runner/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,5 @@ def null_sink(_items, _table) -> None:
destination="duckdb",
)

# load_info = squares_pipeline.run(quads_resource())
# load_info_2 = squares_pipeline.run(squares_resource)

# print(load_info)
load_info = quads_pipeline.run(quads_resource())
print(load_info)
if __name__ == "__main__":
load_info = squares_pipeline.run(squares_resource(), schema=dlt.Schema("bobo-schema"))
21 changes: 21 additions & 0 deletions tests/cli/cases/cli_runner/pipeline_with_immediate_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import dlt


@dlt.resource
def numbers_resource():
for idx in range(100):
yield {"id": idx, "num": idx}


@dlt.destination(loader_file_format="parquet")
def null_sink(_items, _table) -> None:
pass


numbers_pipeline = dlt.pipeline(
pipeline_name="numbers_pipeline",
destination=null_sink,
)

load_info = numbers_pipeline.run()
print(load_info)
58 changes: 12 additions & 46 deletions tests/cli/test_run_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,15 @@

from unittest import mock

import dlt
import pytest

from dlt.cli import run_command

from dlt.cli.utils import parse_init_script
from dlt.common.cli.runner.source_patcher import SourcePatcher
from tests.utils import TESTS_ROOT

RUNNER_PIPELINES = TESTS_ROOT / "cli/cases/cli_runner"
TEST_PIPELINE = RUNNER_PIPELINES / "pipeline.py"
TEST_PIPELINE_WITH_IMMEDIATE_RUN = RUNNER_PIPELINES / "pipeline_with_immediate_run.py"
TEST_PIPELINE_CONTENTS = open(RUNNER_PIPELINES / "pipeline.py").read().strip()


Expand All @@ -36,7 +34,9 @@ def test_run_command_requires_working_directory_same_as_pipeline_working_directo
)

output = buf.getvalue()
assert "Current working directory is different from the pipeline script" in output
assert (
"Current working directory is different from the pipeline script" in output
)
assert "If needed please change your current directory to" in output


Expand Down Expand Up @@ -66,54 +66,20 @@ def test_run_command_fails_with_relevant_error_if_pipeline_resource_or_source_no
"Source or resouce with name: resource_404 has not been found in pipeline script."
in output
)
assert "You can choose one of: quads_resource_instance, squares_resource_instance" in output
assert (
"You can choose one of: quads_resource_instance, squares_resource_instance"
in output
)


def test_run_command_allows_selection_of_pipeline_source_or_resource():
with mock.patch("dlt.common.cli.runner.inquirer.Inquirer.ask", return_value=0) as mocked_ask:
with mock.patch(
"dlt.common.cli.runner.inquirer.Inquirer.ask", return_value=0
) as mocked_ask:
run_command.run_pipeline_command(
str(TEST_PIPELINE),
None,
None,
["write_disposition=append", "loader_file_format=jsonl"],
["write_disposition=append", "loader_file_format=parquet"],
)
assert mocked_ask.call_count == 2


expected_patched_code = """
import dlt
@dlt.resource
def quads_resource():
for idx in range(10):
yield {"id": idx, "quad": idx**4}
@dlt.resource
def squares_resource():
for idx in range(10):
yield {"id": idx, "square": idx * idx}
quads_resource_instance = quads_resource()
squares_resource_instance = squares_resource()
quads_pipeline = dlt.pipeline(pipeline_name="numbers_quadruples_pipeline", destination="duckdb")
squares_pipeline = dlt.pipeline(pipeline_name="numbers_pipeline", destination="duckdb")
""".strip()


def test_pipeline_code_patcher():
visitor = parse_init_script(
"run",
TEST_PIPELINE_CONTENTS,
"pipeline",
)
patcher = SourcePatcher(visitor)
assert patcher.patch().strip() == expected_patched_code

0 comments on commit c8d5e1e

Please sign in to comment.