Skip to content

Commit

Permalink
[ISSUE apache#7872] Fix Tiered Store Query Message Async return diffe…
Browse files Browse the repository at this point in the history
…rent view each time (apache#7874)

Co-authored-by: ayue <[email protected]>
  • Loading branch information
AYue-94 and AYue-94 authored Mar 26, 2024
1 parent dde4fcc commit d9c75ff
Showing 1 changed file with 9 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,7 @@ public CompletableFuture<QueryMessageResult> queryMessageAsync(
messageStore.getIndexService().queryAsync(topic, key, maxCount, begin, end);

return future.thenCompose(indexItemList -> {
QueryMessageResult result = new QueryMessageResult();
List<CompletableFuture<Void>> futureList = new ArrayList<>(maxCount);
List<CompletableFuture<SelectMappedBufferResult>> futureList = new ArrayList<>(maxCount);
for (IndexItem indexItem : indexItemList) {
if (topicId != indexItem.getTopicId()) {
continue;
Expand All @@ -405,17 +404,20 @@ public CompletableFuture<QueryMessageResult> queryMessageAsync(
if (flatFile == null) {
continue;
}
CompletableFuture<Void> getMessageFuture = flatFile
CompletableFuture<SelectMappedBufferResult> getMessageFuture = flatFile
.getCommitLogAsync(indexItem.getOffset(), indexItem.getSize())
.thenAccept(messageBuffer -> result.addMessage(
new SelectMappedBufferResult(
indexItem.getOffset(), messageBuffer, indexItem.getSize(), null)));
.thenApply(messageBuffer -> new SelectMappedBufferResult(
indexItem.getOffset(), messageBuffer, indexItem.getSize(), null));
futureList.add(getMessageFuture);
if (futureList.size() >= maxCount) {
break;
}
}
return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).thenApply(v -> result);
return CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).thenApply(v -> {
QueryMessageResult result = new QueryMessageResult();
futureList.forEach(f -> f.thenAccept(result::addMessage));
return result;
});
}).whenComplete((result, throwable) -> {
if (result != null) {
log.info("MessageFetcher#queryMessageAsync, " +
Expand Down

0 comments on commit d9c75ff

Please sign in to comment.