Skip to content

Commit

Permalink
emit task details to Kinesis too
Browse files Browse the repository at this point in the history
  • Loading branch information
aditya-el committed Oct 24, 2024
1 parent 384eb0a commit 2648e71
Showing 1 changed file with 15 additions and 34 deletions.
49 changes: 15 additions & 34 deletions plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -73,26 +73,6 @@ class PollyObserver implements TraceObserver {
log.info "----------Pipeline complete-------------"
}

/*
* Invoked when the process is created.
*/
@Override
void onProcessCreate(TaskProcessor process ){
log.info "-------------------Process Created-------------------"
log.info process.getName()
putRecordToObserverStream(ProcessStatus.CREATED, process.name)
}

/*
* Invoked when all task have been executed and process ends.
*/
@Override
void onProcessTerminate( TaskProcessor process ){
log.info "-------------------Process Terminated-------------------"
log.info process.toString()
putRecordToObserverStream(ProcessStatus.TERMINATED, process.name)
}

/**
* This method when a new task is created and submitted in the nextflow
* internal queue of pending task to be scheduled to the underlying
Expand All @@ -108,7 +88,7 @@ class PollyObserver implements TraceObserver {
log.info trace.toString()
Map<String, Object> data = getDataFromHandlerAndTrace(handler, trace)
log.info data.toMapString()
putRecordToObserverStream(ProcessStatus.PENDING, handler.task.getName())
putRecordToObserverStream(ProcessStatus.PENDING, handler.task.getName(), data)
}

/**
Expand All @@ -124,7 +104,7 @@ class PollyObserver implements TraceObserver {
log.info "------Process Submitted----------"
Map<String, Object> data = getDataFromHandlerAndTrace(handler, trace)
log.info data.toMapString()
putRecordToObserverStream(ProcessStatus.SUBMITTED, handler.task.getName())
putRecordToObserverStream(ProcessStatus.SUBMITTED, handler.task.getName(), data)
}

/**
Expand All @@ -140,7 +120,8 @@ class PollyObserver implements TraceObserver {
log.info "------Process Started----------"
log.info handler.toString()
log.info trace.toString()
putRecordToObserverStream(ProcessStatus.STARTED, handler.task.getName())
Map<String, Object> data = getDataFromHandlerAndTrace(handler, trace)
putRecordToObserverStream(ProcessStatus.STARTED, handler.task.getName(), data)
}

/**
Expand All @@ -157,7 +138,9 @@ class PollyObserver implements TraceObserver {
log.info handler.toString()
log.info trace.toString()
trace.getProperty("native_id")
putRecordToObserverStream(ProcessStatus.COMPLETED, handler.task.getName())
Map<String, Object> data = getDataFromHandlerAndTrace(handler, trace)
log.info data.toMapString()
putRecordToObserverStream(ProcessStatus.COMPLETED, handler.task.getName(), data)
}

/**
Expand All @@ -175,7 +158,9 @@ class PollyObserver implements TraceObserver {
log.info "------Process Cached----------"
log.info handler.toString()
log.info trace.toString()
putRecordToObserverStream(ProcessStatus.CACHED, handler.task.getName())
Map<String, Object> data = getDataFromHandlerAndTrace(handler, trace)
log.info data.toMapString()
putRecordToObserverStream(ProcessStatus.CACHED, handler.task.getName(), data)
trace.getMachineInfo()
}

Expand All @@ -193,20 +178,16 @@ class PollyObserver implements TraceObserver {
data['inputs'] = input_map
data['input_files_path'] = handler.task.getInputFilesMap()
data['machine_config'] = [
'cpu': handler.task.getConfig().getCpus(),
'container': handler.task.getConfig().getContainer(),
]
data['trace'] = [
'native_id': trace.getProperty('native_id'),
'native_id_2': trace.get('native_id'),
'cpu': trace.get('cpus'),
'cpu2': trace.getProperty('cpus')
'cpus': trace.getProperty('cpus'),
'memory': trace.getProperty('memory'),
'disk': trace.getProperty('disk')
]
return data
}


void putRecordToObserverStream(String status, String processName){
void putRecordToObserverStream(String status, String processName, Map<String, Object> data){
String streamName = this.config.getGraphObserverStreamName()
log.info "Stream Name: " + streamName

Expand All @@ -223,7 +204,7 @@ class PollyObserver implements TraceObserver {

String partitionKey = status.toString()
try {
Map map = [job_id: jobId, status: status, process_name: processName]
Map map = [job_id: jobId, status: status, process_name: processName, task_detail: data]
byte[] json = JsonOutput.toJson(map).getBytes()
KinesisClient client = KinesisClient.builder().build()
PutRecordRequest putRequest = PutRecordRequest.builder()
Expand Down

0 comments on commit 2648e71

Please sign in to comment.