-
Notifications
You must be signed in to change notification settings - Fork 781
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This will allow parameters to be added when processing the `start` step (for the upcomming config change).
- Loading branch information
1 parent
05f9756
commit 407c701
Showing
9 changed files
with
823 additions
and
715 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
import pickle | ||
|
||
from metaflow._vendor import click | ||
|
||
from ..cli import echo_always, echo | ||
from ..datastore import TaskDataStoreSet | ||
from ..exception import CommandException | ||
|
||
|
||
@click.command( | ||
help="Get data artifacts of a task or all tasks in a step. " | ||
"The format for input-path is either <run_id>/<step_name> or " | ||
"<run_id>/<step_name>/<task_id>." | ||
) | ||
@click.argument("input-path") | ||
@click.option( | ||
"--private/--no-private", | ||
default=False, | ||
show_default=True, | ||
help="Show also private attributes.", | ||
) | ||
@click.option( | ||
"--max-value-size", | ||
default=1000, | ||
show_default=True, | ||
type=int, | ||
help="Show only values that are smaller than this number. " | ||
"Set to 0 to see only keys.", | ||
) | ||
@click.option( | ||
"--include", | ||
type=str, | ||
default="", | ||
help="Include only artifacts in the given comma-separated list.", | ||
) | ||
@click.option( | ||
"--file", type=str, default=None, help="Serialize artifacts in the given file." | ||
) | ||
@click.pass_obj | ||
def dump(obj, input_path, private=None, max_value_size=None, include=None, file=None): | ||
output = {} | ||
kwargs = { | ||
"show_private": private, | ||
"max_value_size": max_value_size, | ||
"include": {t for t in include.split(",") if t}, | ||
} | ||
|
||
# Pathspec can either be run_id/step_name or run_id/step_name/task_id. | ||
parts = input_path.split("/") | ||
if len(parts) == 2: | ||
run_id, step_name = parts | ||
task_id = None | ||
elif len(parts) == 3: | ||
run_id, step_name, task_id = parts | ||
else: | ||
raise CommandException( | ||
"input_path should either be run_id/step_name or run_id/step_name/task_id" | ||
) | ||
|
||
datastore_set = TaskDataStoreSet( | ||
obj.flow_datastore, | ||
run_id, | ||
steps=[step_name], | ||
prefetch_data_artifacts=kwargs.get("include"), | ||
) | ||
if task_id: | ||
ds_list = [datastore_set.get_with_pathspec(input_path)] | ||
else: | ||
ds_list = list(datastore_set) # get all tasks | ||
|
||
for ds in ds_list: | ||
echo( | ||
"Dumping output of run_id=*{run_id}* " | ||
"step=*{step}* task_id=*{task_id}*".format( | ||
run_id=ds.run_id, step=ds.step_name, task_id=ds.task_id | ||
), | ||
fg="magenta", | ||
) | ||
|
||
if file is None: | ||
echo_always( | ||
ds.format(**kwargs), highlight="green", highlight_bold=False, err=False | ||
) | ||
else: | ||
output[ds.pathspec] = ds.to_dict(**kwargs) | ||
|
||
if file is not None: | ||
with open(file, "wb") as f: | ||
pickle.dump(output, f, protocol=pickle.HIGHEST_PROTOCOL) | ||
echo("Artifacts written to *%s*" % file) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
from metaflow._vendor import click | ||
|
||
from .. import parameters | ||
from ..runtime import NativeRuntime | ||
|
||
|
||
@parameters.add_custom_parameters(deploy_mode=False) | ||
@click.command(help="Internal command to initialize a run.", hidden=True) | ||
@click.option( | ||
"--run-id", | ||
default=None, | ||
required=True, | ||
help="ID for one execution of all steps in the flow.", | ||
) | ||
@click.option( | ||
"--task-id", default=None, required=True, help="ID for this instance of the step." | ||
) | ||
@click.option( | ||
"--tag", | ||
"tags", | ||
multiple=True, | ||
default=None, | ||
help="Tags for this instance of the step.", | ||
) | ||
@click.pass_obj | ||
def init(obj, run_id=None, task_id=None, tags=None, **kwargs): | ||
# init is a separate command instead of an option in 'step' | ||
# since we need to capture user-specified parameters with | ||
# @add_custom_parameters. Adding custom parameters to 'step' | ||
# is not desirable due to the possibility of name clashes between | ||
# user-specified parameters and our internal options. Note that | ||
# user-specified parameters are often defined as environment | ||
# variables. | ||
|
||
obj.metadata.add_sticky_tags(tags=tags) | ||
|
||
runtime = NativeRuntime( | ||
obj.flow, | ||
obj.graph, | ||
obj.flow_datastore, | ||
obj.metadata, | ||
obj.environment, | ||
obj.package, | ||
obj.logger, | ||
obj.entrypoint, | ||
obj.event_logger, | ||
obj.monitor, | ||
run_id=run_id, | ||
) | ||
obj.flow._set_constants(obj.graph, kwargs) | ||
runtime.persist_constants(task_id=task_id) |
Oops, something went wrong.