From 760d5c7b530dad536f5b320c0f337f2277e833d7 Mon Sep 17 00:00:00 2001 From: fstagni Date: Wed, 11 Oct 2023 11:32:32 +0200 Subject: [PATCH] fix: removed obsolete MultiLaunchAgent command --- Pilot/pilotCommands.py | 214 ----------------------------------------- 1 file changed, 214 deletions(-) diff --git a/Pilot/pilotCommands.py b/Pilot/pilotCommands.py index 16779ace..8ba6012f 100644 --- a/Pilot/pilotCommands.py +++ b/Pilot/pilotCommands.py @@ -780,19 +780,8 @@ def execute(self): # pilotProcessors is basically the number of processors this pilot is "managing" self.pp.pilotProcessors = numberOfProcessorsOnWN - # payloadProcessors is the max number of processors used by the single payloads. - # We store payloadProcessors in the global parameters so that other commands can more easily use it. - # (MultiLaunchAgent is right now the only consumer) - self.pp.payloadProcessors = 1 - if "WholeNode" in self.pp.tags: - self.pp.payloadProcessors = self.pp.pilotProcessors - if self.pp.maxNumberOfProcessors > 0: - self.pp.payloadProcessors = min(self.pp.pilotProcessors, self.pp.maxNumberOfProcessors) - self.log.info("pilotProcessors = %d" % self.pp.pilotProcessors) - self.log.info("payloadProcessors = %d" % self.pp.payloadProcessors) self.cfg.append('-o "/Resources/Computing/CEDefaults/NumberOfProcessors=%d"' % self.pp.pilotProcessors) - self.cfg.append('-o "/Resources/Computing/CEDefaults/NumberOfPayloadProcessors=%d"' % self.pp.payloadProcessors) maxRAM = self.pp.queueParameters.get("MaxRAM", maxRAM) if maxRAM: @@ -1134,209 +1123,6 @@ def execute(self): sys.exit(0) -class MultiLaunchAgent(CommandBase): - """Prepare and launch multiple agents""" - - def __init__(self, pilotParams): - """c'tor""" - super(MultiLaunchAgent, self).__init__(pilotParams) - self.inProcessOpts = [] - self.jobAgentOpts = [] - - def __setInProcessOpts(self): - localUid = os.getuid() - try: - import pwd - - localUser = pwd.getpwuid(localUid)[0] - except KeyError: - localUser = "Unknown" - self.log.info("User Name = %s" % localUser) - self.log.info("User Id = %s" % localUid) - self.inProcessOpts = ["-s /Resources/Computing/CEDefaults"] - self.inProcessOpts.append("-o WorkingDirectory=%s" % self.pp.workingDir) - self.inProcessOpts.append("-o /LocalSite/CPUTime=%s" % (int(self.pp.jobCPUReq))) - # To prevent a wayward agent picking up and failing many jobs. - self.inProcessOpts.append("-o MaxTotalJobs=%s" % self.pp.maxCycles) - self.jobAgentOpts = [ - "-o MaxCycles=%s" % self.pp.maxCycles, - "-o PollingTime=%s" % self.pp.pollingTime, - "-o StopOnApplicationFailure=%s" % self.pp.stopOnApplicationFailure, - "-o StopAfterFailedMatches=0", - ] - - if self.debugFlag: - self.jobAgentOpts.append("-o LogLevel=DEBUG") - - if self.pp.userGroup: - self.log.debug('Setting DIRAC Group to "%s"' % self.pp.userGroup) - self.inProcessOpts.append('-o OwnerGroup="%s"' % self.pp.userGroup) - - if self.pp.userDN: - self.log.debug('Setting Owner DN to "%s"' % self.pp.userDN) - self.inProcessOpts.append('-o OwnerDN="%s"' % self.pp.userDN) - - if self.pp.useServerCertificate: - self.log.debug("Setting UseServerCertificate flag") - self.inProcessOpts.append("-o /DIRAC/Security/UseServerCertificate=yes") - - # The instancePath is where the agent works - self.inProcessOpts.append("-o /LocalSite/InstancePath=%s" % self.pp.workingDir) - - # The file pilot.cfg has to be created previously by ConfigureDIRAC - if self.pp.localConfigFile: - self.inProcessOpts.append(" -o /AgentJobRequirements/ExtraOptions=%s" % self.pp.localConfigFile) - self.inProcessOpts.extend(["--cfg", self.pp.localConfigFile]) - - def __startJobAgent(self): - """Starting of the JobAgent""" - - # Find any .cfg file uploaded with the sandbox or generated by previous commands - - diracAgentScript = "dirac-agent" - extraCFG = [] - for i in os.listdir(self.pp.rootPath): - cfg = os.path.join(self.pp.rootPath, i) - if os.path.isfile(cfg) and cfg.endswith(".cfg"): - extraCFG.append(cfg) - - if self.pp.executeCmd: - # Execute user command - self.log.info("Executing user defined command: %s" % self.pp.executeCmd) - if self.pp.pythonVersion == "27": - self.exitWithError(int(os.system("source bashrc; %s" % self.pp.executeCmd) / 256)) - else: - self.exitWithError(int(os.system("source diracos/diracosrc; %s" % self.pp.executeCmd) / 256)) - - self.log.info("Starting JobAgent") - os.environ["PYTHONUNBUFFERED"] = "yes" - - pid = {} - - for i in range(int(self.pp.pilotProcessors / self.pp.payloadProcessors)): - # One JobAgent per each set of payload processors, based on the - # number of processors allocated to this pilot, rounding downwards - - if self.pp.ceType == "Sudo": - # Available within the SudoComputingElement as BaseUsername in the ceParameters - sudoOpts = "-o /LocalSite/BaseUsername=%s%02dp00" % (os.environ["USER"], i) - else: - sudoOpts = "" - - jobAgent = "%s WorkloadManagement/JobAgent %s %s %s %s" % ( - diracAgentScript, - " ".join(self.jobAgentOpts), - " ".join(self.inProcessOpts), - sudoOpts, - " ".join(extraCFG), - ) - - pid[i] = self.forkAndExecute( - jobAgent, os.path.join(self.pp.workingDir, "jobagent.%02d.log" % i), self.pp.installEnv - ) - - if not pid[i]: - self.log.error("Error executing the JobAgent %d" % i) - else: - self.log.info( - "Forked JobAgent %02d (%d/%d) with PID %d" - % (i, i + 1, int(self.pp.pilotProcessors / self.pp.payloadProcessors), pid[i]) - ) - - # Not very subtle this. How about a time limit?? - for i in range(int(self.pp.pilotProcessors / self.pp.payloadProcessors)): - os.waitpid(pid[i], 0) - - for i in range(int(self.pp.pilotProcessors / self.pp.payloadProcessors)): - shutdownMessage = self.__parseJobAgentLog(os.path.join(self.pp.workingDir, "jobagent.%02d.log" % i)) - open(os.path.join(self.pp.workingDir, "shutdown_message.%02d" % i), "w").write(shutdownMessage) - print(shutdownMessage) - - # FIX ME: this effectively picks one at random. Should be the last one to finish chronologically. - # Not in order of being started. - open(os.path.join(self.pp.workingDir, "shutdown_message"), "w").write(shutdownMessage) - - fs = os.statvfs(self.pp.workingDir) - diskSpace = int(fs[4] * fs[0] / 1024 / 1024) - self.log.info("DiskSpace (MB) = %s" % diskSpace) - - def __parseJobAgentLog(self, logFile): - """Parse the JobAgent log and return shutdown message""" - - # catch-all in case nothing matches - shutdownMessage = "700 Failed, probably JobAgent or Application problem" - - # log file patterns to look for and corresponding messages - messageMappings = [ - # Variants of: "100 Shutdown as requested by the VM's host/hypervisor" - ###################################################################### - # There are other errors from the TimeLeft handling, but we let those go - # to the 600 Failed default - ['INFO: JobAgent will stop with message "No time left for slot', "100 No time left for slot"], - # Variants of: "200 Intended work completed ok" - ############################################### - # Our work is done. More work available in the queue? Who knows! - ['INFO: JobAgent will stop with message "Filling Mode is Disabled', "200 Filling Mode is Disabled"], - ["NOTICE: Cycle was successful", "200 Success"], - # - # !!! Codes 300-699 trigger Vac/Vcycle backoff procedure !!! - # - # Variants of: "300 No more work available from task queue" - ########################################################### - # We asked, but nothing more from the matcher. - ['INFO: JobAgent will stop with message "Nothing to do for more than', "300 Nothing to do"], - ["Job request OK: No match found", "300 Nothing to do"], - # Variants of: "400 Site/host/VM is currently banned/disabled from receiving more work" - ####################################################################################### - # Variants of: "500 Problem detected with environment/VM/contextualization provided by the site" - ################################################################################################ - # This detects using an RFC proxy to talk to legacy-only DIRAC - ['Error while handshaking [("Remote certificate hasn', "500 Certificate/proxy not acceptable"], - # Variants of: "600 Grid-wide problem with job agent or application within VM" - ############################################################################## - [ - "ERROR: Pilot version does not match the production version", - "600 Cannot match jobs with this pilot version", - ], - # Variants of: "700 Error related to job agent or application within VM" - ######################################################################## - # Some of the ways the JobAgent/Application can stop with errors. - # Otherwise we just get the default 700 Failed message. - ['INFO: JobAgent will stop with message "Job Rescheduled', "600 Problem so job rescheduled"], - ['INFO: JobAgent will stop with message "Matcher Failed', "600 Matcher Failed"], - ['INFO: JobAgent will stop with message "JDL Problem', "600 JDL Problem"], - ['INFO: JobAgent will stop with message "Payload Proxy Not Found', "600 Payload Proxy Not Found"], - ['INFO: JobAgent will stop with message "Problem Rescheduling Job', "600 Problem Rescheduling Job"], - [ - 'INFO: JobAgent will stop with message "Payload execution failed with error code', - "600 Payload execution failed with error", - ], - ] - - try: - with open(logFile, "r") as f: - oneline = f.readline() - while oneline: - for pair in messageMappings: - if pair[0] in oneline: - shutdownMessage = pair[1] - break - oneline = f.readline() - - except BaseException: - return "700 Internal VM logging failed" - - return shutdownMessage - - @logFinalizer - def execute(self): - """What is called all the time""" - self.__setInProcessOpts() - self.__startJobAgent() - - sys.exit(0) - - class NagiosProbes(CommandBase): """Run one or more Nagios probe scripts that follow the Nagios Plugin API: https://assets.nagios.com/downloads/nagioscore/docs/nagioscore/3/en/pluginapi.html