Skip to content

Commit

Permalink
Revise the measurement method of GROUP_GET_LATENCY to reveal its inte…
Browse files Browse the repository at this point in the history
…nded semantics (apache#7808)
  • Loading branch information
RongtongJin authored Feb 4, 2024
1 parent c833ff6 commit cb7fa3e
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,6 @@ RemotingCommand handle(final GetMessageResult getMessageResult,
final boolean brokerAllowSuspend,
final MessageFilter messageFilter,
final RemotingCommand response,
final TopicQueueMappingContext mappingContext);
final TopicQueueMappingContext mappingContext,
final long beginTimeMills);
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public RemotingCommand handle(final GetMessageResult getMessageResult,
final boolean brokerAllowSuspend,
final MessageFilter messageFilter,
RemotingCommand response,
TopicQueueMappingContext mappingContext) {
TopicQueueMappingContext mappingContext,
long beginTimeMills) {
PullMessageProcessor processor = brokerController.getPullMessageProcessor();
final String clientAddress = RemotingHelper.parseChannelRemoteAddr(channel);
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
Expand Down Expand Up @@ -137,8 +138,6 @@ public RemotingCommand handle(final GetMessageResult getMessageResult,
}

if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {

final long beginTimeMills = this.brokerController.getMessageStore().now();
final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public boolean rejectRequest() {

private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
final long beginTimeMills = this.brokerController.getMessageStore().now();
RemotingCommand response = RemotingCommand.createResponseCommand(PopMessageResponseHeader.class);
final PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) response.readCustomHeader();
final PeekMessageRequestHeader requestHeader =
Expand Down Expand Up @@ -188,7 +189,6 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
this.brokerController.getBrokerStatsManager().incBrokerGetNums(requestHeader.getTopic(), getMessageResult.getMessageCount());

if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
final long beginTimeMills = this.brokerController.getMessageStore().now();
final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());
this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ public boolean notifyMessageArriving(final String topic, final String cid, final
@Override
public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final long beginTimeMills = this.brokerController.getMessageStore().now();
request.addExtFieldIfNotExist(BORN_TIME, String.valueOf(System.currentTimeMillis()));
if (Objects.equals(request.getExtFields().get(BORN_TIME), "0")) {
request.addExtField(BORN_TIME, String.valueOf(System.currentTimeMillis()));
Expand Down Expand Up @@ -435,7 +436,6 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC
switch (finalResponse.getCode()) {
case ResponseCode.SUCCESS:
if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {
final long beginTimeMills = this.brokerController.getMessageStore().now();
final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(),
requestHeader.getTopic(), requestHeader.getQueueId());
this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ public boolean rejectRequest() {

private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend, boolean brokerAllowFlowCtrSuspend)
throws RemotingCommandException {
final long beginTimeMills = this.brokerController.getMessageStore().now();
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
final PullMessageRequestHeader requestHeader =
Expand Down Expand Up @@ -555,7 +556,8 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
brokerAllowSuspend,
messageFilter,
finalResponse,
mappingContext
mappingContext,
beginTimeMills
);
})
.thenAccept(result -> NettyRemotingAbstract.writeResponse(channel, request, result));
Expand All @@ -574,7 +576,8 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re
brokerAllowSuspend,
messageFilter,
response,
mappingContext
mappingContext,
beginTimeMills
);
}
return null;
Expand Down

0 comments on commit cb7fa3e

Please sign in to comment.