Skip to content

Commit

Permalink
Initial implementation of runner usinf reflection and code generation
Browse files Browse the repository at this point in the history
  • Loading branch information
sultaniman committed Mar 13, 2024
1 parent 28cd3ae commit 9119001
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 125 deletions.
136 changes: 14 additions & 122 deletions dlt/cli/run_command.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,13 @@
from itertools import chain
import os
import inspect as it
import typing as t

from collections import defaultdict
from importlib import util as im
from types import ModuleType

import dlt

from dlt.cli import echo as fmt
from dlt.cli.utils import track_command
from dlt.common.cli.runner import RunnerInventory, DltRunnerEnvironment
from dlt.sources import DltResource, DltSource
from typing_extensions import TypedDict

original_run = dlt.Pipeline.run
def noop(*args, **kwards):
pass

dlt.Pipeline.run = noop

class PipelineMember(TypedDict):
name: str
Expand All @@ -39,114 +28,17 @@ def run_pipeline_command(
args: t.Optional[str] = None,
):
pick_first_pipeline_and_source = not pipeline and not source
pipeline_module = load_module(module)

pipeline_members = extract_dlt_members(pipeline_module)
errors = validate_mvp_pipeline(pipeline_members)
if errors:
for err in errors:
fmt.echo(err)
inventory = RunnerInventory(
module,
pipeline_name=pipeline,
source_name=source,
args=args,
run_first_pipeline_with_source=pick_first_pipeline_and_source,
)

try:
dlt_environment = DltRunnerEnvironment(inventory=inventory)
dlt_environment.run()
except RuntimeError as ex:
fmt.echo(str(ex))
return -1

run_options = prepare_run_arguments(args)
dlt.Pipeline.run = original_run
if pick_first_pipeline_and_source:
fmt.echo(
"Neiter of pipeline name or source were specified, "
"we will pick first pipeline and a source to run"
)

pipeline_instance = pipeline_members["pipelines"][0]["instance"]
if resources := pipeline_members["resources"]:
data_source = resources[0]["instance"]
else:
data_source = pipeline_members["sources"][0]["instance"]

pipeline_instance.run(data_source(), **run_options)
else:
pipeline_instance = get_pipeline_by_name(pipeline, pipeline_members)
resource_instance = get_resource_by_name(source, pipeline_members)
pipeline_instance.run(resource_instance(), **run_options)


def load_module(module_path: str) -> ModuleType:
if not os.path.exists(module_path):
fmt.echo(f'Module or file "{module_path}" does not exist')
return -1

module_name = module_path[:]
if module_name.endswith(".py"):
module_name = module_path[:-3]

spec = im.spec_from_file_location(module_name, module_path)
module = spec.loader.load_module(module_name)

return module


def extract_dlt_members(module: ModuleType) -> DltInventory:
variables: DltInventory = defaultdict(list)
for name, value in it.getmembers(module):
# We would like to skip private variables and other modules
if not it.ismodule(value) and not name.startswith("_"):
if isinstance(value, dlt.Pipeline):
variables["pipelines"].append(
{
"name": value.pipeline_name,
"instance": value,
}
)

if isinstance(value, DltSource):
variables["sources"].append(
{
"name": value.name,
"instance": value,
}
)

if isinstance(value, DltResource):
variables["resources"].append(
{
"name": value.name,
"instance": value,
}
)

return variables


def validate_mvp_pipeline(pipeline_memebers: DltInventory) -> t.List[str]:
errors = []
if not pipeline_memebers.get("pipelines"):
errors.append("Could not find any pipeline in the given module")

if not pipeline_memebers.get("resources") and not pipeline_memebers.get("sources"):
errors.append("Could not find any source or resource in the given module")

return errors


def prepare_run_arguments(arglist: t.List[str]) -> t.Dict[str, str]:
run_options = {}
for arg in arglist:
arg_name, value = arg.split("=")
run_options[arg_name] = value

return run_options


def get_pipeline_by_name(pipeline_name: str, members: DltInventory) -> t.Optional[dlt.Pipeline]:
for pipeline_item in members["pipelines"]:
if pipeline_item["name"] == pipeline_name:
return pipeline_item["instace"]

return None


def get_resource_by_name(
resource_name: str, members: DltInventory
) -> t.Optional[t.Union[DltResource, DltSource]]:
for source_item in chain(members["resources"], members["sources"]):
if source_item["name"] == resource_name:
return source_item["instace"]
Empty file added dlt/common/cli/__init__.py
Empty file.
193 changes: 193 additions & 0 deletions dlt/common/cli/runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import ast
from collections import defaultdict
import inspect
import os
import typing as t

from dataclasses import dataclass
from importlib import machinery as im
from importlib import util as iu
from types import ModuleType

import dlt

from dlt.cli import echo as fmt
from dlt.common.configuration.providers.environ import EnvironProvider
from dlt.common.configuration.providers.toml import ConfigTomlProvider, SecretsTomlProvider
from dlt.common.configuration.specs.config_providers_context import ConfigProvidersContext
from dlt.sources import DltResource, DltSource
from dlt.cli.utils import parse_init_script
from dlt.common.reflection.utils import evaluate_node_literal
from dlt.reflection import names as rn
from dlt.reflection.script_visitor import PipelineScriptVisitor


@dataclass
class RunnerInventory:
"""This class contains parameters passed to command
Also it provides two convenience methods to load script contents
and to get module name from provided `script_path`.
"""

script_path: str
pipeline_name: t.Optional[str] = None
source_name: t.Optional[str] = None
run_first_pipeline_with_source: t.Optional[bool] = False
args: t.List[str] = None

@property
def script_contents(self) -> str:
"""Loads script contents"""
with open(self.script_path) as fp:
return fp.read()

@property
def module_name(self) -> str:
"""Strips extension with path and returns filename as modulename"""
module_name = self.script_path.split(os.sep)[-1]
if module_name.endswith(".py"):
module_name = module_name[:-3]

return module_name

@property
def run_arguments(self) -> t.Dict[str, str]:
run_options = {}
for arg in self.args or []:
arg_name, value = arg.split("=")
run_options[arg_name] = value

return run_options


class PipelineRunner:
def __init__(self, inventory: RunnerInventory, visitor: PipelineScriptVisitor) -> None:
self.inventory = inventory
self.visitor = visitor
self.raise_if_not_runnable()
self.pipeline_source = self.strip_pipeline_runs()
self.module = self.load_module()
self.run_options = self.inventory.run_arguments
self.workdir = os.path.dirname(os.path.abspath(self.inventory.script_path))

def run(self):
config_path = f"{self.workdir}/.dlt"
ConfigProvidersContext.initial_providers = [
EnvironProvider(),
SecretsTomlProvider(project_dir=config_path, add_global_config=False),
ConfigTomlProvider(project_dir=config_path, add_global_config=False),
]

pick_first = not self.inventory.pipeline_name and not self.inventory.source_name
if pick_first:
fmt.echo(
"Neiter of pipeline name or source were specified, "
"we will pick first pipeline and a source to run"
)

pipeline_name = self.inventory.pipeline_name
resource_name = self.inventory.source_name
pipelines = list(self.pipelines.values())
resources = list(self.visitor.known_sources_resources.keys())
if pick_first:
resource = getattr(self.module, resources[0])
pipeline_instance = pipelines[0]
else:
resource = getattr(self.module, resource_name)
pipeline_instance = self.pipelines[pipeline_name]
setattr(self.module, f"pipeline_{pipeline_name}", pipeline_instance)

pipeline_instance.run(resource(), **self.run_options)

@property
def run_nodes(self) -> t.List[ast.AST]:
"""Extract pipeline.run nodes"""
pipeline_runs = self.visitor.known_calls_with_nodes.get(rn.RUN)
run_nodes = [run_node for _args, run_node in pipeline_runs]
return run_nodes

@property
def sources(self) -> t.List[str]:
"""Returns source and resource names"""
return self.visitor.known_sources_resources.keys()

@property
def pipelines(self) -> t.Dict[str, dlt.Pipeline]:
pipeline_arguments: t.List[inspect.BoundArguments] = (
self.visitor.known_calls.get("pipeline") or []
)
pipeline_items = defaultdict(dict)
for item in pipeline_arguments:
pipeline_options = {}
for arg_name, bound_value in item.arguments.items():
if bound_value is not None:
if arg_name == "kwargs":
pipeline_options.update(bound_value)
continue

if hasattr(bound_value, "value"):
value = bound_value.value
else:
value = bound_value
pipeline_options[arg_name] = value

pipeline = dlt.pipeline(**pipeline_options)
pipeline.working_dir = os.path.dirname(os.path.abspath(self.inventory.script_path))
pipeline_items[pipeline_options["pipeline_name"]] = pipeline

return pipeline_items

def strip_pipeline_runs(self) -> str:
"""Removes all pipeline.run nodes and return patched source code"""
# Copy original source
script_lines: t.List[str] = self.visitor.source_lines[:]
stub = '""" run method replaced """'

def restore_indent(line: str) -> str:
indent = ""
for ch in line:
if ch == " ":
indent += " "
return indent

for node in self.run_nodes:
# if it is a one liner
if node.lineno == node.end_lineno:
script_lines[node.lineno] = None
else:
line_of_code = script_lines[node.lineno - 1]
script_lines[node.lineno - 1] = restore_indent(line_of_code) + stub
start = node.lineno + 1
while start <= node.end_lineno:
script_lines[start - 1] = None
start += 1

result = [line.rstrip() for line in script_lines if line]
return "\n".join(result)

def load_module(self) -> ModuleType:
spec = im.ModuleSpec(name=self.inventory.module_name, loader=None)
module = iu.module_from_spec(spec)
exec(self.pipeline_source, module.__dict__)
return module

def raise_if_not_runnable(self) -> None:
if not self.visitor.known_calls.get("pipeline"):
raise RuntimeError("Could not find any pipeline in the given module")

if not self.visitor.known_sources_resources:
raise RuntimeError("Could not find any source or resource in the given module")


class DltRunnerEnvironment:
def __init__(self, inventory: RunnerInventory) -> None:
self.inventory = inventory
visitor: PipelineScriptVisitor = parse_init_script(
"run",
self.inventory.script_contents,
self.inventory.module_name,
)
self.runner = PipelineRunner(inventory, visitor)

def run(self) -> None:
self.runner.run()
7 changes: 5 additions & 2 deletions dlt/common/reflection/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,11 @@ def creates_func_def_name_node(func_def: ast.FunctionDef, source_lines: Sequence
def rewrite_python_script(
source_script_lines: List[str], transformed_nodes: List[Tuple[ast.AST, ast.AST]]
) -> List[str]:
"""Replaces all the nodes present in `transformed_nodes` in the `script_lines`. The `transformed_nodes` is a tuple where the first element
is must be a node with full location information created out of `script_lines`"""
"""Replaces all the nodes present in `transformed_nodes` in the `script_lines`.
The `transformed_nodes` is a tuple where the first element must be a node with
full location information created out of `script_lines`
"""
script_lines: List[str] = []
last_line = -1
last_offset = -1
Expand Down
5 changes: 4 additions & 1 deletion dlt/reflection/script_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import ast
import astunparse
from ast import NodeVisitor
from typing import Any, Dict, List
from collections import defaultdict
from typing import Any, Dict, List, Tuple
from dlt.common.reflection.utils import find_outer_func_def


Expand All @@ -19,6 +20,7 @@ def __init__(self, source: str):
# self.source_aliases: Dict[str, str] = {}
self.is_destination_imported: bool = False
self.known_calls: Dict[str, List[inspect.BoundArguments]] = {}
self.known_calls_with_nodes: Dict[str, List[Tuple[inspect.BoundArguments, ast.AST]]] = defaultdict(list)
self.known_sources: Dict[str, ast.FunctionDef] = {}
self.known_source_calls: Dict[str, List[ast.Call]] = {}
self.known_resources: Dict[str, ast.FunctionDef] = {}
Expand Down Expand Up @@ -104,6 +106,7 @@ def visit_Call(self, node: ast.Call) -> Any:
# print(f"ALIAS: {alias_name} of {self.func_aliases.get(alias_name)} with {bound_args}")
fun_calls = self.known_calls.setdefault(fn, [])
fun_calls.append(bound_args)
self.known_calls_with_nodes[fn].append((bound_args, node))
except TypeError:
# skip the signature
pass
Expand Down

0 comments on commit 9119001

Please sign in to comment.