-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Record the time between a task being triggered and it being successful run #217
Changes from 8 commits
f4d51bd
7f5eeed
814185c
2653fd9
55f849e
bc7d5d5
3bdf72e
e995b03
0aa8dfe
e4c9124
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,2 @@ | ||
version=1.47.0 | ||
version=1.48.0 | ||
org.gradle.internal.http.socketTimeout=120000 |
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -11,6 +11,7 @@ | |||||||
import io.micrometer.core.instrument.Meter; | ||||||||
import io.micrometer.core.instrument.Tags; | ||||||||
import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics; | ||||||||
import java.time.Instant; | ||||||||
import java.time.ZonedDateTime; | ||||||||
import java.util.List; | ||||||||
import java.util.Map; | ||||||||
|
@@ -39,6 +40,7 @@ public class CoreMetricsTemplate implements ICoreMetricsTemplate { | |||||||
public static final String METRIC_TASKS_FAILED_STATUS_CHANGE_COUNT = METRIC_PREFIX + "tasks.failedStatusChangeCount"; | ||||||||
public static final String METRIC_TASKS_DEBUG_PRIORITY_QUEUE_CHECK = METRIC_PREFIX + "tasks.debug.priorityQueueCheck"; | ||||||||
public static final String METRIC_TASKS_TASK_GRABBING = METRIC_PREFIX + "tasks.taskGrabbing"; | ||||||||
public static final String METRIC_TASKS_TASK_GRABBING_TIME = METRIC_PREFIX + "tasks.taskGrabbingTime"; | ||||||||
public static final String METRIC_TASKS_DEBUG_ROOM_MAP_ALREADY_HAS_TYPE = METRIC_PREFIX + "tasks.debug.roomMapAlreadyHasType"; | ||||||||
public static final String METRIC_TASKS_DEBUG_TASK_TRIGGERING_QUEUE_EMPTY = METRIC_PREFIX + "tasks.debug.taskTriggeringQueueEmpty"; | ||||||||
public static final String METRIC_TASKS_DUPLICATES_COUNT = METRIC_PREFIX + "tasks.duplicatesCount"; | ||||||||
|
@@ -179,11 +181,21 @@ public void registerFailedNextEventTimeChange(String taskType, ZonedDateTime fro | |||||||
} | ||||||||
|
||||||||
@Override | ||||||||
public void registerTaskGrabbingResponse(String bucketId, String taskType, int priority, ProcessTaskResponse processTaskResponse) { | ||||||||
meterCache.counter(METRIC_TASKS_TASK_GRABBING, TagsSet.of(TAG_TASK_TYPE, taskType, | ||||||||
TAG_BUCKET_ID, bucketId, TAG_PRIORITY, String.valueOf(priority), TAG_GRABBING_RESPONSE, processTaskResponse.getResult().name(), | ||||||||
TAG_GRABBING_CODE, processTaskResponse.getCode() == null ? "UNKNOWN" : processTaskResponse.getCode().name())) | ||||||||
.increment(); | ||||||||
public void registerTaskGrabbingResponse(String bucketId, String taskType, int priority, ProcessTaskResponse processTaskResponse, | ||||||||
Instant taskTriggeredAt) { | ||||||||
|
||||||||
TagsSet tags = TagsSet.of( | ||||||||
TAG_TASK_TYPE, taskType, | ||||||||
TAG_BUCKET_ID, bucketId, | ||||||||
TAG_PRIORITY, String.valueOf(priority), | ||||||||
TAG_GRABBING_RESPONSE, processTaskResponse.getResult().name(), | ||||||||
TAG_GRABBING_CODE, processTaskResponse.getCode() == null ? "UNKNOWN" : processTaskResponse.getCode().name() | ||||||||
); | ||||||||
|
||||||||
meterCache.counter(METRIC_TASKS_TASK_GRABBING, tags).increment(); | ||||||||
|
||||||||
long millisSinceTaskTriggered = System.currentTimeMillis() - taskTriggeredAt.toEpochMilli(); | ||||||||
meterCache.timer(METRIC_TASKS_TASK_GRABBING_TIME, tags).record(millisSinceTaskTriggered, TimeUnit.MILLISECONDS); | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is one other timer currently exposed by tw tasks Lines 156 to 158 in 3bdf72e
Cannot see it setting histogram buckets via a meter filter or anything so assume leave it to service owners to override |
||||||||
} | ||||||||
|
||||||||
@Override | ||||||||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -213,8 +213,7 @@ protected ProcessTasksResponse processTasks(GlobalProcessingState.Bucket bucket) | |||||||||||||||||||||
mdcService.put(task); | ||||||||||||||||||||||
try { | ||||||||||||||||||||||
ProcessTaskResponse processTaskResponse = grabTaskForProcessing(bucketId, task); | ||||||||||||||||||||||
|
||||||||||||||||||||||
coreMetricsTemplate.registerTaskGrabbingResponse(bucketId, type, priority, processTaskResponse); | ||||||||||||||||||||||
coreMetricsTemplate.registerTaskGrabbingResponse(bucketId, type, priority, processTaskResponse, taskTriggering.getTriggerAt()); | ||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this cause a NPE with old version of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like Kafka one comes from a Would prefer to make triggeredAt final and force its initiation during constructor but seems the library exclusively uses init then set pattern so followed this Lines 184 to 187 in e995b03
Lines 291 to 296 in e995b03
|
||||||||||||||||||||||
|
||||||||||||||||||||||
if (processTaskResponse.getResult() == ProcessTaskResponse.Result.NO_SPACE) { | ||||||||||||||||||||||
noRoomMap.put(type, Boolean.TRUE); | ||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ | |
import com.transferwise.tasks.utils.WaitUtils; | ||
import com.vdurmont.semver4j.Semver; | ||
import java.time.Duration; | ||
import java.time.Instant; | ||
import java.util.ArrayList; | ||
import java.util.Collection; | ||
import java.util.HashMap; | ||
|
@@ -180,7 +181,10 @@ public void trigger(BaseTask task) { | |
} | ||
|
||
if (BooleanUtils.isTrue(bucketsManager.getBucketProperties(processingBucketId).getTriggerInSameProcess())) { | ||
TaskTriggering taskTriggering = new TaskTriggering().setTask(task).setBucketId(processingBucketId); | ||
TaskTriggering taskTriggering = new TaskTriggering() | ||
.setTask(task) | ||
.setBucketId(processingBucketId) | ||
.setTriggerAt(Instant.now()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If task triggered in same process - ran on the pod that triggered it without kafka just use time now |
||
ITasksProcessingService.AddTaskForProcessingResponse addTaskForProcessingResponse = tasksProcessingService.addTaskForProcessing(taskTriggering); | ||
|
||
if (addTaskForProcessingResponse.getResult() == ITasksProcessingService.AddTaskForProcessingResponse.ResultCode.OK) { | ||
|
@@ -284,7 +288,11 @@ public void poll(String bucketId) { | |
mdcService.with(() -> { | ||
mdcService.put(task); | ||
|
||
TaskTriggering taskTriggering = new TaskTriggering().setTask(task).setBucketId(bucketId).setOffset(offset) | ||
TaskTriggering taskTriggering = new TaskTriggering() | ||
.setTask(task) | ||
.setBucketId(bucketId) | ||
.setOffset(offset) | ||
.setTriggerAt(Instant.ofEpochMilli(consumerRecord.timestamp())) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While not clear in the docs https://kafka.apache.org/10/javadoc/org/apache/kafka/connect/data/Timestamp.html https://stackoverflow.com/questions/67759248/what-is-the-kafka-message-timestamp-represents |
||
.setTopicPartition(topicPartition); | ||
|
||
coreMetricsTemplate.registerKafkaTasksExecutionTriggererTriggersReceive(bucketId); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just think if you add a sane default here, even affecting quality of metric that way, it would be safer.
We don't know, when someone changes that logic and null pops here by accident.