Skip to content

Commit

Permalink
A new feature: fetch_at_exec allows the delaying of fetching environm…
Browse files Browse the repository at this point in the history
…ents from deploy (#12)

time to execute time.

Various fixes from upstream:
  - fix an issue when creating an environment with a notebook
  - improved cross-platform resolution
  - fix various corner cases
  - improved selection of pip/conda decorator allowing for third party extension decorators
  - improved behavior of disabled=True so that if conda_base/pip_base has disabled=True, any
    conda/pip decorator with anything in it will turn disable to False
  - improved where files are downloaded on a remote machine (to be cleaner)
  - improved handling of compiled packages to download fewer times and also fix corner cases
  - fetch latest mutable aliases if needed instead of going off of local cache
  • Loading branch information
romain-intel authored Jun 13, 2023
1 parent 01f47a6 commit 4d4ec7e
Show file tree
Hide file tree
Showing 8 changed files with 464 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,10 @@ def create(

cast(Conda, obj.conda).write_out_environments()

# We are going to be creating this new environment going forward (not the
# initial env we got)
_, env, _ = next(resolver.resolved_environments())

delta_time = int(time.time() - start)
obj.echo(" done in %d second%s." % (delta_time, plural_marker(delta_time)))

Expand Down
92 changes: 71 additions & 21 deletions metaflow_extensions/netflix_ext/plugins/conda/conda.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def env_type_for_deps(deps: Sequence[TStr]) -> EnvType:
def default_conda_channels(self) -> List[str]:
if not self._found_binaries:
self._find_conda_binary()
return list(self._info["channels"])
return list(self._info["channels"] or [])

@property
def default_pip_sources(self) -> List[str]:
Expand Down Expand Up @@ -790,14 +790,15 @@ def env_id_from_alias(
% (env_alias, alias_type.value, " " if env_id else " not ", resolved_alias)
)

if env_id:
return env_id

if not local_only and self._storage is not None:
env_id = self._remote_fetch_alias([(alias_type, resolved_alias)], arch)
if env_id:
return env_id[0]
return None
if (
not local_only
and self._storage is not None
and (env_id is None or is_alias_mutable(alias_type, resolved_alias))
):
env_id_list = self._remote_fetch_alias([(alias_type, resolved_alias)], arch)
if env_id_list:
env_id = env_id_list[0]
return env_id

def environment_from_alias(
self, env_alias: str, arch: Optional[str] = None, local_only: bool = False
Expand Down Expand Up @@ -2032,7 +2033,7 @@ def _resolve_env_with_pip(
implementations = [x.interpreter for x in supported_tags]
extra_args = (
"--only-binary=:all:",
# This seems to overly constrain things so skipping for now
# Seems to overly constrain stuff
# *(
# chain.from_iterable(
# product(["--implementation"], set(implementations))
Expand All @@ -2048,7 +2049,38 @@ def _resolve_env_with_pip(
# pip to it using the `--find-links` argument.
local_packages_dict = {} # type: Dict[str, PackageSpecification]
if local_packages:
os.makedirs(os.path.join(pip_dir, "local_packages"))
# This is a bit convoluted but we try to avoid downloading packages
# that we already have but we also don't want to point pip to a directory
# full of packages that we may not want to use so we will create symlinks
# to packages we already have and download the others
base_local_pip_packages = os.path.join(self._package_dirs[0], "pip")
tmp_local_pip_packages = os.path.realpath(
os.path.join(pip_dir, "local_packages", "pip")
)
os.makedirs(tmp_local_pip_packages)
new_local_packages = [] # type: List[PackageSpecification]
for p in local_packages:
for fmt in PipPackageSpecification.allowed_formats():
filename = "%s%s" % (p.filename, fmt)
if os.path.isfile(
os.path.join(base_local_pip_packages, filename)
):
os.symlink(
os.path.join(base_local_pip_packages, filename),
os.path.join(tmp_local_pip_packages, filename),
)
local_packages_dict[
os.path.join(tmp_local_pip_packages, filename)
] = p
p.add_local_file(
fmt,
os.path.join(base_local_pip_packages, filename),
replace=True,
)
break
else:
new_local_packages.append(p)
local_packages = new_local_packages
# This will not fetch on the web so no need for auth object
self._lazy_fetch_packages(
local_packages, None, os.path.join(pip_dir, "local_packages")
Expand Down Expand Up @@ -2119,18 +2151,28 @@ def _resolve_env_with_pip(
packages_to_build.append(dl_info)
else:
# A local wheel or tarball
if url in local_packages_dict:
debug.conda_exec("This is a known local package")
if local_path in local_packages_dict:
# We are going to move this file to a less "temporary"
# location so that it can be installed if needed

pkg_spec = local_packages_dict[local_path]
filename = os.path.split(local_path)[1]
file_format = correct_splitext(filename)[1]
shutil.move(local_path, self._package_dirs[0])
pkg_spec.add_local_file(
file_format,
os.path.join(self._package_dirs[0], filename),
)
if not os.path.islink(local_path):
debug.conda_exec("Known package -- moving in place")
filename = os.path.split(local_path)[1]
file_format = correct_splitext(filename)[1]
shutil.move(
local_path,
os.path.join(self._package_dirs[0], "pip"),
)
pkg_spec.add_local_file(
file_format,
os.path.join(
self._package_dirs[0], "pip", filename
),
replace=True,
)
else:
debug.conda_exec("Known package already in place")
result.append(pkg_spec)
else:
parse_result = parse_explicit_path_pip(url)
Expand Down Expand Up @@ -3168,7 +3210,15 @@ def _install_remote_conda(self):
from metaflow.plugins import DATASTORES

# We download the installer and return a path to it
final_path = os.path.join(os.getcwd(), "__conda_installer")
# To be clean, we try to be in the parent of our current directory to avoid
# polluting the user code. If that is not possible though, we create something
# inside the current directory
parent_dir = os.path.dirname(os.getcwd())
if os.access(parent_dir, os.W_OK):
final_path = os.path.join(parent_dir, "conda_env", "__conda_installer")
else:
final_path = os.path.join(os.getcwd(), "conda_env", "__conda_installer")
os.makedirs(os.path.dirname(final_path))

path_to_fetch = os.path.join(
CONDA_REMOTE_INSTALLER_DIRNAME,
Expand Down
45 changes: 32 additions & 13 deletions metaflow_extensions/netflix_ext/plugins/conda/conda_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,20 @@
Optional,
Set,
Tuple,
Union,
cast,
)

from metaflow.plugins.datastores.local_storage import LocalStorage
from metaflow.flowspec import FlowSpec

from metaflow.metaflow_config import (
CONDA_MAGIC_FILE_V2,
)
from metaflow.exception import MetaflowException

from metaflow.metaflow_config import CONDA_MAGIC_FILE_V2

from metaflow.metaflow_environment import MetaflowEnvironment


from .envsresolver import EnvsResolver
from .utils import get_conda_manifest_path

Expand Down Expand Up @@ -71,7 +73,7 @@ def init_environment(self, echo: Callable[..., None]):
# Figure out the environments that we need to resolve for all steps
# We will resolve all unique environments in parallel
step_conda_dec = get_conda_decorator(self._flow, step.name)
if step_conda_dec.is_enabled():
if step_conda_dec.is_enabled() and not step_conda_dec.is_fetch_at_exec():
resolver.add_environment_for_step(step.name, step_conda_dec)

resolver.resolve_environments(echo)
Expand Down Expand Up @@ -120,12 +122,25 @@ def decospecs(self) -> Tuple[str, ...]:
# We will later resolve which to keep.
return ("conda", "pip") + self.base_env.decospecs()

def _get_env_id(self, step_name: str) -> Optional[EnvID]:
def _get_env_id(self, step_name: str) -> Optional[Union[str, EnvID]]:
conda_decorator = get_conda_decorator(self._flow, step_name)
if conda_decorator.is_enabled():
resolved_env = cast(Conda, self._conda).environment(conda_decorator.env_id)
if resolved_env:
return resolved_env.env_id
if not conda_decorator.is_fetch_at_exec():
resolved_env = cast(Conda, self._conda).environment(
conda_decorator.env_id
)
if resolved_env:
return resolved_env.env_id
else:
raise MetaflowException(
"Cannot find environment for step '%s'" % step_name
)
else:
resolved_env_id = os.environ.get("_METAFLOW_CONDA_ENV")
if resolved_env_id:
return EnvID(*json.loads(resolved_env_id))
# Here we will return the name of the environment
return conda_decorator.from_env_name_unresolved
return None

def _get_executable(self, step_name: str) -> Optional[str]:
Expand All @@ -140,6 +155,12 @@ def bootstrap_commands(self, step_name: str, datastore_type: str) -> List[str]:
# Bootstrap conda and execution environment for step
env_id = self._get_env_id(step_name)
if env_id is not None:
if isinstance(env_id, EnvID):
arg1 = env_id.req_id
arg2 = env_id.full_id
else:
arg1 = env_id
arg2 = "_fetch_exec"
return [
"export CONDA_START=$(date +%s)",
"echo 'Bootstrapping environment ...'",
Expand All @@ -148,17 +169,15 @@ def bootstrap_commands(self, step_name: str, datastore_type: str) -> List[str]:
"metaflow_extensions.netflix_ext.plugins.conda",
self._flow.name,
step_name,
env_id.req_id,
env_id.full_id,
arg1,
arg2,
datastore_type,
),
"export _METAFLOW_CONDA_ENV='%s'"
% json.dumps(env_id).replace('"', '\\"'),
"export _METAFLOW_CONDA_ENV=$(cat _env_id)",
"export PYTHONPATH=$(pwd)/_escape_trampolines:$(printenv PYTHONPATH)",
"echo 'Environment bootstrapped.'",
"export CONDA_END=$(date +%s)",
]
# TODO: Add the PATH part (need conda directory)
return []

def add_to_package(self) -> List[Tuple[str, str]]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ class CondaFlowDecorator(FlowDecorator):
python : Optional[str]
Version of Python to use, e.g. '3.7.4'. If not specified, the current Python
version will be used.
fetch_at_exec : bool, default False
If set to True, the environment will be fetched when the task is
executing as opposed to at the beginning of the flow (or at deploy time if
deploying to a scheduler). This option requires name or pathspec to be
specified. This is useful, for example, if you want this step to always use
the latest named environment when it runs as opposed to the latest when it
is deployed.
disabled : bool, default False
If set to True, disables Conda
"""
Expand All @@ -46,6 +53,7 @@ class CondaFlowDecorator(FlowDecorator):
"pip_packages": {},
"pip_sources": [],
"python": None,
"fetch_at_exec": None,
"disabled": None,
}

Expand Down Expand Up @@ -102,6 +110,13 @@ class PipFlowDecorator(FlowDecorator):
python : Optional[str]
Version of Python to use, e.g. '3.7.4'. If not specified, the current version
will be used.
fetch_at_exec : bool, default False
If set to True, the environment will be fetched when the task is
executing as opposed to at the beginning of the flow (or at deploy time if
deploying to a scheduler). This option requires name or pathspec to be
specified. This is useful, for example, if you want this step to always use
the latest named environment when it runs as opposed to the latest when it
is deployed.
disabled : bool, default False
If set to True, disables Pip
"""
Expand All @@ -114,6 +129,7 @@ class PipFlowDecorator(FlowDecorator):
"packages": {},
"sources": [],
"python": None,
"fetch_at_exec": None,
"disabled": None,
}

Expand Down
Loading

0 comments on commit 4d4ec7e

Please sign in to comment.