diff --git a/metaflow_extensions/netflix_ext/cmd/environment/environment_cmd.py b/metaflow_extensions/netflix_ext/cmd/environment/environment_cmd.py index c4854bf..bd44842 100644 --- a/metaflow_extensions/netflix_ext/cmd/environment/environment_cmd.py +++ b/metaflow_extensions/netflix_ext/cmd/environment/environment_cmd.py @@ -355,6 +355,10 @@ def create( cast(Conda, obj.conda).write_out_environments() + # We are going to be creating this new environment going forward (not the + # initial env we got) + _, env, _ = next(resolver.resolved_environments()) + delta_time = int(time.time() - start) obj.echo(" done in %d second%s." % (delta_time, plural_marker(delta_time))) diff --git a/metaflow_extensions/netflix_ext/plugins/conda/conda.py b/metaflow_extensions/netflix_ext/plugins/conda/conda.py index f83d4c0..6e20014 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/conda.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/conda.py @@ -230,7 +230,7 @@ def env_type_for_deps(deps: Sequence[TStr]) -> EnvType: def default_conda_channels(self) -> List[str]: if not self._found_binaries: self._find_conda_binary() - return list(self._info["channels"]) + return list(self._info["channels"] or []) @property def default_pip_sources(self) -> List[str]: @@ -790,14 +790,15 @@ def env_id_from_alias( % (env_alias, alias_type.value, " " if env_id else " not ", resolved_alias) ) - if env_id: - return env_id - - if not local_only and self._storage is not None: - env_id = self._remote_fetch_alias([(alias_type, resolved_alias)], arch) - if env_id: - return env_id[0] - return None + if ( + not local_only + and self._storage is not None + and (env_id is None or is_alias_mutable(alias_type, resolved_alias)) + ): + env_id_list = self._remote_fetch_alias([(alias_type, resolved_alias)], arch) + if env_id_list: + env_id = env_id_list[0] + return env_id def environment_from_alias( self, env_alias: str, arch: Optional[str] = None, local_only: bool = False @@ -2032,7 +2033,7 @@ def _resolve_env_with_pip( implementations = [x.interpreter for x in supported_tags] extra_args = ( "--only-binary=:all:", - # This seems to overly constrain things so skipping for now + # Seems to overly constrain stuff # *( # chain.from_iterable( # product(["--implementation"], set(implementations)) @@ -2048,7 +2049,38 @@ def _resolve_env_with_pip( # pip to it using the `--find-links` argument. local_packages_dict = {} # type: Dict[str, PackageSpecification] if local_packages: - os.makedirs(os.path.join(pip_dir, "local_packages")) + # This is a bit convoluted but we try to avoid downloading packages + # that we already have but we also don't want to point pip to a directory + # full of packages that we may not want to use so we will create symlinks + # to packages we already have and download the others + base_local_pip_packages = os.path.join(self._package_dirs[0], "pip") + tmp_local_pip_packages = os.path.realpath( + os.path.join(pip_dir, "local_packages", "pip") + ) + os.makedirs(tmp_local_pip_packages) + new_local_packages = [] # type: List[PackageSpecification] + for p in local_packages: + for fmt in PipPackageSpecification.allowed_formats(): + filename = "%s%s" % (p.filename, fmt) + if os.path.isfile( + os.path.join(base_local_pip_packages, filename) + ): + os.symlink( + os.path.join(base_local_pip_packages, filename), + os.path.join(tmp_local_pip_packages, filename), + ) + local_packages_dict[ + os.path.join(tmp_local_pip_packages, filename) + ] = p + p.add_local_file( + fmt, + os.path.join(base_local_pip_packages, filename), + replace=True, + ) + break + else: + new_local_packages.append(p) + local_packages = new_local_packages # This will not fetch on the web so no need for auth object self._lazy_fetch_packages( local_packages, None, os.path.join(pip_dir, "local_packages") @@ -2119,18 +2151,28 @@ def _resolve_env_with_pip( packages_to_build.append(dl_info) else: # A local wheel or tarball - if url in local_packages_dict: - debug.conda_exec("This is a known local package") + if local_path in local_packages_dict: # We are going to move this file to a less "temporary" # location so that it can be installed if needed + pkg_spec = local_packages_dict[local_path] - filename = os.path.split(local_path)[1] - file_format = correct_splitext(filename)[1] - shutil.move(local_path, self._package_dirs[0]) - pkg_spec.add_local_file( - file_format, - os.path.join(self._package_dirs[0], filename), - ) + if not os.path.islink(local_path): + debug.conda_exec("Known package -- moving in place") + filename = os.path.split(local_path)[1] + file_format = correct_splitext(filename)[1] + shutil.move( + local_path, + os.path.join(self._package_dirs[0], "pip"), + ) + pkg_spec.add_local_file( + file_format, + os.path.join( + self._package_dirs[0], "pip", filename + ), + replace=True, + ) + else: + debug.conda_exec("Known package already in place") result.append(pkg_spec) else: parse_result = parse_explicit_path_pip(url) @@ -3168,7 +3210,15 @@ def _install_remote_conda(self): from metaflow.plugins import DATASTORES # We download the installer and return a path to it - final_path = os.path.join(os.getcwd(), "__conda_installer") + # To be clean, we try to be in the parent of our current directory to avoid + # polluting the user code. If that is not possible though, we create something + # inside the current directory + parent_dir = os.path.dirname(os.getcwd()) + if os.access(parent_dir, os.W_OK): + final_path = os.path.join(parent_dir, "conda_env", "__conda_installer") + else: + final_path = os.path.join(os.getcwd(), "conda_env", "__conda_installer") + os.makedirs(os.path.dirname(final_path)) path_to_fetch = os.path.join( CONDA_REMOTE_INSTALLER_DIRNAME, diff --git a/metaflow_extensions/netflix_ext/plugins/conda/conda_environment.py b/metaflow_extensions/netflix_ext/plugins/conda/conda_environment.py index 65e878f..9501d0b 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/conda_environment.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/conda_environment.py @@ -13,18 +13,20 @@ Optional, Set, Tuple, + Union, cast, ) from metaflow.plugins.datastores.local_storage import LocalStorage from metaflow.flowspec import FlowSpec -from metaflow.metaflow_config import ( - CONDA_MAGIC_FILE_V2, -) +from metaflow.exception import MetaflowException + +from metaflow.metaflow_config import CONDA_MAGIC_FILE_V2 from metaflow.metaflow_environment import MetaflowEnvironment + from .envsresolver import EnvsResolver from .utils import get_conda_manifest_path @@ -71,7 +73,7 @@ def init_environment(self, echo: Callable[..., None]): # Figure out the environments that we need to resolve for all steps # We will resolve all unique environments in parallel step_conda_dec = get_conda_decorator(self._flow, step.name) - if step_conda_dec.is_enabled(): + if step_conda_dec.is_enabled() and not step_conda_dec.is_fetch_at_exec(): resolver.add_environment_for_step(step.name, step_conda_dec) resolver.resolve_environments(echo) @@ -120,12 +122,25 @@ def decospecs(self) -> Tuple[str, ...]: # We will later resolve which to keep. return ("conda", "pip") + self.base_env.decospecs() - def _get_env_id(self, step_name: str) -> Optional[EnvID]: + def _get_env_id(self, step_name: str) -> Optional[Union[str, EnvID]]: conda_decorator = get_conda_decorator(self._flow, step_name) if conda_decorator.is_enabled(): - resolved_env = cast(Conda, self._conda).environment(conda_decorator.env_id) - if resolved_env: - return resolved_env.env_id + if not conda_decorator.is_fetch_at_exec(): + resolved_env = cast(Conda, self._conda).environment( + conda_decorator.env_id + ) + if resolved_env: + return resolved_env.env_id + else: + raise MetaflowException( + "Cannot find environment for step '%s'" % step_name + ) + else: + resolved_env_id = os.environ.get("_METAFLOW_CONDA_ENV") + if resolved_env_id: + return EnvID(*json.loads(resolved_env_id)) + # Here we will return the name of the environment + return conda_decorator.from_env_name_unresolved return None def _get_executable(self, step_name: str) -> Optional[str]: @@ -140,6 +155,12 @@ def bootstrap_commands(self, step_name: str, datastore_type: str) -> List[str]: # Bootstrap conda and execution environment for step env_id = self._get_env_id(step_name) if env_id is not None: + if isinstance(env_id, EnvID): + arg1 = env_id.req_id + arg2 = env_id.full_id + else: + arg1 = env_id + arg2 = "_fetch_exec" return [ "export CONDA_START=$(date +%s)", "echo 'Bootstrapping environment ...'", @@ -148,17 +169,15 @@ def bootstrap_commands(self, step_name: str, datastore_type: str) -> List[str]: "metaflow_extensions.netflix_ext.plugins.conda", self._flow.name, step_name, - env_id.req_id, - env_id.full_id, + arg1, + arg2, datastore_type, ), - "export _METAFLOW_CONDA_ENV='%s'" - % json.dumps(env_id).replace('"', '\\"'), + "export _METAFLOW_CONDA_ENV=$(cat _env_id)", "export PYTHONPATH=$(pwd)/_escape_trampolines:$(printenv PYTHONPATH)", "echo 'Environment bootstrapped.'", "export CONDA_END=$(date +%s)", ] - # TODO: Add the PATH part (need conda directory) return [] def add_to_package(self) -> List[Tuple[str, str]]: diff --git a/metaflow_extensions/netflix_ext/plugins/conda/conda_flow_decorator.py b/metaflow_extensions/netflix_ext/plugins/conda/conda_flow_decorator.py index b91be26..5745e00 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/conda_flow_decorator.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/conda_flow_decorator.py @@ -33,6 +33,13 @@ class CondaFlowDecorator(FlowDecorator): python : Optional[str] Version of Python to use, e.g. '3.7.4'. If not specified, the current Python version will be used. + fetch_at_exec : bool, default False + If set to True, the environment will be fetched when the task is + executing as opposed to at the beginning of the flow (or at deploy time if + deploying to a scheduler). This option requires name or pathspec to be + specified. This is useful, for example, if you want this step to always use + the latest named environment when it runs as opposed to the latest when it + is deployed. disabled : bool, default False If set to True, disables Conda """ @@ -46,6 +53,7 @@ class CondaFlowDecorator(FlowDecorator): "pip_packages": {}, "pip_sources": [], "python": None, + "fetch_at_exec": None, "disabled": None, } @@ -102,6 +110,13 @@ class PipFlowDecorator(FlowDecorator): python : Optional[str] Version of Python to use, e.g. '3.7.4'. If not specified, the current version will be used. + fetch_at_exec : bool, default False + If set to True, the environment will be fetched when the task is + executing as opposed to at the beginning of the flow (or at deploy time if + deploying to a scheduler). This option requires name or pathspec to be + specified. This is useful, for example, if you want this step to always use + the latest named environment when it runs as opposed to the latest when it + is deployed. disabled : bool, default False If set to True, disables Pip """ @@ -114,6 +129,7 @@ class PipFlowDecorator(FlowDecorator): "packages": {}, "sources": [], "python": None, + "fetch_at_exec": None, "disabled": None, } diff --git a/metaflow_extensions/netflix_ext/plugins/conda/conda_step_decorator.py b/metaflow_extensions/netflix_ext/plugins/conda/conda_step_decorator.py index 054b62c..54d865f 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/conda_step_decorator.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/conda_step_decorator.py @@ -4,6 +4,7 @@ import json import os import platform +import re import shutil import sys import tempfile @@ -19,6 +20,7 @@ Sequence, Set, Tuple, + Union, cast, ) @@ -74,11 +76,13 @@ class CondaStepDecorator(StepDecorator): name : Optional[str] If specified, can refer to a named environment. The environment referred to here will be the one used for this step. If specified, nothing else can be - specified in this decorator + specified in this decorator. In the name, you can use `@{}` values and + environment variables will be used to substitute. pathspec : Optional[str] If specified, can refer to the pathspec of an existing step. The environment of this referred step will be used here. If specified, nothing else can be - specified in this decorator. + specified in this decorator. In the pathspec, you can use `@{}` values and + environment variables will be used to substitute. libraries : Optional[Dict[str, str]] Libraries to use for this step. The key is the name of the package and the value is the version to use (default: `{}`). Note that versions can @@ -93,11 +97,20 @@ class CondaStepDecorator(StepDecorator): python : Optional[str] Version of Python to use, e.g. '3.7.4'. If not specified, the current version will be used. + fetch_at_exec : bool, default False + If set to True, the environment will be fetched when the task is + executing as opposed to at the beginning of the flow (or at deploy time if + deploying to a scheduler). This option requires name or pathspec to be + specified. This is useful, for example, if you want this step to always use + the latest named environment when it runs as opposed to the latest when it + is deployed. disabled : bool, default False If set to True, disables Conda. """ name = "conda" + TYPE = "conda" + defaults = { "name": None, "pathspec": None, @@ -106,6 +119,7 @@ class CondaStepDecorator(StepDecorator): "pip_packages": {}, "pip_sources": [], "python": None, + "fetch_at_exec": None, "disabled": None, } # type: Dict[str, Any] @@ -118,13 +132,26 @@ def is_enabled(self, ubf_context: Optional[str] = None) -> bool: return not next( x for x in [ - self.attributes["disabled"], + self._self_disabled(), self._base_attributes["disabled"], False, ] if x is not None ) + def is_fetch_at_exec(self, ubf_context: Optional[str] = None) -> bool: + if ubf_context == UBF_CONTROL: + return False + return next( + x + for x in [ + self.attributes["fetch_at_exec"], + self._base_attributes["fetch_at_exec"], + False, + ] + if x is not None + ) + @property def env_ids(self) -> List[EnvID]: # Note this returns a list because initially we had support to specify @@ -190,6 +217,10 @@ def local_root(self) -> Optional[str]: def from_env_name(self) -> Optional[str]: return self._from() + @property + def from_env_name_unresolved(self) -> Optional[str]: + return self._from(True) + @property def from_env(self) -> Optional[ResolvedEnvironment]: from_alias = self._from() @@ -216,6 +247,27 @@ def from_env(self) -> Optional[ResolvedEnvironment]: def set_conda(self, conda: Conda): self.conda = conda + @staticmethod + def sub_envvars_in_envname( + name: str, addl_env: Optional[Dict[str, Union[str, Callable[[], str]]]] = None + ) -> str: + init_name = name + if addl_env is None: + addl_env = {} + envvars_to_sub = re.findall(r"\@{(\w+)}", name) + for envvar in set(envvars_to_sub): + replacement = os.environ.get(envvar, addl_env.get(envvar)) + if callable(replacement): + replacement = replacement() + if replacement is not None: + name = name.replace("@{%s}" % envvar, replacement) + else: + raise InvalidEnvironmentException( + "Could not find '%s' in the environment -- needed to resolve '%s'" + % (envvar, name) + ) + return name + def step_init( self, flow: FlowSpec, @@ -238,6 +290,7 @@ def step_init( self._flow = flow self._step_name = step_name self._flow_datastore_type = flow_datastore.TYPE # type: str + self._flow_datastore = flow_datastore self._base_attributes = self._get_base_attributes() self._is_remote = any( @@ -259,12 +312,14 @@ def step_init( self._resolved_deps = None # type: Optional[Sequence[TStr]] self._resolved_sources = None # type: Optional[Sequence[TStr]] self._env_type = None # type: Optional[EnvType] + self._env_for_fetch = {} # type: Dict[str, Union[str, Callable[[], str]]] + self._flow = None # type: Optional[FlowSpec] - if (self.attributes["name"] or self.attributes["pathspec"]) and len( + if (self.attributes["name"] or self.attributes["pathspec"]) and any( [ - k + True for k, v in self.attributes.items() - if v and k not in ("name", "pathspec") + if v and k not in ("name", "pathspec", "fetch_at_exec") ] ): raise InvalidEnvironmentException( @@ -272,6 +327,32 @@ def step_init( % self.name ) + if self.is_fetch_at_exec(): + if not self._from(raw_name=True): + raise InvalidEnvironmentException( + "You cannot specify a `fetch_at_exec` environment and no environment " + "to fetch (either through `name` or `pathspec`) in @%s" % self.name + ) + # We are also very strict that the environment should be *only* a name + # and nothing else as we won't re-resolve + if any( + [ + True + for k, v in self.attributes.items() + if v and k not in ("name", "pathspec", "fetch_at_exec") + ] + ) or any( + [ + True + for k, v in self._base_attributes.items() + if v and k not in ("name", "pathspec", "fetch_at_exec") + ] + ): + raise InvalidEnvironmentException( + "You cannot specify a `fetch_at_exec` environment with anything " + "other than a pure named environment in @%s" % self.name + ) + os.environ["PYTHONNOUSERSITE"] = "1" def runtime_init(self, flow: FlowSpec, graph: FlowGraph, package: Any, run_id: str): @@ -334,6 +415,53 @@ def runtime_init(self, flow: FlowSpec, graph: FlowGraph, package: Any, run_id: s # the escape to work even in non metaflow-created subprocesses generate_trampolines(self._metaflow_home) + # If we need to fetch the environment on exec, save the information we need + # so that we can resolve it using information such as run id, step name, task + # id and parameter values + if self.is_enabled() and self.is_fetch_at_exec(): + self._flow = flow + self._env_for_fetch["METAFLOW_RUN_ID"] = run_id + self._env_for_fetch["METAFLOW_STEP_NAME"] = self.name + + def runtime_task_created( + self, + task_datastore: TaskDataStore, + task_id: str, + split_index: int, + input_paths: List[str], + is_cloned: bool, + ubf_context: str, + ): + if self.is_enabled(ubf_context) and self.is_fetch_at_exec(ubf_context): + # We need to ensure we can properly find the environment we are + # going to run in + run_id, step_name, task_id = input_paths[0].split("/") + parent_ds = self._flow_datastore.get_task_datastore( + run_id, step_name, task_id + ) + for var, _ in self._flow._get_parameters(): + self._env_for_fetch[ + "METAFLOW_INIT_%s" % var.upper().replace("-", "_") + ] = lambda _param=getattr( + self._flow, var + ), _var=var, _ds=parent_ds: str( + _param.load_parameter(_ds[_var]) + ) + self._env_for_fetch["METAFLOW_TASK_ID"] = task_id + + self._get_conda(self._echo, self._flow_datastore_type) + assert self.conda + # Calling from_env_name will resolve the environment name using all the + # additional variables injected above. + resolved_env_id = self.conda.env_id_from_alias( + cast(str, self.from_env_name), arch=self._arch + ) + if resolved_env_id is None: + raise RuntimeError( + "Cannot find environment '%s' (from '%s') for arch '%s'" + % (self.from_env_name, self.from_env_name_unresolved, self._arch) + ) + def runtime_step_cli( self, cli_args: Any, # Importing CLIArgs causes an issue so ignore for now @@ -341,11 +469,10 @@ def runtime_step_cli( max_user_code_retries: int, ubf_context: str, ): - # If remote -- we don't do anything - if self._is_remote: - return - - if self.is_enabled(UBF_TASK): + # We also set the env var in remote case for is_fetch_at_exec + # so that it can be used to fill out the bootstrap command with + # the proper environment + if self.is_enabled(UBF_TASK) or self.is_fetch_at_exec(ubf_context): self._get_conda(self._echo, self._flow_datastore_type) assert self.conda resolved_env = cast( @@ -359,7 +486,7 @@ def runtime_step_cli( # the environment for the control task -- just for the actual tasks. cli_args.env["_METAFLOW_CONDA_ENV"] = json.dumps(my_env_id) - if not self.is_enabled(ubf_context): + if not self.is_enabled(ubf_context) or self._is_remote: return # Create the environment we are going to use @@ -439,6 +566,20 @@ def _get_base_attributes(self) -> Dict[str, Any]: return self._flow._flow_decorators["conda_base"][0].attributes return self.defaults + def _self_disabled(self) -> bool: + self_disabled = self.attributes["disabled"] + if self_disabled is None: + # If the user sets anything we consider that disabled = False + if ( + self.attributes["name"] + or self.attributes["pathspec"] + or self.attributes["libraries"] + or self.attributes["pip_packages"] + or self.attributes["python"] + ): + self_disabled = False + return self_disabled + def _python_version(self) -> str: return next( x @@ -451,8 +592,8 @@ def _python_version(self) -> str: if x is not None ) - def _from(self) -> Optional[str]: - return ( + def _from(self, raw_name: bool = False) -> Optional[str]: + possible_name = ( next( x for x in [ @@ -471,6 +612,16 @@ def _from(self) -> Optional[str]: or None ) + if possible_name is None: + return None + + possible_name = cast(str, possible_name) + if raw_name: + return possible_name + + # Substitute environment variables + return self.sub_envvars_in_envname(possible_name, self._env_for_fetch) + def _from_env_python(self) -> Optional[str]: from_env = self.from_env if from_env: @@ -649,54 +800,105 @@ def _resolve_pip_or_conda_deco( ) -> bool: has_pip_base = "pip_base" in flow._flow_decorators has_conda_base = "conda_base" in flow._flow_decorators - conda_decs = [(d, idx) for idx, d in enumerate(decorators) if d.name == "conda"] - pip_decs = [(d, idx) for idx, d in enumerate(decorators) if d.name == "pip"] - - # It is possible we don't have both if we call step_init twice (which can - # happen when deploying to schedulers since we attach an additional deco - # and then call step_init again. In that case, we just continue for the - # decorator and ignore this function -- we already properly checked the - # first time around since both conda and pip decorators are added to all - # steps + + # Note that other decorators *extend* either a conda decorator or a pip decorator + # so we look for those decorators as well. + # The pip decorator also extends the conda decorator but here we mean more + # in terms of functionality: + # - extending a conda decorator means providing both pip and conda dependencies + # (potentially) + # - extending a pip decorator means providing only pip packages + all_decs = [d for d in decorators if isinstance(d, CondaStepDecorator)] + last_deco = all_decs[-1] + conda_decs = [d for d in all_decs if d.TYPE == "conda"] + pip_decs = [d for d in all_decs if d.TYPE == "pip"] + + to_remove = [] + if len(conda_decs) > 1: + # There is at least one user defined decorator so we remove all the others + to_remove = [x for x in conda_decs if not x.statically_defined] + conda_decs = [x for x in conda_decs if x.statically_defined] + if len(pip_decs) > 1: + # Ditto for pip + to_remove = [x for x in pip_decs if not x.statically_defined] + pip_decs = [x for x in pip_decs if x.statically_defined] + + # In the environment with decospecs, we add both conda and pip + # decorators so that we can choose the best one based on the presence of the + # base decorators for example. In this function, we clean up the + # decorators and remove all the extraneous ones. In some cases, however, + # this function can be called multiple times (when deploying to a scheduler, + # the step_init (which calls this) is called twice). In that case, we just + # continue along as we already cleaned things out the first time. if len(conda_decs) == 0 or len(pip_decs) == 0: return True - conda_step_decorator, conda_idx = conda_decs[0] - pip_step_decorator, pip_idx = pip_decs[0] + if len(conda_decs) > 1: + # We can only have one Conda-type decorator + raise InvalidEnvironmentException( + "Multiple decorators (%s) provide @conda-like functionality for " + "step '%s'. Please add only one such decorator and include any " + "additional dependencies using the same arguments as for @conda." + % (", ".join(["@%s" % d.name for d in conda_decs]), self.name) + ) + if len(pip_decs) > 1: + # Ditto with Pip-type decorators + raise InvalidEnvironmentException( + "Multiple decorators (%s) provide @pip-like functionality for " + "step '%s'. Please add only one such decorator and include any " + "additional dependencies using the same arguments as for @pip." + % (", ".join(["@%s" % d.name for d in pip_decs]), self.name) + ) - my_idx = pip_idx if self.name == "pip" else conda_idx + conda_deco = conda_decs[0] + pip_deco = pip_decs[0] debug.conda_exec( - "In %s decorator: pip_base(%s), conda_base(%s), conda_idx(%d), pip_idx(%d)" - % (self.name, has_pip_base, has_conda_base, conda_idx, pip_idx) + "In %s decorator: pip_base(%s), conda_base(%s), conda_deco(%s), pip_deco(%s)" + % (self.name, has_pip_base, has_conda_base, conda_deco.name, pip_deco.name) ) - if ( - conda_step_decorator.statically_defined - and pip_step_decorator.statically_defined - ): + if conda_deco.statically_defined and pip_deco.statically_defined: + raise InvalidEnvironmentException( + "Cannot specify both @%s (Conda decorator) and @%s (Pip decorator) " + "in step '%s'. If you need both pip and conda dependencies, " + "use @%s and pass in the pip dependencies as `pip_packages` and " + "the sources as `pip_sources`" + % ( + conda_deco.name, + pip_deco.name, + self.name, + conda_deco.name, + ) + ) + if has_pip_base and conda_deco.statically_defined: raise InvalidEnvironmentException( - "Cannot specify both @conda and @pip on a step. " - "If you need both pip and conda dependencies, use @conda and " - "pass in the pip dependencies as `pip_packages` and the sources as " - "`pip_sources`" + "@pip_base is not compatible with @%s. " + "Use @conda_base instead (using `pip_packages` instead of `packages` " + "and `pip_sources` instead of `sources`)" % conda_deco.name + ) + if has_conda_base and pip_deco.statically_defined: + raise InvalidEnvironmentException( + "@conda_base is not compatible with @%s. Use @pip_base if only using " + "pip dependencies or replace @%s with a @conda decorator (using " + "`pip_packages` instead of `packages` and `pip_sources` instead of " + "`sources)" % (pip_deco.name, pip_deco.name) ) - if has_pip_base and conda_step_decorator.statically_defined: - raise InvalidEnvironmentException("@pip_base is not compatible with @conda") - if has_conda_base and pip_step_decorator.statically_defined: - raise InvalidEnvironmentException("@conda_base is not compatible with @pip") # At this point, we have at most one statically defined so we keep that one # or the one derived from the base decorator. - # If we have none, we keep the conda one (base one). We remove only when - # we are the second decorator + # If we have none, we keep the conda one (base one). # Return true if we should continue the function. False if we return (ie: # we are going to be deleted) - del_idx = pip_idx - if pip_step_decorator.statically_defined or has_pip_base: - del_idx = conda_idx - if my_idx == max(pip_idx, conda_idx): - del decorators[del_idx] - return my_idx != del_idx + del_deco = pip_deco + if pip_deco.statically_defined or has_pip_base: + del_deco = conda_deco + to_remove.append(del_deco) + # We remove only when we are the last decorator since this is called while + # we are iterating on the list + if self.name == last_deco.name: + for d in to_remove: + decorators.remove(d) + return self.name not in [d.name for d in to_remove] @classmethod def _get_conda(cls, echo: Callable[..., None], datastore_type: str) -> None: @@ -722,11 +924,13 @@ class PipStepDecorator(CondaStepDecorator): name : Optional[str] If specified, can refer to a named environment. The environment referred to here will be the one used for this step. If specified, nothing else can be - specified in this decorator + specified in this decorator. In the name, you can use `@{}` values and + environment variables will be used to substitute. pathspec : Optional[str] If specified, can refer to the pathspec of an existing step. The environment of this referred step will be used here. If specified, nothing else can be - specified in this decorator. + specified in this decorator. In the name, you can use `@{}` values and + environment variables will be used to substitute. packages : Optional[Dict[str, str]] Packages to use for this step. The key is the name of the package and the value is the version to use (default: `{}`). @@ -735,11 +939,19 @@ class PipStepDecorator(CondaStepDecorator): python : Optional[str] Version of Python to use, e.g. '3.7.4'. If not specified, the current python version will be used. + fetch_at_exec : bool, default False + If set to True, the environment will be fetched when the task is + executing as opposed to at the beginning of the flow (or at deploy time if + deploying to a scheduler). This option requires name or pathspec to be + specified. This is useful, for example, if you want this step to always use + the latest named environment when it runs as opposed to the latest when it + is deployed. disabled : bool, default False If set to True, disables Pip. """ name = "pip" + TYPE = "pip" defaults = { "name": None, @@ -747,9 +959,23 @@ class PipStepDecorator(CondaStepDecorator): "packages": {}, "sources": [], "python": None, + "fetch_at_exec": None, "disabled": None, } + def _self_disabled(self) -> bool: + self_disabled = self.attributes["disabled"] + if self_disabled is None: + # If the user sets anything we consider that disabled = False + if ( + self.attributes["name"] + or self.attributes["pathspec"] + or self.attributes["packages"] + or self.attributes["python"] + ): + self_disabled = False + return self_disabled + def _np_conda_deps(self) -> Dict[str, str]: return {} diff --git a/metaflow_extensions/netflix_ext/plugins/conda/env_descr.py b/metaflow_extensions/netflix_ext/plugins/conda/env_descr.py index b5f82d2..3de98e9 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/env_descr.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/env_descr.py @@ -390,8 +390,10 @@ def local_file(self, pkg_format: str) -> Optional[str]: @property def local_files(self) -> Iterable[Tuple[str, str]]: - for pkg_fmt, local_path in self._local_path.items(): - yield (pkg_fmt, local_path) + for pkg_fmt in self.allowed_formats(): + local_path = self._local_path.get(pkg_fmt) + if local_path: + yield (pkg_fmt, local_path) def is_fetched(self, pkg_format: str) -> bool: # Return whether the local tar-ball for this package had to be fetched from @@ -440,31 +442,29 @@ def add_local_file( local_path: str, pkg_hash: Optional[str] = None, downloaded: bool = False, + replace: bool = False, ): # Add a local file for this package indicating whether it was downloaded existing_path = self.local_file(pkg_format) - if existing_path: - if local_path != existing_path: - raise ValueError( - "Attempting to add inconsistent local files of format %s for a package %s; " - "adding %s when already have %s" - % (pkg_format, self.filename, local_path, existing_path) - ) - else: - self._dirty = True - self._local_path[pkg_format] = local_path + if not replace and existing_path and local_path != existing_path: + raise ValueError( + "Attempting to add inconsistent local files of format %s for a package %s; " + "adding %s when already have %s" + % (pkg_format, self.filename, local_path, existing_path) + ) + known_hash = self._hashes.get(pkg_format) added_hash = pkg_hash or self.hash_pkg(local_path) - if known_hash: - if known_hash != added_hash: - raise ValueError( - "Attempting to add inconsistent local files of format %s for package %s; " - "got a hash of %s but expected %s" - % (pkg_format, self.filename, added_hash, known_hash) - ) - else: - self._dirty = True - self._hashes[pkg_format] = added_hash + if not replace and known_hash and known_hash != added_hash: + raise ValueError( + "Attempting to add inconsistent local files of format %s for package %s; " + "got a hash of %s but expected %s" + % (pkg_format, self.filename, added_hash, known_hash) + ) + self._dirty = replace or existing_path is None or known_hash is None + self._local_path[pkg_format] = local_path + self._hashes[pkg_format] = added_hash + if downloaded and pkg_format not in self._is_fetched: self._dirty = True self._is_fetched.append(pkg_format) diff --git a/metaflow_extensions/netflix_ext/plugins/conda/remote_bootstrap.py b/metaflow_extensions/netflix_ext/plugins/conda/remote_bootstrap.py index 4c7ecb7..ae4d8f7 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/remote_bootstrap.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/remote_bootstrap.py @@ -1,4 +1,5 @@ # pyright: strict, reportTypeCommentUsage=false, reportMissingTypeStubs=false +import json import os import shutil import sys @@ -14,6 +15,7 @@ from metaflow.plugins.env_escape import generate_trampolines, ENV_ESCAPE_PY from .conda import Conda +from .conda_step_decorator import CondaStepDecorator from .env_descr import EnvID from .utils import arch_id @@ -32,6 +34,17 @@ def bootstrap_environment( my_conda = Conda(my_echo_always, datastore_type, mode="remote") my_echo_always(" done in %d seconds." % int(time.time() - start)) + # Resolve a late environment if full_id is a special string _fetch_exec + if full_id == "_fetch_exec": + alias_to_fetch = CondaStepDecorator.sub_envvars_in_envname(req_id) + env_id = my_conda.env_id_from_alias(alias_to_fetch) + if env_id is None: + raise RuntimeError( + "Cannot find environment '%s' (from '%s') for arch '%s'" + % (alias_to_fetch, req_id, arch_id()) + ) + req_id = env_id.req_id + full_id = env_id.full_id resolved_env = my_conda.environment( EnvID(req_id=req_id, full_id=full_id, arch=arch_id()) ) @@ -53,6 +66,10 @@ def bootstrap_environment( pass # print("Could not find a environment escape interpreter") + # We write out the env_id to _env_id so it can be read by the outer bash script + with open("_env_id", mode="w", encoding="utf-8") as f: + json.dump(EnvID(req_id, full_id, arch_id()), f) + def setup_conda_manifest(): manifest_folder = os.path.join(os.getcwd(), DATASTORE_LOCAL_DIR) diff --git a/metaflow_extensions/netflix_ext/plugins/conda/utils.py b/metaflow_extensions/netflix_ext/plugins/conda/utils.py index d55534c..faf58ec 100644 --- a/metaflow_extensions/netflix_ext/plugins/conda/utils.py +++ b/metaflow_extensions/netflix_ext/plugins/conda/utils.py @@ -34,6 +34,7 @@ compatible_tags, _cpython_abis, cpython_tags, + mac_platforms, Tag, ) @@ -60,8 +61,8 @@ _ALL_CONDA_FORMATS = (".tar.bz2", ".conda") # NOTE: Order is important as it is a preference order _ALL_PIP_FORMATS = (".whl", ".tar.gz", ".zip") -_VALID_IMAGE_NAME = "[^a-z0-9_/]" -_VALID_TAG_NAME = "[^a-z0-9_]" +_VALID_IMAGE_NAME = "[^-a-z0-9_/]" +_VALID_TAG_NAME = "[^-a-z0-9_]" class AliasType(Enum): @@ -154,25 +155,30 @@ def pip_tags_from_arch(python_version: str, arch: str) -> List[Tag]: # This is inspired by what pip does: # https://github.com/pypa/pip/blob/0442875a68f19b0118b0b88c747bdaf6b24853ba/src/pip/_internal/utils/compatibility_tags.py py_version = tuple(map(int, python_version.split(".")[:2])) - if arch.startswith("linux-"): - detail = arch.split("-")[-1] - if detail == "64": - detail = "x86_64" + if arch == "linux-64": platforms = [ - "manylinux%s_%s" % (tag, arch) for tag in ["_2_17", "2014", "2010", "1"] + "manylinux%s_x86_64" % s + for s in ( + "1", + "2010", + "2014", + "_2_17", + "_2_18", + "_2_19", + "_2_20", + "_2_21", + "_2_23", + "_2_24", + "_2_25", + "_2_26", + "_2_27", + ) ] - platforms.append("linux_%s" % detail) + platforms.append("linux_x86_64") elif arch == "osx-64": - platforms = [ - "macosx_10_9_x86_64", - *("macosx_10_%s_universal2" % v for v in range(16, 3, -1)), - *("macosx_10_%s_universal" % v for v in range(16, 3, -1)), - ] + platforms = mac_platforms((11, 0), "x86_64") elif arch == "osx-arm64": - platforms = [ - "macosx_11_0_arm64", - *("macosx_10_%s_universal2" % v for v in range(16, 3, -1)), - ] + platforms = mac_platforms((11, 0), "arm64") else: raise InvalidEnvironmentException("Unsupported platform: %s" % arch) @@ -302,7 +308,7 @@ def resolve_env_alias(env_alias: str) -> Tuple[AliasType, str]: if re.search(_VALID_IMAGE_NAME, image_name): raise MetaflowException( "An environment name must contain only " - "lowercase alphanumeric characters, underscores and forward slashes." + "lowercase alphanumeric characters, dashes, underscores and forward slashes." ) if image_name[0] == "/" or image_name[-1] == "/": raise MetaflowException( @@ -311,7 +317,7 @@ def resolve_env_alias(env_alias: str) -> Tuple[AliasType, str]: if re.search(_VALID_TAG_NAME, image_tag): raise MetaflowException( "An environment tag name must contain only " - "lowercase alphanumeric characters and underscores." + "lowercase alphanumeric characters, dashes and underscores." ) return AliasType.GENERIC, "/".join([image_name, image_tag]) raise MetaflowException("Invalid format for environment alias: '%s'" % env_alias) @@ -428,7 +434,7 @@ def auth_from_urls(urls: List[str]) -> Optional[AuthBase]: } for url in urls: - up = urlparse(url) + up = urlparse(url.strip("'\" \t\n")) if up.hostname and up.username: auths_per_hostname[up.hostname] = HTTPBasicAuth( up.username, up.password or ""