Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory support (Issue #42) #153

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions scripts/smart-dispatch
Original file line number Diff line number Diff line change
Expand Up @@ -146,18 +146,22 @@ def main():
log_folder=path_job_logs, worker_call_suffix=worker_call_suffix)
commands = [COMMAND_STRING.format(ID=i) for i in range(args.pool)]

# TODO: use args.memPerNode instead of args.memPerNode
queue = Queue(args.queueName, CLUSTER_NAME, args.walltime, args.coresPerNode, args.gpusPerNode, float('inf'), args.modules)
queue = Queue(args.queueName, CLUSTER_NAME, args.walltime, args.coresPerNode, args.gpusPerNode, args.memPerNode, args.modules)

# Check that requested core number does not exceed node total
# Check that requested per command resources do not exceed node total
if args.coresPerCommand > queue.nb_cores_per_node:
sys.stderr.write("smart-dispatch: error: coresPerCommand exceeds nodes total: asked {req_cores} cores, nodes have {node_cores}\n"
.format(req_cores=args.coresPerCommand, node_cores=queue.nb_cores_per_node))
sys.exit(2)

if args.memPerCommand > queue.mem_per_node:
sys.stderr.write("smart-dispatch: error: memPerCommand exceeds nodes total: asked {req_mem} Gb, nodes have {node_mem} Gb\n"
.format(req_mem=args.memPerCommand, node_mem=queue.mem_per_node))
sys.exit(2)

command_params = {'nb_cores_per_command': args.coresPerCommand,
'nb_gpus_per_command': args.gpusPerCommand,
'mem_per_command': None # args.memPerCommand
'mem_per_command': args.memPerCommand
}

prolog = []
Expand All @@ -172,7 +176,7 @@ def main():
for pbs_id, pbs in enumerate(job_generator.pbs_list):
proper_size_name = utils.jobname_generator(jobname, pbs_id)
pbs.add_options(N=proper_size_name)

if args.pbsFlags is not None:
job_generator.add_pbs_flags(args.pbsFlags.split(' '))
pbs_filenames = job_generator.write_pbs_files(path_job_commands)
Expand All @@ -193,11 +197,11 @@ def parse_arguments():
parser.add_argument('-L', '--launcher', choices=['qsub', 'msub'], required=False, help='Which launcher to use. Default: qsub')
parser.add_argument('-C', '--coresPerNode', type=int, required=False, help='How many cores there are per node.')
parser.add_argument('-G', '--gpusPerNode', type=int, required=False, help='How many gpus there are per node.')
# parser.add_argument('-M', '--memPerNode', type=int, required=False, help='How much memory there are per node (in Gb).')
parser.add_argument('-M', '--memPerNode', type=float, required=False, help='How much memory there are per node (in Gb).')

parser.add_argument('-c', '--coresPerCommand', type=int, required=False, help='How many cores a command needs.', default=1)
parser.add_argument('-g', '--gpusPerCommand', type=int, required=False, help='How many gpus a command needs.', default=1)
# parser.add_argument('-m', '--memPerCommand', type=float, required=False, help='How much memory a command needs (in Gb).')
parser.add_argument('-m', '--memPerCommand', type=float, required=False, help='How much memory a command needs (in Gb).')
parser.add_argument('-f', '--commandsFile', type=file, required=False, help='File containing commands to launch. Each command must be on a seperate line. (Replaces commandAndOptions)')

parser.add_argument('-l', '--modules', type=str, required=False, help='List of additional modules to load.', nargs='+')
Expand All @@ -224,7 +228,9 @@ def parse_arguments():
if args.queueName not in AVAILABLE_QUEUES and ((args.coresPerNode is None and args.gpusPerNode is None) or args.walltime is None):
parser.error("Unknown queue, --coresPerNode/--gpusPerNode and --walltime must be set.")
if args.coresPerCommand < 1:
parser.error("coresPerNode must be at least 1")
parser.error("coresPerCommand must be at least 1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx, good catch.

if args.memPerCommand is not None and args.memPerCommand <= 0:
parser.error("memPerCommand must be positive")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe: 'strictly' positive?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops it is


return args

Expand Down
11 changes: 9 additions & 2 deletions smartdispatch/job_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(self, queue, commands, prolog=[], epilog=[], command_params={}, bas

self.nb_cores_per_command = command_params.get('nb_cores_per_command', 1)
self.nb_gpus_per_command = command_params.get('nb_gpus_per_command', 1)
#self.mem_per_command = command_params.get('mem_per_command', 0.0)
self.mem_per_command = command_params.get('mem_per_command', None)

self.pbs_list = self._generate_base_pbs()
self._add_cluster_specific_rules()
Expand Down Expand Up @@ -80,6 +80,10 @@ def _generate_base_pbs(self):
if self.queue.nb_gpus_per_node > 0 and self.nb_gpus_per_command > 0:
nb_commands_per_node = min(nb_commands_per_node, self.queue.nb_gpus_per_node // self.nb_gpus_per_command)

# Limit number of running commands by the amount of available memory on the node.
if self.mem_per_command is not None:
nb_commands_per_node = min(nb_commands_per_node, self.queue.mem_per_node // self.mem_per_command)

pbs_files = []
# Distribute equally the jobs among the PBS files and generate those files
for i, commands in enumerate(utils.chunks(self.commands, n=nb_commands_per_node)):
Expand All @@ -92,9 +96,12 @@ def _generate_base_pbs(self):
resource = "1:ppn={ppn}".format(ppn=len(commands) * self.nb_cores_per_command)
if self.queue.nb_gpus_per_node > 0:
resource += ":gpus={gpus}".format(gpus=len(commands) * self.nb_gpus_per_command)

pbs.add_resources(nodes=resource)

if self.mem_per_command is not None:
resource = "{mem}Gb".format(mem=len(commands) * self.mem_per_command)
pbs.add_resources(mem=resource)

pbs.add_modules_to_load(*self.queue.modules)
pbs.add_to_prolog(*self.prolog)
pbs.add_commands(*commands)
Expand Down
30 changes: 26 additions & 4 deletions smartdispatch/tests/test_job_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ def test_generate_pbs(self):
assert_equal(job_generator.pbs_list[0].epilog, self.epilog)

def test_generate_pbs2_cpu(self):
# Should needs two PBS file
# Should need two PBS files
command_params = {'nb_cores_per_command': self.cores // 2}
job_generator = JobGenerator(self.queue, self.commands, command_params=command_params)
assert_equal(len(job_generator.pbs_list), 2)
assert_equal(job_generator.pbs_list[0].commands, self.commands[:2])
assert_equal(job_generator.pbs_list[1].commands, self.commands[2:])

def test_generate_pbs4_cpu(self):
# Should needs four PBS file
# Should need four PBS files
command_params = {'nb_cores_per_command': self.cores}
job_generator = JobGenerator(self.queue, self.commands, command_params=command_params)
assert_equal(len(job_generator.pbs_list), 4)
Expand All @@ -64,17 +64,39 @@ def test_generate_pbs4_cpu(self):
# Check if needed modules for this queue are included in the PBS file
assert_equal(job_generator.pbs_list[0].modules, self.modules)

def test_generate_pbs2_mem(self):
# Should need two PBS files
command_params = {'mem_per_command': self.mem_per_node // 2}
job_generator = JobGenerator(self.queue, self.commands, command_params=command_params)
assert_equal(len(job_generator.pbs_list), 2)
assert_equal(job_generator.pbs_list[0].commands, self.commands[:2])
assert_equal(job_generator.pbs_list[1].commands, self.commands[2:])

def test_generate_pbs4_mem(self):
# Should need four PBS files
command_params = {'mem_per_command': self.mem_per_node}
job_generator = JobGenerator(self.queue, self.commands, command_params=command_params)
assert_equal(len(job_generator.pbs_list), 4)
assert_equal([pbs.commands[0] for pbs in job_generator.pbs_list], self.commands)

# Since queue has no gpus it should not be specified in PBS resource `nodes`
assert_true('gpus' not in job_generator.pbs_list[0].resources['nodes'])

# Test modules to load
# Check if needed modules for this queue are included in the PBS file
assert_equal(job_generator.pbs_list[0].modules, self.modules)

def test_generate_pbs2_gpu(self):
# Test nb_gpus_per_command argument
# Should needs two PBS file
# Should need two PBS files
command_params = {'nb_gpus_per_command': self.gpus // 2}
job_generator = JobGenerator(self.queue_gpu, self.commands, command_params=command_params)
assert_equal(len(job_generator.pbs_list), 2)
assert_equal(job_generator.pbs_list[0].commands, self.commands[:2])
assert_equal(job_generator.pbs_list[1].commands, self.commands[2:])

def test_generate_pbs4_gpu(self):
# Should needs four PBS files
# Should need four PBS files
command_params = {'nb_gpus_per_command': self.gpus}
job_generator = JobGenerator(self.queue_gpu, self.commands, command_params=command_params)
assert_equal(len(job_generator.pbs_list), 4)
Expand Down
21 changes: 18 additions & 3 deletions tests/test_smart_dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,20 @@ def setUp(self):
self.nb_commands = len(self.commands)

scripts_path = abspath(pjoin(os.path.dirname(__file__), os.pardir, "scripts"))
self.smart_dispatch_command = '{} -C 1 -q test -t 5:00 -x'.format(pjoin(scripts_path, 'smart-dispatch'))
self.smart_dispatch_command = '{} -C 1 -M 1 -q test -t 5:00 -x'.format(pjoin(scripts_path, 'smart-dispatch'))
self.launch_command = "{0} launch {1}".format(self.smart_dispatch_command, self.folded_commands)
self.resume_command = "{0} resume {{0}}".format(self.smart_dispatch_command)

smart_dispatch_command_with_pool = '{} --pool 10 -C 1 -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch'))
smart_dispatch_command_with_pool = '{} --pool 10 -C 1 -M 1 -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch'))
self.launch_command_with_pool = smart_dispatch_command_with_pool.format('launch ' + self.folded_commands)
self.nb_workers = 10

smart_dispatch_command_with_cores = '{} -C 1 -c {{cores}} -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch'))
smart_dispatch_command_with_cores = '{} -C 1 -M 1 -c {{cores}} -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch'))
self.launch_command_with_cores = smart_dispatch_command_with_cores.format('launch ' + self.folded_commands, cores='{cores}')

smart_dispatch_command_with_memory = '{} -C 1 -M 1 -m {{memory}} -q test -t 5:00 -x {{0}}'.format(pjoin(scripts_path, 'smart-dispatch'))
self.launch_command_with_memory = smart_dispatch_command_with_memory.format('launch ' + self.folded_commands, memory='{memory}')

self._cwd = os.getcwd()
os.chdir(self.testing_dir)

Expand Down Expand Up @@ -95,6 +98,18 @@ def test_main_launch_with_cores_command(self):
assert_equal(exit_status_100, 2)
assert_true(os.path.isdir(self.logs_dir))

def test_main_launch_with_memory_command(self):
# Actual test
exit_status_0 = call(self.launch_command_with_memory.format(memory=0), shell=True)
exit_status_05 = call(self.launch_command_with_memory.format(memory=0.5), shell=True)
exit_status_100 = call(self.launch_command_with_memory.format(memory=100), shell=True)

# Test validation
assert_equal(exit_status_0, 2)
assert_equal(exit_status_05, 0)
assert_equal(exit_status_100, 2)
assert_true(os.path.isdir(self.logs_dir))

def test_main_resume(self):
# Setup
call(self.launch_command, shell=True)
Expand Down