Skip to content

Commit

Permalink
Merge pull request #118 from matyasselmeci/pr/software-4177.pilot-sec…
Browse files Browse the repository at this point in the history
…tions

Pilot sections
  • Loading branch information
brianhlin authored Jan 7, 2021
2 parents 9f7c209 + 28145b4 commit 2c2905d
Show file tree
Hide file tree
Showing 11 changed files with 540 additions and 297 deletions.
40 changes: 40 additions & 0 deletions config/35-pilot.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
;===================================================================
; Pilots
;===================================================================

; For each pilot type, add a new pilot section.
; Each pilot name must be unique for the entire grid, so make sure to not
; pick anything generic like "MAIN". The name will be used as-is as the
; "Name" attribute in the OSG_ResourceCatalog entry.

; This data is used to determine the resources requested by pilot jobs submitted by the OSG, so it's
; important to keep it up to date.


;[Pilot PILOT_NAME]
;; The number of cores for this pilot type.
;cpucount = 1
;; The amount of memory (in megabytes) for this pilot type.
;ram_mb = 2500
;; This is a whole node pilot; cpucount and ram_mb are ignored if this is true.
;whole_node = false
;; The number of GPUs available
;gpucount = 0
;; The maximum number of pilots of this type that can be sent
;max_pilots = CHANGEME
;; The maximum wall-clock time a job is allowed to run for this pilot type,
;; in minutes.
;max_wall_time = 1440
;; The queue or partition which jobs should be submitted to in order to run on this resource.
;; Equivalent to the HTCondor grid universe classad attribute "remote_queue"
;queue =
;; True if the pilot should require singularity on the workers.
;require_singularity = true
;; The OS of the workers; allowed values are "rhel6", "rhel7", "rhel8", or "ubuntu18".
;; This is required unless require_singularity = true
;os = rhel7
;; Send test pilots?
;send_tests = true
;; A comma-separated list of VOs that are allowed to submit to this subcluster;
;; If *, uses VOs that have accounts on this CE
;allowed_vos = vo1, vo2
6 changes: 3 additions & 3 deletions osg_configure/configure_modules/infoservices.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,17 +139,17 @@ def csgbool(section, option):
self.subcluster_sections = ConfigParser.SafeConfigParser()

for section in configuration.sections():
if section.lower().startswith('subcluster') or section.lower().startswith('resource entry'):
if subcluster.is_subcluster_like(section):
self.subcluster_sections.add_section(section)
for key, value in configuration.items(section):
self.subcluster_sections.set(section, key, value)

if utilities.ce_installed() and not subcluster.check_config(configuration):
self.log("On a CE but no valid 'Subcluster' or 'Resource Entry' sections defined."
self.log("On a CE but no valid 'Subcluster', 'Resource Entry', or 'Pilot' sections defined."
" This is required to advertise the capabilities of your cluster to the central collector."
" Jobs may not be sent to this CE.",
level=logging.ERROR)
raise exceptions.SettingError("No Subcluster or Resource Entry sections")
raise exceptions.SettingError("No Subcluster, Resource Entry, or Pilot sections")

# Check resource catalog
# This is a bit clunky to parse it here and not use the result in
Expand Down
220 changes: 113 additions & 107 deletions osg_configure/modules/resourcecatalog.py
Original file line number Diff line number Diff line change
@@ -1,138 +1,144 @@
import classad
import re
import logging
from collections import namedtuple
from . import utilities

log = logging.getLogger(__name__)

class RCEntry(object):
"""Contains the data in a ResourceCatalog entry
:var name: name of the resource
:var cpus: number of cores per node
:var memory: megabytes of memory per node
:var allowed_vos: a list or string containing the names of all the VOs that are allowed to run on this resource.
Optional; if not specified, all VOs can run on this resource.
:type allowed_vos: str or list or None
:var max_wall_time: optional max run time of job on these nodes in minutes
:var queue: optional remote queue name
:var subclusters: optional list of subclusters connected to this resource
:var vo_tag: optional
:var extra_requirements: optional string of extra requirements clauses (which are ANDed together)
:var extra_transforms; optional string of transform attributes (which are appended)
"""

def __init__(self, **kwargs):
self.name = kwargs.get('name', '')
self.cpus = kwargs.get('cpus', 0)
self.memory = kwargs.get('memory', 0)
self.allowed_vos = kwargs.get('allowed_vos', None)
self.max_wall_time = kwargs.get('max_wall_time', None)
self.queue = kwargs.get('queue', '')
self.subclusters = kwargs.get('subclusters', None)
self.vo_tag = kwargs.get('vo_tag', None)
self.extra_requirements = kwargs.get('extra_requirements', '')
self.extra_transforms = kwargs.get('extra_transforms', '')
def _noop(x):
return x

def validate(self):
"""Check that the values of the RCEntry fields match the requirements for a resource catalog entry.
Some fields must be specified; some fields must be convertable to
ints if specified; some fields must be in a certain range.

:raise ValueError, TypeError: in case validation fails
:return self:
def _extra_transforms_to_classad(extra_transforms):
"""Ensure extra_transforms is surrounded by exactly one pair of brackets
so it can be parsed as a classad
"""
return classad.parseOne('[' + extra_transforms.lstrip('[ \t').rstrip('] \t') + ']')

"""
# Several of these can raise TypeError or ValueError but that's expected
if not self.name:
raise ValueError("'name' not specified")
if int(self.cpus) <= 0:
raise ValueError("'cpus' out of range at %s; must be > 0" % self.cpus)
if int(self.memory) <= 0:
raise ValueError("'memory' out of range at %s; must be > 0" % self.cpus)

if self.max_wall_time is not None:
if not int(self.max_wall_time) >= 0:
raise ValueError("'max_wall_time' out of range at %s; must be >= 0" % self.max_wall_time)
def _to_classad_list(a_list):
return "{ " + ", ".join([utilities.classad_quote(it) for it in a_list if it]) + " }"

if self.allowed_vos is not None:
if not isinstance(self.allowed_vos, (list, tuple, set, str)):
raise TypeError("'allowed_vos' is a %s; must be a string or a list/tuple/set" % type(self.allowed_vos))

if self.subclusters is not None:
if not isinstance(self.subclusters, (list, tuple, set, str)):
raise TypeError("'subclusters' is a %s; must be a string or a list/tuple/set" % type(self.subclusters))
def _str_to_classad_list(a_str):
return _to_classad_list(utilities.split_comma_separated_list(a_str))

return self

def normalize(self):
"""Convert the vaules in the RCEntry fields to their most useful form.
For example, integers are parsed, and comma or space-separated strings
are split up into lists.
:return self:
"""
self.cpus = int(self.cpus)
self.memory = int(self.memory)
if self.max_wall_time is not None:
self.max_wall_time = int(self.max_wall_time)
if self.allowed_vos is not None and isinstance(self.allowed_vos, str):
self.allowed_vos = list(filter(None, re.split('[ ,]+', self.allowed_vos)))
if self.subclusters is not None and isinstance(self.subclusters, str):
self.subclusters = list(filter(None, re.split(r'\s*,\s*', self.subclusters)))
class RCAttribute(namedtuple("RCAttribute", "rce_field classad_attr format_fn")):
"""The mapping of an RCEntry field to a classad attribute, with a format function"""

return self

def as_attributes(self):
"""Return this entry as a list of classad attributes"""
attributes = {'Name': utilities.classad_quote(self.name),
'CPUs': self.cpus,
'Memory': self.memory}
ATTRIBUTE_MAPPINGS = [
RCAttribute("name", "Name", utilities.classad_quote),
RCAttribute("cpus", "CPUs", int),
RCAttribute("memory", "Memory", int),
RCAttribute("allowed_vos", "AllowedVOs", _to_classad_list),
RCAttribute("max_wall_time", "MaxWallTime", int),
# queue is special
RCAttribute("subclusters", "Subclusters", _to_classad_list),
RCAttribute("vo_tag", "VOTag", utilities.classad_quote),
# extra_requirements is special
# extra_transforms is special
RCAttribute("gpus", "GPUs", int),
RCAttribute("max_pilots", "MaxPilots", int),
RCAttribute("whole_node", "WholeNode", bool),
RCAttribute("require_singularity", "RequireSingularity", bool),
RCAttribute("os", "OS", utilities.classad_quote),
RCAttribute("send_tests", "SendTests", bool),
RCAttribute("is_pilot", "IsPilotEntry", bool),
]

if self.max_wall_time is not None:
attributes['MaxWallTime'] = self.max_wall_time

requirements_clauses = ['TARGET.RequestCPUs <= CPUs', 'TARGET.RequestMemory <= Memory']
class RCEntry(object):
def __init__(self, **kwargs):
self.name = kwargs.get('name', '')
self.cpus = kwargs.get('cpus', 0)
self.memory = kwargs.get('memory', 0)
self.allowed_vos = kwargs.get('allowed_vos', None)
self.max_wall_time = kwargs.get('max_wall_time', None)
self.queue = kwargs.get('queue', '')
self.subclusters = kwargs.get('subclusters', None)
self.vo_tag = kwargs.get('vo_tag', None)
self.extra_requirements = kwargs.get('extra_requirements', '')
self.extra_transforms = kwargs.get('extra_transforms', '')
self.gpus = kwargs.get('gpus', None)
self.max_pilots = kwargs.get('max_pilots', None)
self.whole_node = kwargs.get('whole_node', None)
self.require_singularity = kwargs.get('require_singularity', None)
self.os = kwargs.get('os', None)
self.send_tests = kwargs.get('send_tests', None)
self.is_pilot = kwargs.get('is_pilot', None)

def get_requirements(self, attributes):
if self.is_pilot:
return None
requirements_clauses = []
if "CPUs" in attributes:
requirements_clauses.append("TARGET.RequestCPUs <= CPUs")
if "Memory" in attributes:
requirements_clauses.append("TARGET.RequestMemory <= Memory")
if "GPUs" in attributes:
requirements_clauses.append('(TARGET.RequestGPUs ?: 0) <= GPUs')
if "AllowedVOs" in attributes:
requirements_clauses.append("member(TARGET.VO, AllowedVOs)")
if "VOTag" in attributes:
requirements_clauses.append(f"TARGET.VOTag == {attributes['VOTag']}")
if self.extra_requirements:
requirements_clauses.append(self.extra_requirements)

if self.allowed_vos:
allowed_vos = "{ " + ", ".join([utilities.classad_quote(vo) for vo in self.allowed_vos]) + " }"
attributes['AllowedVOs'] = allowed_vos
requirements_clauses.append("member(TARGET.VO, AllowedVOs)")

if self.subclusters:
subclusters = "{ " + ", ".join([utilities.classad_quote(sc) for sc in self.subclusters]) + " }"
attributes['Subclusters'] = subclusters

transform_classad = classad.parseOne('[set_xcount = RequestCPUs; set_MaxMemory = RequestMemory]')

if self.vo_tag:
quoted_vo_tag = utilities.classad_quote(self.vo_tag)
attributes['VOTag'] = quoted_vo_tag
requirements_clauses.append("TARGET.VOTag == " + quoted_vo_tag)
transform_classad['set_VOTag'] = quoted_vo_tag

attributes['Requirements'] = ' && '.join(requirements_clauses)

if requirements_clauses:
return ' && '.join(requirements_clauses)
return None

def get_transform(self, attributes):
if self.is_pilot:
return None
transform_classad = classad.ClassAd()
if "CPUs" in attributes:
transform_classad["set_xcount"] = "RequestCPUs"
if "Memory" in attributes:
transform_classad["set_MaxMemory"] = "RequestMemory"
if "VOTag" in attributes:
transform_classad["set_VOTag"] = attributes["VOTag"]
if self.queue:
transform_classad['set_remote_queue'] = utilities.classad_quote(self.queue)
if self.extra_transforms:
try:
extra_transforms_classad = classad.parseOne(self._munge_extra_transforms(self.extra_transforms))
transform_classad.update(_extra_transforms_to_classad(self.extra_transforms))
except SyntaxError as e:
raise ValueError("Unable to parse 'extra_transforms': %s" % e)
transform_classad.update(extra_transforms_classad)
attributes['Transform'] = '['
for key in sorted(transform_classad.keys()):
attributes['Transform'] += " %s = %s;" % (key, transform_classad[key])
attributes['Transform'] += ' ]'

if transform_classad:
transform = "["
for key in sorted(transform_classad.keys()):
transform += f" {key} = {transform_classad[key]};"
transform += " ]"

return transform
return None

def as_attributes(self):
"""Return this entry as a list of classad attributes"""
attributes = {}

for rce_field, classad_attr, format_fn in ATTRIBUTE_MAPPINGS:
try:
val = self.__getattribute__(rce_field)
if val is not None:
attributes[classad_attr] = format_fn(val)
except AttributeError:
continue

requirements = self.get_requirements(attributes)
if requirements:
attributes['Requirements'] = requirements
transform = self.get_transform(attributes)
if transform:
attributes['Transform'] = transform

return attributes

@staticmethod
def _munge_extra_transforms(extra_transforms):
"""Ensure extra_transforms is surrounded by exactly one pair of brackets
so it can be parsed as a classad
"""
return '[' + extra_transforms.lstrip('[ \t').rstrip('] \t') + ']'


class ResourceCatalog(object):
Expand All @@ -142,7 +148,7 @@ def __init__(self):
self.entries = {}

def add_rcentry(self, rcentry):
self.entries[rcentry.name] = rcentry.normalize().validate().as_attributes()
self.entries[rcentry.name] = rcentry.as_attributes()

return self

Expand Down
Loading

0 comments on commit 2c2905d

Please sign in to comment.