Skip to content

Commit

Permalink
Fix partition reset (micronaut-projects#1028)
Browse files Browse the repository at this point in the history
  • Loading branch information
henriquelsmti committed May 28, 2024
1 parent 90637af commit 3858226
Showing 1 changed file with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -586,14 +586,15 @@ private boolean processConsumerRecords(final ConsumerState consumerState,
}

private void resetTheFollowingPartitions(ConsumerRecord<?, ?> errorConsumerRecord, ConsumerState consumerState, Iterator<? extends ConsumerRecord<?, ?>> iterator) {
Set<Integer> processedPartition = new HashSet<>();
processedPartition.add(errorConsumerRecord.partition());
Set<TopicPartition> processedPartition = new HashSet<>();
TopicPartition topicPartition = new TopicPartition(errorConsumerRecord.topic(), errorConsumerRecord.partition());
processedPartition.add(topicPartition);
while (iterator.hasNext()) {
ConsumerRecord<?, ?> consumerRecord = iterator.next();
if (!processedPartition.add(consumerRecord.partition())) {
topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
if (!processedPartition.add(topicPartition)) {
continue;
}
TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
consumerState.kafkaConsumer.seek(topicPartition, consumerRecord.offset());
}
}
Expand Down

0 comments on commit 3858226

Please sign in to comment.