Skip to content

Commit

Permalink
fix: removed obsolete MultiLaunchAgent command
Browse files Browse the repository at this point in the history
  • Loading branch information
fstagni committed Oct 11, 2023
1 parent 45235f5 commit 760d5c7
Showing 1 changed file with 0 additions and 214 deletions.
214 changes: 0 additions & 214 deletions Pilot/pilotCommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -780,19 +780,8 @@ def execute(self):
# pilotProcessors is basically the number of processors this pilot is "managing"
self.pp.pilotProcessors = numberOfProcessorsOnWN

# payloadProcessors is the max number of processors used by the single payloads.
# We store payloadProcessors in the global parameters so that other commands can more easily use it.
# (MultiLaunchAgent is right now the only consumer)
self.pp.payloadProcessors = 1
if "WholeNode" in self.pp.tags:
self.pp.payloadProcessors = self.pp.pilotProcessors
if self.pp.maxNumberOfProcessors > 0:
self.pp.payloadProcessors = min(self.pp.pilotProcessors, self.pp.maxNumberOfProcessors)

self.log.info("pilotProcessors = %d" % self.pp.pilotProcessors)
self.log.info("payloadProcessors = %d" % self.pp.payloadProcessors)
self.cfg.append('-o "/Resources/Computing/CEDefaults/NumberOfProcessors=%d"' % self.pp.pilotProcessors)
self.cfg.append('-o "/Resources/Computing/CEDefaults/NumberOfPayloadProcessors=%d"' % self.pp.payloadProcessors)

maxRAM = self.pp.queueParameters.get("MaxRAM", maxRAM)
if maxRAM:
Expand Down Expand Up @@ -1134,209 +1123,6 @@ def execute(self):
sys.exit(0)


class MultiLaunchAgent(CommandBase):
"""Prepare and launch multiple agents"""

def __init__(self, pilotParams):
"""c'tor"""
super(MultiLaunchAgent, self).__init__(pilotParams)
self.inProcessOpts = []
self.jobAgentOpts = []

def __setInProcessOpts(self):
localUid = os.getuid()
try:
import pwd

localUser = pwd.getpwuid(localUid)[0]
except KeyError:
localUser = "Unknown"
self.log.info("User Name = %s" % localUser)
self.log.info("User Id = %s" % localUid)
self.inProcessOpts = ["-s /Resources/Computing/CEDefaults"]
self.inProcessOpts.append("-o WorkingDirectory=%s" % self.pp.workingDir)
self.inProcessOpts.append("-o /LocalSite/CPUTime=%s" % (int(self.pp.jobCPUReq)))
# To prevent a wayward agent picking up and failing many jobs.
self.inProcessOpts.append("-o MaxTotalJobs=%s" % self.pp.maxCycles)
self.jobAgentOpts = [
"-o MaxCycles=%s" % self.pp.maxCycles,
"-o PollingTime=%s" % self.pp.pollingTime,
"-o StopOnApplicationFailure=%s" % self.pp.stopOnApplicationFailure,
"-o StopAfterFailedMatches=0",
]

if self.debugFlag:
self.jobAgentOpts.append("-o LogLevel=DEBUG")

if self.pp.userGroup:
self.log.debug('Setting DIRAC Group to "%s"' % self.pp.userGroup)
self.inProcessOpts.append('-o OwnerGroup="%s"' % self.pp.userGroup)

if self.pp.userDN:
self.log.debug('Setting Owner DN to "%s"' % self.pp.userDN)
self.inProcessOpts.append('-o OwnerDN="%s"' % self.pp.userDN)

if self.pp.useServerCertificate:
self.log.debug("Setting UseServerCertificate flag")
self.inProcessOpts.append("-o /DIRAC/Security/UseServerCertificate=yes")

# The instancePath is where the agent works
self.inProcessOpts.append("-o /LocalSite/InstancePath=%s" % self.pp.workingDir)

# The file pilot.cfg has to be created previously by ConfigureDIRAC
if self.pp.localConfigFile:
self.inProcessOpts.append(" -o /AgentJobRequirements/ExtraOptions=%s" % self.pp.localConfigFile)
self.inProcessOpts.extend(["--cfg", self.pp.localConfigFile])

def __startJobAgent(self):
"""Starting of the JobAgent"""

# Find any .cfg file uploaded with the sandbox or generated by previous commands

diracAgentScript = "dirac-agent"
extraCFG = []
for i in os.listdir(self.pp.rootPath):
cfg = os.path.join(self.pp.rootPath, i)
if os.path.isfile(cfg) and cfg.endswith(".cfg"):
extraCFG.append(cfg)

if self.pp.executeCmd:
# Execute user command
self.log.info("Executing user defined command: %s" % self.pp.executeCmd)
if self.pp.pythonVersion == "27":
self.exitWithError(int(os.system("source bashrc; %s" % self.pp.executeCmd) / 256))
else:
self.exitWithError(int(os.system("source diracos/diracosrc; %s" % self.pp.executeCmd) / 256))

self.log.info("Starting JobAgent")
os.environ["PYTHONUNBUFFERED"] = "yes"

pid = {}

for i in range(int(self.pp.pilotProcessors / self.pp.payloadProcessors)):
# One JobAgent per each set of payload processors, based on the
# number of processors allocated to this pilot, rounding downwards

if self.pp.ceType == "Sudo":
# Available within the SudoComputingElement as BaseUsername in the ceParameters
sudoOpts = "-o /LocalSite/BaseUsername=%s%02dp00" % (os.environ["USER"], i)
else:
sudoOpts = ""

jobAgent = "%s WorkloadManagement/JobAgent %s %s %s %s" % (
diracAgentScript,
" ".join(self.jobAgentOpts),
" ".join(self.inProcessOpts),
sudoOpts,
" ".join(extraCFG),
)

pid[i] = self.forkAndExecute(
jobAgent, os.path.join(self.pp.workingDir, "jobagent.%02d.log" % i), self.pp.installEnv
)

if not pid[i]:
self.log.error("Error executing the JobAgent %d" % i)
else:
self.log.info(
"Forked JobAgent %02d (%d/%d) with PID %d"
% (i, i + 1, int(self.pp.pilotProcessors / self.pp.payloadProcessors), pid[i])
)

# Not very subtle this. How about a time limit??
for i in range(int(self.pp.pilotProcessors / self.pp.payloadProcessors)):
os.waitpid(pid[i], 0)

for i in range(int(self.pp.pilotProcessors / self.pp.payloadProcessors)):
shutdownMessage = self.__parseJobAgentLog(os.path.join(self.pp.workingDir, "jobagent.%02d.log" % i))
open(os.path.join(self.pp.workingDir, "shutdown_message.%02d" % i), "w").write(shutdownMessage)
print(shutdownMessage)

# FIX ME: this effectively picks one at random. Should be the last one to finish chronologically.
# Not in order of being started.
open(os.path.join(self.pp.workingDir, "shutdown_message"), "w").write(shutdownMessage)

fs = os.statvfs(self.pp.workingDir)
diskSpace = int(fs[4] * fs[0] / 1024 / 1024)
self.log.info("DiskSpace (MB) = %s" % diskSpace)

def __parseJobAgentLog(self, logFile):
"""Parse the JobAgent log and return shutdown message"""

# catch-all in case nothing matches
shutdownMessage = "700 Failed, probably JobAgent or Application problem"

# log file patterns to look for and corresponding messages
messageMappings = [
# Variants of: "100 Shutdown as requested by the VM's host/hypervisor"
######################################################################
# There are other errors from the TimeLeft handling, but we let those go
# to the 600 Failed default
['INFO: JobAgent will stop with message "No time left for slot', "100 No time left for slot"],
# Variants of: "200 Intended work completed ok"
###############################################
# Our work is done. More work available in the queue? Who knows!
['INFO: JobAgent will stop with message "Filling Mode is Disabled', "200 Filling Mode is Disabled"],
["NOTICE: Cycle was successful", "200 Success"],
#
# !!! Codes 300-699 trigger Vac/Vcycle backoff procedure !!!
#
# Variants of: "300 No more work available from task queue"
###########################################################
# We asked, but nothing more from the matcher.
['INFO: JobAgent will stop with message "Nothing to do for more than', "300 Nothing to do"],
["Job request OK: No match found", "300 Nothing to do"],
# Variants of: "400 Site/host/VM is currently banned/disabled from receiving more work"
#######################################################################################
# Variants of: "500 Problem detected with environment/VM/contextualization provided by the site"
################################################################################################
# This detects using an RFC proxy to talk to legacy-only DIRAC
['Error while handshaking [("Remote certificate hasn', "500 Certificate/proxy not acceptable"],
# Variants of: "600 Grid-wide problem with job agent or application within VM"
##############################################################################
[
"ERROR: Pilot version does not match the production version",
"600 Cannot match jobs with this pilot version",
],
# Variants of: "700 Error related to job agent or application within VM"
########################################################################
# Some of the ways the JobAgent/Application can stop with errors.
# Otherwise we just get the default 700 Failed message.
['INFO: JobAgent will stop with message "Job Rescheduled', "600 Problem so job rescheduled"],
['INFO: JobAgent will stop with message "Matcher Failed', "600 Matcher Failed"],
['INFO: JobAgent will stop with message "JDL Problem', "600 JDL Problem"],
['INFO: JobAgent will stop with message "Payload Proxy Not Found', "600 Payload Proxy Not Found"],
['INFO: JobAgent will stop with message "Problem Rescheduling Job', "600 Problem Rescheduling Job"],
[
'INFO: JobAgent will stop with message "Payload execution failed with error code',
"600 Payload execution failed with error",
],
]

try:
with open(logFile, "r") as f:
oneline = f.readline()
while oneline:
for pair in messageMappings:
if pair[0] in oneline:
shutdownMessage = pair[1]
break
oneline = f.readline()

except BaseException:
return "700 Internal VM logging failed"

return shutdownMessage

@logFinalizer
def execute(self):
"""What is called all the time"""
self.__setInProcessOpts()
self.__startJobAgent()

sys.exit(0)


class NagiosProbes(CommandBase):
"""Run one or more Nagios probe scripts that follow the Nagios Plugin API:
https://assets.nagios.com/downloads/nagioscore/docs/nagioscore/3/en/pluginapi.html
Expand Down

0 comments on commit 760d5c7

Please sign in to comment.