Skip to content

Commit

Permalink
added observer for nf processes, to emit status events
Browse files Browse the repository at this point in the history
  • Loading branch information
aditya-el committed Oct 4, 2024
1 parent 9b1eda7 commit b921c7b
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 0 deletions.
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

config ?= compileClasspath
version ?= $(shell grep 'Plugin-Version' plugins/nf-polly/src/resources/META-INF/MANIFEST.MF | awk '{ print $$2 }')

ifdef module
mm = :${module}:
Expand Down Expand Up @@ -48,6 +49,12 @@ else
./gradlew ${mm}test --tests ${class}
endif

install:
./gradlew copyPluginZip
rm -rf ${HOME}/.nextflow/plugins/nf-polly-${version}
cp -r build/plugins/nf-polly-${version} ${HOME}/.nextflow/plugins/


assemble:
./gradlew assemble

Expand Down
6 changes: 6 additions & 0 deletions plugins/nf-polly/src/main/nextflow/polly/PollyConfig.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@ import groovy.transform.PackageScope
class PollyConfig {

final private String metricsStreamName
final private String graphObserverStreamName
final private String jobId

PollyConfig(Map map) {
def config = map ?: Collections.emptyMap()
metricsStreamName = config.metricsStreamName ?: "NA"
graphObserverStreamName = config.graphObserverStreamName ?: "NA"
jobId = config.jobId ?: "NA"
}

String getMetricsStreamName() { metricsStreamName }

String getGraphObserverStreamName() { graphObserverStreamName }
}
37 changes: 37 additions & 0 deletions plugins/nf-polly/src/main/nextflow/polly/PollyFactory.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2021, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.polly

import groovy.transform.CompileStatic
import nextflow.Session
import nextflow.trace.TraceObserver
import nextflow.trace.TraceObserverFactory
/**
* Implements the validation observer factory
*
* @author Paolo Di Tommaso <[email protected]>
*/
@CompileStatic
class PollyFactory implements TraceObserverFactory {

@Override
Collection<TraceObserver> create(Session session) {
final result = new ArrayList()
result.add( new PollyObserver() )
return result
}
}
211 changes: 211 additions & 0 deletions plugins/nf-polly/src/main/nextflow/polly/PollyObserver.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@

package nextflow.polly

import groovy.json.JsonOutput
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.Session
import nextflow.processor.TaskHandler
import nextflow.processor.TaskProcessor
import nextflow.trace.TraceObserver
import nextflow.trace.TraceRecord
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import software.amazon.awssdk.core.SdkBytes
import software.amazon.awssdk.services.kinesis.KinesisClient
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse


class ProcessStatus {
public static String CREATED = "created"
public static String SUBMITTED = "submitted"
public static String PENDING = "pending"
public static String STARTED = "started"
public static String CACHED = "cached"
public static String TERMINATED = "terminated"
public static String COMPLETED = "completed"
}


@Slf4j
@CompileStatic
class PollyObserver implements TraceObserver {

static final Logger logger = LoggerFactory.getLogger(PollyExtension.class)
private Session session

/*
* A Custom config extracted from nextflow.config under polly tag
* nextflow.config
* ---------------
* docker {
* enabled = true
* }
* ...
* polly {
* metricsStreamName = "my-kinesis-stream"
* }
*/
private PollyConfig config

/**
* A map of 'env' variables set in the Nextflow config file
*/
private Map env


@Override
void onFlowCreate(Session session) {
log.info "-------Pipeline is starting! 🚀-----------"
this.session = session
this.config = new PollyConfig(session.config.navigate('polly') as Map)
this.env = session.config.navigate('env') as Map
}

@Override
void onFlowComplete() {
log.info "----------Pipeline complete! 👋-------------"
}

/*
* Invoked when the process is created.
*/
@Override
void onProcessCreate(TaskProcessor process ){
log.info "-------------------Process Created! 👋-------------------"
log.info process.getName()
putRecordToObserverStream(ProcessStatus.CREATED, process.name)
}

/*
* Invoked when all task have been executed and process ends.
*/
@Override
void onProcessTerminate( TaskProcessor process ){
log.info "-------------------Process Terminated! 👋-------------------"
log.info process.toString()
putRecordToObserverStream(ProcessStatus.TERMINATED, process.name)
}

/**
* This method when a new task is created and submitted in the nextflow
* internal queue of pending task to be scheduled to the underlying
* execution backend
*
* @param handler
* @param trace
*/
@Override
void onProcessPending(TaskHandler handler, TraceRecord trace){
log.info "------Process Pending! 👋----------"
log.info handler.toString()
log.info trace.toString()
putRecordToObserverStream(ProcessStatus.PENDING, handler.task.getName())
}

/**
* This method is invoked before a process run is going to be submitted
*
* @param handler
* The {@link TaskHandler} instance for the current task.
* @param trace
* The associated {@link TraceRecord} for the current task.
*/
@Override
void onProcessSubmit(TaskHandler handler, TraceRecord trace){
log.info "------Process Submitted! 👋----------"
log.info handler.toString()
log.info trace.toString()
putRecordToObserverStream(ProcessStatus.SUBMITTED, handler.task.getName())
}

/**
* This method is invoked when a process run is going to start
*
* @param handler
* The {@link TaskHandler} instance for the current task.
* @param trace
* The associated {@link TraceRecord} for the current task.
*/
@Override
void onProcessStart(TaskHandler handler, TraceRecord trace){
log.info "------Process Started! 👋----------"
log.info handler.toString()
log.info trace.toString()
putRecordToObserverStream(ProcessStatus.STARTED, handler.task.getName())
}

/**
* This method is invoked when a process run completes
*
* @param handler
* The {@link TaskHandler} instance for the current task.
* @param trace
* The associated {@link TraceRecord} for the current task.
*/
@Override
void onProcessComplete(TaskHandler handler, TraceRecord trace){
log.info "------Process Completed! 👋----------"
log.info handler.toString()
log.info trace.toString()
putRecordToObserverStream(ProcessStatus.COMPLETED, handler.task.getName())
}

/**
* method invoked when a task execution is skipped because the result is cached (already computed)
* or stored (due to the usage of `storeDir` directive)
*
* @param handler
* The {@link TaskHandler} instance for the current task
* @param trace
* The trace record for the cached trace. When this event is invoked for a store task
* the {@code trace} record is expected to be {@code null}
*/
@Override
void onProcessCached(TaskHandler handler, TraceRecord trace){
log.info "------Process Cached! 👋----------"
log.info handler.toString()
log.info trace.toString()
putRecordToObserverStream(ProcessStatus.CACHED, handler.task.getName())

}

void putRecordToObserverStream(String status, String processName){
String streamName = this.config.getGraphObserverStreamName()

if (streamName == "NA") {
logger.error("No stream set for process to send metrics to. Unable to report metric.")
return
}

String jobId = this.env.get("JOB_ID") ?: "NA"
if (jobId == "NA") {
logger.error("No JOB_ID set for process. Unable to report metric.")
return
}

String partitionKey = status.toString()
try {
Map map = [job_id: jobId, status: status, process_name: processName]
byte[] json = JsonOutput.toJson(map).getBytes()
KinesisClient client = KinesisClient.builder().build()
PutRecordRequest putRequest = PutRecordRequest.builder()
.partitionKey(partitionKey)
.streamName(streamName)
.data(SdkBytes.fromByteArray(json))
.build() as PutRecordRequest
PutRecordResponse response = client.putRecord(putRequest)
logger.info(
String.format(
"Submitted record %s to stream shard %s",
response.sequenceNumber(),
response.shardId()
)
)
} catch (Exception e) {
logger.error("Failed to produce: " + e.getMessage())
}
}

}
1 change: 1 addition & 0 deletions plugins/nf-polly/src/resources/META-INF/extensions.idx
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
nextflow.polly.PollyFactory
nextflow.polly.PollyExtension

0 comments on commit b921c7b

Please sign in to comment.