From b921c7bae6441027287fed9884a84f947658a3d2 Mon Sep 17 00:00:00 2001 From: aditya-el Date: Fri, 4 Oct 2024 11:49:03 +0530 Subject: [PATCH] added observer for nf processes, to emit status events --- Makefile | 7 + .../main/nextflow/polly/PollyConfig.groovy | 6 + .../main/nextflow/polly/PollyFactory.groovy | 37 +++ .../main/nextflow/polly/PollyObserver.groovy | 211 ++++++++++++++++++ .../src/resources/META-INF/extensions.idx | 1 + 5 files changed, 262 insertions(+) create mode 100644 plugins/nf-polly/src/main/nextflow/polly/PollyFactory.groovy create mode 100644 plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy diff --git a/Makefile b/Makefile index ad40b01..111e19f 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,6 @@ config ?= compileClasspath +version ?= $(shell grep 'Plugin-Version' plugins/nf-polly/src/resources/META-INF/MANIFEST.MF | awk '{ print $$2 }') ifdef module mm = :${module}: @@ -48,6 +49,12 @@ else ./gradlew ${mm}test --tests ${class} endif +install: + ./gradlew copyPluginZip + rm -rf ${HOME}/.nextflow/plugins/nf-polly-${version} + cp -r build/plugins/nf-polly-${version} ${HOME}/.nextflow/plugins/ + + assemble: ./gradlew assemble diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyConfig.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyConfig.groovy index 1d07966..9935188 100644 --- a/plugins/nf-polly/src/main/nextflow/polly/PollyConfig.groovy +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyConfig.groovy @@ -13,11 +13,17 @@ import groovy.transform.PackageScope class PollyConfig { final private String metricsStreamName + final private String graphObserverStreamName + final private String jobId PollyConfig(Map map) { def config = map ?: Collections.emptyMap() metricsStreamName = config.metricsStreamName ?: "NA" + graphObserverStreamName = config.graphObserverStreamName ?: "NA" + jobId = config.jobId ?: "NA" } String getMetricsStreamName() { metricsStreamName } + + String getGraphObserverStreamName() { graphObserverStreamName } } diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyFactory.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyFactory.groovy new file mode 100644 index 0000000..aebc851 --- /dev/null +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyFactory.groovy @@ -0,0 +1,37 @@ +/* + * Copyright 2021, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.polly + +import groovy.transform.CompileStatic +import nextflow.Session +import nextflow.trace.TraceObserver +import nextflow.trace.TraceObserverFactory +/** + * Implements the validation observer factory + * + * @author Paolo Di Tommaso + */ +@CompileStatic +class PollyFactory implements TraceObserverFactory { + + @Override + Collection create(Session session) { + final result = new ArrayList() + result.add( new PollyObserver() ) + return result + } +} diff --git a/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy new file mode 100644 index 0000000..07d4581 --- /dev/null +++ b/plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy @@ -0,0 +1,211 @@ + +package nextflow.polly + +import groovy.json.JsonOutput +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.Session +import nextflow.processor.TaskHandler +import nextflow.processor.TaskProcessor +import nextflow.trace.TraceObserver +import nextflow.trace.TraceRecord +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import software.amazon.awssdk.core.SdkBytes +import software.amazon.awssdk.services.kinesis.KinesisClient +import software.amazon.awssdk.services.kinesis.model.PutRecordRequest +import software.amazon.awssdk.services.kinesis.model.PutRecordResponse + + +class ProcessStatus { + public static String CREATED = "created" + public static String SUBMITTED = "submitted" + public static String PENDING = "pending" + public static String STARTED = "started" + public static String CACHED = "cached" + public static String TERMINATED = "terminated" + public static String COMPLETED = "completed" +} + + +@Slf4j +@CompileStatic +class PollyObserver implements TraceObserver { + + static final Logger logger = LoggerFactory.getLogger(PollyExtension.class) + private Session session + + /* + * A Custom config extracted from nextflow.config under polly tag + * nextflow.config + * --------------- + * docker { + * enabled = true + * } + * ... + * polly { + * metricsStreamName = "my-kinesis-stream" + * } + */ + private PollyConfig config + + /** + * A map of 'env' variables set in the Nextflow config file + */ + private Map env + + + @Override + void onFlowCreate(Session session) { + log.info "-------Pipeline is starting! 🚀-----------" + this.session = session + this.config = new PollyConfig(session.config.navigate('polly') as Map) + this.env = session.config.navigate('env') as Map + } + + @Override + void onFlowComplete() { + log.info "----------Pipeline complete! 👋-------------" + } + + /* + * Invoked when the process is created. + */ + @Override + void onProcessCreate(TaskProcessor process ){ + log.info "-------------------Process Created! 👋-------------------" + log.info process.getName() + putRecordToObserverStream(ProcessStatus.CREATED, process.name) + } + + /* + * Invoked when all task have been executed and process ends. + */ + @Override + void onProcessTerminate( TaskProcessor process ){ + log.info "-------------------Process Terminated! 👋-------------------" + log.info process.toString() + putRecordToObserverStream(ProcessStatus.TERMINATED, process.name) + } + + /** + * This method when a new task is created and submitted in the nextflow + * internal queue of pending task to be scheduled to the underlying + * execution backend + * + * @param handler + * @param trace + */ + @Override + void onProcessPending(TaskHandler handler, TraceRecord trace){ + log.info "------Process Pending! 👋----------" + log.info handler.toString() + log.info trace.toString() + putRecordToObserverStream(ProcessStatus.PENDING, handler.task.getName()) + } + + /** + * This method is invoked before a process run is going to be submitted + * + * @param handler + * The {@link TaskHandler} instance for the current task. + * @param trace + * The associated {@link TraceRecord} for the current task. + */ + @Override + void onProcessSubmit(TaskHandler handler, TraceRecord trace){ + log.info "------Process Submitted! 👋----------" + log.info handler.toString() + log.info trace.toString() + putRecordToObserverStream(ProcessStatus.SUBMITTED, handler.task.getName()) + } + + /** + * This method is invoked when a process run is going to start + * + * @param handler + * The {@link TaskHandler} instance for the current task. + * @param trace + * The associated {@link TraceRecord} for the current task. + */ + @Override + void onProcessStart(TaskHandler handler, TraceRecord trace){ + log.info "------Process Started! 👋----------" + log.info handler.toString() + log.info trace.toString() + putRecordToObserverStream(ProcessStatus.STARTED, handler.task.getName()) + } + + /** + * This method is invoked when a process run completes + * + * @param handler + * The {@link TaskHandler} instance for the current task. + * @param trace + * The associated {@link TraceRecord} for the current task. + */ + @Override + void onProcessComplete(TaskHandler handler, TraceRecord trace){ + log.info "------Process Completed! 👋----------" + log.info handler.toString() + log.info trace.toString() + putRecordToObserverStream(ProcessStatus.COMPLETED, handler.task.getName()) + } + + /** + * method invoked when a task execution is skipped because the result is cached (already computed) + * or stored (due to the usage of `storeDir` directive) + * + * @param handler + * The {@link TaskHandler} instance for the current task + * @param trace + * The trace record for the cached trace. When this event is invoked for a store task + * the {@code trace} record is expected to be {@code null} + */ + @Override + void onProcessCached(TaskHandler handler, TraceRecord trace){ + log.info "------Process Cached! 👋----------" + log.info handler.toString() + log.info trace.toString() + putRecordToObserverStream(ProcessStatus.CACHED, handler.task.getName()) + + } + + void putRecordToObserverStream(String status, String processName){ + String streamName = this.config.getGraphObserverStreamName() + + if (streamName == "NA") { + logger.error("No stream set for process to send metrics to. Unable to report metric.") + return + } + + String jobId = this.env.get("JOB_ID") ?: "NA" + if (jobId == "NA") { + logger.error("No JOB_ID set for process. Unable to report metric.") + return + } + + String partitionKey = status.toString() + try { + Map map = [job_id: jobId, status: status, process_name: processName] + byte[] json = JsonOutput.toJson(map).getBytes() + KinesisClient client = KinesisClient.builder().build() + PutRecordRequest putRequest = PutRecordRequest.builder() + .partitionKey(partitionKey) + .streamName(streamName) + .data(SdkBytes.fromByteArray(json)) + .build() as PutRecordRequest + PutRecordResponse response = client.putRecord(putRequest) + logger.info( + String.format( + "Submitted record %s to stream shard %s", + response.sequenceNumber(), + response.shardId() + ) + ) + } catch (Exception e) { + logger.error("Failed to produce: " + e.getMessage()) + } + } + +} diff --git a/plugins/nf-polly/src/resources/META-INF/extensions.idx b/plugins/nf-polly/src/resources/META-INF/extensions.idx index 4ed52e5..85201fe 100644 --- a/plugins/nf-polly/src/resources/META-INF/extensions.idx +++ b/plugins/nf-polly/src/resources/META-INF/extensions.idx @@ -1 +1,2 @@ +nextflow.polly.PollyFactory nextflow.polly.PollyExtension \ No newline at end of file