From fc796208a833b40400374ffc0f01e4eb1070470f Mon Sep 17 00:00:00 2001 From: SSpirits Date: Tue, 17 Oct 2023 20:20:06 +0800 Subject: [PATCH] feat(proxy): enhance metrics for ReceiveMessageRequest (#334) Signed-off-by: SSpirits --- .../grpc/ExtendGrpcMessagingApplication.java | 2 +- .../activity/ExtendGrpcMessingActivity.java | 1 + .../ExtendReceiveMessageActivity.java | 44 +++++++++++ ...endReceiveMessageResponseStreamWriter.java | 78 +++++++++++++++++++ .../proxy/metrics/ProxyMetricsManager.java | 34 +++++--- .../rocketmq/proxy/model/ProxyContextExt.java | 13 ++++ .../proxy/service/MessageServiceImpl.java | 2 +- .../service/SuspendPopRequestService.java | 8 +- .../proxy/service/MessageServiceImplTest.java | 23 +++--- .../service/SuspendPopRequestServiceTest.java | 7 +- 10 files changed, 182 insertions(+), 30 deletions(-) create mode 100644 proxy/src/main/java/com/automq/rocketmq/proxy/grpc/activity/ExtendReceiveMessageActivity.java create mode 100644 proxy/src/main/java/com/automq/rocketmq/proxy/grpc/v2/consumer/ExtendReceiveMessageResponseStreamWriter.java diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ExtendGrpcMessagingApplication.java b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ExtendGrpcMessagingApplication.java index 571750ed3..9eb856dca 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ExtendGrpcMessagingApplication.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/ExtendGrpcMessagingApplication.java @@ -59,7 +59,7 @@ private String getResponseStatus(T response) { try { Method getStatus = response.getClass().getDeclaredMethod("getStatus"); Status status = (Status) getStatus.invoke(response); - return status.getMessage(); + return status.getCode().name().toLowerCase(); } catch (Exception e) { return "unknown"; } diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/activity/ExtendGrpcMessingActivity.java b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/activity/ExtendGrpcMessingActivity.java index 00ec267ac..656915a15 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/activity/ExtendGrpcMessingActivity.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/activity/ExtendGrpcMessingActivity.java @@ -31,5 +31,6 @@ protected void init(MessagingProcessor messagingProcessor) { this.routeActivity = new ExtendRouteActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); this.sendMessageActivity = new ExtendSendMessageActivity(messagingProcessor, grpcClientSettingsManager, grpcChannelManager); + this.receiveMessageActivity = new ExtendReceiveMessageActivity(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager); } } diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/activity/ExtendReceiveMessageActivity.java b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/activity/ExtendReceiveMessageActivity.java new file mode 100644 index 000000000..c154ba8c9 --- /dev/null +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/activity/ExtendReceiveMessageActivity.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.proxy.grpc.activity; + +import apache.rocketmq.v2.ReceiveMessageResponse; +import com.automq.rocketmq.proxy.grpc.v2.consumer.ExtendReceiveMessageResponseStreamWriter; +import io.grpc.stub.StreamObserver; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.grpc.v2.channel.GrpcChannelManager; +import org.apache.rocketmq.proxy.grpc.v2.common.GrpcClientSettingsManager; +import org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageActivity; +import org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageResponseStreamWriter; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; +import org.apache.rocketmq.proxy.processor.ReceiptHandleProcessor; + +public class ExtendReceiveMessageActivity extends ReceiveMessageActivity { + public ExtendReceiveMessageActivity(MessagingProcessor messagingProcessor, + ReceiptHandleProcessor receiptHandleProcessor, + GrpcClientSettingsManager grpcClientSettingsManager, + GrpcChannelManager grpcChannelManager) { + super(messagingProcessor, receiptHandleProcessor, grpcClientSettingsManager, grpcChannelManager); + } + + @Override + protected ReceiveMessageResponseStreamWriter createWriter(ProxyContext ctx, + StreamObserver responseObserver) { + return new ExtendReceiveMessageResponseStreamWriter(this.messagingProcessor, responseObserver); + } +} diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/v2/consumer/ExtendReceiveMessageResponseStreamWriter.java b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/v2/consumer/ExtendReceiveMessageResponseStreamWriter.java new file mode 100644 index 000000000..e72e0f81f --- /dev/null +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/grpc/v2/consumer/ExtendReceiveMessageResponseStreamWriter.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.automq.rocketmq.proxy.grpc.v2.consumer; + +import apache.rocketmq.v2.Code; +import apache.rocketmq.v2.ReceiveMessageRequest; +import apache.rocketmq.v2.ReceiveMessageResponse; +import apache.rocketmq.v2.Status; +import com.automq.rocketmq.proxy.metrics.ProxyMetricsManager; +import com.automq.rocketmq.proxy.model.ProxyContextExt; +import io.grpc.stub.StreamObserver; +import org.apache.rocketmq.client.consumer.PopResult; +import org.apache.rocketmq.proxy.common.ProxyContext; +import org.apache.rocketmq.proxy.grpc.v2.common.ResponseBuilder; +import org.apache.rocketmq.proxy.grpc.v2.consumer.ReceiveMessageResponseStreamWriter; +import org.apache.rocketmq.proxy.processor.MessagingProcessor; + +public class ExtendReceiveMessageResponseStreamWriter extends ReceiveMessageResponseStreamWriter { + public ExtendReceiveMessageResponseStreamWriter( + MessagingProcessor messagingProcessor, + StreamObserver observer) { + super(messagingProcessor, observer); + } + + private void recordRpcLatency(ProxyContext ctx, Code code) { + ProxyContextExt context = (ProxyContextExt) ctx; + ProxyMetricsManager.recordRpcLatency(ctx.getProtocolType(), ctx.getAction(), code.name().toLowerCase(), context.getElapsedTimeNanos(), context.suspended()); + } + + @Override + public void writeAndComplete(ProxyContext ctx, ReceiveMessageRequest request, PopResult popResult) { + super.writeAndComplete(ctx, request, popResult); + switch (popResult.getPopStatus()) { + case FOUND: + if (popResult.getMsgFoundList().isEmpty()) { + recordRpcLatency(ctx, Code.MESSAGE_NOT_FOUND); + } else { + recordRpcLatency(ctx, Code.OK); + } + case POLLING_FULL: + recordRpcLatency(ctx, Code.TOO_MANY_REQUESTS); + break; + case NO_NEW_MSG: + case POLLING_NOT_FOUND: + default: + recordRpcLatency(ctx, Code.MESSAGE_NOT_FOUND); + break; + } + } + + @Override + public void writeAndComplete(ProxyContext ctx, Code code, String message) { + super.writeAndComplete(ctx, code, message); + recordRpcLatency(ctx, code); + } + + @Override + public void writeAndComplete(ProxyContext ctx, ReceiveMessageRequest request, Throwable throwable) { + super.writeAndComplete(ctx, request, throwable); + Status status = ResponseBuilder.getInstance().buildStatus(throwable); + recordRpcLatency(ctx, status.getCode()); + } +} diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/metrics/ProxyMetricsManager.java b/proxy/src/main/java/com/automq/rocketmq/proxy/metrics/ProxyMetricsManager.java index cc5a6436a..8151ace45 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/metrics/ProxyMetricsManager.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/metrics/ProxyMetricsManager.java @@ -69,6 +69,7 @@ public class ProxyMetricsManager implements MetricsManager { public static final String LABEL_PROTOCOL_TYPE = "protocol_type"; public static final String LABEL_ACTION = "action"; public static final String LABEL_RESULT = "result"; + public static final String LABEL_SUSPENDED = "suspended"; public static final String PROTOCOL_TYPE_GRPC = "grpc"; private static LongHistogram rpcLatency = new NopLongHistogram(); @@ -233,15 +234,18 @@ public static List> getMetricsView() { metricsViewList.add(Pair.of(messageSizeSelector, messageSizeViewBuilder.build())); List rpcCostTimeBuckets = Arrays.asList( - (double) Duration.ofMillis(1).toMillis(), - (double) Duration.ofMillis(3).toMillis(), - (double) Duration.ofMillis(5).toMillis(), - (double) Duration.ofMillis(7).toMillis(), - (double) Duration.ofMillis(10).toMillis(), - (double) Duration.ofMillis(100).toMillis(), - (double) Duration.ofSeconds(1).toMillis(), - (double) Duration.ofSeconds(2).toMillis(), - (double) Duration.ofSeconds(3).toMillis() + (double) Duration.ofNanos(100).toNanos(), + (double) Duration.ofNanos(1000).toNanos(), + (double) Duration.ofNanos(10_000).toNanos(), + (double) Duration.ofMillis(1).toNanos(), + (double) Duration.ofMillis(3).toNanos(), + (double) Duration.ofMillis(5).toNanos(), + (double) Duration.ofMillis(7).toNanos(), + (double) Duration.ofMillis(10).toNanos(), + (double) Duration.ofMillis(100).toNanos(), + (double) Duration.ofSeconds(1).toNanos(), + (double) Duration.ofSeconds(2).toNanos(), + (double) Duration.ofSeconds(3).toNanos() ); InstrumentSelector selector = InstrumentSelector.builder() .setType(InstrumentType.HISTOGRAM) @@ -255,12 +259,18 @@ public static List> getMetricsView() { return metricsViewList; } - public static void recordRpcLatency(String protocolType, String action, String result, long costTimeMillis) { + public static void recordRpcLatency(String protocolType, String action, String result, long costTimeNanos) { + recordRpcLatency(protocolType, action, result, costTimeNanos, false); + } + + public static void recordRpcLatency(String protocolType, String action, String result, long costTimeNanos, + boolean suspended) { AttributesBuilder attributesBuilder = newAttributesBuilder() .put(LABEL_PROTOCOL_TYPE, protocolType) .put(LABEL_ACTION, action) - .put(LABEL_RESULT, result); - rpcLatency.record(costTimeMillis, attributesBuilder.build()); + .put(LABEL_RESULT, result) + .put(LABEL_SUSPENDED, suspended); + rpcLatency.record(costTimeNanos, attributesBuilder.build()); } public static void recordIncomingMessages(String topic, TopicMessageType messageType, int count, long size) { diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/model/ProxyContextExt.java b/proxy/src/main/java/com/automq/rocketmq/proxy/model/ProxyContextExt.java index 510d90987..bea077c50 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/model/ProxyContextExt.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/model/ProxyContextExt.java @@ -22,6 +22,19 @@ public class ProxyContextExt extends ProxyContext { private final Stopwatch stopwatch = Stopwatch.createStarted(); + private boolean suspended; + + public static ProxyContext create() { + return new ProxyContextExt(); + } + + public boolean suspended() { + return suspended; + } + + public void setSuspended(boolean suspended) { + this.suspended = suspended; + } public long getElapsedTimeNanos() { return stopwatch.elapsed().toNanos(); diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java b/proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java index 762cae19d..d20262eda 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java @@ -265,7 +265,7 @@ public CompletableFuture popMessage(ProxyContext ctx, AddressableMess return popMessageFuture.thenCompose(messageList -> { if (messageList.isEmpty()) { - return suspendPopRequestService.suspendPopRequest(requestHeader, topicReference.get().getTopicId(), virtualQueue.physicalQueueId(), filter, + return suspendPopRequestService.suspendPopRequest(ctx, requestHeader, topicReference.get().getTopicId(), virtualQueue.physicalQueueId(), filter, () -> popSpecifiedQueue(consumerGroupReference.get(), clientId, topicReference.get(), virtualQueue.physicalQueueId(), filter, requestHeader.getMaxMsgNums(), requestHeader.isOrder(), requestHeader.getInvisibleTime(), timeoutMillis)); } diff --git a/proxy/src/main/java/com/automq/rocketmq/proxy/service/SuspendPopRequestService.java b/proxy/src/main/java/com/automq/rocketmq/proxy/service/SuspendPopRequestService.java index d28a42c89..580e3f5bc 100644 --- a/proxy/src/main/java/com/automq/rocketmq/proxy/service/SuspendPopRequestService.java +++ b/proxy/src/main/java/com/automq/rocketmq/proxy/service/SuspendPopRequestService.java @@ -18,6 +18,7 @@ package com.automq.rocketmq.proxy.service; import com.automq.rocketmq.common.model.FlatMessageExt; +import com.automq.rocketmq.proxy.model.ProxyContextExt; import com.automq.rocketmq.proxy.util.FlatMessageUtil; import com.automq.rocketmq.store.model.message.Filter; import com.automq.rocketmq.store.model.message.TopicQueueId; @@ -34,6 +35,7 @@ import org.apache.rocketmq.client.consumer.PopResult; import org.apache.rocketmq.client.consumer.PopStatus; import org.apache.rocketmq.common.ServiceThread; +import org.apache.rocketmq.proxy.common.ProxyContext; import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -124,8 +126,10 @@ public void notifyMessageArrival(long topicId, int queueId, String tag) { } } - public CompletableFuture suspendPopRequest(PopMessageRequestHeader requestHeader, long topicId, - int queueId, Filter filter, Supplier>> messageSupplier) { + public CompletableFuture suspendPopRequest(ProxyContext context, PopMessageRequestHeader requestHeader, + long topicId, int queueId, Filter filter, Supplier>> messageSupplier) { + ((ProxyContextExt) context).setSuspended(true); + // Check if the request is already expired. if (requestHeader.getPollTime() <= 0) { return CompletableFuture.completedFuture(new PopResult(PopStatus.NO_NEW_MSG, Collections.emptyList())); diff --git a/proxy/src/test/java/com/automq/rocketmq/proxy/service/MessageServiceImplTest.java b/proxy/src/test/java/com/automq/rocketmq/proxy/service/MessageServiceImplTest.java index 2c97e532f..9f0fc593b 100644 --- a/proxy/src/test/java/com/automq/rocketmq/proxy/service/MessageServiceImplTest.java +++ b/proxy/src/test/java/com/automq/rocketmq/proxy/service/MessageServiceImplTest.java @@ -21,6 +21,7 @@ import com.automq.rocketmq.metadata.api.ProxyMetadataService; import com.automq.rocketmq.proxy.mock.MockMessageStore; import com.automq.rocketmq.proxy.mock.MockProxyMetadataService; +import com.automq.rocketmq.proxy.model.ProxyContextExt; import com.automq.rocketmq.proxy.model.VirtualQueue; import com.automq.rocketmq.proxy.util.FlatMessageUtil; import com.automq.rocketmq.proxy.util.ReceiptHandleUtil; @@ -81,7 +82,7 @@ void sendMessage() { AddressableMessageQueue messageQueue = new AddressableMessageQueue(new MessageQueue(topicName, virtualQueue.brokerName(), 0), null); - List resultList = messageService.sendMessage(ProxyContext.create(), messageQueue, List.of(message), header, 0).join(); + List resultList = messageService.sendMessage(ProxyContextExt.create(), messageQueue, List.of(message), header, 0).join(); assertEquals(1, resultList.size()); SendResult result = resultList.get(0); @@ -108,7 +109,7 @@ void popMessage() { VirtualQueue virtualQueue = new VirtualQueue(2, 0); AddressableMessageQueue messageQueue = new AddressableMessageQueue(new MessageQueue(topicName, virtualQueue.brokerName(), 0), null); - PopResult result = messageService.popMessage(ProxyContext.create(), messageQueue, header, 0L).join(); + PopResult result = messageService.popMessage(ProxyContextExt.create(), messageQueue, header, 0L).join(); assertEquals(PopStatus.NO_NEW_MSG, result.getPopStatus()); header.setExpType(ExpressionType.TAG); @@ -118,14 +119,14 @@ void popMessage() { messageStore.put(FlatMessageUtil.convertTo(topicId, 0, "", new Message(topicName, "", new byte[] {}))); messageStore.put(FlatMessageUtil.convertTo(topicId, 0, "", new Message(topicName, "", new byte[] {}))); - result = messageService.popMessage(ProxyContext.create(), messageQueue, header, 0L).join(); + result = messageService.popMessage(ProxyContextExt.create(), messageQueue, header, 0L).join(); assertEquals(PopStatus.FOUND, result.getPopStatus()); assertEquals(2, result.getMsgFoundList().size()); // All messages in queue 0 has been consumed assertEquals(2, messageStore.getConsumeOffset(consumerGroupId, topicId, 0).join()); // Pop again. - result = messageService.popMessage(ProxyContext.create(), messageQueue, header, 0L).join(); + result = messageService.popMessage(ProxyContextExt.create(), messageQueue, header, 0L).join(); assertEquals(PopStatus.NO_NEW_MSG, result.getPopStatus()); assertEquals(0, result.getMsgFoundList().size()); } @@ -146,7 +147,7 @@ void pop_withFifo() { messageStore.put(FlatMessageUtil.convertTo(topicId, 0, "", new Message(topicName, "", new byte[] {}))); // Pop message with client id "client1". - ProxyContext context = ProxyContext.create(); + ProxyContext context = ProxyContextExt.create(); context.setClientID("client1"); VirtualQueue virtualQueue = new VirtualQueue(2, 0); @@ -189,11 +190,11 @@ void changeInvisibleTime() { ChangeInvisibleTimeRequestHeader header = new ChangeInvisibleTimeRequestHeader(); header.setExtraInfo(ReceiptHandleUtil.encodeReceiptHandle(RECEIPT_HANDLE, 0L)); header.setInvisibleTime(100L); - AckResult ackResult = messageService.changeInvisibleTime(ProxyContext.create(), null, null, header, 0).join(); + AckResult ackResult = messageService.changeInvisibleTime(ProxyContextExt.create(), null, null, header, 0).join(); assertEquals(AckStatus.OK, ackResult.getStatus()); header.setExtraInfo(""); - ackResult = messageService.changeInvisibleTime(ProxyContext.create(), null, null, header, 0).join(); + ackResult = messageService.changeInvisibleTime(ProxyContextExt.create(), null, null, header, 0).join(); assertEquals(AckStatus.NO_EXIST, ackResult.getStatus()); } @@ -204,11 +205,11 @@ void ackMessage() { header.setTopic("topic"); header.setQueueId(0); header.setConsumerGroup("group"); - AckResult ackResult = messageService.ackMessage(ProxyContext.create(), null, null, header, 0).join(); + AckResult ackResult = messageService.ackMessage(ProxyContextExt.create(), null, null, header, 0).join(); assertEquals(AckStatus.OK, ackResult.getStatus()); header.setExtraInfo(""); - ackResult = messageService.ackMessage(ProxyContext.create(), null, null, header, 0).join(); + ackResult = messageService.ackMessage(ProxyContextExt.create(), null, null, header, 0).join(); assertEquals(AckStatus.NO_EXIST, ackResult.getStatus()); } @@ -219,13 +220,13 @@ void offset() { updateConsumerOffsetRequestHeader.setTopic("topic"); updateConsumerOffsetRequestHeader.setQueueId(0); updateConsumerOffsetRequestHeader.setCommitOffset(100L); - messageService.updateConsumerOffset(ProxyContext.create(), null, updateConsumerOffsetRequestHeader, 0).join(); + messageService.updateConsumerOffset(ProxyContextExt.create(), null, updateConsumerOffsetRequestHeader, 0).join(); QueryConsumerOffsetRequestHeader queryConsumerOffsetRequestHeader = new QueryConsumerOffsetRequestHeader(); queryConsumerOffsetRequestHeader.setConsumerGroup("group"); queryConsumerOffsetRequestHeader.setTopic("topic"); queryConsumerOffsetRequestHeader.setQueueId(0); - Long offset = messageService.queryConsumerOffset(ProxyContext.create(), null, queryConsumerOffsetRequestHeader, 0).join(); + Long offset = messageService.queryConsumerOffset(ProxyContextExt.create(), null, queryConsumerOffsetRequestHeader, 0).join(); assertEquals(100L, offset); } } \ No newline at end of file diff --git a/proxy/src/test/java/com/automq/rocketmq/proxy/service/SuspendPopRequestServiceTest.java b/proxy/src/test/java/com/automq/rocketmq/proxy/service/SuspendPopRequestServiceTest.java index ed88deb20..996454257 100644 --- a/proxy/src/test/java/com/automq/rocketmq/proxy/service/SuspendPopRequestServiceTest.java +++ b/proxy/src/test/java/com/automq/rocketmq/proxy/service/SuspendPopRequestServiceTest.java @@ -19,6 +19,7 @@ import com.automq.rocketmq.common.model.FlatMessageExt; import com.automq.rocketmq.proxy.mock.MockMessageUtil; +import com.automq.rocketmq.proxy.model.ProxyContextExt; import com.automq.rocketmq.store.model.message.TagFilter; import com.google.common.base.Supplier; import java.util.Collections; @@ -53,7 +54,7 @@ void suspendAndNotify() { Supplier>> supplier = () -> CompletableFuture.completedFuture(List.of(MockMessageUtil.buildMessage(0, 0, "tagA"))); // Try to suspend request with zero polling time. - CompletableFuture future = suspendPopRequestService.suspendPopRequest(header, 0, 0, + CompletableFuture future = suspendPopRequestService.suspendPopRequest(ProxyContextExt.create(), header, 0, 0, new TagFilter("tagA"), supplier); assertEquals(0, suspendPopRequestService.suspendRequestCount()); assertTrue(future.isDone()); @@ -64,7 +65,7 @@ void suspendAndNotify() { // Try to suspend request with non-zero polling time. header.setBornTime(System.currentTimeMillis()); header.setPollTime(100_000); - future = suspendPopRequestService.suspendPopRequest(header, 0, 0, + future = suspendPopRequestService.suspendPopRequest(ProxyContextExt.create(), header, 0, 0, new TagFilter("tagA"), supplier); assertEquals(1, suspendPopRequestService.suspendRequestCount()); assertFalse(future.isDone()); @@ -95,7 +96,7 @@ void cleanExpired() { header.setPollTime(100); header.setTopic("topic"); - CompletableFuture future = suspendPopRequestService.suspendPopRequest(header, 0, 0, + CompletableFuture future = suspendPopRequestService.suspendPopRequest(ProxyContextExt.create(), header, 0, 0, new TagFilter("tagA"), () -> CompletableFuture.completedFuture(Collections.emptyList())); assertEquals(1, suspendPopRequestService.suspendRequestCount()); assertFalse(future.isDone());