Skip to content

Commit

Permalink
feat(proxy): enhance metrics for ReceiveMessageRequest (#334)
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored Oct 17, 2023
1 parent 6b7c525 commit fc79620
Show file tree
Hide file tree
Showing 10 changed files with 182 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private <T> String getResponseStatus(T response) {
try {
Method getStatus = response.getClass().getDeclaredMethod("getStatus");
Status status = (Status) getStatus.invoke(response);
return status.getMessage();
return status.getCode().name().toLowerCase();
} catch (Exception e) {
return "unknown";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,6 @@ protected void init(MessagingProcessor messagingProcessor) {

this.routeActivity = new ExtendRouteActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager);
this.sendMessageActivity = new ExtendSendMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager);
this.receiveMessageActivity = new ExtendReceiveMessageActivity(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.proxy.grpc.activity;

import apache.rocketmq.v2.ReceiveMessageResponse;
import com.automq.rocketmq.proxy.grpc.v2.consumer.ExtendReceiveMessageResponseStreamWriter;
import io.grpc.stub.StreamObserver;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager;
import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager;
import org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity;
import org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageResponseStreamWriter;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;
import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor;

public class ExtendReceiveMessageActivity extends ReceiveMessageActivity {
public ExtendReceiveMessageActivity(MessagingProcessor messagingProcessor,
ReceiptHandleProcessor receiptHandleProcessor,
GrpcClientSettingsManager grpcClientSettingsManager,
GrpcChannelManager grpcChannelManager) {
super(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager);
}

@Override
protected ReceiveMessageResponseStreamWriter createWriter(ProxyContext ctx,
StreamObserver<ReceiveMessageResponse> responseObserver) {
return new ExtendReceiveMessageResponseStreamWriter(this.messagingProcessor, responseObserver);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.rocketmq.proxy.grpc.v2.consumer;

import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.ReceiveMessageRequest;
import apache.rocketmq.v2.ReceiveMessageResponse;
import apache.rocketmq.v2.Status;
import com.automq.rocketmq.proxy.metrics.ProxyMetricsManager;
import com.automq.rocketmq.proxy.model.ProxyContextExt;
import io.grpc.stub.StreamObserver;
import org.apache.rocketmq.client.consumer.PopResult;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder;
import org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageResponseStreamWriter;
import org.apache.rocketmq.proxy.processor.MessagingProcessor;

public class ExtendReceiveMessageResponseStreamWriter extends ReceiveMessageResponseStreamWriter {
public ExtendReceiveMessageResponseStreamWriter(
MessagingProcessor messagingProcessor,
StreamObserver<ReceiveMessageResponse> observer) {
super(messagingProcessor, observer);
}

private void recordRpcLatency(ProxyContext ctx, Code code) {
ProxyContextExt context = (ProxyContextExt) ctx;
ProxyMetricsManager.recordRpcLatency(ctx.getProtocolType(), ctx.getAction(), code.name().toLowerCase(), context.getElapsedTimeNanos(), context.suspended());
}

@Override
public void writeAndComplete(ProxyContext ctx, ReceiveMessageRequest request, PopResult popResult) {
super.writeAndComplete(ctx, request, popResult);
switch (popResult.getPopStatus()) {
case FOUND:
if (popResult.getMsgFoundList().isEmpty()) {
recordRpcLatency(ctx, Code.MESSAGE_NOT_FOUND);
} else {
recordRpcLatency(ctx, Code.OK);
}
case POLLING_FULL:
recordRpcLatency(ctx, Code.TOO_MANY_REQUESTS);
break;
case NO_NEW_MSG:
case POLLING_NOT_FOUND:
default:
recordRpcLatency(ctx, Code.MESSAGE_NOT_FOUND);
break;
}
}

@Override
public void writeAndComplete(ProxyContext ctx, Code code, String message) {
super.writeAndComplete(ctx, code, message);
recordRpcLatency(ctx, code);
}

@Override
public void writeAndComplete(ProxyContext ctx, ReceiveMessageRequest request, Throwable throwable) {
super.writeAndComplete(ctx, request, throwable);
Status status = ResponseBuilder.getInstance().buildStatus(throwable);
recordRpcLatency(ctx, status.getCode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class ProxyMetricsManager implements MetricsManager {
public static final String LABEL_PROTOCOL_TYPE = "protocol_type";
public static final String LABEL_ACTION = "action";
public static final String LABEL_RESULT = "result";
public static final String LABEL_SUSPENDED = "suspended";
public static final String PROTOCOL_TYPE_GRPC = "grpc";

private static LongHistogram rpcLatency = new NopLongHistogram();
Expand Down Expand Up @@ -233,15 +234,18 @@ public static List<Pair<InstrumentSelector, View>> getMetricsView() {
metricsViewList.add(Pair.of(messageSizeSelector, messageSizeViewBuilder.build()));

List<Double> rpcCostTimeBuckets = Arrays.asList(
(double) Duration.ofMillis(1).toMillis(),
(double) Duration.ofMillis(3).toMillis(),
(double) Duration.ofMillis(5).toMillis(),
(double) Duration.ofMillis(7).toMillis(),
(double) Duration.ofMillis(10).toMillis(),
(double) Duration.ofMillis(100).toMillis(),
(double) Duration.ofSeconds(1).toMillis(),
(double) Duration.ofSeconds(2).toMillis(),
(double) Duration.ofSeconds(3).toMillis()
(double) Duration.ofNanos(100).toNanos(),
(double) Duration.ofNanos(1000).toNanos(),
(double) Duration.ofNanos(10_000).toNanos(),
(double) Duration.ofMillis(1).toNanos(),
(double) Duration.ofMillis(3).toNanos(),
(double) Duration.ofMillis(5).toNanos(),
(double) Duration.ofMillis(7).toNanos(),
(double) Duration.ofMillis(10).toNanos(),
(double) Duration.ofMillis(100).toNanos(),
(double) Duration.ofSeconds(1).toNanos(),
(double) Duration.ofSeconds(2).toNanos(),
(double) Duration.ofSeconds(3).toNanos()
);
InstrumentSelector selector = InstrumentSelector.builder()
.setType(InstrumentType.HISTOGRAM)
Expand All @@ -255,12 +259,18 @@ public static List<Pair<InstrumentSelector, View>> getMetricsView() {
return metricsViewList;
}

public static void recordRpcLatency(String protocolType, String action, String result, long costTimeMillis) {
public static void recordRpcLatency(String protocolType, String action, String result, long costTimeNanos) {
recordRpcLatency(protocolType, action, result, costTimeNanos, false);
}

public static void recordRpcLatency(String protocolType, String action, String result, long costTimeNanos,
boolean suspended) {
AttributesBuilder attributesBuilder = newAttributesBuilder()
.put(LABEL_PROTOCOL_TYPE, protocolType)
.put(LABEL_ACTION, action)
.put(LABEL_RESULT, result);
rpcLatency.record(costTimeMillis, attributesBuilder.build());
.put(LABEL_RESULT, result)
.put(LABEL_SUSPENDED, suspended);
rpcLatency.record(costTimeNanos, attributesBuilder.build());
}

public static void recordIncomingMessages(String topic, TopicMessageType messageType, int count, long size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@

public class ProxyContextExt extends ProxyContext {
private final Stopwatch stopwatch = Stopwatch.createStarted();
private boolean suspended;

public static ProxyContext create() {
return new ProxyContextExt();
}

public boolean suspended() {
return suspended;
}

public void setSuspended(boolean suspended) {
this.suspended = suspended;
}

public long getElapsedTimeNanos() {
return stopwatch.elapsed().toNanos();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public CompletableFuture<PopResult> popMessage(ProxyContext ctx, AddressableMess

return popMessageFuture.thenCompose(messageList -> {
if (messageList.isEmpty()) {
return suspendPopRequestService.suspendPopRequest(requestHeader, topicReference.get().getTopicId(), virtualQueue.physicalQueueId(), filter,
return suspendPopRequestService.suspendPopRequest(ctx, requestHeader, topicReference.get().getTopicId(), virtualQueue.physicalQueueId(), filter,
() -> popSpecifiedQueue(consumerGroupReference.get(), clientId, topicReference.get(), virtualQueue.physicalQueueId(), filter,
requestHeader.getMaxMsgNums(), requestHeader.isOrder(), requestHeader.getInvisibleTime(), timeoutMillis));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.automq.rocketmq.proxy.service;

import com.automq.rocketmq.common.model.FlatMessageExt;
import com.automq.rocketmq.proxy.model.ProxyContextExt;
import com.automq.rocketmq.proxy.util.FlatMessageUtil;
import com.automq.rocketmq.store.model.message.Filter;
import com.automq.rocketmq.store.model.message.TopicQueueId;
Expand All @@ -34,6 +35,7 @@
import org.apache.rocketmq.client.consumer.PopResult;
import org.apache.rocketmq.client.consumer.PopStatus;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -124,8 +126,10 @@ public void notifyMessageArrival(long topicId, int queueId, String tag) {
}
}

public CompletableFuture<PopResult> suspendPopRequest(PopMessageRequestHeader requestHeader, long topicId,
int queueId, Filter filter, Supplier<CompletableFuture<List<FlatMessageExt>>> messageSupplier) {
public CompletableFuture<PopResult> suspendPopRequest(ProxyContext context, PopMessageRequestHeader requestHeader,
long topicId, int queueId, Filter filter, Supplier<CompletableFuture<List<FlatMessageExt>>> messageSupplier) {
((ProxyContextExt) context).setSuspended(true);

// Check if the request is already expired.
if (requestHeader.getPollTime() <= 0) {
return CompletableFuture.completedFuture(new PopResult(PopStatus.NO_NEW_MSG, Collections.emptyList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.automq.rocketmq.metadata.api.ProxyMetadataService;
import com.automq.rocketmq.proxy.mock.MockMessageStore;
import com.automq.rocketmq.proxy.mock.MockProxyMetadataService;
import com.automq.rocketmq.proxy.model.ProxyContextExt;
import com.automq.rocketmq.proxy.model.VirtualQueue;
import com.automq.rocketmq.proxy.util.FlatMessageUtil;
import com.automq.rocketmq.proxy.util.ReceiptHandleUtil;
Expand Down Expand Up @@ -81,7 +82,7 @@ void sendMessage() {

AddressableMessageQueue messageQueue = new AddressableMessageQueue(new MessageQueue(topicName, virtualQueue.brokerName(), 0), null);

List<SendResult> resultList = messageService.sendMessage(ProxyContext.create(), messageQueue, List.of(message), header, 0).join();
List<SendResult> resultList = messageService.sendMessage(ProxyContextExt.create(), messageQueue, List.of(message), header, 0).join();
assertEquals(1, resultList.size());

SendResult result = resultList.get(0);
Expand All @@ -108,7 +109,7 @@ void popMessage() {
VirtualQueue virtualQueue = new VirtualQueue(2, 0);
AddressableMessageQueue messageQueue = new AddressableMessageQueue(new MessageQueue(topicName, virtualQueue.brokerName(), 0), null);

PopResult result = messageService.popMessage(ProxyContext.create(), messageQueue, header, 0L).join();
PopResult result = messageService.popMessage(ProxyContextExt.create(), messageQueue, header, 0L).join();
assertEquals(PopStatus.NO_NEW_MSG, result.getPopStatus());

header.setExpType(ExpressionType.TAG);
Expand All @@ -118,14 +119,14 @@ void popMessage() {
messageStore.put(FlatMessageUtil.convertTo(topicId, 0, "", new Message(topicName, "", new byte[] {})));
messageStore.put(FlatMessageUtil.convertTo(topicId, 0, "", new Message(topicName, "", new byte[] {})));

result = messageService.popMessage(ProxyContext.create(), messageQueue, header, 0L).join();
result = messageService.popMessage(ProxyContextExt.create(), messageQueue, header, 0L).join();
assertEquals(PopStatus.FOUND, result.getPopStatus());
assertEquals(2, result.getMsgFoundList().size());
// All messages in queue 0 has been consumed
assertEquals(2, messageStore.getConsumeOffset(consumerGroupId, topicId, 0).join());

// Pop again.
result = messageService.popMessage(ProxyContext.create(), messageQueue, header, 0L).join();
result = messageService.popMessage(ProxyContextExt.create(), messageQueue, header, 0L).join();
assertEquals(PopStatus.NO_NEW_MSG, result.getPopStatus());
assertEquals(0, result.getMsgFoundList().size());
}
Expand All @@ -146,7 +147,7 @@ void pop_withFifo() {
messageStore.put(FlatMessageUtil.convertTo(topicId, 0, "", new Message(topicName, "", new byte[] {})));

// Pop message with client id "client1".
ProxyContext context = ProxyContext.create();
ProxyContext context = ProxyContextExt.create();
context.setClientID("client1");

VirtualQueue virtualQueue = new VirtualQueue(2, 0);
Expand Down Expand Up @@ -189,11 +190,11 @@ void changeInvisibleTime() {
ChangeInvisibleTimeRequestHeader header = new ChangeInvisibleTimeRequestHeader();
header.setExtraInfo(ReceiptHandleUtil.encodeReceiptHandle(RECEIPT_HANDLE, 0L));
header.setInvisibleTime(100L);
AckResult ackResult = messageService.changeInvisibleTime(ProxyContext.create(), null, null, header, 0).join();
AckResult ackResult = messageService.changeInvisibleTime(ProxyContextExt.create(), null, null, header, 0).join();
assertEquals(AckStatus.OK, ackResult.getStatus());

header.setExtraInfo("");
ackResult = messageService.changeInvisibleTime(ProxyContext.create(), null, null, header, 0).join();
ackResult = messageService.changeInvisibleTime(ProxyContextExt.create(), null, null, header, 0).join();
assertEquals(AckStatus.NO_EXIST, ackResult.getStatus());
}

Expand All @@ -204,11 +205,11 @@ void ackMessage() {
header.setTopic("topic");
header.setQueueId(0);
header.setConsumerGroup("group");
AckResult ackResult = messageService.ackMessage(ProxyContext.create(), null, null, header, 0).join();
AckResult ackResult = messageService.ackMessage(ProxyContextExt.create(), null, null, header, 0).join();
assertEquals(AckStatus.OK, ackResult.getStatus());

header.setExtraInfo("");
ackResult = messageService.ackMessage(ProxyContext.create(), null, null, header, 0).join();
ackResult = messageService.ackMessage(ProxyContextExt.create(), null, null, header, 0).join();
assertEquals(AckStatus.NO_EXIST, ackResult.getStatus());
}

Expand All @@ -219,13 +220,13 @@ void offset() {
updateConsumerOffsetRequestHeader.setTopic("topic");
updateConsumerOffsetRequestHeader.setQueueId(0);
updateConsumerOffsetRequestHeader.setCommitOffset(100L);
messageService.updateConsumerOffset(ProxyContext.create(), null, updateConsumerOffsetRequestHeader, 0).join();
messageService.updateConsumerOffset(ProxyContextExt.create(), null, updateConsumerOffsetRequestHeader, 0).join();

QueryConsumerOffsetRequestHeader queryConsumerOffsetRequestHeader = new QueryConsumerOffsetRequestHeader();
queryConsumerOffsetRequestHeader.setConsumerGroup("group");
queryConsumerOffsetRequestHeader.setTopic("topic");
queryConsumerOffsetRequestHeader.setQueueId(0);
Long offset = messageService.queryConsumerOffset(ProxyContext.create(), null, queryConsumerOffsetRequestHeader, 0).join();
Long offset = messageService.queryConsumerOffset(ProxyContextExt.create(), null, queryConsumerOffsetRequestHeader, 0).join();
assertEquals(100L, offset);
}
}
Loading

0 comments on commit fc79620

Please sign in to comment.