From 91190015b05f1206bb2918deb13d8b3c4c36a96a Mon Sep 17 00:00:00 2001 From: Sultan Iman Date: Wed, 13 Mar 2024 19:30:52 +0100 Subject: [PATCH] Initial implementation of runner usinf reflection and code generation --- dlt/cli/run_command.py | 136 +++------------------- dlt/common/cli/__init__.py | 0 dlt/common/cli/runner.py | 193 +++++++++++++++++++++++++++++++ dlt/common/reflection/utils.py | 7 +- dlt/reflection/script_visitor.py | 5 +- 5 files changed, 216 insertions(+), 125 deletions(-) create mode 100644 dlt/common/cli/__init__.py create mode 100644 dlt/common/cli/runner.py diff --git a/dlt/cli/run_command.py b/dlt/cli/run_command.py index b3a165810c..3a61e1263e 100644 --- a/dlt/cli/run_command.py +++ b/dlt/cli/run_command.py @@ -1,24 +1,13 @@ -from itertools import chain -import os -import inspect as it import typing as t -from collections import defaultdict -from importlib import util as im -from types import ModuleType - import dlt from dlt.cli import echo as fmt from dlt.cli.utils import track_command +from dlt.common.cli.runner import RunnerInventory, DltRunnerEnvironment from dlt.sources import DltResource, DltSource from typing_extensions import TypedDict -original_run = dlt.Pipeline.run -def noop(*args, **kwards): - pass - -dlt.Pipeline.run = noop class PipelineMember(TypedDict): name: str @@ -39,114 +28,17 @@ def run_pipeline_command( args: t.Optional[str] = None, ): pick_first_pipeline_and_source = not pipeline and not source - pipeline_module = load_module(module) - - pipeline_members = extract_dlt_members(pipeline_module) - errors = validate_mvp_pipeline(pipeline_members) - if errors: - for err in errors: - fmt.echo(err) + inventory = RunnerInventory( + module, + pipeline_name=pipeline, + source_name=source, + args=args, + run_first_pipeline_with_source=pick_first_pipeline_and_source, + ) + + try: + dlt_environment = DltRunnerEnvironment(inventory=inventory) + dlt_environment.run() + except RuntimeError as ex: + fmt.echo(str(ex)) return -1 - - run_options = prepare_run_arguments(args) - dlt.Pipeline.run = original_run - if pick_first_pipeline_and_source: - fmt.echo( - "Neiter of pipeline name or source were specified, " - "we will pick first pipeline and a source to run" - ) - - pipeline_instance = pipeline_members["pipelines"][0]["instance"] - if resources := pipeline_members["resources"]: - data_source = resources[0]["instance"] - else: - data_source = pipeline_members["sources"][0]["instance"] - - pipeline_instance.run(data_source(), **run_options) - else: - pipeline_instance = get_pipeline_by_name(pipeline, pipeline_members) - resource_instance = get_resource_by_name(source, pipeline_members) - pipeline_instance.run(resource_instance(), **run_options) - - -def load_module(module_path: str) -> ModuleType: - if not os.path.exists(module_path): - fmt.echo(f'Module or file "{module_path}" does not exist') - return -1 - - module_name = module_path[:] - if module_name.endswith(".py"): - module_name = module_path[:-3] - - spec = im.spec_from_file_location(module_name, module_path) - module = spec.loader.load_module(module_name) - - return module - - -def extract_dlt_members(module: ModuleType) -> DltInventory: - variables: DltInventory = defaultdict(list) - for name, value in it.getmembers(module): - # We would like to skip private variables and other modules - if not it.ismodule(value) and not name.startswith("_"): - if isinstance(value, dlt.Pipeline): - variables["pipelines"].append( - { - "name": value.pipeline_name, - "instance": value, - } - ) - - if isinstance(value, DltSource): - variables["sources"].append( - { - "name": value.name, - "instance": value, - } - ) - - if isinstance(value, DltResource): - variables["resources"].append( - { - "name": value.name, - "instance": value, - } - ) - - return variables - - -def validate_mvp_pipeline(pipeline_memebers: DltInventory) -> t.List[str]: - errors = [] - if not pipeline_memebers.get("pipelines"): - errors.append("Could not find any pipeline in the given module") - - if not pipeline_memebers.get("resources") and not pipeline_memebers.get("sources"): - errors.append("Could not find any source or resource in the given module") - - return errors - - -def prepare_run_arguments(arglist: t.List[str]) -> t.Dict[str, str]: - run_options = {} - for arg in arglist: - arg_name, value = arg.split("=") - run_options[arg_name] = value - - return run_options - - -def get_pipeline_by_name(pipeline_name: str, members: DltInventory) -> t.Optional[dlt.Pipeline]: - for pipeline_item in members["pipelines"]: - if pipeline_item["name"] == pipeline_name: - return pipeline_item["instace"] - - return None - - -def get_resource_by_name( - resource_name: str, members: DltInventory -) -> t.Optional[t.Union[DltResource, DltSource]]: - for source_item in chain(members["resources"], members["sources"]): - if source_item["name"] == resource_name: - return source_item["instace"] diff --git a/dlt/common/cli/__init__.py b/dlt/common/cli/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dlt/common/cli/runner.py b/dlt/common/cli/runner.py new file mode 100644 index 0000000000..4c88a4bd59 --- /dev/null +++ b/dlt/common/cli/runner.py @@ -0,0 +1,193 @@ +import ast +from collections import defaultdict +import inspect +import os +import typing as t + +from dataclasses import dataclass +from importlib import machinery as im +from importlib import util as iu +from types import ModuleType + +import dlt + +from dlt.cli import echo as fmt +from dlt.common.configuration.providers.environ import EnvironProvider +from dlt.common.configuration.providers.toml import ConfigTomlProvider, SecretsTomlProvider +from dlt.common.configuration.specs.config_providers_context import ConfigProvidersContext +from dlt.sources import DltResource, DltSource +from dlt.cli.utils import parse_init_script +from dlt.common.reflection.utils import evaluate_node_literal +from dlt.reflection import names as rn +from dlt.reflection.script_visitor import PipelineScriptVisitor + + +@dataclass +class RunnerInventory: + """This class contains parameters passed to command + Also it provides two convenience methods to load script contents + and to get module name from provided `script_path`. + """ + + script_path: str + pipeline_name: t.Optional[str] = None + source_name: t.Optional[str] = None + run_first_pipeline_with_source: t.Optional[bool] = False + args: t.List[str] = None + + @property + def script_contents(self) -> str: + """Loads script contents""" + with open(self.script_path) as fp: + return fp.read() + + @property + def module_name(self) -> str: + """Strips extension with path and returns filename as modulename""" + module_name = self.script_path.split(os.sep)[-1] + if module_name.endswith(".py"): + module_name = module_name[:-3] + + return module_name + + @property + def run_arguments(self) -> t.Dict[str, str]: + run_options = {} + for arg in self.args or []: + arg_name, value = arg.split("=") + run_options[arg_name] = value + + return run_options + + +class PipelineRunner: + def __init__(self, inventory: RunnerInventory, visitor: PipelineScriptVisitor) -> None: + self.inventory = inventory + self.visitor = visitor + self.raise_if_not_runnable() + self.pipeline_source = self.strip_pipeline_runs() + self.module = self.load_module() + self.run_options = self.inventory.run_arguments + self.workdir = os.path.dirname(os.path.abspath(self.inventory.script_path)) + + def run(self): + config_path = f"{self.workdir}/.dlt" + ConfigProvidersContext.initial_providers = [ + EnvironProvider(), + SecretsTomlProvider(project_dir=config_path, add_global_config=False), + ConfigTomlProvider(project_dir=config_path, add_global_config=False), + ] + + pick_first = not self.inventory.pipeline_name and not self.inventory.source_name + if pick_first: + fmt.echo( + "Neiter of pipeline name or source were specified, " + "we will pick first pipeline and a source to run" + ) + + pipeline_name = self.inventory.pipeline_name + resource_name = self.inventory.source_name + pipelines = list(self.pipelines.values()) + resources = list(self.visitor.known_sources_resources.keys()) + if pick_first: + resource = getattr(self.module, resources[0]) + pipeline_instance = pipelines[0] + else: + resource = getattr(self.module, resource_name) + pipeline_instance = self.pipelines[pipeline_name] + setattr(self.module, f"pipeline_{pipeline_name}", pipeline_instance) + + pipeline_instance.run(resource(), **self.run_options) + + @property + def run_nodes(self) -> t.List[ast.AST]: + """Extract pipeline.run nodes""" + pipeline_runs = self.visitor.known_calls_with_nodes.get(rn.RUN) + run_nodes = [run_node for _args, run_node in pipeline_runs] + return run_nodes + + @property + def sources(self) -> t.List[str]: + """Returns source and resource names""" + return self.visitor.known_sources_resources.keys() + + @property + def pipelines(self) -> t.Dict[str, dlt.Pipeline]: + pipeline_arguments: t.List[inspect.BoundArguments] = ( + self.visitor.known_calls.get("pipeline") or [] + ) + pipeline_items = defaultdict(dict) + for item in pipeline_arguments: + pipeline_options = {} + for arg_name, bound_value in item.arguments.items(): + if bound_value is not None: + if arg_name == "kwargs": + pipeline_options.update(bound_value) + continue + + if hasattr(bound_value, "value"): + value = bound_value.value + else: + value = bound_value + pipeline_options[arg_name] = value + + pipeline = dlt.pipeline(**pipeline_options) + pipeline.working_dir = os.path.dirname(os.path.abspath(self.inventory.script_path)) + pipeline_items[pipeline_options["pipeline_name"]] = pipeline + + return pipeline_items + + def strip_pipeline_runs(self) -> str: + """Removes all pipeline.run nodes and return patched source code""" + # Copy original source + script_lines: t.List[str] = self.visitor.source_lines[:] + stub = '""" run method replaced """' + + def restore_indent(line: str) -> str: + indent = "" + for ch in line: + if ch == " ": + indent += " " + return indent + + for node in self.run_nodes: + # if it is a one liner + if node.lineno == node.end_lineno: + script_lines[node.lineno] = None + else: + line_of_code = script_lines[node.lineno - 1] + script_lines[node.lineno - 1] = restore_indent(line_of_code) + stub + start = node.lineno + 1 + while start <= node.end_lineno: + script_lines[start - 1] = None + start += 1 + + result = [line.rstrip() for line in script_lines if line] + return "\n".join(result) + + def load_module(self) -> ModuleType: + spec = im.ModuleSpec(name=self.inventory.module_name, loader=None) + module = iu.module_from_spec(spec) + exec(self.pipeline_source, module.__dict__) + return module + + def raise_if_not_runnable(self) -> None: + if not self.visitor.known_calls.get("pipeline"): + raise RuntimeError("Could not find any pipeline in the given module") + + if not self.visitor.known_sources_resources: + raise RuntimeError("Could not find any source or resource in the given module") + + +class DltRunnerEnvironment: + def __init__(self, inventory: RunnerInventory) -> None: + self.inventory = inventory + visitor: PipelineScriptVisitor = parse_init_script( + "run", + self.inventory.script_contents, + self.inventory.module_name, + ) + self.runner = PipelineRunner(inventory, visitor) + + def run(self) -> None: + self.runner.run() diff --git a/dlt/common/reflection/utils.py b/dlt/common/reflection/utils.py index 9bd3cb6775..3ce8ab8ba6 100644 --- a/dlt/common/reflection/utils.py +++ b/dlt/common/reflection/utils.py @@ -78,8 +78,11 @@ def creates_func_def_name_node(func_def: ast.FunctionDef, source_lines: Sequence def rewrite_python_script( source_script_lines: List[str], transformed_nodes: List[Tuple[ast.AST, ast.AST]] ) -> List[str]: - """Replaces all the nodes present in `transformed_nodes` in the `script_lines`. The `transformed_nodes` is a tuple where the first element - is must be a node with full location information created out of `script_lines`""" + """Replaces all the nodes present in `transformed_nodes` in the `script_lines`. + + The `transformed_nodes` is a tuple where the first element must be a node with + full location information created out of `script_lines` + """ script_lines: List[str] = [] last_line = -1 last_offset = -1 diff --git a/dlt/reflection/script_visitor.py b/dlt/reflection/script_visitor.py index 52b19fe031..90129395b1 100644 --- a/dlt/reflection/script_visitor.py +++ b/dlt/reflection/script_visitor.py @@ -2,7 +2,8 @@ import ast import astunparse from ast import NodeVisitor -from typing import Any, Dict, List +from collections import defaultdict +from typing import Any, Dict, List, Tuple from dlt.common.reflection.utils import find_outer_func_def @@ -19,6 +20,7 @@ def __init__(self, source: str): # self.source_aliases: Dict[str, str] = {} self.is_destination_imported: bool = False self.known_calls: Dict[str, List[inspect.BoundArguments]] = {} + self.known_calls_with_nodes: Dict[str, List[Tuple[inspect.BoundArguments, ast.AST]]] = defaultdict(list) self.known_sources: Dict[str, ast.FunctionDef] = {} self.known_source_calls: Dict[str, List[ast.Call]] = {} self.known_resources: Dict[str, ast.FunctionDef] = {} @@ -104,6 +106,7 @@ def visit_Call(self, node: ast.Call) -> Any: # print(f"ALIAS: {alias_name} of {self.func_aliases.get(alias_name)} with {bound_args}") fun_calls = self.known_calls.setdefault(fn, []) fun_calls.append(bound_args) + self.known_calls_with_nodes[fn].append((bound_args, node)) except TypeError: # skip the signature pass