From d9c75ff1f94d0b25bd79532cc326eff5a994e47d Mon Sep 17 00:00:00 2001 From: AYue <40812847+AYue-94@users.noreply.github.com> Date: Tue, 26 Mar 2024 15:12:38 +0800 Subject: [PATCH] [ISSUE #7872] Fix Tiered Store Query Message Async return different view each time (#7874) Co-authored-by: ayue --- .../core/MessageStoreFetcherImpl.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java index 2ffad2e3f4c..5403ebdc311 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java @@ -394,8 +394,7 @@ public CompletableFuture queryMessageAsync( messageStore.getIndexService().queryAsync(topic, key, maxCount, begin, end); return future.thenCompose(indexItemList -> { - QueryMessageResult result = new QueryMessageResult(); - List> futureList = new ArrayList<>(maxCount); + List> futureList = new ArrayList<>(maxCount); for (IndexItem indexItem : indexItemList) { if (topicId != indexItem.getTopicId()) { continue; @@ -405,17 +404,20 @@ public CompletableFuture queryMessageAsync( if (flatFile == null) { continue; } - CompletableFuture getMessageFuture = flatFile + CompletableFuture getMessageFuture = flatFile .getCommitLogAsync(indexItem.getOffset(), indexItem.getSize()) - .thenAccept(messageBuffer -> result.addMessage( - new SelectMappedBufferResult( - indexItem.getOffset(), messageBuffer, indexItem.getSize(), null))); + .thenApply(messageBuffer -> new SelectMappedBufferResult( + indexItem.getOffset(), messageBuffer, indexItem.getSize(), null)); futureList.add(getMessageFuture); if (futureList.size() >= maxCount) { break; } } - return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).thenApply(v -> result); + return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).thenApply(v -> { + QueryMessageResult result = new QueryMessageResult(); + futureList.forEach(f -> f.thenAccept(result::addMessage)); + return result; + }); }).whenComplete((result, throwable) -> { if (result != null) { log.info("MessageFetcher#queryMessageAsync, " +