Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

include flowdir in syspath while loading file in Runner API #2182

Merged
merged 4 commits into from
Dec 18, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 36 additions & 22 deletions metaflow/runner/click_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,28 +188,42 @@ def get_inspect_param_obj(p: Union[click.Argument, click.Option], kind: str):
def extract_flow_class_from_file(flow_file: str) -> FlowSpec:
if not os.path.exists(flow_file):
raise FileNotFoundError("Flow file not present at '%s'" % flow_file)
# Check if the module has already been loaded
if flow_file in loaded_modules:
module = loaded_modules[flow_file]
else:
# Load the module if it's not already loaded
spec = importlib.util.spec_from_file_location("module", flow_file)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Cache the loaded module
loaded_modules[flow_file] = module
classes = inspect.getmembers(module, inspect.isclass)

flow_cls = None
for _, kls in classes:
if kls != FlowSpec and issubclass(kls, FlowSpec):
if flow_cls is not None:
raise MetaflowException(
"Multiple FlowSpec classes found in %s" % flow_file
)
flow_cls = kls

return flow_cls

flow_dir = os.path.dirname(os.path.abspath(flow_file))
path_was_added = False

# Only add to path if it's not already there
if flow_dir not in sys.path:
sys.path.insert(0, flow_dir)
path_was_added = True

try:
# Check if the module has already been loaded
if flow_file in loaded_modules:
module = loaded_modules[flow_file]
else:
# Load the module if it's not already loaded
spec = importlib.util.spec_from_file_location("module", flow_file)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Cache the loaded module
loaded_modules[flow_file] = module
classes = inspect.getmembers(module, inspect.isclass)

flow_cls = None
for _, kls in classes:
if kls != FlowSpec and issubclass(kls, FlowSpec):
if flow_cls is not None:
raise MetaflowException(
"Multiple FlowSpec classes found in %s" % flow_file
)
flow_cls = kls

return flow_cls
finally:
# Only remove from path if we added it
if path_was_added:
sys.path.remove(flow_dir)
madhur-ob marked this conversation as resolved.
Show resolved Hide resolved


class MetaflowAPI(object):
Expand Down
Loading