Skip to content

Commit

Permalink
Record the time between a task being triggered and it being successfu…
Browse files Browse the repository at this point in the history
…l run (#217)

* Example recording time from trigger timestamp to grab before successful process

* Clean up example

* Clean up example

* Checkstyle

* Checkstyle

* Simplify flow

* Add changelog / bump version

* Fix timestamp uni

* Fix triggeredAt name

* Default to 0 if triggeredAt not set
  • Loading branch information
aaronrosser authored Dec 18, 2024
1 parent 37a1559 commit 1795a34
Show file tree
Hide file tree
Showing 13 changed files with 43 additions and 19 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres
to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## 1.48.0 - 2024/12/13

### Added

- Added timer `twTasks.tasks.taskGrabbingTime` tracking the time between a task being triggered and a task being grabbed for processing

## 1.47.0 - 2024/12/13

### Added
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import org.eclipse.jgit.api.errors.RefAlreadyExistsException
import com.github.spotbugs.snom.Confidence
import com.github.spotbugs.snom.Effort
import org.eclipse.jgit.api.errors.RefAlreadyExistsException

buildscript {
if (!project.hasProperty("springBootVersion")) {
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version=1.47.0
version=1.48.0
org.gradle.internal.http.socketTimeout=120000
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootContextLoader;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.transferwise.tasks;

import com.transferwise.tasks.ITasksService.AddTaskResponse.Result;
import com.transferwise.tasks.domain.TaskContext;
import com.transferwise.tasks.domain.TaskStatus;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.time.Duration;
import java.time.ZonedDateTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.time.ZonedDateTime;
import java.util.Objects;
import java.util.UUID;
import lombok.Data;
import lombok.experimental.Accessors;
Expand All @@ -24,7 +23,7 @@ public class FullTaskRecord implements ITask {
private ZonedDateTime nextEventTime;
private String processingClientId;
private TaskContext taskContext;

@Override
public ITaskVersionId getVersionId() {
return new TaskVersionId(id, version);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.transferwise.tasks.domain;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.Objects;
import java.util.UUID;
import lombok.Data;
import lombok.experimental.Accessors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -39,6 +40,7 @@ public class CoreMetricsTemplate implements ICoreMetricsTemplate {
public static final String METRIC_TASKS_FAILED_STATUS_CHANGE_COUNT = METRIC_PREFIX + "tasks.failedStatusChangeCount";
public static final String METRIC_TASKS_DEBUG_PRIORITY_QUEUE_CHECK = METRIC_PREFIX + "tasks.debug.priorityQueueCheck";
public static final String METRIC_TASKS_TASK_GRABBING = METRIC_PREFIX + "tasks.taskGrabbing";
public static final String METRIC_TASKS_TASK_GRABBING_TIME = METRIC_PREFIX + "tasks.taskGrabbingTime";
public static final String METRIC_TASKS_DEBUG_ROOM_MAP_ALREADY_HAS_TYPE = METRIC_PREFIX + "tasks.debug.roomMapAlreadyHasType";
public static final String METRIC_TASKS_DEBUG_TASK_TRIGGERING_QUEUE_EMPTY = METRIC_PREFIX + "tasks.debug.taskTriggeringQueueEmpty";
public static final String METRIC_TASKS_DUPLICATES_COUNT = METRIC_PREFIX + "tasks.duplicatesCount";
Expand Down Expand Up @@ -179,11 +181,23 @@ public void registerFailedNextEventTimeChange(String taskType, ZonedDateTime fro
}

@Override
public void registerTaskGrabbingResponse(String bucketId, String taskType, int priority, ProcessTaskResponse processTaskResponse) {
meterCache.counter(METRIC_TASKS_TASK_GRABBING, TagsSet.of(TAG_TASK_TYPE, taskType,
TAG_BUCKET_ID, bucketId, TAG_PRIORITY, String.valueOf(priority), TAG_GRABBING_RESPONSE, processTaskResponse.getResult().name(),
TAG_GRABBING_CODE, processTaskResponse.getCode() == null ? "UNKNOWN" : processTaskResponse.getCode().name()))
.increment();
public void registerTaskGrabbingResponse(String bucketId, String taskType, int priority, ProcessTaskResponse processTaskResponse,
Instant taskTriggeredAt) {

TagsSet tags = TagsSet.of(
TAG_TASK_TYPE, taskType,
TAG_BUCKET_ID, bucketId,
TAG_PRIORITY, String.valueOf(priority),
TAG_GRABBING_RESPONSE, processTaskResponse.getResult().name(),
TAG_GRABBING_CODE, processTaskResponse.getCode() == null ? "UNKNOWN" : processTaskResponse.getCode().name()
);

meterCache.counter(METRIC_TASKS_TASK_GRABBING, tags).increment();

long millisSinceTaskTriggered = taskTriggeredAt != null
? System.currentTimeMillis() - taskTriggeredAt.toEpochMilli()
: 0;
meterCache.timer(METRIC_TASKS_TASK_GRABBING_TIME, tags).record(millisSinceTaskTriggered, TimeUnit.MILLISECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.transferwise.tasks.domain.TaskStatus;
import com.transferwise.tasks.handler.interfaces.StuckDetectionSource;
import com.transferwise.tasks.processing.TasksProcessingService.ProcessTaskResponse;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -49,7 +50,7 @@ public interface ICoreMetricsTemplate {

void registerFailedNextEventTimeChange(String taskType, ZonedDateTime fromNextEventTime, ZonedDateTime toNextEventTime);

void registerTaskGrabbingResponse(String bucketId, String type, int priority, ProcessTaskResponse processTaskResponse);
void registerTaskGrabbingResponse(String bucketId, String type, int priority, ProcessTaskResponse processTaskResponse, Instant taskTriggeredAt);

void debugPriorityQueueCheck(String bucketId, int priority);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,7 @@ protected ProcessTasksResponse processTasks(GlobalProcessingState.Bucket bucket)
mdcService.put(task);
try {
ProcessTaskResponse processTaskResponse = grabTaskForProcessing(bucketId, task);

coreMetricsTemplate.registerTaskGrabbingResponse(bucketId, type, priority, processTaskResponse);
coreMetricsTemplate.registerTaskGrabbingResponse(bucketId, type, priority, processTaskResponse, taskTriggering.getTriggeredAt());

if (processTaskResponse.getResult() == ProcessTaskResponse.Result.NO_SPACE) {
noRoomMap.put(type, Boolean.TRUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.transferwise.tasks.utils.WaitUtils;
import com.vdurmont.semver4j.Semver;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand Down Expand Up @@ -180,7 +181,10 @@ public void trigger(BaseTask task) {
}

if (BooleanUtils.isTrue(bucketsManager.getBucketProperties(processingBucketId).getTriggerInSameProcess())) {
TaskTriggering taskTriggering = new TaskTriggering().setTask(task).setBucketId(processingBucketId);
TaskTriggering taskTriggering = new TaskTriggering()
.setTask(task)
.setBucketId(processingBucketId)
.setTriggeredAt(Instant.now());
ITasksProcessingService.AddTaskForProcessingResponse addTaskForProcessingResponse = tasksProcessingService.addTaskForProcessing(taskTriggering);

if (addTaskForProcessingResponse.getResult() == ITasksProcessingService.AddTaskForProcessingResponse.ResultCode.OK) {
Expand Down Expand Up @@ -284,7 +288,11 @@ public void poll(String bucketId) {
mdcService.with(() -> {
mdcService.put(task);

TaskTriggering taskTriggering = new TaskTriggering().setTask(task).setBucketId(bucketId).setOffset(offset)
TaskTriggering taskTriggering = new TaskTriggering()
.setTask(task)
.setBucketId(bucketId)
.setOffset(offset)
.setTriggeredAt(Instant.ofEpochMilli(consumerRecord.timestamp()))
.setTopicPartition(topicPartition);

coreMetricsTemplate.registerKafkaTasksExecutionTriggererTriggersReceive(bucketId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.TopicPartition;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.transferwise.tasks.triggering;

import com.transferwise.tasks.domain.BaseTask;
import java.time.Instant;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -11,6 +12,7 @@ public class TaskTriggering {

private BaseTask task;
private long offset;
private Instant triggeredAt;
private long sequence;
private TopicPartition topicPartition;
private String bucketId;
Expand Down

0 comments on commit 1795a34

Please sign in to comment.