Skip to content

Commit

Permalink
PDG: initializations and job deletion moved in functions and called o…
Browse files Browse the repository at this point in the history
…n each cook start.

Before it, sheduler worked just once, than Houdini restart was needed.

Comments added on virual functions (callbacks)

References #514.
  • Loading branch information
timurhai committed Jul 7, 2021
1 parent aa66c7d commit 1499382
Showing 1 changed file with 67 additions and 20 deletions.
87 changes: 67 additions & 20 deletions plugins/houdini/pdg/types/afanasyscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ class AfanasyScheduler(CallbackServerMixin, PyScheduler):
def __init__(self, scheduler, name):
PyScheduler.__init__(self, scheduler, name)
CallbackServerMixin.__init__(self, False)
self.job_id = None
self.job_block_name_id = {}
self.job_tasks_id_name = {}
self.active_jobs = {}
self._initData()


@classmethod
Expand All @@ -41,31 +38,55 @@ def templateBody(cls):
})


def _initData(self):
self.job_id = None
self.job_block_name_id = {}
self.job_tasks_id_name = {}


def _deleteJob(self):
if self.job_id is not None:
af.Cmd().deleteJobById(self.job_id)
self._initData()


def applicationBin(self, i_app, i_work_item):
"""
[virtual] Returns the path to the given application
"""
# This is needed to locate such programs as hython, python.
# CGRU has an own mechanism to locate soft,
# and has such commands as hython, python in PATH.
# So we can just return app name, no full app path is needed.
return i_app


def onStart(self):
"""
[virtual] Called by PDG when scheduler is first created.
"""
logger.debug("onStart")
return True


def onStop(self):
"""
[virtual] Called by PDG when scheduler is cleaned up.
"""
logger.debug("onStop")
self.stopCallbackServer()
if self.job_id is not None:
af.Cmd().deleteJobById(self.job_id)
self.job_id = None
self._deleteJob()
return True


def onStartCook(self, static, cook_set):
"""
onStartCook(self, static, cook_set) -> boolean
[virtual] Cook start callback. Starts a root job for the cook session
[virtual] Cook start callback. Starts the job for the cook session.
"""
logger.debug("onStartCook")
self._deleteJob()

pdg_workingdir = self["pdg_workingdir"].evaluateString()
self.setWorkingDir(pdg_workingdir, pdg_workingdir)
Expand Down Expand Up @@ -111,21 +132,20 @@ def onStartCook(self, static, cook_set):

def onStopCook(self, cancel):
"""
[virtual] Callback invoked by PDG when graph cook ends. Can be called
multiple times.
[virtual] Callback invoked by PDG when graph cook ends.
Can be called multiple times.
"""
logger.debug("onStopCook")
self.stopCallbackServer()
if self.job_id is not None:
af.Cmd().deleteJobById(self.job_id)
self.job_id = None

return True


def onSchedule(self, work_item):
"""
onSchedule(self, pdg.PyWorkItem) -> pdg.SchedulerResult
[virtual] onSchedule(self, pdg.WorkItem) -> pdg.SchedulerResult
This callback is evaluated when the given pdg.WorkItem is ready to be executed.
"""
if len(work_item.command) == 0:
return pdg.scheduleResult.CookSucceeded
Expand Down Expand Up @@ -185,6 +205,15 @@ def onSchedule(self, work_item):


def onScheduleStatic(self, dependencies, dependents, ready_items):
"""
[virtual] Called when the scheduler should process a static dependency graph.
dependencies _pdg.WorkItem map of dependencies
dependents _pdg.WorkItem map of dependents
ready_items _pdg.WorkItem array of work items
- Not Supported
"""
logger.debug('onScheduleStatic:')
print('Counts:')
print('len(dependencies) = %d' % len(dependencies))
Expand All @@ -195,13 +224,19 @@ def onScheduleStatic(self, dependencies, dependents, ready_items):

def onTick(self):
"""
Called during a cook. Checks on jobs in flight to see if any have finished.
[virtual] Called during a cook.
Checks on jobs in flight to see if any have finished.
"""
#return pdg.tickResult.SchedulerReady

# check that the job was created
if self.job_id is None:
return tickResult.SchedulerCancelCook

# get job progress
cmd = af.Cmd()
job_progress = cmd.getJobProgress(self.job_id)
if job_progress is None:
print('Error getting job progress.')
return pdg.tickResult.SchedulerBusy

job_progress = job_progress['progress']
Expand All @@ -216,20 +251,21 @@ def onTick(self):

if state.find('RUN') != -1:
self.onWorkItemStartCook(work_item_name, -1)
continue

if state.find('ERR') != -1:
elif state.find('ERR') != -1:
self.onWorkItemFailed(work_item_name, -1)
ids_to_del[block_id].append(task_id)
continue

elif state.find('SKP') != -1:
self.onWorkItemCanceled(work_item_name, -1)
ids_to_del[block_id].append(task_id)

elif state.find('DON') != -1:
time_started = task_progress['tst']
time_done = task_progress['tdn']
cook_time = float(time_started - time_done)
self.onWorkItemSucceeded(work_item_name, -1, cook_time)
ids_to_del[block_id].append(task_id)
continue

for block_id in ids_to_del:
for task_id in ids_to_del[block_id]:
Expand All @@ -239,10 +275,21 @@ def onTick(self):


def submitAsJob(self, graph_file, node_path):
"""
[virtual]
Called when the scheduler should cook the entire TOP Network as a standalone job.
by pressing the 'Submit as Job' button on the scheduler node UI.
Creates a job which cooks that TOP graph using hython.
Returns the status URI for the submitted job - just to open manager Web GUI.
graph_file Path to a .hip file containing the TOP Network, relative to $PDG_DIR.
node_path Op path to the TOP Network
"""
# not supported, yet
logger.debug("submitAsJob({},{})".format(graph_file, node_path))
return ""


# Register Afanasy Scheduler type
def registerTypes(type_registry):
type_registry.registerScheduler(AfanasyScheduler, label="Afanasy Scheduler")

0 comments on commit 1499382

Please sign in to comment.