Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
[Maulik] batch commits from offset commit worker
Browse files Browse the repository at this point in the history
Signed-off-by: mauliksoneji <[email protected]>
  • Loading branch information
mauliksoneji committed Apr 24, 2020
1 parent 304883e commit f24e943
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 172 deletions.
24 changes: 17 additions & 7 deletions src/main/java/com/gojek/beast/commiter/OffsetState.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,41 @@

import java.time.Instant;
import java.util.Map;
import java.util.Set;

public class OffsetState {
@Getter
private final long acknowledgeTimeoutMs;
@Getter
private final long offsetCommitTime;
private boolean start;
private Map<TopicPartition, OffsetAndMetadata> lastCommitOffset;
private Instant lastCommittedTime;
private Set<Map<TopicPartition, OffsetAndMetadata>> partitionOffsetAck;

public OffsetState(long acknowledgeTimeoutMs) {
public OffsetState(Set<Map<TopicPartition, OffsetAndMetadata>> partitionOffsetAck, long acknowledgeTimeoutMs, long offsetCommitTime) {
this.partitionOffsetAck = partitionOffsetAck;
this.acknowledgeTimeoutMs = acknowledgeTimeoutMs;
this.offsetCommitTime = offsetCommitTime;
lastCommittedTime = Instant.now();
}

public boolean shouldCloseConsumer(Map<TopicPartition, OffsetAndMetadata> currentOffset) {
if (!start) {
return false;
}
boolean sameOffset = lastCommitOffset == currentOffset || currentOffset.equals(lastCommitOffset);
boolean ackTimedOut = (Instant.now().toEpochMilli() - lastCommittedTime.toEpochMilli()) > acknowledgeTimeoutMs;
boolean neverAcknowledged = lastCommitOffset == null && ackTimedOut;
return (sameOffset && ackTimedOut) || neverAcknowledged;
return ackTimedOut;
}

public int partitionOffsetAckSize() {
return partitionOffsetAck.size();
}

public boolean removeFromOffsetAck(Map<TopicPartition, OffsetAndMetadata> commitOffset) {
return partitionOffsetAck.remove(commitOffset);
}

public void resetOffset(Map<TopicPartition, OffsetAndMetadata> acknowledgedOffset) {
lastCommitOffset = acknowledgedOffset;
public void resetOffset() {
lastCommittedTime = Instant.now();
}

Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/gojek/beast/config/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,8 @@ public interface AppConfig extends Config {

@Key("GCS_WRITER_PROJECT_NAME")
String getGcsWriterProject();

@DefaultValue("2000")
@Key("OFFSET_COMMIT_TIME")
long getOffsetCommitTime();
}
5 changes: 3 additions & 2 deletions src/main/java/com/gojek/beast/factory/BeastFactory.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.gojek.beast.factory;

import com.gojek.beast.Clock;
import com.gojek.beast.backoff.BackOff;
import com.gojek.beast.backoff.ExponentialBackOffProvider;
import com.gojek.beast.config.AppConfig;
Expand Down Expand Up @@ -160,8 +161,8 @@ public OffsetCommitWorker createOffsetCommitter() {
if (committer != null) {
return committer;
}
OffsetState offsetState = new OffsetState(appConfig.getOffsetAckTimeoutMs());
committer = new OffsetCommitWorker("committer", partitionsAck, createKafkaConsumer(), offsetState, commitQueue, workerState);
OffsetState offsetState = new OffsetState(partitionsAck, appConfig.getOffsetAckTimeoutMs(), appConfig.getOffsetCommitTime());
committer = new OffsetCommitWorker("committer", new QueueConfig(appConfig.getBqWorkerPollTimeoutMs(), "commit"), createKafkaConsumer(), offsetState, commitQueue, workerState, new Clock());
return committer;
}

Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/gojek/beast/models/FailureStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public Optional<Exception> getException() {
@Override
public String toString() {
return "FailureStatus{"
+ "cause=" + cause.getCause()
+ "exception=" + cause.getClass().getName()
+ ", cause=" + cause.getCause()
+ ", message='" + ((message != null) ? message : cause.getMessage()) + '\'' + '}';
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package com.gojek.beast.protomapping;

import com.gojek.beast.Clock;
import com.gojek.beast.config.StencilConfig;
import com.gojek.beast.config.BQConfig;
import com.gojek.beast.config.ColumnMapping;
import com.gojek.beast.config.ProtoMappingConfig;
import com.gojek.beast.config.StencilConfig;
import com.gojek.beast.converter.ConsumerRecordConverter;
import com.gojek.beast.converter.RowMapper;
import com.gojek.beast.exception.BQTableUpdateFailure;
import com.gojek.beast.exception.BQSchemaMappingException;
import com.gojek.beast.exception.ProtoMappingException;
import com.gojek.beast.exception.ProtoNotFoundException;
import com.gojek.beast.models.BQField;
import com.gojek.beast.models.ProtoField;
Expand Down Expand Up @@ -88,10 +86,9 @@ public void onProtoUpdate(String url, Map<String, DescriptorAndTypeName> newDesc
ProtoField protoField = protoFieldFactory.getProtoField();
protoField = protoMappingParser.parseFields(protoField, proto, StencilUtils.getAllProtobufDescriptors(newDescriptors), StencilUtils.getTypeNameToPackageNameMap(newDescriptors));
updateProtoParser(protoField);
} catch (ProtoNotFoundException | BQSchemaMappingException | BigQueryException | IOException e) {
} catch (BigQueryException | ProtoNotFoundException | IOException e) {
String errMsg = "Error while updating bigquery table on callback:" + e.getMessage();
log.error(errMsg);
e.printStackTrace();
statsClient.increment("bq.table.upsert.failures");
throw new BQTableUpdateFailure(errMsg);
}
Expand All @@ -112,14 +109,7 @@ private void updateProtoParser(final ProtoField protoField) throws IOException {

private ColumnMapping getProtoMapping() throws IOException {
ProtoField protoField = new ProtoField();
try {
protoField = protoMappingParser.parseFields(protoField, proto, stencilClient.getAll(), stencilClient.getTypeNameToPackageNameMap());
} catch (ProtoNotFoundException e) {
String errMsg = "Error while generating proto to column mapping:" + e.getMessage();
log.error(errMsg);
e.printStackTrace();
throw new ProtoMappingException(errMsg);
}
protoField = protoMappingParser.parseFields(protoField, proto, stencilClient.getAll(), stencilClient.getTypeNameToPackageNameMap());
String protoMapping = protoMappingConverter.generateColumnMappings(protoField.getFields());
protoMappingConfig.setProperty("PROTO_COLUMN_MAPPING", protoMapping);
return protoMappingConfig.getProtoColumnMapping();
Expand Down
101 changes: 68 additions & 33 deletions src/main/java/com/gojek/beast/worker/OffsetCommitWorker.java
Original file line number Diff line number Diff line change
@@ -1,84 +1,119 @@
package com.gojek.beast.worker;

import com.gojek.beast.Clock;
import com.gojek.beast.commiter.KafkaCommitter;
import com.gojek.beast.commiter.OffsetState;
import com.gojek.beast.config.QueueConfig;
import com.gojek.beast.models.FailureStatus;
import com.gojek.beast.models.Records;
import com.gojek.beast.models.Status;
import com.gojek.beast.models.OffsetMetadata;
import com.gojek.beast.models.SuccessStatus;
import com.gojek.beast.stats.Stats;
import com.gojek.beast.models.Records;
import com.gojek.beast.models.Status;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;

@Slf4j
public class OffsetCommitWorker extends Worker {
private static final int DEFAULT_SLEEP_MS = 100;
private final Stats statsClient = Stats.client();
private BlockingQueue<Records> commitQueue;
private Set<Map<TopicPartition, OffsetAndMetadata>> partitionOffsetAck;
private KafkaCommitter kafkaCommitter;
private final BlockingQueue<Records> commitQueue;
private final QueueConfig queueConfig;
private final KafkaCommitter kafkaCommitter;
@Setter
private long defaultSleepMs;


private boolean stopped;
private OffsetState offsetState;
private Clock clock;

public OffsetCommitWorker(String name, Set<Map<TopicPartition, OffsetAndMetadata>> partitionOffsetAck, KafkaCommitter kafkaCommitter, OffsetState offsetState, BlockingQueue<Records> commitQueue, WorkerState workerState) {
public OffsetCommitWorker(String name, QueueConfig queueConfig, KafkaCommitter kafkaCommitter, OffsetState offsetState, BlockingQueue<Records> commitQueue, WorkerState workerState, Clock clock) {
super(name, workerState);
this.clock = clock;
this.queueConfig = queueConfig;
this.commitQueue = commitQueue;
this.partitionOffsetAck = partitionOffsetAck;
this.kafkaCommitter = kafkaCommitter;
this.defaultSleepMs = DEFAULT_SLEEP_MS;
this.offsetState = offsetState;
this.stopped = false;
}

@Override
public void stop(String reason) {
log.info("Closing committer: {}", reason);
this.stopped = true;
kafkaCommitter.wakeup(reason);
}

@Override
public Status job() {
offsetState.startTimer();
try {
Instant start = Instant.now();
Records commitOffset = commitQueue.peek();
if (commitOffset == null) {
return new SuccessStatus();
}
Map<TopicPartition, OffsetAndMetadata> currentOffset = commitOffset.getPartitionsCommitOffset();
if (partitionOffsetAck.contains(currentOffset)) {
commit();
} else {
if (offsetState.shouldCloseConsumer(currentOffset)) {
statsClient.increment("committer.ack.timeout");
return new FailureStatus(new RuntimeException("Acknowledgement Timeout exceeded: " + offsetState.getAcknowledgeTimeoutMs()));
Instant startTime = Instant.now();
long start = clock.currentEpochMillis();
Map<TopicPartition, OffsetAndMetadata> partitionsCommitOffset = new HashMap<>();

int offsetClubbedBatches = 0;
while (true) {
Records commitOffset = commitQueue.poll(queueConfig.getTimeout(), queueConfig.getTimeoutUnit());
if (stopped || clock.currentEpochMillis() - start > offsetState.getOffsetCommitTime()) {
break;
}
log.debug("waiting for {} acknowledgement for offset {}", defaultSleepMs, currentOffset);
sleep(defaultSleepMs);
statsClient.gauge("committer.queue.wait.ms", defaultSleepMs);

if (commitOffset == null) {
continue;
}

Map<TopicPartition, OffsetAndMetadata> currentOffset = commitOffset.getPartitionsCommitOffset();
Instant commitQueuePollStartTime = Instant.now();
while (true) {
if (offsetState.removeFromOffsetAck(currentOffset)) {

currentOffset.keySet().forEach(topicPartition -> {
OffsetAndMetadata offsetAndMetadata = currentOffset.get(topicPartition);
OffsetMetadata previousOffset = (OffsetMetadata) partitionsCommitOffset.getOrDefault(topicPartition, new OffsetMetadata(Integer.MIN_VALUE));
OffsetMetadata newOffset = new OffsetMetadata(offsetAndMetadata.offset());
if (previousOffset.compareTo(newOffset) < 0) {
partitionsCommitOffset.put(topicPartition, newOffset);
}
});
offsetState.resetOffset();
offsetClubbedBatches++;
break;
} else {
if (offsetState.shouldCloseConsumer(partitionsCommitOffset)) {
statsClient.increment("committer.ack.timeout");
return new FailureStatus(new RuntimeException("Acknowledgement Timeout exceeded: " + offsetState.getAcknowledgeTimeoutMs()));
}

log.debug("waiting for {} acknowledgement for offset {}, {}, {}", defaultSleepMs, currentOffset, offsetState.partitionOffsetAckSize(), currentOffset);
sleep(defaultSleepMs);
}
}
statsClient.timeIt("committer.queue.wait.ms", commitQueuePollStartTime);
}
statsClient.timeIt("committer.processing.time", start);
commit(partitionsCommitOffset);

statsClient.gauge("committer.clubbed.offsets", offsetClubbedBatches);
statsClient.timeIt("committer.processing.time", startTime);
} catch (InterruptedException | RuntimeException e) {
log.info("Received {} exception: {}, resetting committer", e.getClass(), e.getMessage());
e.printStackTrace();
return new FailureStatus(new RuntimeException("Exception in offset committer: " + e.getMessage()));
}
return new SuccessStatus();
}

private void commit() {
Map<TopicPartition, OffsetAndMetadata> partitionsCommitOffset = commitQueue.remove().getPartitionsCommitOffset();
kafkaCommitter.commitSync(partitionsCommitOffset);
partitionOffsetAck.remove(partitionsCommitOffset);
Records nextOffset = commitQueue.peek();
if (nextOffset != null) offsetState.resetOffset(nextOffset.getPartitionsCommitOffset());
log.info("commit partition {} size {}", partitionsCommitOffset.toString(), partitionsCommitOffset.size());
private void commit(Map<TopicPartition, OffsetAndMetadata> partitionsCommitOffset) {
if (partitionsCommitOffset.size() != 0) {
kafkaCommitter.commitSync(partitionsCommitOffset);
}
log.info("committed offsets partition {} size {}", partitionsCommitOffset.toString(), partitionsCommitOffset.size());
}
}
Loading

0 comments on commit f24e943

Please sign in to comment.