diff --git a/src/main/java/com/gojek/beast/commiter/OffsetState.java b/src/main/java/com/gojek/beast/commiter/OffsetState.java index af6cfe3..6a6b14f 100644 --- a/src/main/java/com/gojek/beast/commiter/OffsetState.java +++ b/src/main/java/com/gojek/beast/commiter/OffsetState.java @@ -6,16 +6,21 @@ import java.time.Instant; import java.util.Map; +import java.util.Set; public class OffsetState { @Getter private final long acknowledgeTimeoutMs; + @Getter + private final long offsetBatchDuration; private boolean start; - private Map lastCommitOffset; private Instant lastCommittedTime; + private Set> partitionOffsetAck; - public OffsetState(long acknowledgeTimeoutMs) { + public OffsetState(Set> partitionOffsetAck, long acknowledgeTimeoutMs, long offsetBatchDuration) { + this.partitionOffsetAck = partitionOffsetAck; this.acknowledgeTimeoutMs = acknowledgeTimeoutMs; + this.offsetBatchDuration = offsetBatchDuration; lastCommittedTime = Instant.now(); } @@ -23,14 +28,19 @@ public boolean shouldCloseConsumer(Map curren if (!start) { return false; } - boolean sameOffset = lastCommitOffset == currentOffset || currentOffset.equals(lastCommitOffset); boolean ackTimedOut = (Instant.now().toEpochMilli() - lastCommittedTime.toEpochMilli()) > acknowledgeTimeoutMs; - boolean neverAcknowledged = lastCommitOffset == null && ackTimedOut; - return (sameOffset && ackTimedOut) || neverAcknowledged; + return ackTimedOut; + } + + public int partitionOffsetAckSize() { + return partitionOffsetAck.size(); + } + + public boolean removeFromOffsetAck(Map commitOffset) { + return partitionOffsetAck.remove(commitOffset); } - public void resetOffset(Map acknowledgedOffset) { - lastCommitOffset = acknowledgedOffset; + public void resetOffset() { lastCommittedTime = Instant.now(); } diff --git a/src/main/java/com/gojek/beast/config/AppConfig.java b/src/main/java/com/gojek/beast/config/AppConfig.java index f8e597b..381e5e6 100644 --- a/src/main/java/com/gojek/beast/config/AppConfig.java +++ b/src/main/java/com/gojek/beast/config/AppConfig.java @@ -75,4 +75,8 @@ public interface AppConfig extends Config { @Key("GCS_WRITER_PROJECT_NAME") String getGcsWriterProject(); + + @DefaultValue("2000") + @Key("OFFSET_BATCH_DURATION") + long getOffsetBatchDuration(); } diff --git a/src/main/java/com/gojek/beast/factory/BeastFactory.java b/src/main/java/com/gojek/beast/factory/BeastFactory.java index 035f146..d5b45dc 100644 --- a/src/main/java/com/gojek/beast/factory/BeastFactory.java +++ b/src/main/java/com/gojek/beast/factory/BeastFactory.java @@ -1,5 +1,6 @@ package com.gojek.beast.factory; +import com.gojek.beast.Clock; import com.gojek.beast.backoff.BackOff; import com.gojek.beast.backoff.ExponentialBackOffProvider; import com.gojek.beast.config.AppConfig; @@ -160,8 +161,8 @@ public OffsetCommitWorker createOffsetCommitter() { if (committer != null) { return committer; } - OffsetState offsetState = new OffsetState(appConfig.getOffsetAckTimeoutMs()); - committer = new OffsetCommitWorker("committer", partitionsAck, createKafkaConsumer(), offsetState, commitQueue, workerState); + OffsetState offsetState = new OffsetState(partitionsAck, appConfig.getOffsetAckTimeoutMs(), appConfig.getOffsetBatchDuration()); + committer = new OffsetCommitWorker("committer", new QueueConfig(appConfig.getBqWorkerPollTimeoutMs(), "commit"), createKafkaConsumer(), offsetState, commitQueue, workerState, new Clock()); return committer; } diff --git a/src/main/java/com/gojek/beast/models/FailureStatus.java b/src/main/java/com/gojek/beast/models/FailureStatus.java index afc6d19..1fc2da4 100644 --- a/src/main/java/com/gojek/beast/models/FailureStatus.java +++ b/src/main/java/com/gojek/beast/models/FailureStatus.java @@ -28,7 +28,8 @@ public Optional getException() { @Override public String toString() { return "FailureStatus{" - + "cause=" + cause.getCause() + + "exception=" + cause.getClass().getName() + + ", cause=" + cause.getCause() + ", message='" + ((message != null) ? message : cause.getMessage()) + '\'' + '}'; } } diff --git a/src/main/java/com/gojek/beast/protomapping/ProtoUpdateListener.java b/src/main/java/com/gojek/beast/protomapping/ProtoUpdateListener.java index 4847fe3..6299891 100644 --- a/src/main/java/com/gojek/beast/protomapping/ProtoUpdateListener.java +++ b/src/main/java/com/gojek/beast/protomapping/ProtoUpdateListener.java @@ -1,15 +1,13 @@ package com.gojek.beast.protomapping; import com.gojek.beast.Clock; -import com.gojek.beast.config.StencilConfig; import com.gojek.beast.config.BQConfig; import com.gojek.beast.config.ColumnMapping; import com.gojek.beast.config.ProtoMappingConfig; +import com.gojek.beast.config.StencilConfig; import com.gojek.beast.converter.ConsumerRecordConverter; import com.gojek.beast.converter.RowMapper; import com.gojek.beast.exception.BQTableUpdateFailure; -import com.gojek.beast.exception.BQSchemaMappingException; -import com.gojek.beast.exception.ProtoMappingException; import com.gojek.beast.exception.ProtoNotFoundException; import com.gojek.beast.models.BQField; import com.gojek.beast.models.ProtoField; @@ -88,10 +86,9 @@ public void onProtoUpdate(String url, Map newDesc ProtoField protoField = protoFieldFactory.getProtoField(); protoField = protoMappingParser.parseFields(protoField, proto, StencilUtils.getAllProtobufDescriptors(newDescriptors), StencilUtils.getTypeNameToPackageNameMap(newDescriptors)); updateProtoParser(protoField); - } catch (ProtoNotFoundException | BQSchemaMappingException | BigQueryException | IOException e) { + } catch (BigQueryException | ProtoNotFoundException | IOException e) { String errMsg = "Error while updating bigquery table on callback:" + e.getMessage(); log.error(errMsg); - e.printStackTrace(); statsClient.increment("bq.table.upsert.failures"); throw new BQTableUpdateFailure(errMsg); } @@ -112,14 +109,7 @@ private void updateProtoParser(final ProtoField protoField) throws IOException { private ColumnMapping getProtoMapping() throws IOException { ProtoField protoField = new ProtoField(); - try { - protoField = protoMappingParser.parseFields(protoField, proto, stencilClient.getAll(), stencilClient.getTypeNameToPackageNameMap()); - } catch (ProtoNotFoundException e) { - String errMsg = "Error while generating proto to column mapping:" + e.getMessage(); - log.error(errMsg); - e.printStackTrace(); - throw new ProtoMappingException(errMsg); - } + protoField = protoMappingParser.parseFields(protoField, proto, stencilClient.getAll(), stencilClient.getTypeNameToPackageNameMap()); String protoMapping = protoMappingConverter.generateColumnMappings(protoField.getFields()); protoMappingConfig.setProperty("PROTO_COLUMN_MAPPING", protoMapping); return protoMappingConfig.getProtoColumnMapping(); diff --git a/src/main/java/com/gojek/beast/worker/OffsetCommitWorker.java b/src/main/java/com/gojek/beast/worker/OffsetCommitWorker.java index 822e293..8a20e33 100644 --- a/src/main/java/com/gojek/beast/worker/OffsetCommitWorker.java +++ b/src/main/java/com/gojek/beast/worker/OffsetCommitWorker.java @@ -1,47 +1,53 @@ package com.gojek.beast.worker; +import com.gojek.beast.Clock; import com.gojek.beast.commiter.KafkaCommitter; import com.gojek.beast.commiter.OffsetState; +import com.gojek.beast.config.QueueConfig; import com.gojek.beast.models.FailureStatus; -import com.gojek.beast.models.Records; -import com.gojek.beast.models.Status; +import com.gojek.beast.models.OffsetMetadata; import com.gojek.beast.models.SuccessStatus; import com.gojek.beast.stats.Stats; +import com.gojek.beast.models.Records; +import com.gojek.beast.models.Status; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import java.time.Instant; +import java.util.HashMap; import java.util.Map; -import java.util.Set; import java.util.concurrent.BlockingQueue; @Slf4j public class OffsetCommitWorker extends Worker { private static final int DEFAULT_SLEEP_MS = 100; private final Stats statsClient = Stats.client(); - private BlockingQueue commitQueue; - private Set> partitionOffsetAck; - private KafkaCommitter kafkaCommitter; + private final BlockingQueue commitQueue; + private final QueueConfig queueConfig; + private final KafkaCommitter kafkaCommitter; @Setter private long defaultSleepMs; - - + private boolean stopped; private OffsetState offsetState; + private Clock clock; - public OffsetCommitWorker(String name, Set> partitionOffsetAck, KafkaCommitter kafkaCommitter, OffsetState offsetState, BlockingQueue commitQueue, WorkerState workerState) { + public OffsetCommitWorker(String name, QueueConfig queueConfig, KafkaCommitter kafkaCommitter, OffsetState offsetState, BlockingQueue commitQueue, WorkerState workerState, Clock clock) { super(name, workerState); + this.clock = clock; + this.queueConfig = queueConfig; this.commitQueue = commitQueue; - this.partitionOffsetAck = partitionOffsetAck; this.kafkaCommitter = kafkaCommitter; this.defaultSleepMs = DEFAULT_SLEEP_MS; this.offsetState = offsetState; + this.stopped = false; } @Override public void stop(String reason) { log.info("Closing committer: {}", reason); + this.stopped = true; kafkaCommitter.wakeup(reason); } @@ -49,36 +55,65 @@ public void stop(String reason) { public Status job() { offsetState.startTimer(); try { - Instant start = Instant.now(); - Records commitOffset = commitQueue.peek(); - if (commitOffset == null) { - return new SuccessStatus(); - } - Map currentOffset = commitOffset.getPartitionsCommitOffset(); - if (partitionOffsetAck.contains(currentOffset)) { - commit(); - } else { - if (offsetState.shouldCloseConsumer(currentOffset)) { - statsClient.increment("committer.ack.timeout"); - return new FailureStatus(new RuntimeException("Acknowledgement Timeout exceeded: " + offsetState.getAcknowledgeTimeoutMs())); + Instant startTime = Instant.now(); + long start = clock.currentEpochMillis(); + Map partitionsCommitOffset = new HashMap<>(); + + int offsetClubbedBatches = 0; + while (true) { + Records commitOffset = commitQueue.poll(queueConfig.getTimeout(), queueConfig.getTimeoutUnit()); + if (stopped || clock.currentEpochMillis() - start > offsetState.getOffsetBatchDuration()) { + break; } - log.debug("waiting for {} acknowledgement for offset {}", defaultSleepMs, currentOffset); - sleep(defaultSleepMs); - statsClient.gauge("committer.queue.wait.ms", defaultSleepMs); + + if (commitOffset == null) { + continue; + } + + Map currentOffset = commitOffset.getPartitionsCommitOffset(); + Instant commitQueuePollStartTime = Instant.now(); + while (true) { + if (offsetState.removeFromOffsetAck(currentOffset)) { + + currentOffset.keySet().forEach(topicPartition -> { + OffsetAndMetadata offsetAndMetadata = currentOffset.get(topicPartition); + OffsetMetadata previousOffset = (OffsetMetadata) partitionsCommitOffset.getOrDefault(topicPartition, new OffsetMetadata(Integer.MIN_VALUE)); + OffsetMetadata newOffset = new OffsetMetadata(offsetAndMetadata.offset()); + if (previousOffset.compareTo(newOffset) < 0) { + partitionsCommitOffset.put(topicPartition, newOffset); + } + }); + offsetState.resetOffset(); + offsetClubbedBatches++; + break; + } else { + if (offsetState.shouldCloseConsumer(partitionsCommitOffset)) { + statsClient.increment("committer.ack.timeout"); + return new FailureStatus(new RuntimeException("Acknowledgement Timeout exceeded: " + offsetState.getAcknowledgeTimeoutMs())); + } + + log.debug("waiting for {} acknowledgement for offset {}, {}, {}", defaultSleepMs, currentOffset, offsetState.partitionOffsetAckSize(), currentOffset); + sleep(defaultSleepMs); + } + } + statsClient.timeIt("committer.queue.wait.ms", commitQueuePollStartTime); } - statsClient.timeIt("committer.processing.time", start); + commit(partitionsCommitOffset); + + statsClient.gauge("committer.clubbed.offsets", offsetClubbedBatches); + statsClient.timeIt("committer.processing.time", startTime); } catch (InterruptedException | RuntimeException e) { + log.info("Received {} exception: {}, resetting committer", e.getClass(), e.getMessage()); + e.printStackTrace(); return new FailureStatus(new RuntimeException("Exception in offset committer: " + e.getMessage())); } return new SuccessStatus(); } - private void commit() { - Map partitionsCommitOffset = commitQueue.remove().getPartitionsCommitOffset(); - kafkaCommitter.commitSync(partitionsCommitOffset); - partitionOffsetAck.remove(partitionsCommitOffset); - Records nextOffset = commitQueue.peek(); - if (nextOffset != null) offsetState.resetOffset(nextOffset.getPartitionsCommitOffset()); - log.info("commit partition {} size {}", partitionsCommitOffset.toString(), partitionsCommitOffset.size()); + private void commit(Map partitionsCommitOffset) { + if (partitionsCommitOffset.size() != 0) { + kafkaCommitter.commitSync(partitionsCommitOffset); + } + log.info("committed offsets partition {} size {}", partitionsCommitOffset.toString(), partitionsCommitOffset.size()); } } diff --git a/src/test/java/com/gojek/beast/commiter/OffsetCommitWorkerIntegrationTest.java b/src/test/java/com/gojek/beast/commiter/OffsetCommitWorkerIntegrationTest.java index 836d649..86bf2c2 100644 --- a/src/test/java/com/gojek/beast/commiter/OffsetCommitWorkerIntegrationTest.java +++ b/src/test/java/com/gojek/beast/commiter/OffsetCommitWorkerIntegrationTest.java @@ -1,5 +1,7 @@ package com.gojek.beast.commiter; +import com.gojek.beast.Clock; +import com.gojek.beast.config.QueueConfig; import com.gojek.beast.consumer.KafkaConsumer; import com.gojek.beast.models.Records; import com.gojek.beast.util.RecordsUtil; @@ -10,6 +12,8 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @@ -20,9 +24,11 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.Comparator; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; @@ -30,6 +36,7 @@ import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.never; @@ -39,7 +46,8 @@ public class OffsetCommitWorkerIntegrationTest { @Mock private KafkaConsumer kafkaConsumer; - private OffsetCommitWorker offsetCommitWorker; + @Captor + private ArgumentCaptor> commitPartitionsOffsetCaptor; private int acknowledgeTimeoutMs; private LinkedBlockingQueue commitQueue; private RecordsUtil recordsUtil; @@ -47,18 +55,20 @@ public class OffsetCommitWorkerIntegrationTest { private OffsetState offsetState; private Acknowledger offsetAcknowledger; private WorkerState workerState; + private Clock clock; @Before public void setUp() { commitQueue = new LinkedBlockingQueue<>(); + clock = new Clock(); CopyOnWriteArraySet> ackSet = new CopyOnWriteArraySet<>(); acknowledgements = Collections.synchronizedSet(ackSet); - acknowledgeTimeoutMs = 3000; + acknowledgeTimeoutMs = 2000; recordsUtil = new RecordsUtil(); - offsetState = new OffsetState(acknowledgeTimeoutMs); + offsetState = new OffsetState(acknowledgements, acknowledgeTimeoutMs, 1000); offsetAcknowledger = new OffsetAcknowledger(acknowledgements); workerState = new WorkerState(); - committer = new OffsetCommitWorker("committer", acknowledgements, kafkaConsumer, offsetState, commitQueue, workerState); + committer = new OffsetCommitWorker("committer", new QueueConfig(200), kafkaConsumer, offsetState, commitQueue, workerState, clock); } @Test @@ -86,15 +96,27 @@ public void shouldCommitPartitionsOfAllRecordsInSequence() throws InterruptedExc committerThread.join(); InOrder inOrder = inOrder(kafkaConsumer); - inOrder.verify(kafkaConsumer).commitSync(records1.getPartitionsCommitOffset()); - inOrder.verify(kafkaConsumer).commitSync(records2.getPartitionsCommitOffset()); - inOrder.verify(kafkaConsumer).commitSync(records3.getPartitionsCommitOffset()); + verify(kafkaConsumer).commitSync(commitPartitionsOffsetCaptor.capture()); + List> committedOffsets = commitPartitionsOffsetCaptor.getValue().entrySet().stream().sorted(Comparator.comparing(e -> e.getKey().topic())).collect(Collectors.toList()); + assertEquals(3, committedOffsets.size()); + Map.Entry offset1 = committedOffsets.get(0); + Map.Entry offset2 = committedOffsets.get(1); + Map.Entry offset3 = committedOffsets.get(2); + assertEquals("topic_customer-", offset1.getKey().topic()); + assertEquals(0, offset1.getKey().partition()); + assertEquals(6, offset1.getValue().offset()); + assertEquals("topic_driver-", offset2.getKey().topic()); + assertEquals(0, offset2.getKey().partition()); + assertEquals(3, offset2.getValue().offset()); + assertEquals("topic_merchant-", offset3.getKey().topic()); + assertEquals(0, offset3.getKey().partition()); + assertEquals(9, offset3.getValue().offset()); inOrder.verify(kafkaConsumer, atLeastOnce()).wakeup(anyString()); assertTrue(acknowledgements.isEmpty()); } @Test - public void shouldStopConsumerWhenAckTimeOutHappensForNextOffset() throws InterruptedException { + public void shouldStopConsumerWhenAckTimeOutHappens() throws InterruptedException { Records records1 = recordsUtil.createRecords("driver-", 3); Records records2 = recordsUtil.createRecords("customer-", 3); Records records3 = recordsUtil.createRecords("merchant-", 3); @@ -119,11 +141,8 @@ public void shouldStopConsumerWhenAckTimeOutHappensForNextOffset() throws Interr committerThread.join(); InOrder inOrder = inOrder(kafkaConsumer); - inOrder.verify(kafkaConsumer).commitSync(records1.getPartitionsCommitOffset()); - inOrder.verify(kafkaConsumer, never()).commitSync(records2.getPartitionsCommitOffset()); - inOrder.verify(kafkaConsumer, never()).commitSync(records3.getPartitionsCommitOffset()); - assertEquals(2, commitQueue.size()); - assertEquals(records2, commitQueue.take()); + inOrder.verify(kafkaConsumer, never()).commitSync(anyMap()); + assertEquals(1, commitQueue.size()); assertEquals(records3, commitQueue.take()); inOrder.verify(kafkaConsumer, atLeastOnce()).wakeup(anyString()); assertEquals(1, acknowledgements.size()); @@ -145,7 +164,7 @@ public void shouldStopWhenNoAcknowledgements() throws InterruptedException { InOrder inOrder = inOrder(kafkaConsumer); inOrder.verify(kafkaConsumer, never()).commitSync(anyMap()); - assertEquals(3, commitQueue.size()); + assertEquals(2, commitQueue.size()); inOrder.verify(kafkaConsumer, atLeastOnce()).wakeup(anyString()); assertTrue(acknowledgements.isEmpty()); } diff --git a/src/test/java/com/gojek/beast/commiter/OffsetStateTest.java b/src/test/java/com/gojek/beast/commiter/OffsetStateTest.java index e42eac8..3efaa74 100644 --- a/src/test/java/com/gojek/beast/commiter/OffsetStateTest.java +++ b/src/test/java/com/gojek/beast/commiter/OffsetStateTest.java @@ -4,32 +4,36 @@ import org.apache.kafka.common.TopicPartition; import org.junit.Test; +import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class OffsetStateTest { - + private CopyOnWriteArraySet> ackSet = new CopyOnWriteArraySet<>(); + private Set> acknowledgements = Collections.synchronizedSet(ackSet); @Test public void shouldReturnFalseWhenAcknowledgedRecently() { Map offset = new HashMap<>(); offset.put(new TopicPartition("topic", 1), new OffsetAndMetadata(101)); - OffsetState state = new OffsetState(10000); + OffsetState state = new OffsetState(acknowledgements, 200, 100); state.startTimer(); - state.resetOffset(offset); + state.resetOffset(); assertFalse(state.shouldCloseConsumer(offset)); } @Test public void shouldReturnFalseAfterFirstRecordAcknowledgement() throws InterruptedException { - int ackTimeout = 100; + int ackTimeout = 200; Map offset = new HashMap<>(); offset.put(new TopicPartition("topic", 1), new OffsetAndMetadata(101)); - OffsetState state = new OffsetState(ackTimeout); + OffsetState state = new OffsetState(acknowledgements, ackTimeout, 100); state.startTimer(); Thread.sleep(ackTimeout - 10); @@ -38,10 +42,10 @@ public void shouldReturnFalseAfterFirstRecordAcknowledgement() throws Interrupte @Test public void shouldReturnTrueIfNoneAcknowledgedAndTimedOut() throws InterruptedException { - int ackTimeout = 70; + int ackTimeout = 200; Map offset = new HashMap<>(); offset.put(new TopicPartition("topic", 1), new OffsetAndMetadata(101)); - OffsetState state = new OffsetState(ackTimeout); + OffsetState state = new OffsetState(acknowledgements, ackTimeout, 100); state.startTimer(); Thread.sleep(ackTimeout + 100); @@ -51,14 +55,14 @@ public void shouldReturnTrueIfNoneAcknowledgedAndTimedOut() throws InterruptedEx @Test public void shouldReturnFalseWhenAcknowledgedWithDifferentOffset() throws InterruptedException { - int ackTimeout = 100; + int ackTimeout = 200; Map oldOffset = new HashMap<>(); Map currOffset = new HashMap<>(); currOffset.put(new TopicPartition("topic", 1), new OffsetAndMetadata(101)); - OffsetState state = new OffsetState(ackTimeout); + OffsetState state = new OffsetState(acknowledgements, ackTimeout, 100); state.startTimer(); - state.resetOffset(oldOffset); - state.resetOffset(currOffset); + state.resetOffset(); + state.resetOffset(); Thread.sleep(ackTimeout - 20); assertFalse(state.shouldCloseConsumer(currOffset)); @@ -66,12 +70,12 @@ public void shouldReturnFalseWhenAcknowledgedWithDifferentOffset() throws Interr @Test public void shouldReturnTrueWhenLastAckOffsetIsSameAndTimedOut() throws InterruptedException { - int ackTimeout = 100; + int ackTimeout = 200; Map currOffset = new HashMap<>(); currOffset.put(new TopicPartition("topic", 1), new OffsetAndMetadata(101)); - OffsetState state = new OffsetState(ackTimeout); + OffsetState state = new OffsetState(acknowledgements, ackTimeout, 100); state.startTimer(); - state.resetOffset(currOffset); + state.resetOffset(); Thread.sleep(ackTimeout + 10); assertTrue(state.shouldCloseConsumer(currOffset)); @@ -79,10 +83,10 @@ public void shouldReturnTrueWhenLastAckOffsetIsSameAndTimedOut() throws Interrup @Test public void shouldReturnFalseWhenTimerNotStarted() throws InterruptedException { - int ackTimeout = 10; + int ackTimeout = 200; Map currOffset = new HashMap<>(); currOffset.put(new TopicPartition("topic", 1), new OffsetAndMetadata(101)); - OffsetState state = new OffsetState(ackTimeout); + OffsetState state = new OffsetState(acknowledgements, ackTimeout, 10); Thread.sleep(ackTimeout + 10); assertFalse(state.shouldCloseConsumer(currOffset)); diff --git a/src/test/java/com/gojek/beast/worker/OffsetCommitWorkerTest.java b/src/test/java/com/gojek/beast/worker/OffsetCommitWorkerTest.java index f68b9e5..05475a4 100644 --- a/src/test/java/com/gojek/beast/worker/OffsetCommitWorkerTest.java +++ b/src/test/java/com/gojek/beast/worker/OffsetCommitWorkerTest.java @@ -1,9 +1,11 @@ package com.gojek.beast.worker; +import com.gojek.beast.Clock; import com.gojek.beast.commiter.Acknowledger; +import com.gojek.beast.commiter.KafkaCommitter; import com.gojek.beast.commiter.OffsetAcknowledger; import com.gojek.beast.commiter.OffsetState; -import com.gojek.beast.consumer.KafkaConsumer; +import com.gojek.beast.config.QueueConfig; import com.gojek.beast.models.Records; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; @@ -11,86 +13,74 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.InOrder; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import java.util.Arrays; -import java.util.Collections; -import java.util.ConcurrentModificationException; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; +import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.atLeast; -import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; @RunWith(MockitoJUnitRunner.class) public class OffsetCommitWorkerTest { @Mock private Records records; + @Captor + private ArgumentCaptor> commitPartitionsOffsetCaptor; @Mock private Map commitPartitionsOffset; @Mock - private KafkaConsumer kafkaConsumer; - private BlockingQueue commitQ; + private KafkaCommitter kafkaCommitter; + private QueueConfig queueConfig; + private int pollTimeout; private Set> acknowledgements; - private int acknowledgeTimeoutMs; - private OffsetCommitWorker offsetCommitWorker; - @Mock - private Set> acknowledgeSetMock; - @Mock private OffsetState offsetState; private Acknowledger offsetAcknowledger; private WorkerState workerState; + private long offsetBatchDuration; + private long ackTimeoutTime; + @Mock + private Clock clock; @Before public void setUp() { - commitQ = new LinkedBlockingQueue<>(); + pollTimeout = 200; + offsetBatchDuration = 1000; + ackTimeoutTime = 2000; + queueConfig = new QueueConfig(pollTimeout); + commitPartitionsOffset = new HashMap() {{ + put(new TopicPartition("topic", 0), new OffsetAndMetadata(1)); + }}; CopyOnWriteArraySet> ackSet = new CopyOnWriteArraySet<>(); acknowledgements = Collections.synchronizedSet(ackSet); - when(offsetState.shouldCloseConsumer(any())).thenReturn(false); + offsetState = new OffsetState(acknowledgements, ackTimeoutTime, offsetBatchDuration); workerState = new WorkerState(); - offsetCommitWorker = new OffsetCommitWorker("committer", acknowledgements, kafkaConsumer, offsetState, commitQ, workerState); - acknowledgeTimeoutMs = 15000; offsetAcknowledger = new OffsetAcknowledger(acknowledgements); } @After - public void tearDown() throws Exception { - commitQ.clear(); + public void tearDown() { acknowledgements.clear(); - offsetCommitWorker.stop("some reason"); } @Test public void shouldCommitFirstOffsetWhenAcknowledged() throws InterruptedException { when(records.getPartitionsCommitOffset()).thenReturn(commitPartitionsOffset); - CopyOnWriteArraySet> ackSet = spy(new CopyOnWriteArraySet<>()); - Set> acks = Collections.synchronizedSet(ackSet); + when(clock.currentEpochMillis()).thenReturn(0L, 0L, 0L, 1001L); BlockingQueue commitQueue = spy(new LinkedBlockingQueue<>()); - offsetAcknowledger = new OffsetAcknowledger(ackSet); - OffsetCommitWorker committer = new OffsetCommitWorker("committer", acks, kafkaConsumer, offsetState, commitQueue, workerState); + OffsetCommitWorker committer = new OffsetCommitWorker("committer", new QueueConfig(200), kafkaCommitter, offsetState, commitQueue, workerState, clock); committer.setDefaultSleepMs(10); commitQueue.put(records); offsetAcknowledger.acknowledge(commitPartitionsOffset); @@ -100,29 +90,40 @@ public void shouldCommitFirstOffsetWhenAcknowledged() throws InterruptedExceptio await().until(commitQueue::isEmpty); workerState.closeWorker(); - - verify(commitQueue, atLeast(1)).peek(); - - InOrder callOrder = inOrder(kafkaConsumer, records); - callOrder.verify(records, atLeast(1)).getPartitionsCommitOffset(); - callOrder.verify(kafkaConsumer, times(1)).commitSync(commitPartitionsOffset); + commitThread.join(); + + verify(commitQueue, atLeast(1)).poll(queueConfig.getTimeout(), queueConfig.getTimeoutUnit()); + verify(records, atLeast(1)).getPartitionsCommitOffset(); + verify(kafkaCommitter).commitSync(commitPartitionsOffsetCaptor.capture()); + Map committedOffsets = commitPartitionsOffsetCaptor.getValue(); + assertEquals(1, committedOffsets.size()); + Map.Entry offset = committedOffsets.entrySet().iterator().next(); + assertEquals(offset.getKey().topic(), "topic"); + assertEquals(offset.getKey().partition(), 0); + assertEquals(offset.getValue().offset(), 1); assertTrue(commitQueue.isEmpty()); - assertTrue(acks.isEmpty()); } @Test - public void shouldCommitOffsetsInSequenceWhenAcknowledgedRandom() throws InterruptedException { - Map record1CommitOffset = mock(Map.class); - Map record2CommitOffset = mock(Map.class); - Map record3CommitOffset = mock(Map.class); + public void shouldBatchCommitOffsets() throws InterruptedException { + Map record1CommitOffset = new HashMap() {{ + put(new TopicPartition("topic", 0), new OffsetAndMetadata(1)); + }}; + Map record2CommitOffset = new HashMap() {{ + put(new TopicPartition("topic", 0), new OffsetAndMetadata(2)); + }}; + Map record3CommitOffset = new HashMap() {{ + put(new TopicPartition("topic", 0), new OffsetAndMetadata(3)); + }}; Records records1 = mock(Records.class); Records records2 = mock(Records.class); Records records3 = mock(Records.class); when(records1.getPartitionsCommitOffset()).thenReturn(record1CommitOffset); when(records2.getPartitionsCommitOffset()).thenReturn(record2CommitOffset); when(records3.getPartitionsCommitOffset()).thenReturn(record3CommitOffset); - LinkedBlockingQueue commitQueue = new LinkedBlockingQueue<>(); - OffsetCommitWorker committer = new OffsetCommitWorker("committer", acknowledgements, kafkaConsumer, offsetState, commitQueue, workerState); + when(clock.currentEpochMillis()).thenReturn(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 1001L); + BlockingQueue commitQueue = spy(new LinkedBlockingQueue<>()); + OffsetCommitWorker committer = new OffsetCommitWorker("committer", new QueueConfig(200), kafkaCommitter, offsetState, commitQueue, workerState, clock); for (Records rs : Arrays.asList(records1, records2, records3)) { commitQueue.offer(rs, 1, TimeUnit.SECONDS); @@ -137,27 +138,31 @@ public void shouldCommitOffsetsInSequenceWhenAcknowledgedRandom() throws Interru workerState.closeWorker(); committerThread.join(); - InOrder inOrder = inOrder(kafkaConsumer, offsetState); - inOrder.verify(offsetState).startTimer(); - inOrder.verify(kafkaConsumer).commitSync(record1CommitOffset); - inOrder.verify(offsetState, atLeastOnce()).resetOffset(record2CommitOffset); - - inOrder.verify(kafkaConsumer).commitSync(record2CommitOffset); - inOrder.verify(offsetState, atLeastOnce()).resetOffset(record3CommitOffset); - - inOrder.verify(kafkaConsumer).commitSync(record3CommitOffset); - inOrder.verify(offsetState, never()).resetOffset(null); - inOrder.verify(offsetState, never()).shouldCloseConsumer(record3CommitOffset); - - assertTrue(commitQ.isEmpty()); - assertTrue(acknowledgements.isEmpty()); + verify(commitQueue, atLeast(3)).poll(queueConfig.getTimeout(), queueConfig.getTimeoutUnit()); + verify(records1, atLeast(1)).getPartitionsCommitOffset(); + verify(records2, atLeast(1)).getPartitionsCommitOffset(); + verify(records3, atLeast(1)).getPartitionsCommitOffset(); + verify(kafkaCommitter).commitSync(commitPartitionsOffsetCaptor.capture()); + Map committedOffsets = commitPartitionsOffsetCaptor.getValue(); + assertEquals(1, committedOffsets.size()); + Map.Entry offset = committedOffsets.entrySet().iterator().next(); + assertEquals(offset.getKey().topic(), "topic"); + assertEquals(offset.getKey().partition(), 0); + assertEquals(offset.getValue().offset(), 3); + assertTrue(commitQueue.isEmpty()); } @Test public void shouldCommitInSequenceWithParallelAcknowledgements() throws InterruptedException { - Map record1CommitOffset = mock(Map.class); - Map record2CommitOffset = mock(Map.class); - Map record3CommitOffset = mock(Map.class); + Map record1CommitOffset = new HashMap() {{ + put(new TopicPartition("topic", 0), new OffsetAndMetadata(1)); + }}; + Map record2CommitOffset = new HashMap() {{ + put(new TopicPartition("topic", 0), new OffsetAndMetadata(2)); + }}; + Map record3CommitOffset = new HashMap() {{ + put(new TopicPartition("topic", 0), new OffsetAndMetadata(3)); + }}; Records records2 = mock(Records.class); Records records3 = mock(Records.class); Records records1 = mock(Records.class); @@ -165,13 +170,16 @@ public void shouldCommitInSequenceWithParallelAcknowledgements() throws Interrup when(records1.getPartitionsCommitOffset()).thenReturn(record1CommitOffset); when(records2.getPartitionsCommitOffset()).thenReturn(record2CommitOffset); when(records3.getPartitionsCommitOffset()).thenReturn(record3CommitOffset); - offsetCommitWorker.setDefaultSleepMs(50); + when(clock.currentEpochMillis()).thenReturn(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 1001L); + BlockingQueue commitQueue = spy(new LinkedBlockingQueue<>()); + OffsetCommitWorker committer = new OffsetCommitWorker("committer", new QueueConfig(200), kafkaCommitter, offsetState, commitQueue, workerState, clock); + committer.setDefaultSleepMs(50); List incomingRecords = Arrays.asList(records1, records2, records3); for (Records incomingRecord : incomingRecords) { - commitQ.offer(incomingRecord, 1, TimeUnit.SECONDS); + commitQueue.offer(incomingRecord, 1, TimeUnit.SECONDS); } - Thread commiterThread = new Thread(offsetCommitWorker); + Thread commiterThread = new Thread(committer); commiterThread.start(); AcknowledgeHelper acknowledger1 = new AcknowledgeHelper(record2CommitOffset, offsetAcknowledger, new Random().nextInt(ackDelayRandomMs)); @@ -180,41 +188,50 @@ public void shouldCommitInSequenceWithParallelAcknowledgements() throws Interrup List acknowledgers = Arrays.asList(acknowledger1, acknowledger2, acknowledger3); acknowledgers.forEach(Thread::start); - await().until(() -> commitQ.isEmpty()); + await().until(commitQueue::isEmpty); workerState.closeWorker(); commiterThread.join(); - InOrder inOrder = inOrder(kafkaConsumer); - incomingRecords.forEach(rs -> inOrder.verify(kafkaConsumer).commitSync(rs.getPartitionsCommitOffset())); - assertTrue(commitQ.isEmpty()); - assertTrue(acknowledgements.isEmpty()); + verify(records1, atLeast(1)).getPartitionsCommitOffset(); + verify(records2, atLeast(1)).getPartitionsCommitOffset(); + verify(records3, atLeast(1)).getPartitionsCommitOffset(); + verify(kafkaCommitter).commitSync(commitPartitionsOffsetCaptor.capture()); + Map committedOffsets = commitPartitionsOffsetCaptor.getValue(); + assertEquals(1, committedOffsets.size()); + Map.Entry offset = committedOffsets.entrySet().iterator().next(); + assertEquals(offset.getKey().topic(), "topic"); + assertEquals(offset.getKey().partition(), 0); + assertEquals(offset.getValue().offset(), 3); } @Test public void shouldStopProcessWhenCommitterGetException() throws InterruptedException { - doThrow(ConcurrentModificationException.class).when(kafkaConsumer).commitSync(anyMap()); - commitQ.put(records); + doThrow(ConcurrentModificationException.class).when(kafkaCommitter).commitSync(anyMap()); when(records.getPartitionsCommitOffset()).thenReturn(commitPartitionsOffset); - acknowledgements.add(commitPartitionsOffset); + offsetAcknowledger.acknowledge(commitPartitionsOffset); + BlockingQueue commitQueue = spy(new LinkedBlockingQueue<>()); + when(clock.currentEpochMillis()).thenReturn(0L, 0L, 0L, 1001L); + OffsetCommitWorker committer = new OffsetCommitWorker("committer", new QueueConfig(200), kafkaCommitter, offsetState, commitQueue, workerState, clock); + commitQueue.put(records); - Thread commiterThread = new Thread(offsetCommitWorker); + Thread commiterThread = new Thread(committer); commiterThread.start(); commiterThread.join(); - verify(kafkaConsumer).commitSync(anyMap()); - verify(kafkaConsumer, atLeastOnce()).wakeup(anyString()); + verify(kafkaCommitter).commitSync(anyMap()); + verify(kafkaCommitter, atLeastOnce()).wakeup(anyString()); } } class AcknowledgeHelper extends Thread { private Map offset; - private Acknowledger committer; + private Acknowledger acknowledger; private long sleepMs; - AcknowledgeHelper(Map offset, Acknowledger committer, long sleepMs) { + AcknowledgeHelper(Map offset, Acknowledger acknowledger, long sleepMs) { this.offset = offset; - this.committer = committer; + this.acknowledger = acknowledger; this.sleepMs = sleepMs; } @@ -225,6 +242,6 @@ public void run() { } catch (InterruptedException e) { e.printStackTrace(); } - committer.acknowledge(offset); + acknowledger.acknowledge(offset); } }