diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy index 3cf21d6..c69cb01 100644 --- a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy @@ -73,26 +73,6 @@ class PollyObserver implements TraceObserver { log.info "----------Pipeline complete-------------" } - /* - * Invoked when the process is created. - */ - @Override - void onProcessCreate(TaskProcessor process ){ - log.info "-------------------Process Created-------------------" - log.info process.getName() - putRecordToObserverStream(ProcessStatus.CREATED, process.name) - } - - /* - * Invoked when all task have been executed and process ends. - */ - @Override - void onProcessTerminate( TaskProcessor process ){ - log.info "-------------------Process Terminated-------------------" - log.info process.toString() - putRecordToObserverStream(ProcessStatus.TERMINATED, process.name) - } - /** * This method when a new task is created and submitted in the nextflow * internal queue of pending task to be scheduled to the underlying @@ -108,7 +88,7 @@ class PollyObserver implements TraceObserver { log.info trace.toString() Map data = getDataFromHandlerAndTrace(handler, trace) log.info data.toMapString() - putRecordToObserverStream(ProcessStatus.PENDING, handler.task.getName()) + putRecordToObserverStream(ProcessStatus.PENDING, handler.task.getName(), data) } /** @@ -124,7 +104,7 @@ class PollyObserver implements TraceObserver { log.info "------Process Submitted----------" Map data = getDataFromHandlerAndTrace(handler, trace) log.info data.toMapString() - putRecordToObserverStream(ProcessStatus.SUBMITTED, handler.task.getName()) + putRecordToObserverStream(ProcessStatus.SUBMITTED, handler.task.getName(), data) } /** @@ -140,7 +120,8 @@ class PollyObserver implements TraceObserver { log.info "------Process Started----------" log.info handler.toString() log.info trace.toString() - putRecordToObserverStream(ProcessStatus.STARTED, handler.task.getName()) + Map data = getDataFromHandlerAndTrace(handler, trace) + putRecordToObserverStream(ProcessStatus.STARTED, handler.task.getName(), data) } /** @@ -157,7 +138,9 @@ class PollyObserver implements TraceObserver { log.info handler.toString() log.info trace.toString() trace.getProperty("native_id") - putRecordToObserverStream(ProcessStatus.COMPLETED, handler.task.getName()) + Map data = getDataFromHandlerAndTrace(handler, trace) + log.info data.toMapString() + putRecordToObserverStream(ProcessStatus.COMPLETED, handler.task.getName(), data) } /** @@ -175,7 +158,9 @@ class PollyObserver implements TraceObserver { log.info "------Process Cached----------" log.info handler.toString() log.info trace.toString() - putRecordToObserverStream(ProcessStatus.CACHED, handler.task.getName()) + Map data = getDataFromHandlerAndTrace(handler, trace) + log.info data.toMapString() + putRecordToObserverStream(ProcessStatus.CACHED, handler.task.getName(), data) trace.getMachineInfo() } @@ -193,20 +178,16 @@ class PollyObserver implements TraceObserver { data['inputs'] = input_map data['input_files_path'] = handler.task.getInputFilesMap() data['machine_config'] = [ - 'cpu': handler.task.getConfig().getCpus(), - 'container': handler.task.getConfig().getContainer(), - ] - data['trace'] = [ 'native_id': trace.getProperty('native_id'), - 'native_id_2': trace.get('native_id'), - 'cpu': trace.get('cpus'), - 'cpu2': trace.getProperty('cpus') + 'cpus': trace.getProperty('cpus'), + 'memory': trace.getProperty('memory'), + 'disk': trace.getProperty('disk') ] return data } - void putRecordToObserverStream(String status, String processName){ + void putRecordToObserverStream(String status, String processName, Map data){ String streamName = this.config.getGraphObserverStreamName() log.info "Stream Name: " + streamName @@ -223,7 +204,7 @@ class PollyObserver implements TraceObserver { String partitionKey = status.toString() try { - Map map = [job_id: jobId, status: status, process_name: processName] + Map map = [job_id: jobId, status: status, process_name: processName, task_detail: data] byte[] json = JsonOutput.toJson(map).getBytes() KinesisClient client = KinesisClient.builder().build() PutRecordRequest putRequest = PutRecordRequest.builder()