diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ccb4318..e390db63 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## 1.48.0 - 2024/12/13 + +### Added + +- Added timer `twTasks.tasks.taskGrabbingTime` tracking the time between a task being triggered and a task being grabbed for processing + ## 1.47.0 - 2024/12/13 ### Added diff --git a/build.gradle b/build.gradle index 4a62a9b3..32ad5650 100644 --- a/build.gradle +++ b/build.gradle @@ -1,6 +1,6 @@ -import org.eclipse.jgit.api.errors.RefAlreadyExistsException import com.github.spotbugs.snom.Confidence import com.github.spotbugs.snom.Effort +import org.eclipse.jgit.api.errors.RefAlreadyExistsException buildscript { if (!project.hasProperty("springBootVersion")) { diff --git a/gradle.properties b/gradle.properties index 77bef476..e0d008c5 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ -version=1.47.0 +version=1.48.0 org.gradle.internal.http.socketTimeout=120000 diff --git a/integration-tests/src/test/java/com/transferwise/tasks/BaseIntTest.java b/integration-tests/src/test/java/com/transferwise/tasks/BaseIntTest.java index 92775cdb..fa7c12d6 100644 --- a/integration-tests/src/test/java/com/transferwise/tasks/BaseIntTest.java +++ b/integration-tests/src/test/java/com/transferwise/tasks/BaseIntTest.java @@ -17,7 +17,6 @@ import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootContextLoader; diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java index 0515004c..065b618c 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java @@ -1,8 +1,6 @@ package com.transferwise.tasks; -import com.transferwise.tasks.ITasksService.AddTaskResponse.Result; import com.transferwise.tasks.domain.TaskContext; -import com.transferwise.tasks.domain.TaskStatus; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.time.Duration; import java.time.ZonedDateTime; diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/FullTaskRecord.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/FullTaskRecord.java index cc286d1d..64146ce8 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/FullTaskRecord.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/FullTaskRecord.java @@ -2,7 +2,6 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.time.ZonedDateTime; -import java.util.Objects; import java.util.UUID; import lombok.Data; import lombok.experimental.Accessors; @@ -24,7 +23,7 @@ public class FullTaskRecord implements ITask { private ZonedDateTime nextEventTime; private String processingClientId; private TaskContext taskContext; - + @Override public ITaskVersionId getVersionId() { return new TaskVersionId(id, version); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/Task.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/Task.java index 03f95d96..dda85807 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/Task.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/Task.java @@ -1,7 +1,6 @@ package com.transferwise.tasks.domain; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.Objects; import java.util.UUID; import lombok.Data; import lombok.experimental.Accessors; diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java index 70dcae0c..ee96331d 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java @@ -11,6 +11,7 @@ import io.micrometer.core.instrument.Meter; import io.micrometer.core.instrument.Tags; import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics; +import java.time.Instant; import java.time.ZonedDateTime; import java.util.List; import java.util.Map; @@ -39,6 +40,7 @@ public class CoreMetricsTemplate implements ICoreMetricsTemplate { public static final String METRIC_TASKS_FAILED_STATUS_CHANGE_COUNT = METRIC_PREFIX + "tasks.failedStatusChangeCount"; public static final String METRIC_TASKS_DEBUG_PRIORITY_QUEUE_CHECK = METRIC_PREFIX + "tasks.debug.priorityQueueCheck"; public static final String METRIC_TASKS_TASK_GRABBING = METRIC_PREFIX + "tasks.taskGrabbing"; + public static final String METRIC_TASKS_TASK_GRABBING_TIME = METRIC_PREFIX + "tasks.taskGrabbingTime"; public static final String METRIC_TASKS_DEBUG_ROOM_MAP_ALREADY_HAS_TYPE = METRIC_PREFIX + "tasks.debug.roomMapAlreadyHasType"; public static final String METRIC_TASKS_DEBUG_TASK_TRIGGERING_QUEUE_EMPTY = METRIC_PREFIX + "tasks.debug.taskTriggeringQueueEmpty"; public static final String METRIC_TASKS_DUPLICATES_COUNT = METRIC_PREFIX + "tasks.duplicatesCount"; @@ -179,11 +181,23 @@ public void registerFailedNextEventTimeChange(String taskType, ZonedDateTime fro } @Override - public void registerTaskGrabbingResponse(String bucketId, String taskType, int priority, ProcessTaskResponse processTaskResponse) { - meterCache.counter(METRIC_TASKS_TASK_GRABBING, TagsSet.of(TAG_TASK_TYPE, taskType, - TAG_BUCKET_ID, bucketId, TAG_PRIORITY, String.valueOf(priority), TAG_GRABBING_RESPONSE, processTaskResponse.getResult().name(), - TAG_GRABBING_CODE, processTaskResponse.getCode() == null ? "UNKNOWN" : processTaskResponse.getCode().name())) - .increment(); + public void registerTaskGrabbingResponse(String bucketId, String taskType, int priority, ProcessTaskResponse processTaskResponse, + Instant taskTriggeredAt) { + + TagsSet tags = TagsSet.of( + TAG_TASK_TYPE, taskType, + TAG_BUCKET_ID, bucketId, + TAG_PRIORITY, String.valueOf(priority), + TAG_GRABBING_RESPONSE, processTaskResponse.getResult().name(), + TAG_GRABBING_CODE, processTaskResponse.getCode() == null ? "UNKNOWN" : processTaskResponse.getCode().name() + ); + + meterCache.counter(METRIC_TASKS_TASK_GRABBING, tags).increment(); + + long millisSinceTaskTriggered = taskTriggeredAt != null + ? System.currentTimeMillis() - taskTriggeredAt.toEpochMilli() + : 0; + meterCache.timer(METRIC_TASKS_TASK_GRABBING_TIME, tags).record(millisSinceTaskTriggered, TimeUnit.MILLISECONDS); } @Override diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java index 249e0574..a11464d3 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java @@ -3,6 +3,7 @@ import com.transferwise.tasks.domain.TaskStatus; import com.transferwise.tasks.handler.interfaces.StuckDetectionSource; import com.transferwise.tasks.processing.TasksProcessingService.ProcessTaskResponse; +import java.time.Instant; import java.time.ZonedDateTime; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -49,7 +50,7 @@ public interface ICoreMetricsTemplate { void registerFailedNextEventTimeChange(String taskType, ZonedDateTime fromNextEventTime, ZonedDateTime toNextEventTime); - void registerTaskGrabbingResponse(String bucketId, String type, int priority, ProcessTaskResponse processTaskResponse); + void registerTaskGrabbingResponse(String bucketId, String type, int priority, ProcessTaskResponse processTaskResponse, Instant taskTriggeredAt); void debugPriorityQueueCheck(String bucketId, int priority); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/TasksProcessingService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/TasksProcessingService.java index 9736ab60..91633ebe 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/TasksProcessingService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/TasksProcessingService.java @@ -213,8 +213,7 @@ protected ProcessTasksResponse processTasks(GlobalProcessingState.Bucket bucket) mdcService.put(task); try { ProcessTaskResponse processTaskResponse = grabTaskForProcessing(bucketId, task); - - coreMetricsTemplate.registerTaskGrabbingResponse(bucketId, type, priority, processTaskResponse); + coreMetricsTemplate.registerTaskGrabbingResponse(bucketId, type, priority, processTaskResponse, taskTriggering.getTriggeredAt()); if (processTaskResponse.getResult() == ProcessTaskResponse.Result.NO_SPACE) { noRoomMap.put(type, Boolean.TRUE); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/triggering/KafkaTasksExecutionTriggerer.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/triggering/KafkaTasksExecutionTriggerer.java index 69760124..cef03a28 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/triggering/KafkaTasksExecutionTriggerer.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/triggering/KafkaTasksExecutionTriggerer.java @@ -30,6 +30,7 @@ import com.transferwise.tasks.utils.WaitUtils; import com.vdurmont.semver4j.Semver; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -180,7 +181,10 @@ public void trigger(BaseTask task) { } if (BooleanUtils.isTrue(bucketsManager.getBucketProperties(processingBucketId).getTriggerInSameProcess())) { - TaskTriggering taskTriggering = new TaskTriggering().setTask(task).setBucketId(processingBucketId); + TaskTriggering taskTriggering = new TaskTriggering() + .setTask(task) + .setBucketId(processingBucketId) + .setTriggeredAt(Instant.now()); ITasksProcessingService.AddTaskForProcessingResponse addTaskForProcessingResponse = tasksProcessingService.addTaskForProcessing(taskTriggering); if (addTaskForProcessingResponse.getResult() == ITasksProcessingService.AddTaskForProcessingResponse.ResultCode.OK) { @@ -284,7 +288,11 @@ public void poll(String bucketId) { mdcService.with(() -> { mdcService.put(task); - TaskTriggering taskTriggering = new TaskTriggering().setTask(task).setBucketId(bucketId).setOffset(offset) + TaskTriggering taskTriggering = new TaskTriggering() + .setTask(task) + .setBucketId(bucketId) + .setOffset(offset) + .setTriggeredAt(Instant.ofEpochMilli(consumerRecord.timestamp())) .setTopicPartition(topicPartition); coreMetricsTemplate.registerKafkaTasksExecutionTriggererTriggersReceive(bucketId); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/triggering/SeekToDurationOnRebalanceListener.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/triggering/SeekToDurationOnRebalanceListener.java index 91e811c3..27d3b6b0 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/triggering/SeekToDurationOnRebalanceListener.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/triggering/SeekToDurationOnRebalanceListener.java @@ -14,7 +14,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.TopicPartition; diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/triggering/TaskTriggering.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/triggering/TaskTriggering.java index 6ebd35e5..8ab12c73 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/triggering/TaskTriggering.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/triggering/TaskTriggering.java @@ -1,6 +1,7 @@ package com.transferwise.tasks.triggering; import com.transferwise.tasks.domain.BaseTask; +import java.time.Instant; import lombok.Data; import lombok.experimental.Accessors; import org.apache.kafka.common.TopicPartition; @@ -11,6 +12,7 @@ public class TaskTriggering { private BaseTask task; private long offset; + private Instant triggeredAt; private long sequence; private TopicPartition topicPartition; private String bucketId;