Skip to content

Commit

Permalink
Fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
romain-intel committed Dec 6, 2024
1 parent 4aa85d2 commit fc1a3b8
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 53 deletions.
7 changes: 6 additions & 1 deletion metaflow/includefile.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,16 @@ def __init__(
self._includefile_overrides["is_text"] = is_text
if encoding is not None:
self._includefile_overrides["encoding"] = encoding
# NOTA: Right now, there is an issue where these can't be overridden by config
# in all circumstances. Ignoring for now.
super(IncludeFile, self).__init__(
name,
required=required,
help=help,
type=FilePathClass(is_text, encoding),
type=FilePathClass(
self._includefile_overrides.get("is_text", True),
self._includefile_overrides.get("encoding", "utf-8"),
),
**kwargs,
)

Expand Down
42 changes: 35 additions & 7 deletions metaflow/runner/click_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,20 @@ def _method_sanity_check(


def _lazy_load_command(
cli_collection: click.Group, flow_parameters: List[Parameter], _self, name: str
cli_collection: click.Group,
flow_parameters: Union[str, List[Parameter]],
_self,
name: str,
):

# Context is not used in get_command so we can pass None. Since we pin click,
# this won't change from under us.

if isinstance(flow_parameters, str):
# Resolve flow_parameters -- for start, this is a function which we
# need to call to figure out the actual parameters (may be changed by configs)
flow_parameters = getattr(_self, flow_parameters)()

cmd_obj = cli_collection.get_command(None, name)
if cmd_obj:
if isinstance(cmd_obj, click.Group):
Expand Down Expand Up @@ -205,9 +214,11 @@ def extract_flow_class_from_file(flow_file: str) -> FlowSpec:


class MetaflowAPI(object):
def __init__(self, parent=None, **kwargs):
def __init__(self, parent=None, flow_cls=None, **kwargs):
self._parent = parent
self._chain = [{self._API_NAME: kwargs}]
self._flow_cls = flow_cls
self._cached_computed_parameters = None

@property
def parent(self):
Expand All @@ -226,9 +237,7 @@ def name(self):
@classmethod
def from_cli(cls, flow_file: str, cli_collection: Callable) -> Callable:
flow_cls = extract_flow_class_from_file(flow_file)
flow_parameters = [
p for _, p in flow_cls._get_parameters() if not p.IS_CONFIG_PARAMETER
]

with flow_context(flow_cls) as _:
add_decorator_options(cli_collection)

Expand All @@ -240,7 +249,7 @@ def getattr_wrapper(_self, name):
"__module__": "metaflow",
"_API_NAME": flow_file,
"_internal_getattr": functools.partial(
_lazy_load_command, cli_collection, flow_parameters
_lazy_load_command, cli_collection, "_compute_flow_parameters"
),
"__getattr__": getattr_wrapper,
}
Expand All @@ -264,7 +273,7 @@ def _method(_self, **kwargs):
defaults,
**kwargs,
)
return to_return(parent=None, **method_params)
return to_return(parent=None, flow_cls=flow_cls, **method_params)

m = _method
m.__name__ = cli_collection.name
Expand Down Expand Up @@ -314,6 +323,25 @@ def execute(self) -> List[str]:

return components

def _compute_flow_parameters(self):
if self._flow_cls is None or self._parent is not None:
raise RuntimeError(
"Computing flow-level parameters for a non start API. "
"Please report to the Metaflow team."
)
# TODO: We need to actually compute the new parameters (based on configs) which
# would involve processing the options at least partially. We will do this
# before GA but for now making it work for regular parameters
if self._cached_computed_parameters is not None:
return self._cached_computed_parameters
self._cached_computed_parameters = []
for _, param in self._flow_cls._get_parameters():
if param.IS_CONFIG_PARAMETER:
continue
param.init()
self._cached_computed_parameters.append(param)
return self._cached_computed_parameters


def extract_all_params(cmd_obj: Union[click.Command, click.Group]):
arg_params_sigs = OrderedDict()
Expand Down
104 changes: 62 additions & 42 deletions metaflow/user_configs/config_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def convert(self, value, param, ctx):
return value
if value.startswith(_DEFAULT_PREFIX):
is_default = True
value = value[len(_DEFAULT_PREFIX) :]

return self.convert_value(value, is_default)

Expand Down Expand Up @@ -105,7 +106,8 @@ class MultipleTuple(click.Tuple):
# by whitespace which is totally not what we want
# You can now pass multiple configuration options through an environment variable
# using something like:
# METAFLOW_FLOW_CONFIG='{"config1": "filenameforconfig1.json", "config2": {"key1": "value1"}}'
# METAFLOW_FLOW_CONFIG_VALUE='{"config1": {"key0": "value0"}, "config2": {"key1": "value1"}}'
# or METAFLOW_FLOW_CONFIG='{"config1": "file1", "config2": "file2"}'

def split_envvar_value(self, rv):
loaded = json.loads(rv)
Expand Down Expand Up @@ -225,53 +227,71 @@ def process_configs(self, ctx, param, value):
return None

# The second go around, we process all the values and merge them.
# Check that the user didn't provide *both* a path and a value.
common_keys = set(self._value_values or []).intersection(
[k for k, v in self._path_values.items()] or []

# If we are processing options that start with kv., we know we are in a subprocess
# and ignore other stuff. In particular, environment variables used to pass
# down configurations (like METAFLOW_FLOW_CONFIG) could still be present and
# would cause an issue -- we can ignore those as the kv. values should trump
# everything else.
all_keys = set(self._value_values).union(self._path_values)
# Make sure we have at least some keys (ie: some non default values)
has_all_kv = all_keys and all(
self._value_values.get(k, "").startswith(_CONVERT_PREFIX + "kv.")
for k in all_keys
)
if common_keys:
raise click.UsageError(
"Cannot provide both a value and a file for the same configuration. "
"Found such values for '%s'" % "', '".join(common_keys)
)

all_values = dict(self._path_values or {})
all_values.update(self._value_values or {})
flow_cls._flow_state[_FlowState.CONFIGS] = {}
to_return = {}

debug.userconf_exec("All config values: %s" % str(all_values))
if not has_all_kv:
# Check that the user didn't provide *both* a path and a value.
common_keys = set(self._value_values or []).intersection(
[k for k, v in self._path_values.items()] or []
)
if common_keys:
raise click.UsageError(
"Cannot provide both a value and a file for the same configuration. "
"Found such values for '%s'" % "', '".join(common_keys)
)

flow_cls._flow_state[_FlowState.CONFIGS] = {}
all_values = dict(self._path_values or {})
all_values.update(self._value_values or {})

to_return = {}
merged_configs = {}
for name, (val, is_path) in self._defaults.items():
n = name.lower()
if n in all_values:
merged_configs[n] = all_values[n]
else:
if isinstance(val, DeployTimeField):
# This supports a default value that is a deploy-time field (similar
# to Parameter).)
# We will form our own context and pass it down -- note that you cannot
# use configs in the default value of configs as this introduces a bit
# of circularity. Note also that quiet and datastore are *eager*
# options so are available here.
param_ctx = ParameterContext(
flow_name=ctx.obj.flow.name,
user_name=get_username(),
parameter_name=n,
logger=echo_dev_null if ctx.params["quiet"] else echo_always,
ds_type=ctx.params["datastore"],
configs=None,
)
val = val.fun(param_ctx)
if is_path:
# This is a file path
merged_configs[n] = ConvertPath.convert_value(val, False)
else:
# This is a value
merged_configs[n] = ConvertDictOrStr.convert_value(val, False)
debug.userconf_exec("All config values: %s" % str(all_values))

merged_configs = {}
for name, (val, is_path) in self._defaults.items():
n = name.lower()
if n in all_values:
merged_configs[n] = all_values[n]
else:
if isinstance(val, DeployTimeField):
# This supports a default value that is a deploy-time field (similar
# to Parameter).)
# We will form our own context and pass it down -- note that you cannot
# use configs in the default value of configs as this introduces a bit
# of circularity. Note also that quiet and datastore are *eager*
# options so are available here.
param_ctx = ParameterContext(
flow_name=ctx.obj.flow.name,
user_name=get_username(),
parameter_name=n,
logger=(
echo_dev_null if ctx.params["quiet"] else echo_always
),
ds_type=ctx.params["datastore"],
configs=None,
)
val = val.fun(param_ctx)
if is_path:
# This is a file path
merged_configs[n] = ConvertPath.convert_value(val, False)
else:
# This is a value
merged_configs[n] = ConvertDictOrStr.convert_value(val, False)
else:
debug.userconf_exec("Fast path due to pre-processed values")
merged_configs = self._value_values
debug.userconf_exec("Configs merged with defaults: %s" % str(merged_configs))

missing_configs = set()
Expand Down
1 change: 1 addition & 0 deletions test/core/metaflow_test/formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(self, graphspec, test):
self.steps = self._index_steps(test)
self.flow_code = self._pretty_print(self._flow_lines())
self.check_code = self._pretty_print(self._check_lines())
self.copy_files = getattr(test, "REQUIRED_FILES", [])
self.valid = True

for step in self.steps:
Expand Down
4 changes: 4 additions & 0 deletions test/core/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ def construct_arg_dicts_from_click_api():
os.path.join(cwd, "metaflow_test"), os.path.join(tempdir, "metaflow_test")
)

# Copy files required by the test
for file in formatter.copy_files:
shutil.copy2(os.path.join(cwd, "tests", file), os.path.join(tempdir, file))

path = os.path.join(tempdir, "test_flow.py")

original_env = os.environ.copy()
Expand Down
7 changes: 4 additions & 3 deletions test/core/tests/basic_config_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

class BasicConfigTest(MetaflowTest):
PRIORITY = 1
REQUIRED_FILES = ["basic_config_silly.txt"]
PARAMETERS = {
"default_from_config": {
"default": "config_expr('config2').default_param",
Expand All @@ -17,7 +18,7 @@ class BasicConfigTest(MetaflowTest):
"silly_config": {
"required": True,
"parser": "silly_parser",
"default": "silly.txt",
"default": "'silly.txt'",
},
"config2": {},
# Test using a function to get the value
Expand All @@ -32,7 +33,7 @@ class BasicConfigTest(MetaflowTest):
# Test passing values directly on the command line
os.environ['METAFLOW_FLOW_CONFIG_VALUE'] = json.dumps(
{
"config2": {"default_param": 123}
"config2": {"default_param": 123},
"config_env": {"vars": {"var1": "value1", "var2": "value2"}}
}
)
Expand All @@ -46,7 +47,7 @@ class BasicConfigTest(MetaflowTest):
def silly_parser(s):
k, v = s.split(":")
return {k: v}
return {k: v.strip()}
default_config = {
"value": 42,
Expand Down

0 comments on commit fc1a3b8

Please sign in to comment.