diff --git a/src/qtoolkit/io/pbs.py b/src/qtoolkit/io/pbs.py index 98acfc5..eec2ea0 100644 --- a/src/qtoolkit/io/pbs.py +++ b/src/qtoolkit/io/pbs.py @@ -88,21 +88,18 @@ class PBSIO(PBSIOBase): SUBMIT_CMD: str | None = "qsub" CANCEL_CMD: str | None = "qdel" - - def __init__(self): - super().__init__() - self.system_name = "PBS" - self.default_unit = "mb" - self.power_labels = {"kb": 0, "mb": 1, "gb": 2, "tb": 3} - self._qresources_mapping = { - "queue_name": "queue", - "job_name": "job_name", - "account": "account", - "priority": "priority", - "output_filepath": "qout_path", - "error_filepath": "qerr_path", - "project": "group_list", - } + system_name: str = "PBS" + default_unit: str = "mb" + power_labels: dict = {"kb": 0, "mb": 1, "gb": 2, "tb": 3} + _qresources_mapping: dict = { + "queue_name": "queue", + "job_name": "job_name", + "account": "account", + "priority": "priority", + "output_filepath": "qout_path", + "error_filepath": "qerr_path", + "project": "group_list", + } def extract_job_id(self, stdout): return stdout.strip() diff --git a/src/qtoolkit/io/pbs_base.py b/src/qtoolkit/io/pbs_base.py index 86a7e2b..d414dfa 100644 --- a/src/qtoolkit/io/pbs_base.py +++ b/src/qtoolkit/io/pbs_base.py @@ -24,12 +24,10 @@ class PBSIOBase(BaseSchedulerIO, ABC): SUBMIT_CMD: str | None = "qsub" CANCEL_CMD: str | None = "qdel" - - def __init__(self): - self._qresources_mapping = None - self.system_name = None - self.default_unit = None - self.power_labels = None + _qresources_mapping: dict + system_name: str + default_unit: str + power_labels: dict def parse_submit_output(self, exit_code, stdout, stderr) -> SubmissionResult: if isinstance(stdout, bytes): diff --git a/src/qtoolkit/io/sge.py b/src/qtoolkit/io/sge.py index ea0dd89..b9730be 100644 --- a/src/qtoolkit/io/sge.py +++ b/src/qtoolkit/io/sge.py @@ -125,21 +125,21 @@ class SGEIO(PBSIOBase): SUBMIT_CMD: str | None = "qsub" CANCEL_CMD: str | None = "qdel" + system_name: str = "SGE" + default_unit: str = "M" + power_labels: dict = {"k": 0, "m": 1, "g": 2, "t": 3} + _qresources_mapping: dict = { + "queue_name": "queue", + "job_name": "job_name", + "priority": "priority", + "output_filepath": "qout_path", + "error_filepath": "qerr_path", + "project": "group_list", + } def __init__(self, get_job_executable: str = "qstat"): super().__init__() self.get_job_executable = get_job_executable - self.system_name = "SGE" - self.default_unit = "M" - self.power_labels = {"k": 0, "m": 1, "g": 2, "t": 3} - self._qresources_mapping = { - "queue_name": "queue", - "job_name": "job_name", - "priority": "priority", - "output_filepath": "qout_path", - "error_filepath": "qerr_path", - "project": "group_list", - } def extract_job_id(self, stdout): match = re.search(r'Your job (\d+) \(".*?"\) has been submitted', stdout)