diff --git a/chipstream/manager.py b/chipstream/manager.py index 32932dd..58dfb37 100644 --- a/chipstream/manager.py +++ b/chipstream/manager.py @@ -15,7 +15,13 @@ class JobStillRunningError(BaseException): class ChipStreamJobManager: def __init__(self): - self._path_list = [] + #: Input path list containing tuples of (path, state-string) for + #: each input path. This is a list that is shared with the worker + #: thread and kept updated by the worker during the run. + self._path_in_list = [] + #: Output directory for data processing. If set to None, files + #: are created alongside the input files. + self._path_out = None self._runner_list = [] self._worker = None self.busy_lock = threading.Lock() @@ -24,15 +30,15 @@ def __getitem__(self, index): runner = self.get_runner(index) if runner is None: status = {"progress": 0, - "state": self._path_list[index][1], + "state": self._path_in_list[index][1], } else: status = runner.get_status() - status["path"] = str(self._path_list[index][0]) + status["path"] = str(self._path_in_list[index][0]) return status def __len__(self): - return len(self._path_list) + return len(self._path_in_list) @property def current_index(self): @@ -41,11 +47,11 @@ def current_index(self): def add_path(self, path): if not self.is_busy(): # Only append paths if we are currently not busy - self._path_list.append([path, "created"]) + self._path_in_list.append([path, "created"]) def clear(self): """Clear all data""" - self._path_list.clear() + self._path_in_list.clear() self._runner_list.clear() self._worker = None @@ -75,16 +81,59 @@ def get_info(self, index): # Fallback for debugging return traceback.format_exc() + def get_paths_out(self): + """Return output path list""" + pin = self._path_in_list + if self._path_out is None: + pout = [pp[0].with_name(pp[0].stem + "_dcn.rtdc") for pp in pin] + else: + # Get common stem for all paths + # First, check whether all input files are on the same anchor. + anchors = set([pp.anchor for pp, _ in self._path_in_list]) + if len(anchors) == 1: + common_parent = self._path_in_list[0][0].parent + for pp, _ in self._path_in_list[1:]: + for parent in pp.parents: + if common_parent.is_relative_to(parent): + common_parent = parent + break + pout = [] + for pp, _ in self._path_in_list: + prel = pp.relative_to(common_parent) + prel_dcn = prel.with_name(prel.stem + "_dcn.rtdc") + pout.append(self._path_out / prel_dcn) + else: + # This is a very weird scenario on Windows. The user added + # files from different drives. We have to remove the anchor + # part and compute relative files with a placeholder for + # the anchor. + # TODO: Find a way to test this. + pout = [] + for pp, _ in self._path_in_list: + # relative path to anchor + prel = pp.relative_to(pp.anchor) + # placeholder for anchor + anch = "".join([ch for ch in pp.anchor if ch.isalnum()]) + pout.append(self._path_out / anch / prel) + return pout + + def get_paths_in(self): + """Return input path list""" + return [pp[0] for pp in self._path_in_list] + def get_runner(self, index): if index >= len(self._runner_list): return None else: return self._runner_list[index] - def run_all_in_thread(self, job_kwargs=None, callback_when_done=None): + def run_all_in_thread(self, + job_kwargs: Dict = None, + callback_when_done: callable = None): if job_kwargs is None: job_kwargs = {} - self._worker = JobWorker(paths=self._path_list, + self._worker = JobWorker(paths_in=self._path_in_list, + paths_out=self.get_paths_out(), job_kwargs=job_kwargs, runners=self._runner_list, busy_lock=self.busy_lock, @@ -92,6 +141,9 @@ def run_all_in_thread(self, job_kwargs=None, callback_when_done=None): ) self._worker.start() + def set_output_path(self, path_out): + self._path_out = pathlib.Path(path_out) + class ErrorredRunner: """Convenience class replacing a high-level failed runner""" @@ -106,7 +158,8 @@ def get_status(self): class JobWorker(threading.Thread): def __init__(self, - paths: List[List[pathlib.Path | str]], + paths_in: List[List[pathlib.Path | str]], + paths_out: List[List[pathlib.Path | str]], job_kwargs: Dict, runners: List, busy_lock: threading.Lock = None, @@ -117,10 +170,12 @@ def __init__(self, Parameters ---------- - paths: - List of input paths to process + paths_in: + List of tuples (path, state) of the input data + paths_out: + List of output paths for each item in `paths_in` job_kwargs: - List of keyword arguments for the DCNumJob instace + List of keyword arguments for the DCNumJob instance runners: Empty list which is filled with runner instances busy_lock: @@ -133,7 +188,8 @@ def __init__(self, pipeline identifiers. """ super(JobWorker, self).__init__(*args, **kwargs) - self.paths = paths + self.paths_in = paths_in + self.paths_out = paths_out self.jobs = [] self.runners = runners self.job_kwargs = job_kwargs @@ -145,22 +201,24 @@ def run(self): with self.busy_lock: self.runners.clear() # reset all job states - [pp.__setitem__(1, "created") for pp in self.paths] + [pp.__setitem__(1, "created") for pp in self.paths_in] # run jobs - for ii, (pp, _) in enumerate(self.paths): + for ii, (pp, _) in enumerate(self.paths_in): try: - self.run_job(path_in=pp) + self.run_job(path_in=pp, path_out=self.paths_out[ii]) except BaseException: # Create a dummy error runner self.runners.append(ErrorredRunner(traceback.format_exc())) # write final state to path list runner = self.runners[ii] - self.paths[ii][1] = runner.get_status()["state"] + self.paths_in[ii][1] = runner.get_status()["state"] if self.callback_when_done is not None: self.callback_when_done() - def run_job(self, path_in): - job = dclogic.DCNumPipelineJob(path_in=path_in, **self.job_kwargs) + def run_job(self, path_in, path_out): + job = dclogic.DCNumPipelineJob(path_in=path_in, + path_out=path_out, + **self.job_kwargs) self.jobs.append(job) with dclogic.DCNumJobRunner(job) as runner: self.runners.append(runner) diff --git a/tests/test_manager.py b/tests/test_manager.py index 9da4d94..3d8856c 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -1,9 +1,46 @@ +import shutil + from chipstream import manager from helper_methods import retrieve_data +def test_manager_get_paths_out(tmp_path): + path = retrieve_data( + "fmt-hdf5_cytoshot_full-features_legacy_allev_2023.zip") + + p1 = tmp_path / "foo" / "bar" / "data.rtdc" + p2 = tmp_path / "foo" / "baz" / "data.rtdc" + pout = tmp_path / "far" + p1.parent.mkdir(exist_ok=True, parents=True) + p2.parent.mkdir(exist_ok=True, parents=True) + pout.mkdir(exist_ok=True, parents=True) + shutil.copy2(path, p1) + shutil.copy2(path, p2) + + mg = manager.ChipStreamJobManager() + mg.add_path(p1) + mg.add_path(p2) + + # Sanity check + assert mg.get_paths_in()[0] == p1 + assert mg.get_paths_in()[1] == p2 + + # If no output path is specified (None), then the returned + # path list should just be the pats with "_dcn" inserted in the stem. + assert mg.get_paths_out()[0] == p1.with_name(p1.stem + "_dcn.rtdc") + assert mg.get_paths_out()[1] == p2.with_name(p2.stem + "_dcn.rtdc") + + # We now set the output path. The manager should now compute the + # common parent path for all input paths and append relative + # subdirectories. + mg.set_output_path(pout) + # Note that the common parent "foo" is missing. + assert mg.get_paths_out()[0] == pout / "bar" / "data_dcn.rtdc" + assert mg.get_paths_out()[1] == pout / "baz" / "data_dcn.rtdc" + + def test_manager_read_data(): path = retrieve_data( "fmt-hdf5_cytoshot_full-features_legacy_allev_2023.zip") @@ -49,3 +86,42 @@ def test_manager_run_defaults(): "thresh:t=-6:cle=1^f=1^clo=2|" "legacy:b=1^h=1|" "norm:o=0^s=10") + + +def test_manager_run_with_path_out(tmp_path): + path = retrieve_data( + "fmt-hdf5_cytoshot_full-features_legacy_allev_2023.zip") + + p1 = tmp_path / "foo" / "bar" / "data.rtdc" + p2 = tmp_path / "foo" / "baz" / "data.rtdc" + pout = tmp_path / "far" + p1.parent.mkdir(exist_ok=True, parents=True) + p2.parent.mkdir(exist_ok=True, parents=True) + pout.mkdir(exist_ok=True, parents=True) + shutil.copy2(path, p1) + shutil.copy2(path, p2) + + # set up manager + mg = manager.ChipStreamJobManager() + mg.add_path(p1) + mg.add_path(p2) + mg.set_output_path(pout) + + # start analysis + mg.run_all_in_thread() + # wait for the thread to join + mg.join() + + # sanity checks + assert mg[0]["progress"] == 1 + assert mg[0]["state"] == "done" + assert mg[0]["path"] == str(p1) + assert mg[1]["progress"] == 1 + assert mg[1]["state"] == "done" + assert mg[1]["path"] == str(p2) + assert mg.current_index == 1 + assert not mg.is_busy() + + # make sure the output paths exist + assert (pout / "bar" / "data_dcn.rtdc").exists() + assert (pout / "baz" / "data_dcn.rtdc").exists()