Skip to content

Commit

Permalink
Merge pull request #38 from flatironinstitute/dev
Browse files Browse the repository at this point in the history
v0.5.3
  • Loading branch information
asistradition authored Mar 25, 2021
2 parents 2814f4c + 94c54bd commit 22bd8dd
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 17 deletions.
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright (c) 2016-2020 The Simons Foundation, Inc.
Copyright (c) 2016-2021 The Simons Foundation, Inc.
All rights reserved.

Redistribution and use in source and binary forms, with or without
Expand Down
13 changes: 12 additions & 1 deletion docs/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
Change Log
==========

Inferelator v0.5.2 `January 29, 2021`
Inferelator v0.5.3 `March 22, 2021`
--------------------------------------

New Functionality:

- Added the ability to control threads-per-process when using dask

Bug Fixes:

- Fixed bug in result dataframe that failed to create columns in older versions of pandas

Inferelator v0.5.2 `January 29, 2021`
-------------------------------------

New Functionality:

- Added flag ``.set_shuffle_parameters(make_data_noise=True)`` to model on randomly generated noise
- Output TSV files are gzipped by default
- Added ``.set_output_file_names()`` as interface to change output file names
Expand Down
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
author = 'Chris Jackson'

# The full version, including alpha/beta/rc tags
release = 'v0.5.2'
release = 'v0.5.3'


# -- General configuration ---------------------------------------------------
Expand Down
60 changes: 48 additions & 12 deletions inferelator/distributed/dask_cluster_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@

_DEFAULT_CONDA_ACTIVATE = "source ~/.local/anaconda3/bin/activate"
_DEFAULT_NUM_JOBS = 1
_DEFAULT_THREADS_PER_WORKER = 1
_DEFAULT_WORKERS_PER_JOB = 20
_DEFAULT_MEM_PER_JOB = '62GB'
_DEFAULT_INTERFACE = 'ib0'
_DEFAULT_WALLTIME = '1:00:00'

_DEFAULT_ENV_EXTRA = ['module purge',
'export MKL_NUM_THREADS=1',
'export OPENBLAS_NUM_THREADS=1',
'export NUMEXPR_NUM_THREADS=1']
_DEFAULT_ENV_EXTRA = ['module purge']

_THREAD_CONTROL_ENV = ['export MKL_NUM_THREADS={t}',
'export OPENBLAS_NUM_THREADS={t}',
'export NUMEXPR_NUM_THREADS={t}']

_DEFAULT_CONTROLLER_EXTRA = ['--nodes 1', '--ntasks-per-node 1']

Expand All @@ -36,25 +38,41 @@
"_interface": "ib0",
"_job_extra_env_commands": copy.copy(_DEFAULT_ENV_EXTRA)
},

"rusty_ccb": {"_job_n_workers": 28,
"_num_local_workers": 25,
"_job_mem": "498GB",
"_job_time": "48:00:00",
"_interface": "ib0",
"_queue": "ccb",
"_job_extra_env_commands": copy.copy(_DEFAULT_ENV_EXTRA)},
"_job_extra_env_commands": copy.copy(_DEFAULT_ENV_EXTRA)
},

"rusty_preempt": {"_job_n_workers": 40,
"_num_local_workers": 35,
"_job_mem": "766GB",
"_job_time": "48:00:00",
"_interface": "ib0",
"_queue": "preempt",
"_job_extra_env_commands": copy.copy(_DEFAULT_ENV_EXTRA),
"_job_slurm_commands": copy.copy(_DEFAULT_ENV_EXTRA) + ["--qos=preempt",
"--constraint=info"]}
"_job_slurm_commands": copy.copy(_DEFAULT_CONTROLLER_EXTRA) + ["--qos=preempt",
"--constraint=info"]
},

"rusty_rome": {"_job_n_workers": 64,
"_job_n_threads": 2,
"_num_local_workers": 60,
"_job_mem": "990GB",
"_job_time": "24:00:00",
"_interface": "ib0",
"_queue": "ccb",
"_job_extra_env_commands": copy.copy(_DEFAULT_ENV_EXTRA),
"_job_slurm_commands": copy.copy(_DEFAULT_CONTROLLER_EXTRA) + ["--constraint=rome"]
},
}

_DEFAULT_LOCAL_WORKER_COMMAND = "dask-worker {a} --nprocs {p} --nthreads 1 --memory-limit 0 --local-directory {d}"

_DEFAULT_LOCAL_WORKER_COMMAND = "dask-worker {a} --nprocs {p} --nthreads {t} --memory-limit 0 --local-directory {d}"

try:
_DEFAULT_LOCAL_DIR = os.environ['TMPDIR']
Expand Down Expand Up @@ -114,10 +132,12 @@ class DaskHPCClusterController(AbstractController):
# Job variables
_job_n = _DEFAULT_NUM_JOBS
_job_n_workers = _DEFAULT_WORKERS_PER_JOB
_worker_n_threads = _DEFAULT_THREADS_PER_WORKER
_job_mem = _DEFAULT_MEM_PER_JOB
_job_time = _DEFAULT_WALLTIME
_job_slurm_commands = copy.copy(_DEFAULT_CONTROLLER_EXTRA)
_job_extra_env_commands = copy.copy(_DEFAULT_ENV_EXTRA)
_job_threading_commands = copy.copy(_THREAD_CONTROL_ENV)

@classmethod
def connect(cls, *args, **kwargs):
Expand All @@ -130,11 +150,12 @@ def connect(cls, *args, **kwargs):
project=cls._project,
interface=cls._interface,
walltime=cls._job_time,
job_cpu=cls._job_n_workers,
cores=cls._job_n_workers,
job_cpu=cls._job_n_workers * cls._worker_n_threads,
cores=cls._job_n_workers * cls._worker_n_threads,
processes=cls._job_n_workers,
threads = cls._worker_n_threads,
job_mem=cls._job_mem,
env_extra=cls._job_extra_env_commands,
env_extra=cls._config_env(),
local_directory=cls._local_directory,
memory=cls._job_mem,
job_extra=cls._job_slurm_commands,
Expand Down Expand Up @@ -180,7 +201,8 @@ def use_default_configuration(cls, known_config, n_jobs=1):
utils.Debug.vprint(cls._config_str(), level=1)

@classmethod
def set_job_size_params(cls, n_jobs=None, n_cores_per_job=None, mem_per_job=None, walltime=None):
def set_job_size_params(cls, n_jobs=None, n_cores_per_job=None, mem_per_job=None, walltime=None,
n_workers_per_job=None, n_threads_per_worker=None):
"""
Set the job size parameters
Expand All @@ -197,15 +219,24 @@ def set_job_size_params(cls, n_jobs=None, n_cores_per_job=None, mem_per_job=None
:param walltime: The time limit per worker job.
For SLURM, this is setting #SBATCH --time
:type walltime: str
:param n_workers_per_job: The number of worker jobs to start
SLURM will allocate n_workers_per_job * n_threads_per_worker cores per job.
:type n_workers_per_job: int
:param n_threads_per_worker: The number of threads to give each worker job.
SLURM will allocate n_workers_per_job * n_threads_per_worker cores per job.
"""

check.argument_integer(n_jobs, allow_none=True)
check.argument_integer(n_cores_per_job, allow_none=True)
check.argument_integer(n_threads_per_worker, allow_none=True)

cls._job_n = n_jobs if n_jobs is not None else cls._job_n
cls._job_n_workers = n_cores_per_job if n_cores_per_job is not None else cls._job_n_workers
cls._job_mem = mem_per_job if mem_per_job is not None else cls._job_mem
cls._job_time = walltime if walltime is not None else cls._job_time
cls._worker_n_threads = n_threads_per_worker if n_threads_per_worker is not None else cls._worker_n_threads
cls._job_n_workers = n_workers_per_job if n_workers_per_job is not None else cls._job_n_workers

@classmethod
def set_cluster_params(cls, queue=None, project=None, interface=None, local_workers=None):
Expand Down Expand Up @@ -328,6 +359,10 @@ def _config_str(cls):
"ENV: " + "\n\t".join(cls._job_extra_env_commands)]) + "\n"
return status.format(n=cls._job_n, w=cls._job_n_workers, m=cls._job_mem, q=cls._queue, p=cls._project)

@classmethod
def _config_env(cls):
return [s.format(t=cls._worker_n_threads) for s in cls._job_threading_commands] + cls._job_extra_env_commands

@classmethod
def _scale_jobs(cls):
"""
Expand All @@ -354,6 +389,7 @@ def _add_local_node_workers(cls, num_workers):

if num_workers is not None and num_workers > 0:
cmd = _DEFAULT_LOCAL_WORKER_COMMAND.format(p=num_workers,
t=cls._worker_n_threads,
a=cls._local_cluster.scheduler_address,
d=cls._local_directory)
out_handle = open("slurm-{i}.out".format(i=_DEFAULT_SLURM_ID), mode="w")
Expand Down
2 changes: 1 addition & 1 deletion inferelator/postprocessing/model_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def compute_confusion_matrix(data, rank_col=CONFIDENCE_COLUMN, gs_col=GOLD_STAND
RankSummingMetric.transform_column(df, rank_col, FN, 'min')

# Stick confusion results back onto the data and return it
data[[TP, FP, TN, FN]] = pd.NA
data[TP], data[FP], data[TN], data[FN] = pd.NA, pd.NA, pd.NA, pd.NA
data.loc[valid_gs_idx, [TP, FP, TN, FN]] = df[[TP, FP, TN, FN]]

if _reindex is not None:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from setuptools import setup, find_packages

# Current Inferelator Version Number
version = "0.5.2"
version = "0.5.3"


# Description from README.md
Expand Down

0 comments on commit 22bd8dd

Please sign in to comment.