-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker] Decouple pulsar_storage_backlog_age_seconds metric with backlogQuota check #23619
base: master
Are you sure you want to change the base?
Conversation
…from backlogQuota policy
8b8ffc8
to
e2a102f
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #23619 +/- ##
============================================
+ Coverage 73.57% 74.42% +0.85%
- Complexity 32624 35003 +2379
============================================
Files 1877 1944 +67
Lines 139502 147136 +7634
Branches 15299 16224 +925
============================================
+ Hits 102638 109512 +6874
- Misses 28908 29184 +276
- Partials 7956 8440 +484
Flags with carried forward coverage won't be shown. Click here to find out more.
|
} | ||
return CompletableFuture.completedFuture(false); | ||
return CompletableFuture.completedFuture(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am considering whether the value of the metric will be fine in the following case:
- ledgers:
[1:0~1:99, 2:0~2:99]
- create a cursor
s1
at1:0
- the value of
pulsar_storage_backlog_age_seconds
relates to1:0
- unsubscribe
s1
- the value will be never updated anymore, in other words, it always relates to
1:0
|
||
boolean expired = MessageImpl.isEntryExpired(backlogQuotaLimitInSecond, entryTimestamp); | ||
if (log.isDebugEnabled()) { | ||
log.debug("[{}] Time based backlog quota check. Oldest unacked entry read from BK. " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this log needed?
if (oldestPositionInfo == null) { | ||
return CompletableFuture.completedFuture(false); | ||
} | ||
if (!hasBacklogs(brokerService.pulsar().getConfiguration().isPreciseTimeBasedBacklogQuotaCheck())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
CompletableFuture<Boolean> future = new CompletableFuture<>(); | ||
CompletableFuture<Void> future = new CompletableFuture<>(); | ||
// Check if first unconsumed message(first message after mark delete position) | ||
// for slowest cursor's has expired. | ||
Position position = ledger.getNextValidPosition(oldestMarkDeletePosition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this position be x:-1
? If yes, it may cause an EntryNotExists error. Could you add a test for this scenario?
Motivation
#21709 introduced a valuable metric: pulsar_storage_backlog_age_seconds, but it only updates when a topic has the message_age backlog quota policy.
Modifications
backlogQuotaChecker
is also used to periodically update thepulsar_storage_backlog_age_seconds
metric, regardless of whether the topic has a backlog quota policy.Verifying this change
backlogsAgeMetricsNoPreciseWithoutBacklogQuota
andbacklogsAgeMetricsPreciseWithoutBacklogQuota
to cover this change.Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: