Skip to content

Commit

Permalink
Various improvements/fixes
Browse files Browse the repository at this point in the history
- Data polling call bugfix.
- Print cube summary.
- Staging files fix and improved verbose messages.
  • Loading branch information
cpelley committed Nov 15, 2024
1 parent dc4a6c1 commit 816671c
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 44 deletions.
9 changes: 7 additions & 2 deletions dagrunner/execute_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from dask.utils import apply

from dagrunner.config import CONFIG
from dagrunner.plugin_framework import NodeAwarePlugin, IGNORE_EVENT, SKIP_EVENT
from dagrunner.plugin_framework import IGNORE_EVENT, SKIP_EVENT, NodeAwarePlugin
from dagrunner.runner.schedulers import SCHEDULERS
from dagrunner.utils import (
CaptureProcMemory,
Expand Down Expand Up @@ -194,7 +194,12 @@ def plugin_executor(
logging.info(msg)

if verbose:
print(f"result: {res}")
try:
# cube looking UI
print(f"result: {res.summary(shorten=True)}")
except (TypeError, AttributeError):
# fallback
print(f"result: {res}")
return res


Expand Down
36 changes: 23 additions & 13 deletions dagrunner/plugin_framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@
import json
import os
import pickle
import shutil
import string
import subprocess
import warnings
from abc import ABC, abstractmethod
from glob import glob

from dagrunner.utils import process_path, Singleton, data_polling, stage_to_dir
from dagrunner.utils import Singleton, data_polling, process_path, stage_to_dir


class _EventBase:
Expand Down Expand Up @@ -153,7 +151,9 @@ def __call__(self, *args, **kwargs):

if self._staging_dir and args:
try:
args = stage_to_dir(*args, staging_dir=self._staging_dir, verbose=self._verbose)
args = stage_to_dir(
*args, staging_dir=self._staging_dir, verbose=self._verbose
)
except FileNotFoundError as e:
if self._ignore_missing:
warnings.warn(str(e))
Expand All @@ -175,22 +175,32 @@ def __call__(self, *args, **kwargs):

class DataPolling(Plugin):
"""A trigger plugin that completes when data is successfully polled."""

def __init__(self, timeout=60 * 2, polling=1, file_count=None, verbose=False):
self._timeout = timeout
self._polling = polling
self._file_count = file_count
self._verbose = verbose

def __call__(self, *args):
fpaths_found, fpaths_not_found = data_polling(*args, self._timeout, self._polling, self._file_count, fail_fast=True, verbose=self._verbose)
if fpaths_not_found:
raise FileNotFoundError(
f"Timeout waiting for: {'; '.join(sorted(fpaths_not_found))}"
)
if self._verbose:
msg = f"These files were polled and found: {'; '.join(sorted(fpaths_found))}"
print(msg)
return
fpaths_found, fpaths_not_found = data_polling(
*args,
timeout=self._timeout,
polling=self._polling,
file_count=self._file_count,
fail_fast=True,
verbose=self._verbose,
)
if fpaths_not_found:
raise FileNotFoundError(
f"Timeout waiting for: {'; '.join(sorted(fpaths_not_found))}"
)
if self._verbose:
msg = (
f"These files were polled and found: {'; '.join(sorted(fpaths_found))}"
)
print(msg)
return


class Input(NodeAwarePlugin):
Expand Down
63 changes: 34 additions & 29 deletions dagrunner/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,26 @@
# This file is part of 'dagrunner' and is released under the BSD 3-Clause license.
# See LICENSE in the root of the repository for full licensing details.
import argparse
from glob import glob
import inspect
import itertools
import os
import socket
import shutil
import socket
import subprocess
import threading
import time
from typing import Iterable
import warnings
from abc import ABC, abstractmethod
from glob import glob
from typing import Iterable

import dagrunner.utils._doc_styles as doc_styles


def as_iterable(obj):
if obj is None:
return []
elif not isinstance(obj, Iterable) or isinstance(
obj, (str, bytes)
):
elif not isinstance(obj, Iterable) or isinstance(obj, (str, bytes)):
obj = [obj]
return obj

Expand Down Expand Up @@ -415,7 +413,9 @@ def function_to_argparse_parse_args(*args, **kwargs):
return args, kwargs


def data_polling(*args, timeout=60 * 2, polling=1, file_count=None, fail_fast=True, verbose=False):
def data_polling(
*args, timeout=60 * 2, polling=1, file_count=None, fail_fast=True, verbose=False
):
"""
Poll for the availability of files
Expand All @@ -435,6 +435,7 @@ def data_polling(*args, timeout=60 * 2, polling=1, file_count=None, fail_fast=Tr
- fail_fast (bool): Stop when a file is not found (default is True).
- verbose (bool): Print verbose output.
"""

# Define a key function
def host_and_glob_key(path):
psplit = path.split(":")
Expand Down Expand Up @@ -471,17 +472,15 @@ def host_and_glob_key(path):
if expanded_paths:
expanded_paths = expanded_paths.split("\n")
else:
expanded_paths = list(
itertools.chain.from_iterable(map(glob, paths))
)
expanded_paths = list(itertools.chain.from_iterable(map(glob, paths)))
if expanded_paths:
if host:
fpaths_found = fpaths_found.union(set([f"{host}:{path}" for path in expanded_paths]))
fpaths_found = fpaths_found.union(
set([f"{host}:{path}" for path in expanded_paths])
)
else:
fpaths_found = fpaths_found.union(expanded_paths)
if globular and (
not file_count or len(expanded_paths) >= file_count
):
if globular and (not file_count or len(expanded_paths) >= file_count):
# globular expansion completed
paths = set()
else:
Expand All @@ -504,7 +503,9 @@ def host_and_glob_key(path):

if paths:
if host:
fpaths_not_found = fpaths_not_found.union(set([f"{host}:{path}" for path in paths]))
fpaths_not_found = fpaths_not_found.union(
set([f"{host}:{path}" for path in paths])
)
else:
fpaths_not_found = fpaths_not_found.union(paths)

Expand Down Expand Up @@ -535,9 +536,12 @@ def __str__(self):
def exists(self):
if self._host:
# check if file exists on remote host
exists = subprocess.run(
["ssh", self._host, "test", "-e", self._lpath], check=False
).returncode == 0
exists = (
subprocess.run(
["ssh", self._host, "test", "-e", self._lpath], check=False
).returncode
== 0
)
else:
exists = os.path.exists(self._lpath)
return exists
Expand All @@ -552,15 +556,15 @@ def get_identity(self):
capture_output=True,
).stdout.strip()
else:
mtime = f"{int(os.path.getmtime(self._lpath))}_{os.path.getsize(self._lpath)}"
mtime = (
f"{int(os.path.getmtime(self._lpath))}_{os.path.getsize(self._lpath)}"
)
return mtime

def copy(self, target):
if self._host:
rsync_command = ["scp", "-p", f"{self._host}:{self._lpath}", target]
subprocess.run(
rsync_command, check=True, text=True, capture_output=True
)
subprocess.run(rsync_command, check=True, text=True, capture_output=True)
else:
try:
os.link(self._lpath, target)
Expand All @@ -584,22 +588,23 @@ def stage_to_dir(*args, staging_dir, verbose=False):
os.makedirs(staging_dir, exist_ok=True)
args = list(args)
for ind, arg in enumerate(args):

fpath = _RemotePathHandler(arg)
if not fpath.exists():
raise FileNotFoundError(f"File '{fpath}' not found.")

source_mtime_size = fpath.get_identity()

target = os.path.join(
staging_dir, f"{source_mtime_size}_{os.path.basename(fpath)}"
staging_dir, f"{source_mtime_size}_{os.path.basename(str(fpath))}"
)
if not os.path.exists(target):
if verbose:
print(f"Staged '{arg}' to '{target}'")
fpath.copy(target)
else:
warnings.warn(f"Staged file {target} already exists. Skipping copy.")
warnings.warn(
f"'{arg}' staged file '{target}' already exists. Skipping copy."
)

args[ind] = target
if verbose:
print(f"Staged {arg} to {args[ind]}")
return args
return args

0 comments on commit 816671c

Please sign in to comment.