From b921c7bae6441027287fed9884a84f947658a3d2 Mon Sep 17 00:00:00 2001 From: aditya-el Date: Fri, 4 Oct 2024 11:49:03 +0530 Subject: [PATCH 01/22] added observer for nf processes, to emit status events --- Makefile | 7 + .../main/nextflow/polly/PollyConfig.groovy | 6 + .../main/nextflow/polly/PollyFactory.groovy | 37 +++ .../main/nextflow/polly/PollyObserver.groovy | 211 ++++++++++++++++++ .../src/resources/META-INF/extensions.idx | 1 + 5 files changed, 262 insertions(+) create mode 100644 plugins/nf-polly/src/main/nextflow/polly/PollyFactory.groovy create mode 100644 plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy diff --git a/Makefile b/Makefile index ad40b01..111e19f 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,6 @@ config ?= compileClasspath +version ?= $(shell grep 'Plugin-Version' plugins/nf-polly/src/resources/META-INF/MANIFEST.MF | awk '{ print $$2 }') ifdef module mm = :${module}: @@ -48,6 +49,12 @@ else ./gradlew ${mm}test --tests ${class} endif +install: + ./gradlew copyPluginZip + rm -rf ${HOME}/.nextflow/plugins/nf-polly-${version} + cp -r build/plugins/nf-polly-${version} ${HOME}/.nextflow/plugins/ + + assemble: ./gradlew assemble diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyConfig.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyConfig.groovy index 1d07966..9935188 100644 --- a/plugins/nf-polly/src/main/nextflow/polly/PollyConfig.groovy +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyConfig.groovy @@ -13,11 +13,17 @@ import groovy.transform.PackageScope class PollyConfig { final private String metricsStreamName + final private String graphObserverStreamName + final private String jobId PollyConfig(Map map) { def config = map ?: Collections.emptyMap() metricsStreamName = config.metricsStreamName ?: "NA" + graphObserverStreamName = config.graphObserverStreamName ?: "NA" + jobId = config.jobId ?: "NA" } String getMetricsStreamName() { metricsStreamName } + + String getGraphObserverStreamName() { graphObserverStreamName } } diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyFactory.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyFactory.groovy new file mode 100644 index 0000000..aebc851 --- /dev/null +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyFactory.groovy @@ -0,0 +1,37 @@ +/* + * Copyright 2021, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.polly + +import groovy.transform.CompileStatic +import nextflow.Session +import nextflow.trace.TraceObserver +import nextflow.trace.TraceObserverFactory +/** + * Implements the validation observer factory + * + * @author Paolo Di Tommaso + */ +@CompileStatic +class PollyFactory implements TraceObserverFactory { + + @Override + Collection create(Session session) { + final result = new ArrayList() + result.add( new PollyObserver() ) + return result + } +} diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy new file mode 100644 index 0000000..07d4581 --- /dev/null +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy @@ -0,0 +1,211 @@ + +package nextflow.polly + +import groovy.json.JsonOutput +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.Session +import nextflow.processor.TaskHandler +import nextflow.processor.TaskProcessor +import nextflow.trace.TraceObserver +import nextflow.trace.TraceRecord +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import software.amazon.awssdk.core.SdkBytes +import software.amazon.awssdk.services.kinesis.KinesisClient +import software.amazon.awssdk.services.kinesis.model.PutRecordRequest +import software.amazon.awssdk.services.kinesis.model.PutRecordResponse + + +class ProcessStatus { + public static String CREATED = "created" + public static String SUBMITTED = "submitted" + public static String PENDING = "pending" + public static String STARTED = "started" + public static String CACHED = "cached" + public static String TERMINATED = "terminated" + public static String COMPLETED = "completed" +} + + +@Slf4j +@CompileStatic +class PollyObserver implements TraceObserver { + + static final Logger logger = LoggerFactory.getLogger(PollyExtension.class) + private Session session + + /* + * A Custom config extracted from nextflow.config under polly tag + * nextflow.config + * --------------- + * docker { + * enabled = true + * } + * ... + * polly { + * metricsStreamName = "my-kinesis-stream" + * } + */ + private PollyConfig config + + /** + * A map of 'env' variables set in the Nextflow config file + */ + private Map env + + + @Override + void onFlowCreate(Session session) { + log.info "-------Pipeline is starting! 🚀-----------" + this.session = session + this.config = new PollyConfig(session.config.navigate('polly') as Map) + this.env = session.config.navigate('env') as Map + } + + @Override + void onFlowComplete() { + log.info "----------Pipeline complete! 👋-------------" + } + + /* + * Invoked when the process is created. + */ + @Override + void onProcessCreate(TaskProcessor process ){ + log.info "-------------------Process Created! 👋-------------------" + log.info process.getName() + putRecordToObserverStream(ProcessStatus.CREATED, process.name) + } + + /* + * Invoked when all task have been executed and process ends. + */ + @Override + void onProcessTerminate( TaskProcessor process ){ + log.info "-------------------Process Terminated! 👋-------------------" + log.info process.toString() + putRecordToObserverStream(ProcessStatus.TERMINATED, process.name) + } + + /** + * This method when a new task is created and submitted in the nextflow + * internal queue of pending task to be scheduled to the underlying + * execution backend + * + * @param handler + * @param trace + */ + @Override + void onProcessPending(TaskHandler handler, TraceRecord trace){ + log.info "------Process Pending! 👋----------" + log.info handler.toString() + log.info trace.toString() + putRecordToObserverStream(ProcessStatus.PENDING, handler.task.getName()) + } + + /** + * This method is invoked before a process run is going to be submitted + * + * @param handler + * The {@link TaskHandler} instance for the current task. + * @param trace + * The associated {@link TraceRecord} for the current task. + */ + @Override + void onProcessSubmit(TaskHandler handler, TraceRecord trace){ + log.info "------Process Submitted! 👋----------" + log.info handler.toString() + log.info trace.toString() + putRecordToObserverStream(ProcessStatus.SUBMITTED, handler.task.getName()) + } + + /** + * This method is invoked when a process run is going to start + * + * @param handler + * The {@link TaskHandler} instance for the current task. + * @param trace + * The associated {@link TraceRecord} for the current task. + */ + @Override + void onProcessStart(TaskHandler handler, TraceRecord trace){ + log.info "------Process Started! 👋----------" + log.info handler.toString() + log.info trace.toString() + putRecordToObserverStream(ProcessStatus.STARTED, handler.task.getName()) + } + + /** + * This method is invoked when a process run completes + * + * @param handler + * The {@link TaskHandler} instance for the current task. + * @param trace + * The associated {@link TraceRecord} for the current task. + */ + @Override + void onProcessComplete(TaskHandler handler, TraceRecord trace){ + log.info "------Process Completed! 👋----------" + log.info handler.toString() + log.info trace.toString() + putRecordToObserverStream(ProcessStatus.COMPLETED, handler.task.getName()) + } + + /** + * method invoked when a task execution is skipped because the result is cached (already computed) + * or stored (due to the usage of `storeDir` directive) + * + * @param handler + * The {@link TaskHandler} instance for the current task + * @param trace + * The trace record for the cached trace. When this event is invoked for a store task + * the {@code trace} record is expected to be {@code null} + */ + @Override + void onProcessCached(TaskHandler handler, TraceRecord trace){ + log.info "------Process Cached! 👋----------" + log.info handler.toString() + log.info trace.toString() + putRecordToObserverStream(ProcessStatus.CACHED, handler.task.getName()) + + } + + void putRecordToObserverStream(String status, String processName){ + String streamName = this.config.getGraphObserverStreamName() + + if (streamName == "NA") { + logger.error("No stream set for process to send metrics to. Unable to report metric.") + return + } + + String jobId = this.env.get("JOB_ID") ?: "NA" + if (jobId == "NA") { + logger.error("No JOB_ID set for process. Unable to report metric.") + return + } + + String partitionKey = status.toString() + try { + Map map = [job_id: jobId, status: status, process_name: processName] + byte[] json = JsonOutput.toJson(map).getBytes() + KinesisClient client = KinesisClient.builder().build() + PutRecordRequest putRequest = PutRecordRequest.builder() + .partitionKey(partitionKey) + .streamName(streamName) + .data(SdkBytes.fromByteArray(json)) + .build() as PutRecordRequest + PutRecordResponse response = client.putRecord(putRequest) + logger.info( + String.format( + "Submitted record %s to stream shard %s", + response.sequenceNumber(), + response.shardId() + ) + ) + } catch (Exception e) { + logger.error("Failed to produce: " + e.getMessage()) + } + } + +} diff --git a/plugins/nf-polly/src/resources/META-INF/extensions.idx b/plugins/nf-polly/src/resources/META-INF/extensions.idx index 4ed52e5..85201fe 100644 --- a/plugins/nf-polly/src/resources/META-INF/extensions.idx +++ b/plugins/nf-polly/src/resources/META-INF/extensions.idx @@ -1 +1,2 @@ +nextflow.polly.PollyFactory nextflow.polly.PollyExtension \ No newline at end of file From bec4c74d255913d7e766344e1536e48a29017ab8 Mon Sep 17 00:00:00 2001 From: aditya-el Date: Fri, 4 Oct 2024 12:01:30 +0530 Subject: [PATCH 02/22] fixed comments' --- .../main/nextflow/polly/PollyFactory.groovy | 19 +------------------ .../main/nextflow/polly/PollyObserver.groovy | 19 ++++++++++--------- 2 files changed, 11 insertions(+), 27 deletions(-) diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyFactory.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyFactory.groovy index aebc851..cdc44ac 100644 --- a/plugins/nf-polly/src/main/nextflow/polly/PollyFactory.groovy +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyFactory.groovy @@ -1,29 +1,12 @@ -/* - * Copyright 2021, Seqera Labs - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package nextflow.polly import groovy.transform.CompileStatic import nextflow.Session import nextflow.trace.TraceObserver import nextflow.trace.TraceObserverFactory + /** * Implements the validation observer factory - * - * @author Paolo Di Tommaso */ @CompileStatic class PollyFactory implements TraceObserverFactory { diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy index 07d4581..c0f8acf 100644 --- a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy @@ -45,6 +45,7 @@ class PollyObserver implements TraceObserver { * ... * polly { * metricsStreamName = "my-kinesis-stream" + * graphObserverStreamName = "pravaah-dev-graph-observer-stream-v1" * } */ private PollyConfig config @@ -57,7 +58,7 @@ class PollyObserver implements TraceObserver { @Override void onFlowCreate(Session session) { - log.info "-------Pipeline is starting! 🚀-----------" + log.info "-------Pipeline is starting-----------" this.session = session this.config = new PollyConfig(session.config.navigate('polly') as Map) this.env = session.config.navigate('env') as Map @@ -65,7 +66,7 @@ class PollyObserver implements TraceObserver { @Override void onFlowComplete() { - log.info "----------Pipeline complete! 👋-------------" + log.info "----------Pipeline complete-------------" } /* @@ -73,7 +74,7 @@ class PollyObserver implements TraceObserver { */ @Override void onProcessCreate(TaskProcessor process ){ - log.info "-------------------Process Created! 👋-------------------" + log.info "-------------------Process Created-------------------" log.info process.getName() putRecordToObserverStream(ProcessStatus.CREATED, process.name) } @@ -83,7 +84,7 @@ class PollyObserver implements TraceObserver { */ @Override void onProcessTerminate( TaskProcessor process ){ - log.info "-------------------Process Terminated! 👋-------------------" + log.info "-------------------Process Terminated-------------------" log.info process.toString() putRecordToObserverStream(ProcessStatus.TERMINATED, process.name) } @@ -98,7 +99,7 @@ class PollyObserver implements TraceObserver { */ @Override void onProcessPending(TaskHandler handler, TraceRecord trace){ - log.info "------Process Pending! 👋----------" + log.info "------Process Pending----------" log.info handler.toString() log.info trace.toString() putRecordToObserverStream(ProcessStatus.PENDING, handler.task.getName()) @@ -114,7 +115,7 @@ class PollyObserver implements TraceObserver { */ @Override void onProcessSubmit(TaskHandler handler, TraceRecord trace){ - log.info "------Process Submitted! 👋----------" + log.info "------Process Submitted----------" log.info handler.toString() log.info trace.toString() putRecordToObserverStream(ProcessStatus.SUBMITTED, handler.task.getName()) @@ -130,7 +131,7 @@ class PollyObserver implements TraceObserver { */ @Override void onProcessStart(TaskHandler handler, TraceRecord trace){ - log.info "------Process Started! 👋----------" + log.info "------Process Started----------" log.info handler.toString() log.info trace.toString() putRecordToObserverStream(ProcessStatus.STARTED, handler.task.getName()) @@ -146,7 +147,7 @@ class PollyObserver implements TraceObserver { */ @Override void onProcessComplete(TaskHandler handler, TraceRecord trace){ - log.info "------Process Completed! 👋----------" + log.info "------Process Completed----------" log.info handler.toString() log.info trace.toString() putRecordToObserverStream(ProcessStatus.COMPLETED, handler.task.getName()) @@ -164,7 +165,7 @@ class PollyObserver implements TraceObserver { */ @Override void onProcessCached(TaskHandler handler, TraceRecord trace){ - log.info "------Process Cached! 👋----------" + log.info "------Process Cached----------" log.info handler.toString() log.info trace.toString() putRecordToObserverStream(ProcessStatus.CACHED, handler.task.getName()) From fd4f699c47a5473bca5f72a8c45b20b5129d5b05 Mon Sep 17 00:00:00 2001 From: aditya-el Date: Thu, 17 Oct 2024 12:11:59 +0530 Subject: [PATCH 03/22] added debug statements to observer --- plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy | 2 ++ 1 file changed, 2 insertions(+) diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy index c0f8acf..9d34c7d 100644 --- a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy @@ -62,6 +62,8 @@ class PollyObserver implements TraceObserver { this.session = session this.config = new PollyConfig(session.config.navigate('polly') as Map) this.env = session.config.navigate('env') as Map + log.info this.config.toString() + log.info this.env.toString() } @Override From 1b6eb94d968b29c184b37d2348857d7dc7b5656c Mon Sep 17 00:00:00 2001 From: aditya-el Date: Thu, 17 Oct 2024 12:25:03 +0530 Subject: [PATCH 04/22] added more messages to debug issue --- plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy | 2 ++ 1 file changed, 2 insertions(+) diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy index 9d34c7d..57c2f38 100644 --- a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy @@ -64,6 +64,7 @@ class PollyObserver implements TraceObserver { this.env = session.config.navigate('env') as Map log.info this.config.toString() log.info this.env.toString() + log.info "_______________________________________" } @Override @@ -176,6 +177,7 @@ class PollyObserver implements TraceObserver { void putRecordToObserverStream(String status, String processName){ String streamName = this.config.getGraphObserverStreamName() + log.info "Stream Name: " + streamName if (streamName == "NA") { logger.error("No stream set for process to send metrics to. Unable to report metric.") From 94581679094ff3bd68f0351d4c593ddb4f7309b0 Mon Sep 17 00:00:00 2001 From: aditya-el Date: Thu, 17 Oct 2024 13:05:20 +0530 Subject: [PATCH 05/22] fixed env issue for fetching run_id --- plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy index 57c2f38..8533615 100644 --- a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy @@ -184,7 +184,7 @@ class PollyObserver implements TraceObserver { return } - String jobId = this.env.get("JOB_ID") ?: "NA" + String jobId = this.env.get("RUN_ID") ?: "NA" if (jobId == "NA") { logger.error("No JOB_ID set for process. Unable to report metric.") return From ef23a5bedbe99c9060b2d849b4059a44c80c7922 Mon Sep 17 00:00:00 2001 From: aditya-el Date: Thu, 24 Oct 2024 14:11:07 +0530 Subject: [PATCH 06/22] added print for config params --- .../src/main/nextflow/polly/PollyObserver.groovy | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy index 8533615..7bd896d 100644 --- a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy @@ -105,6 +105,12 @@ class PollyObserver implements TraceObserver { log.info "------Process Pending----------" log.info handler.toString() log.info trace.toString() + String task_hash = handler.task.getHash().toString() + Map params = handler.task.getInputs() + log.info "__CONFIG__" + log.info handler.task.getConfig().toMapString() + log.info params.toMapString() + log.info task_hash putRecordToObserverStream(ProcessStatus.PENDING, handler.task.getName()) } @@ -121,6 +127,12 @@ class PollyObserver implements TraceObserver { log.info "------Process Submitted----------" log.info handler.toString() log.info trace.toString() + String task_hash = handler.task.getHash().toString() + Map params = handler.task.getInputs() + log.info "__CONFIG__" + log.info handler.task.getConfig().toMapString() + log.info params.toMapString() + log.info task_hash putRecordToObserverStream(ProcessStatus.SUBMITTED, handler.task.getName()) } From 7c91ca7981cdce8e2532ba6363fb47dd8a898521 Mon Sep 17 00:00:00 2001 From: aditya-el Date: Thu, 24 Oct 2024 15:19:31 +0530 Subject: [PATCH 07/22] added getDataFromHandlerAndTrace --- .../main/nextflow/polly/PollyObserver.groovy | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy index 7bd896d..33381f0 100644 --- a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy @@ -130,9 +130,8 @@ class PollyObserver implements TraceObserver { String task_hash = handler.task.getHash().toString() Map params = handler.task.getInputs() log.info "__CONFIG__" - log.info handler.task.getConfig().toMapString() - log.info params.toMapString() - log.info task_hash + Map data = getDataFromHandlerAndTrace(handler, trace) + log.info data.toMapString() putRecordToObserverStream(ProcessStatus.SUBMITTED, handler.task.getName()) } @@ -187,6 +186,19 @@ class PollyObserver implements TraceObserver { } + + Map getDataFromHandlerAndTrace(TaskHandler handler, TraceRecord trace){ + Map data = [:] + data['inputs'] = handler.task.getInputs() + data['input_files_path'] = handler.task.getInputFilesMap() + data['machine_config'] = [ + 'cpu': handler.task.getConfig().getCpus(), + 'memory': handler.task.getConfig().getContainer(), + ] + return data + } + + void putRecordToObserverStream(String status, String processName){ String streamName = this.config.getGraphObserverStreamName() log.info "Stream Name: " + streamName From 6d53e0d93a83919ae7017017718220b7b9fab7b0 Mon Sep 17 00:00:00 2001 From: aditya-el Date: Thu, 24 Oct 2024 16:31:05 +0530 Subject: [PATCH 08/22] minor change --- plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy | 2 ++ 1 file changed, 2 insertions(+) diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy index 33381f0..e24dea8 100644 --- a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy @@ -111,6 +111,8 @@ class PollyObserver implements TraceObserver { log.info handler.task.getConfig().toMapString() log.info params.toMapString() log.info task_hash + Map data = getDataFromHandlerAndTrace(handler, trace) + log.info data.toMapString() putRecordToObserverStream(ProcessStatus.PENDING, handler.task.getName()) } From 59190a7ef9c485efff4d6936f0beb427c39e9252 Mon Sep 17 00:00:00 2001 From: aditya-el Date: Thu, 24 Oct 2024 18:09:06 +0530 Subject: [PATCH 09/22] correctly map input params in groovy obj before serialisation --- .../src/main/nextflow/polly/PollyObserver.groovy | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy index e24dea8..2229b9f 100644 --- a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy @@ -7,6 +7,7 @@ import groovy.util.logging.Slf4j import nextflow.Session import nextflow.processor.TaskHandler import nextflow.processor.TaskProcessor +import nextflow.script.params.InParam import nextflow.trace.TraceObserver import nextflow.trace.TraceRecord import org.slf4j.Logger @@ -191,7 +192,14 @@ class PollyObserver implements TraceObserver { Map getDataFromHandlerAndTrace(TaskHandler handler, TraceRecord trace){ Map data = [:] - data['inputs'] = handler.task.getInputs() + Map input_map = [:] + def inputs = handler.task.getInputs() + for ( input in inputs ) { + InParam param = input.key + input_map[param.getName()] = input.value + } + + data['inputs'] = input_map data['input_files_path'] = handler.task.getInputFilesMap() data['machine_config'] = [ 'cpu': handler.task.getConfig().getCpus(), From 384eb0a2c402ec8ef3064d141a74269115959178 Mon Sep 17 00:00:00 2001 From: aditya-el Date: Thu, 24 Oct 2024 19:48:04 +0530 Subject: [PATCH 10/22] debug tracerecord --- .../main/nextflow/polly/PollyObserver.groovy | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy index 2229b9f..3cf21d6 100644 --- a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy @@ -106,12 +106,6 @@ class PollyObserver implements TraceObserver { log.info "------Process Pending----------" log.info handler.toString() log.info trace.toString() - String task_hash = handler.task.getHash().toString() - Map params = handler.task.getInputs() - log.info "__CONFIG__" - log.info handler.task.getConfig().toMapString() - log.info params.toMapString() - log.info task_hash Map data = getDataFromHandlerAndTrace(handler, trace) log.info data.toMapString() putRecordToObserverStream(ProcessStatus.PENDING, handler.task.getName()) @@ -128,11 +122,6 @@ class PollyObserver implements TraceObserver { @Override void onProcessSubmit(TaskHandler handler, TraceRecord trace){ log.info "------Process Submitted----------" - log.info handler.toString() - log.info trace.toString() - String task_hash = handler.task.getHash().toString() - Map params = handler.task.getInputs() - log.info "__CONFIG__" Map data = getDataFromHandlerAndTrace(handler, trace) log.info data.toMapString() putRecordToObserverStream(ProcessStatus.SUBMITTED, handler.task.getName()) @@ -167,6 +156,7 @@ class PollyObserver implements TraceObserver { log.info "------Process Completed----------" log.info handler.toString() log.info trace.toString() + trace.getProperty("native_id") putRecordToObserverStream(ProcessStatus.COMPLETED, handler.task.getName()) } @@ -186,7 +176,7 @@ class PollyObserver implements TraceObserver { log.info handler.toString() log.info trace.toString() putRecordToObserverStream(ProcessStatus.CACHED, handler.task.getName()) - + trace.getMachineInfo() } @@ -196,14 +186,21 @@ class PollyObserver implements TraceObserver { def inputs = handler.task.getInputs() for ( input in inputs ) { InParam param = input.key - input_map[param.getName()] = input.value + input_map[param.getName()] = input.value.getClass().getName() } + data['task_hash'] = handler.task.getHash().toString() data['inputs'] = input_map data['input_files_path'] = handler.task.getInputFilesMap() data['machine_config'] = [ 'cpu': handler.task.getConfig().getCpus(), - 'memory': handler.task.getConfig().getContainer(), + 'container': handler.task.getConfig().getContainer(), + ] + data['trace'] = [ + 'native_id': trace.getProperty('native_id'), + 'native_id_2': trace.get('native_id'), + 'cpu': trace.get('cpus'), + 'cpu2': trace.getProperty('cpus') ] return data } From 2648e71cd23dc92b6113ddbb67b6363ff11034ed Mon Sep 17 00:00:00 2001 From: aditya-el Date: Thu, 24 Oct 2024 20:24:17 +0530 Subject: [PATCH 11/22] emit task details to Kinesis too --- .../main/nextflow/polly/PollyObserver.groovy | 49 ++++++------------- 1 file changed, 15 insertions(+), 34 deletions(-) diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy index 3cf21d6..c69cb01 100644 --- a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy @@ -73,26 +73,6 @@ class PollyObserver implements TraceObserver { log.info "----------Pipeline complete-------------" } - /* - * Invoked when the process is created. - */ - @Override - void onProcessCreate(TaskProcessor process ){ - log.info "-------------------Process Created-------------------" - log.info process.getName() - putRecordToObserverStream(ProcessStatus.CREATED, process.name) - } - - /* - * Invoked when all task have been executed and process ends. - */ - @Override - void onProcessTerminate( TaskProcessor process ){ - log.info "-------------------Process Terminated-------------------" - log.info process.toString() - putRecordToObserverStream(ProcessStatus.TERMINATED, process.name) - } - /** * This method when a new task is created and submitted in the nextflow * internal queue of pending task to be scheduled to the underlying @@ -108,7 +88,7 @@ class PollyObserver implements TraceObserver { log.info trace.toString() Map data = getDataFromHandlerAndTrace(handler, trace) log.info data.toMapString() - putRecordToObserverStream(ProcessStatus.PENDING, handler.task.getName()) + putRecordToObserverStream(ProcessStatus.PENDING, handler.task.getName(), data) } /** @@ -124,7 +104,7 @@ class PollyObserver implements TraceObserver { log.info "------Process Submitted----------" Map data = getDataFromHandlerAndTrace(handler, trace) log.info data.toMapString() - putRecordToObserverStream(ProcessStatus.SUBMITTED, handler.task.getName()) + putRecordToObserverStream(ProcessStatus.SUBMITTED, handler.task.getName(), data) } /** @@ -140,7 +120,8 @@ class PollyObserver implements TraceObserver { log.info "------Process Started----------" log.info handler.toString() log.info trace.toString() - putRecordToObserverStream(ProcessStatus.STARTED, handler.task.getName()) + Map data = getDataFromHandlerAndTrace(handler, trace) + putRecordToObserverStream(ProcessStatus.STARTED, handler.task.getName(), data) } /** @@ -157,7 +138,9 @@ class PollyObserver implements TraceObserver { log.info handler.toString() log.info trace.toString() trace.getProperty("native_id") - putRecordToObserverStream(ProcessStatus.COMPLETED, handler.task.getName()) + Map data = getDataFromHandlerAndTrace(handler, trace) + log.info data.toMapString() + putRecordToObserverStream(ProcessStatus.COMPLETED, handler.task.getName(), data) } /** @@ -175,7 +158,9 @@ class PollyObserver implements TraceObserver { log.info "------Process Cached----------" log.info handler.toString() log.info trace.toString() - putRecordToObserverStream(ProcessStatus.CACHED, handler.task.getName()) + Map data = getDataFromHandlerAndTrace(handler, trace) + log.info data.toMapString() + putRecordToObserverStream(ProcessStatus.CACHED, handler.task.getName(), data) trace.getMachineInfo() } @@ -193,20 +178,16 @@ class PollyObserver implements TraceObserver { data['inputs'] = input_map data['input_files_path'] = handler.task.getInputFilesMap() data['machine_config'] = [ - 'cpu': handler.task.getConfig().getCpus(), - 'container': handler.task.getConfig().getContainer(), - ] - data['trace'] = [ 'native_id': trace.getProperty('native_id'), - 'native_id_2': trace.get('native_id'), - 'cpu': trace.get('cpus'), - 'cpu2': trace.getProperty('cpus') + 'cpus': trace.getProperty('cpus'), + 'memory': trace.getProperty('memory'), + 'disk': trace.getProperty('disk') ] return data } - void putRecordToObserverStream(String status, String processName){ + void putRecordToObserverStream(String status, String processName, Map data){ String streamName = this.config.getGraphObserverStreamName() log.info "Stream Name: " + streamName @@ -223,7 +204,7 @@ class PollyObserver implements TraceObserver { String partitionKey = status.toString() try { - Map map = [job_id: jobId, status: status, process_name: processName] + Map map = [job_id: jobId, status: status, process_name: processName, task_detail: data] byte[] json = JsonOutput.toJson(map).getBytes() KinesisClient client = KinesisClient.builder().build() PutRecordRequest putRequest = PutRecordRequest.builder() From e41672cb4e078dcf38030624a02f866d47521e77 Mon Sep 17 00:00:00 2001 From: aditya-el Date: Fri, 25 Oct 2024 12:25:42 +0530 Subject: [PATCH 12/22] added null checks --- .../src/main/nextflow/polly/PollyObserver.groovy | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy index c69cb01..7953f00 100644 --- a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy @@ -176,12 +176,11 @@ class PollyObserver implements TraceObserver { data['task_hash'] = handler.task.getHash().toString() data['inputs'] = input_map - data['input_files_path'] = handler.task.getInputFilesMap() data['machine_config'] = [ - 'native_id': trace.getProperty('native_id'), - 'cpus': trace.getProperty('cpus'), - 'memory': trace.getProperty('memory'), - 'disk': trace.getProperty('disk') + 'native_id': trace.getProperty('native_id') ?: 'null', + 'cpus': trace.getProperty('cpus') ?: 'null', + 'memory': trace.getProperty('memory') ?: 'null', + 'disk': trace.getProperty('disk') ?: 'null' ] return data } From 70d1d9a61dbae260588de46643e49ee9a0e86b8f Mon Sep 17 00:00:00 2001 From: aditya-el Date: Wed, 30 Oct 2024 17:20:38 +0530 Subject: [PATCH 13/22] key changes for observer events --- plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy index 7953f00..5cf7ed6 100644 --- a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy @@ -174,8 +174,7 @@ class PollyObserver implements TraceObserver { input_map[param.getName()] = input.value.getClass().getName() } - data['task_hash'] = handler.task.getHash().toString() - data['inputs'] = input_map + data['process_hash'] = handler.task.getHash().toString() data['machine_config'] = [ 'native_id': trace.getProperty('native_id') ?: 'null', 'cpus': trace.getProperty('cpus') ?: 'null', From 45ebe3575dbbee1af995fcda76734c380f00747b Mon Sep 17 00:00:00 2001 From: aditya-el Date: Fri, 15 Nov 2024 03:10:59 +0530 Subject: [PATCH 14/22] removed logs from the plugin logic --- .../main/nextflow/polly/PollyObserver.groovy | 23 ------------------- 1 file changed, 23 deletions(-) diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy index 5cf7ed6..6e794dd 100644 --- a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy @@ -59,13 +59,9 @@ class PollyObserver implements TraceObserver { @Override void onFlowCreate(Session session) { - log.info "-------Pipeline is starting-----------" this.session = session this.config = new PollyConfig(session.config.navigate('polly') as Map) this.env = session.config.navigate('env') as Map - log.info this.config.toString() - log.info this.env.toString() - log.info "_______________________________________" } @Override @@ -83,11 +79,7 @@ class PollyObserver implements TraceObserver { */ @Override void onProcessPending(TaskHandler handler, TraceRecord trace){ - log.info "------Process Pending----------" - log.info handler.toString() - log.info trace.toString() Map data = getDataFromHandlerAndTrace(handler, trace) - log.info data.toMapString() putRecordToObserverStream(ProcessStatus.PENDING, handler.task.getName(), data) } @@ -101,9 +93,7 @@ class PollyObserver implements TraceObserver { */ @Override void onProcessSubmit(TaskHandler handler, TraceRecord trace){ - log.info "------Process Submitted----------" Map data = getDataFromHandlerAndTrace(handler, trace) - log.info data.toMapString() putRecordToObserverStream(ProcessStatus.SUBMITTED, handler.task.getName(), data) } @@ -117,9 +107,6 @@ class PollyObserver implements TraceObserver { */ @Override void onProcessStart(TaskHandler handler, TraceRecord trace){ - log.info "------Process Started----------" - log.info handler.toString() - log.info trace.toString() Map data = getDataFromHandlerAndTrace(handler, trace) putRecordToObserverStream(ProcessStatus.STARTED, handler.task.getName(), data) } @@ -134,12 +121,7 @@ class PollyObserver implements TraceObserver { */ @Override void onProcessComplete(TaskHandler handler, TraceRecord trace){ - log.info "------Process Completed----------" - log.info handler.toString() - log.info trace.toString() - trace.getProperty("native_id") Map data = getDataFromHandlerAndTrace(handler, trace) - log.info data.toMapString() putRecordToObserverStream(ProcessStatus.COMPLETED, handler.task.getName(), data) } @@ -155,13 +137,8 @@ class PollyObserver implements TraceObserver { */ @Override void onProcessCached(TaskHandler handler, TraceRecord trace){ - log.info "------Process Cached----------" - log.info handler.toString() - log.info trace.toString() Map data = getDataFromHandlerAndTrace(handler, trace) - log.info data.toMapString() putRecordToObserverStream(ProcessStatus.CACHED, handler.task.getName(), data) - trace.getMachineInfo() } From 1d3ff19e6b9c2c5c6ad6bb33eb71de361a20d66d Mon Sep 17 00:00:00 2001 From: aditya-el Date: Tue, 10 Dec 2024 15:33:05 +0530 Subject: [PATCH 15/22] added log for jobs --- plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy index 6e794dd..a32488f 100644 --- a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy @@ -79,6 +79,7 @@ class PollyObserver implements TraceObserver { */ @Override void onProcessPending(TaskHandler handler, TraceRecord trace){ + log.info "Process Pending: " + handler.task.getName() Map data = getDataFromHandlerAndTrace(handler, trace) putRecordToObserverStream(ProcessStatus.PENDING, handler.task.getName(), data) } @@ -94,6 +95,7 @@ class PollyObserver implements TraceObserver { @Override void onProcessSubmit(TaskHandler handler, TraceRecord trace){ Map data = getDataFromHandlerAndTrace(handler, trace) + log.info "Submitting the process: " + handler.task.getName() + " hash: " + handler.task.getHash().toString() putRecordToObserverStream(ProcessStatus.SUBMITTED, handler.task.getName(), data) } @@ -107,6 +109,7 @@ class PollyObserver implements TraceObserver { */ @Override void onProcessStart(TaskHandler handler, TraceRecord trace){ + log.info "Process Started: " + handler.task.getName() Map data = getDataFromHandlerAndTrace(handler, trace) putRecordToObserverStream(ProcessStatus.STARTED, handler.task.getName(), data) } @@ -121,6 +124,7 @@ class PollyObserver implements TraceObserver { */ @Override void onProcessComplete(TaskHandler handler, TraceRecord trace){ + log.info "Process Completed: " + handler.task.getName() Map data = getDataFromHandlerAndTrace(handler, trace) putRecordToObserverStream(ProcessStatus.COMPLETED, handler.task.getName(), data) } From e1eb6dd4107af8744e2f2d95379fd86e570f75b2 Mon Sep 17 00:00:00 2001 From: aditya-el Date: Wed, 18 Dec 2024 14:05:12 +0530 Subject: [PATCH 16/22] additional check for streamname --- plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy index a32488f..043e097 100644 --- a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy @@ -170,6 +170,10 @@ class PollyObserver implements TraceObserver { String streamName = this.config.getGraphObserverStreamName() log.info "Stream Name: " + streamName + if (streamName == "NA"){ + streamName = this.env.get("GRAPH_OBSERVER_STREAM_NAME") ?: "NA" + } + if (streamName == "NA") { logger.error("No stream set for process to send metrics to. Unable to report metric.") return From b7bfe217012ed564b2e376d768938320f09ed308 Mon Sep 17 00:00:00 2001 From: aditya-el Date: Thu, 26 Dec 2024 12:39:10 +0530 Subject: [PATCH 17/22] added infra flag in the event --- plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy | 3 +++ 1 file changed, 3 insertions(+) diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy index 043e097..e57da37 100644 --- a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy @@ -155,6 +155,9 @@ class PollyObserver implements TraceObserver { input_map[param.getName()] = input.value.getClass().getName() } + String infra = this.env.get("INFRA") ?: "NA" + + data['infra'] = infra data['process_hash'] = handler.task.getHash().toString() data['machine_config'] = [ 'native_id': trace.getProperty('native_id') ?: 'null', From 29692f4e1740fa77aee4319c5ce73a248a9bbedd Mon Sep 17 00:00:00 2001 From: aditya-el Date: Thu, 26 Dec 2024 14:17:14 +0530 Subject: [PATCH 18/22] revert infra key and add it on root --- .../nf-polly/src/main/nextflow/polly/PollyObserver.groovy | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy index e57da37..0820847 100644 --- a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy @@ -155,9 +155,6 @@ class PollyObserver implements TraceObserver { input_map[param.getName()] = input.value.getClass().getName() } - String infra = this.env.get("INFRA") ?: "NA" - - data['infra'] = infra data['process_hash'] = handler.task.getHash().toString() data['machine_config'] = [ 'native_id': trace.getProperty('native_id') ?: 'null', @@ -188,9 +185,11 @@ class PollyObserver implements TraceObserver { return } + String infraName = this.env.get("INFRA") ?: "NA" + String partitionKey = status.toString() try { - Map map = [job_id: jobId, status: status, process_name: processName, task_detail: data] + Map map = [job_id: jobId, status: status, process_name: processName, task_detail: data, infra: infraName] byte[] json = JsonOutput.toJson(map).getBytes() KinesisClient client = KinesisClient.builder().build() PutRecordRequest putRequest = PutRecordRequest.builder() From e443b0c90ef8bb56ccb168a486edd48ae28faa05 Mon Sep 17 00:00:00 2001 From: aditya-el Date: Thu, 26 Dec 2024 14:40:43 +0530 Subject: [PATCH 19/22] change key --- .../nf-polly/src/main/nextflow/polly/PollyObserver.groovy | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy index 0820847..68c41b6 100644 --- a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy @@ -185,11 +185,11 @@ class PollyObserver implements TraceObserver { return } - String infraName = this.env.get("INFRA") ?: "NA" - + String infraName = this.env.get("INFRA_TYPE") ?: "NA" + log.info "INFRA_TYPE: " + infraName String partitionKey = status.toString() try { - Map map = [job_id: jobId, status: status, process_name: processName, task_detail: data, infra: infraName] + Map map = [job_id: jobId, status: status, process_name: processName, task_detail: data, infra_type: infraName] byte[] json = JsonOutput.toJson(map).getBytes() KinesisClient client = KinesisClient.builder().build() PutRecordRequest putRequest = PutRecordRequest.builder() From 92df8ad1d0298758ae4701ff855b2d3b71ec7fbe Mon Sep 17 00:00:00 2001 From: aditya-el Date: Thu, 26 Dec 2024 15:06:14 +0530 Subject: [PATCH 20/22] print env --- plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy | 2 ++ 1 file changed, 2 insertions(+) diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy index 68c41b6..b1d26eb 100644 --- a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy @@ -185,6 +185,8 @@ class PollyObserver implements TraceObserver { return } + log.info this.env.toMapString() + String infraName = this.env.get("INFRA_TYPE") ?: "NA" log.info "INFRA_TYPE: " + infraName String partitionKey = status.toString() From 9997088f8e2ddb25d80912bf0df57391ce546f78 Mon Sep 17 00:00:00 2001 From: aditya-el Date: Mon, 6 Jan 2025 14:19:19 +0530 Subject: [PATCH 21/22] change debug statements for completed process --- .../nf-polly/src/main/nextflow/polly/PollyObserver.groovy | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy index b1d26eb..4c0950d 100644 --- a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy @@ -109,7 +109,6 @@ class PollyObserver implements TraceObserver { */ @Override void onProcessStart(TaskHandler handler, TraceRecord trace){ - log.info "Process Started: " + handler.task.getName() Map data = getDataFromHandlerAndTrace(handler, trace) putRecordToObserverStream(ProcessStatus.STARTED, handler.task.getName(), data) } @@ -126,6 +125,7 @@ class PollyObserver implements TraceObserver { void onProcessComplete(TaskHandler handler, TraceRecord trace){ log.info "Process Completed: " + handler.task.getName() Map data = getDataFromHandlerAndTrace(handler, trace) + log.info "____PROCESS_COMPLETE____" + data.toMapString() putRecordToObserverStream(ProcessStatus.COMPLETED, handler.task.getName(), data) } @@ -168,7 +168,6 @@ class PollyObserver implements TraceObserver { void putRecordToObserverStream(String status, String processName, Map data){ String streamName = this.config.getGraphObserverStreamName() - log.info "Stream Name: " + streamName if (streamName == "NA"){ streamName = this.env.get("GRAPH_OBSERVER_STREAM_NAME") ?: "NA" @@ -185,10 +184,7 @@ class PollyObserver implements TraceObserver { return } - log.info this.env.toMapString() - String infraName = this.env.get("INFRA_TYPE") ?: "NA" - log.info "INFRA_TYPE: " + infraName String partitionKey = status.toString() try { Map map = [job_id: jobId, status: status, process_name: processName, task_detail: data, infra_type: infraName] From a04d5791ec624d41c64fcd3175e753b14f2980da Mon Sep 17 00:00:00 2001 From: aditya-el Date: Thu, 9 Jan 2025 12:24:38 +0530 Subject: [PATCH 22/22] added timestamp in the event --- plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy index 4c0950d..768c841 100644 --- a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy @@ -168,6 +168,7 @@ class PollyObserver implements TraceObserver { void putRecordToObserverStream(String status, String processName, Map data){ String streamName = this.config.getGraphObserverStreamName() + long unixTimestampMs = System.currentTimeMillis() if (streamName == "NA"){ streamName = this.env.get("GRAPH_OBSERVER_STREAM_NAME") ?: "NA" @@ -187,7 +188,7 @@ class PollyObserver implements TraceObserver { String infraName = this.env.get("INFRA_TYPE") ?: "NA" String partitionKey = status.toString() try { - Map map = [job_id: jobId, status: status, process_name: processName, task_detail: data, infra_type: infraName] + Map map = [job_id: jobId, status: status, process_name: processName, task_detail: data, infra_type: infraName, timestamp: unixTimestampMs] byte[] json = JsonOutput.toJson(map).getBytes() KinesisClient client = KinesisClient.builder().build() PutRecordRequest putRequest = PutRecordRequest.builder()