Skip to content

Commit

Permalink
feat: support setting output directory in manager (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
paulmueller committed Jan 19, 2024
1 parent df36c37 commit a3399d5
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 19 deletions.
96 changes: 77 additions & 19 deletions chipstream/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@ class JobStillRunningError(BaseException):

class ChipStreamJobManager:
def __init__(self):
self._path_list = []
#: Input path list containing tuples of (path, state-string) for
#: each input path. This is a list that is shared with the worker
#: thread and kept updated by the worker during the run.
self._path_in_list = []
#: Output directory for data processing. If set to None, files
#: are created alongside the input files.
self._path_out = None
self._runner_list = []
self._worker = None
self.busy_lock = threading.Lock()
Expand All @@ -24,15 +30,15 @@ def __getitem__(self, index):
runner = self.get_runner(index)
if runner is None:
status = {"progress": 0,
"state": self._path_list[index][1],
"state": self._path_in_list[index][1],
}
else:
status = runner.get_status()
status["path"] = str(self._path_list[index][0])
status["path"] = str(self._path_in_list[index][0])
return status

def __len__(self):
return len(self._path_list)
return len(self._path_in_list)

@property
def current_index(self):
Expand All @@ -41,11 +47,11 @@ def current_index(self):
def add_path(self, path):
if not self.is_busy():
# Only append paths if we are currently not busy
self._path_list.append([path, "created"])
self._path_in_list.append([path, "created"])

def clear(self):
"""Clear all data"""
self._path_list.clear()
self._path_in_list.clear()
self._runner_list.clear()
self._worker = None

Expand Down Expand Up @@ -75,23 +81,69 @@ def get_info(self, index):
# Fallback for debugging
return traceback.format_exc()

def get_paths_out(self):
"""Return output path list"""
pin = self._path_in_list
if self._path_out is None:
pout = [pp[0].with_name(pp[0].stem + "_dcn.rtdc") for pp in pin]
else:
# Get common stem for all paths
# First, check whether all input files are on the same anchor.
anchors = set([pp.anchor for pp, _ in self._path_in_list])
if len(anchors) == 1:
common_parent = self._path_in_list[0][0].parent
for pp, _ in self._path_in_list[1:]:
for parent in pp.parents:
if common_parent.is_relative_to(parent):
common_parent = parent
break
pout = []
for pp, _ in self._path_in_list:
prel = pp.relative_to(common_parent)
prel_dcn = prel.with_name(prel.stem + "_dcn.rtdc")
pout.append(self._path_out / prel_dcn)
else:
# This is a very weird scenario on Windows. The user added
# files from different drives. We have to remove the anchor
# part and compute relative files with a placeholder for
# the anchor.
# TODO: Find a way to test this.
pout = []
for pp, _ in self._path_in_list:
# relative path to anchor
prel = pp.relative_to(pp.anchor)
# placeholder for anchor
anch = "".join([ch for ch in pp.anchor if ch.isalnum()])
pout.append(self._path_out / anch / prel)
return pout

def get_paths_in(self):
"""Return input path list"""
return [pp[0] for pp in self._path_in_list]

def get_runner(self, index):
if index >= len(self._runner_list):
return None
else:
return self._runner_list[index]

def run_all_in_thread(self, job_kwargs=None, callback_when_done=None):
def run_all_in_thread(self,
job_kwargs: Dict = None,
callback_when_done: callable = None):
if job_kwargs is None:
job_kwargs = {}
self._worker = JobWorker(paths=self._path_list,
self._worker = JobWorker(paths_in=self._path_in_list,
paths_out=self.get_paths_out(),
job_kwargs=job_kwargs,
runners=self._runner_list,
busy_lock=self.busy_lock,
callback_when_done=callback_when_done,
)
self._worker.start()

def set_output_path(self, path_out):
self._path_out = pathlib.Path(path_out)


class ErrorredRunner:
"""Convenience class replacing a high-level failed runner"""
Expand All @@ -106,7 +158,8 @@ def get_status(self):

class JobWorker(threading.Thread):
def __init__(self,
paths: List[List[pathlib.Path | str]],
paths_in: List[List[pathlib.Path | str]],
paths_out: List[List[pathlib.Path | str]],
job_kwargs: Dict,
runners: List,
busy_lock: threading.Lock = None,
Expand All @@ -117,10 +170,12 @@ def __init__(self,
Parameters
----------
paths:
List of input paths to process
paths_in:
List of tuples (path, state) of the input data
paths_out:
List of output paths for each item in `paths_in`
job_kwargs:
List of keyword arguments for the DCNumJob instace
List of keyword arguments for the DCNumJob instance
runners:
Empty list which is filled with runner instances
busy_lock:
Expand All @@ -133,7 +188,8 @@ def __init__(self,
pipeline identifiers.
"""
super(JobWorker, self).__init__(*args, **kwargs)
self.paths = paths
self.paths_in = paths_in
self.paths_out = paths_out
self.jobs = []
self.runners = runners
self.job_kwargs = job_kwargs
Expand All @@ -145,22 +201,24 @@ def run(self):
with self.busy_lock:
self.runners.clear()
# reset all job states
[pp.__setitem__(1, "created") for pp in self.paths]
[pp.__setitem__(1, "created") for pp in self.paths_in]
# run jobs
for ii, (pp, _) in enumerate(self.paths):
for ii, (pp, _) in enumerate(self.paths_in):
try:
self.run_job(path_in=pp)
self.run_job(path_in=pp, path_out=self.paths_out[ii])
except BaseException:
# Create a dummy error runner
self.runners.append(ErrorredRunner(traceback.format_exc()))
# write final state to path list
runner = self.runners[ii]
self.paths[ii][1] = runner.get_status()["state"]
self.paths_in[ii][1] = runner.get_status()["state"]
if self.callback_when_done is not None:
self.callback_when_done()

def run_job(self, path_in):
job = dclogic.DCNumPipelineJob(path_in=path_in, **self.job_kwargs)
def run_job(self, path_in, path_out):
job = dclogic.DCNumPipelineJob(path_in=path_in,
path_out=path_out,
**self.job_kwargs)
self.jobs.append(job)
with dclogic.DCNumJobRunner(job) as runner:
self.runners.append(runner)
Expand Down
76 changes: 76 additions & 0 deletions tests/test_manager.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,46 @@
import shutil

from chipstream import manager


from helper_methods import retrieve_data


def test_manager_get_paths_out(tmp_path):
path = retrieve_data(
"fmt-hdf5_cytoshot_full-features_legacy_allev_2023.zip")

p1 = tmp_path / "foo" / "bar" / "data.rtdc"
p2 = tmp_path / "foo" / "baz" / "data.rtdc"
pout = tmp_path / "far"
p1.parent.mkdir(exist_ok=True, parents=True)
p2.parent.mkdir(exist_ok=True, parents=True)
pout.mkdir(exist_ok=True, parents=True)
shutil.copy2(path, p1)
shutil.copy2(path, p2)

mg = manager.ChipStreamJobManager()
mg.add_path(p1)
mg.add_path(p2)

# Sanity check
assert mg.get_paths_in()[0] == p1
assert mg.get_paths_in()[1] == p2

# If no output path is specified (None), then the returned
# path list should just be the pats with "_dcn" inserted in the stem.
assert mg.get_paths_out()[0] == p1.with_name(p1.stem + "_dcn.rtdc")
assert mg.get_paths_out()[1] == p2.with_name(p2.stem + "_dcn.rtdc")

# We now set the output path. The manager should now compute the
# common parent path for all input paths and append relative
# subdirectories.
mg.set_output_path(pout)
# Note that the common parent "foo" is missing.
assert mg.get_paths_out()[0] == pout / "bar" / "data_dcn.rtdc"
assert mg.get_paths_out()[1] == pout / "baz" / "data_dcn.rtdc"


def test_manager_read_data():
path = retrieve_data(
"fmt-hdf5_cytoshot_full-features_legacy_allev_2023.zip")
Expand Down Expand Up @@ -49,3 +86,42 @@ def test_manager_run_defaults():
"thresh:t=-6:cle=1^f=1^clo=2|"
"legacy:b=1^h=1|"
"norm:o=0^s=10")


def test_manager_run_with_path_out(tmp_path):
path = retrieve_data(
"fmt-hdf5_cytoshot_full-features_legacy_allev_2023.zip")

p1 = tmp_path / "foo" / "bar" / "data.rtdc"
p2 = tmp_path / "foo" / "baz" / "data.rtdc"
pout = tmp_path / "far"
p1.parent.mkdir(exist_ok=True, parents=True)
p2.parent.mkdir(exist_ok=True, parents=True)
pout.mkdir(exist_ok=True, parents=True)
shutil.copy2(path, p1)
shutil.copy2(path, p2)

# set up manager
mg = manager.ChipStreamJobManager()
mg.add_path(p1)
mg.add_path(p2)
mg.set_output_path(pout)

# start analysis
mg.run_all_in_thread()
# wait for the thread to join
mg.join()

# sanity checks
assert mg[0]["progress"] == 1
assert mg[0]["state"] == "done"
assert mg[0]["path"] == str(p1)
assert mg[1]["progress"] == 1
assert mg[1]["state"] == "done"
assert mg[1]["path"] == str(p2)
assert mg.current_index == 1
assert not mg.is_busy()

# make sure the output paths exist
assert (pout / "bar" / "data_dcn.rtdc").exists()
assert (pout / "baz" / "data_dcn.rtdc").exists()

0 comments on commit a3399d5

Please sign in to comment.