-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Record the time between a task being triggered and it being successful run #217
Conversation
long timeTillProcessingStarted = epochMilliBeforeProcessing - taskTriggering.getTriggerAt().toEpochMilli(); | ||
meterCache.timer(METRIC_TASKS_TASK_GRABBING_TIME, tags).record(timeTillProcessingStarted, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Response code in tags allows filtering for just successful run tasks
|
||
coreMetricsTemplate.registerTaskGrabbingResponse(bucketId, type, priority, processTaskResponse); | ||
long epochMilliBeforeProcessing = System.currentTimeMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Record time before grabTaskForProcessing
to remove the task processing time from metric.
Maybe would be cleaner to add epochMilliBeforeProcessing
to ProcessTaskResponse
and have it be returned from grabTaskForProcessing
🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree with proposal, could be part of the response
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it turns out that in the successful flow (code = OK
) the final task grabbing / execution is handed off to another thread which is not awaited so can just use the time the metric method is called at 😄
Lines 339 to 345 in f1b9e2d
tasksGrabbingExecutor.submit(() -> { | |
try { | |
grabTaskForProcessing0(bucket, task, concurrencyPolicy, taskHandler); | |
} catch (Throwable t) { | |
log.error("Task grabbing failed for '{}'.", task.getVersionId(), t); | |
} | |
}); |
.setTask(task) | ||
.setBucketId(bucketId) | ||
.setOffset(offset) | ||
.setTriggerAt(Instant.ofEpochSecond(consumerRecord.timestamp())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If triggered via kafka use consumer record timestamp (time broker register event / time producer set when triggering)
TaskTriggering taskTriggering = new TaskTriggering() | ||
.setTask(task) | ||
.setBucketId(processingBucketId) | ||
.setTriggerAt(Instant.now()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If task triggered in same process - ran on the pod that triggered it without kafka just use time now
tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
Would need changelog and version bump.
|
||
coreMetricsTemplate.registerTaskGrabbingResponse(bucketId, type, priority, processTaskResponse); | ||
long epochMilliBeforeProcessing = System.currentTimeMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to name it epochMilliGrabForProcessing
? describing the current state, instead of the outcome (grabbing the task Vs processing the task)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed as not needed
|
||
coreMetricsTemplate.registerTaskGrabbingResponse(bucketId, type, priority, processTaskResponse); | ||
long epochMilliBeforeProcessing = System.currentTimeMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree with proposal, could be part of the response
The approval(s) from maru-tw do(es)n't fullfill the approvers requirements because:
|
f1b9e2d
to
96168e1
Compare
96168e1
to
3bdf72e
Compare
.setTask(task) | ||
.setBucketId(bucketId) | ||
.setOffset(offset) | ||
.setTriggerAt(Instant.ofEpochMilli(consumerRecord.timestamp())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While not clear in the docs consumerRecord.timestamp()
appears to be epoch time in millis
https://kafka.apache.org/10/javadoc/org/apache/kafka/connect/data/Timestamp.html
https://stackoverflow.com/questions/67759248/what-is-the-kafka-message-timestamp-represents
The approval(s) from maru-tw do(es)n't fullfill the approvers requirements because:
|
meterCache.counter(METRIC_TASKS_TASK_GRABBING, tags).increment(); | ||
|
||
long millisSinceTaskTriggered = System.currentTimeMillis() - taskTriggeredAt.toEpochMilli(); | ||
meterCache.timer(METRIC_TASKS_TASK_GRABBING_TIME, tags).record(millisSinceTaskTriggered, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is one other timer currently exposed by tw tasks
Lines 156 to 158 in 3bdf72e
meterCache.timer(METRIC_TASKS_PROCESSING_TIME, TagsSet.of(TAG_BUCKET_ID, resolvedBucketId, TAG_TASK_TYPE, taskType, | |
TAG_PROCESSING_RESULT, processingResult)) | |
.record(TwContextClockHolder.getClock().millis() - processingStartTimeMs, TimeUnit.MILLISECONDS); |
Cannot see it setting histogram buckets via a meter filter or anything so assume leave it to service owners to override
@@ -213,8 +213,7 @@ protected ProcessTasksResponse processTasks(GlobalProcessingState.Bucket bucket) | |||
mdcService.put(task); | |||
try { | |||
ProcessTaskResponse processTaskResponse = grabTaskForProcessing(bucketId, task); | |||
|
|||
coreMetricsTemplate.registerTaskGrabbingResponse(bucketId, type, priority, processTaskResponse); | |||
coreMetricsTemplate.registerTaskGrabbingResponse(bucketId, type, priority, processTaskResponse, taskTriggering.getTriggerAt()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this cause a NPE with old version of taskTriggering
DTO? One that has no triggeredAt
saved yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like TaskTriggering
is initialised in two places and we set the triggeredAt
with both of them.
Kafka one comes from a long
from the event timestamp and other one comes from Instant.now()
so think should be fine.
Would prefer to make triggeredAt final and force its initiation during constructor but seems the library exclusively uses init then set pattern so followed this
Lines 184 to 187 in e995b03
TaskTriggering taskTriggering = new TaskTriggering() | |
.setTask(task) | |
.setBucketId(processingBucketId) | |
.setTriggerAt(Instant.now()); |
Lines 291 to 296 in e995b03
TaskTriggering taskTriggering = new TaskTriggering() | |
.setTask(task) | |
.setBucketId(bucketId) | |
.setOffset(offset) | |
.setTriggerAt(Instant.ofEpochMilli(consumerRecord.timestamp())) | |
.setTopicPartition(topicPartition); |
0aa8dfe
|
||
meterCache.counter(METRIC_TASKS_TASK_GRABBING, tags).increment(); | ||
|
||
long millisSinceTaskTriggered = System.currentTimeMillis() - taskTriggeredAt.toEpochMilli(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just think if you add a sane default here, even affecting quality of metric that way, it would be safer.
We don't know, when someone changes that logic and null pops here by accident.
long millisSinceTaskTriggered = taskTriggeredAt != null | ||
? System.currentTimeMillis() - taskTriggeredAt.toEpochMilli() | ||
: 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could also just have an if condition and not record if null 🤔
0ms is within reason for same pod triggering though
Context
We would like to record the time between a task being triggered and the task processing being handled over to application code - ie the time take for the task to
The metric should ideally account for any other random delays eg rebalance / back offs / retryAfters...
Checklist