Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

[Maulik] batch commits from offset commit worker #45

Merged
merged 1 commit into from
Apr 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions src/main/java/com/gojek/beast/commiter/OffsetState.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,41 @@

import java.time.Instant;
import java.util.Map;
import java.util.Set;

public class OffsetState {
@Getter
private final long acknowledgeTimeoutMs;
@Getter
private final long offsetBatchDuration;
private boolean start;
private Map<TopicPartition, OffsetAndMetadata> lastCommitOffset;
private Instant lastCommittedTime;
private Set<Map<TopicPartition, OffsetAndMetadata>> partitionOffsetAck;

public OffsetState(long acknowledgeTimeoutMs) {
public OffsetState(Set<Map<TopicPartition, OffsetAndMetadata>> partitionOffsetAck, long acknowledgeTimeoutMs, long offsetBatchDuration) {
this.partitionOffsetAck = partitionOffsetAck;
this.acknowledgeTimeoutMs = acknowledgeTimeoutMs;
this.offsetBatchDuration = offsetBatchDuration;
lastCommittedTime = Instant.now();
}

public boolean shouldCloseConsumer(Map<TopicPartition, OffsetAndMetadata> currentOffset) {
if (!start) {
return false;
}
boolean sameOffset = lastCommitOffset == currentOffset || currentOffset.equals(lastCommitOffset);
boolean ackTimedOut = (Instant.now().toEpochMilli() - lastCommittedTime.toEpochMilli()) > acknowledgeTimeoutMs;
boolean neverAcknowledged = lastCommitOffset == null && ackTimedOut;
return (sameOffset && ackTimedOut) || neverAcknowledged;
return ackTimedOut;
}

public int partitionOffsetAckSize() {
return partitionOffsetAck.size();
}

public boolean removeFromOffsetAck(Map<TopicPartition, OffsetAndMetadata> commitOffset) {
return partitionOffsetAck.remove(commitOffset);
}

public void resetOffset(Map<TopicPartition, OffsetAndMetadata> acknowledgedOffset) {
lastCommitOffset = acknowledgedOffset;
public void resetOffset() {
lastCommittedTime = Instant.now();
}

Expand Down
4 changes: 4 additions & 0 deletions src/main/java/com/gojek/beast/config/AppConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,8 @@ public interface AppConfig extends Config {

@Key("GCS_WRITER_PROJECT_NAME")
String getGcsWriterProject();

@DefaultValue("2000")
@Key("OFFSET_BATCH_DURATION")
long getOffsetBatchDuration();
}
5 changes: 3 additions & 2 deletions src/main/java/com/gojek/beast/factory/BeastFactory.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.gojek.beast.factory;

import com.gojek.beast.Clock;
import com.gojek.beast.backoff.BackOff;
import com.gojek.beast.backoff.ExponentialBackOffProvider;
import com.gojek.beast.config.AppConfig;
Expand Down Expand Up @@ -160,8 +161,8 @@ public OffsetCommitWorker createOffsetCommitter() {
if (committer != null) {
return committer;
}
OffsetState offsetState = new OffsetState(appConfig.getOffsetAckTimeoutMs());
committer = new OffsetCommitWorker("committer", partitionsAck, createKafkaConsumer(), offsetState, commitQueue, workerState);
OffsetState offsetState = new OffsetState(partitionsAck, appConfig.getOffsetAckTimeoutMs(), appConfig.getOffsetBatchDuration());
committer = new OffsetCommitWorker("committer", new QueueConfig(appConfig.getBqWorkerPollTimeoutMs(), "commit"), createKafkaConsumer(), offsetState, commitQueue, workerState, new Clock());
return committer;
}

Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/gojek/beast/models/FailureStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public Optional<Exception> getException() {
@Override
public String toString() {
return "FailureStatus{"
+ "cause=" + cause.getCause()
+ "exception=" + cause.getClass().getName()
+ ", cause=" + cause.getCause()
+ ", message='" + ((message != null) ? message : cause.getMessage()) + '\'' + '}';
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package com.gojek.beast.protomapping;

import com.gojek.beast.Clock;
import com.gojek.beast.config.StencilConfig;
import com.gojek.beast.config.BQConfig;
import com.gojek.beast.config.ColumnMapping;
import com.gojek.beast.config.ProtoMappingConfig;
import com.gojek.beast.config.StencilConfig;
import com.gojek.beast.converter.ConsumerRecordConverter;
import com.gojek.beast.converter.RowMapper;
import com.gojek.beast.exception.BQTableUpdateFailure;
import com.gojek.beast.exception.BQSchemaMappingException;
import com.gojek.beast.exception.ProtoMappingException;
import com.gojek.beast.exception.ProtoNotFoundException;
import com.gojek.beast.models.BQField;
import com.gojek.beast.models.ProtoField;
Expand Down Expand Up @@ -88,10 +86,9 @@ public void onProtoUpdate(String url, Map<String, DescriptorAndTypeName> newDesc
ProtoField protoField = protoFieldFactory.getProtoField();
protoField = protoMappingParser.parseFields(protoField, proto, StencilUtils.getAllProtobufDescriptors(newDescriptors), StencilUtils.getTypeNameToPackageNameMap(newDescriptors));
updateProtoParser(protoField);
} catch (ProtoNotFoundException | BQSchemaMappingException | BigQueryException | IOException e) {
} catch (BigQueryException | ProtoNotFoundException | IOException e) {
String errMsg = "Error while updating bigquery table on callback:" + e.getMessage();
log.error(errMsg);
e.printStackTrace();
statsClient.increment("bq.table.upsert.failures");
throw new BQTableUpdateFailure(errMsg);
}
Expand All @@ -112,14 +109,7 @@ private void updateProtoParser(final ProtoField protoField) throws IOException {

private ColumnMapping getProtoMapping() throws IOException {
ProtoField protoField = new ProtoField();
try {
protoField = protoMappingParser.parseFields(protoField, proto, stencilClient.getAll(), stencilClient.getTypeNameToPackageNameMap());
} catch (ProtoNotFoundException e) {
String errMsg = "Error while generating proto to column mapping:" + e.getMessage();
log.error(errMsg);
e.printStackTrace();
throw new ProtoMappingException(errMsg);
}
protoField = protoMappingParser.parseFields(protoField, proto, stencilClient.getAll(), stencilClient.getTypeNameToPackageNameMap());
String protoMapping = protoMappingConverter.generateColumnMappings(protoField.getFields());
protoMappingConfig.setProperty("PROTO_COLUMN_MAPPING", protoMapping);
return protoMappingConfig.getProtoColumnMapping();
Expand Down
101 changes: 68 additions & 33 deletions src/main/java/com/gojek/beast/worker/OffsetCommitWorker.java
Original file line number Diff line number Diff line change
@@ -1,84 +1,119 @@
package com.gojek.beast.worker;

import com.gojek.beast.Clock;
import com.gojek.beast.commiter.KafkaCommitter;
import com.gojek.beast.commiter.OffsetState;
import com.gojek.beast.config.QueueConfig;
import com.gojek.beast.models.FailureStatus;
import com.gojek.beast.models.Records;
import com.gojek.beast.models.Status;
import com.gojek.beast.models.OffsetMetadata;
import com.gojek.beast.models.SuccessStatus;
import com.gojek.beast.stats.Stats;
import com.gojek.beast.models.Records;
import com.gojek.beast.models.Status;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;

@Slf4j
public class OffsetCommitWorker extends Worker {
private static final int DEFAULT_SLEEP_MS = 100;
private final Stats statsClient = Stats.client();
private BlockingQueue<Records> commitQueue;
private Set<Map<TopicPartition, OffsetAndMetadata>> partitionOffsetAck;
private KafkaCommitter kafkaCommitter;
private final BlockingQueue<Records> commitQueue;
private final QueueConfig queueConfig;
private final KafkaCommitter kafkaCommitter;
@Setter
private long defaultSleepMs;


private boolean stopped;
private OffsetState offsetState;
private Clock clock;

public OffsetCommitWorker(String name, Set<Map<TopicPartition, OffsetAndMetadata>> partitionOffsetAck, KafkaCommitter kafkaCommitter, OffsetState offsetState, BlockingQueue<Records> commitQueue, WorkerState workerState) {
public OffsetCommitWorker(String name, QueueConfig queueConfig, KafkaCommitter kafkaCommitter, OffsetState offsetState, BlockingQueue<Records> commitQueue, WorkerState workerState, Clock clock) {
super(name, workerState);
this.clock = clock;
this.queueConfig = queueConfig;
this.commitQueue = commitQueue;
this.partitionOffsetAck = partitionOffsetAck;
this.kafkaCommitter = kafkaCommitter;
this.defaultSleepMs = DEFAULT_SLEEP_MS;
this.offsetState = offsetState;
this.stopped = false;
}

@Override
public void stop(String reason) {
log.info("Closing committer: {}", reason);
this.stopped = true;
kafkaCommitter.wakeup(reason);
}

@Override
public Status job() {
offsetState.startTimer();
try {
Instant start = Instant.now();
Records commitOffset = commitQueue.peek();
if (commitOffset == null) {
return new SuccessStatus();
}
Map<TopicPartition, OffsetAndMetadata> currentOffset = commitOffset.getPartitionsCommitOffset();
if (partitionOffsetAck.contains(currentOffset)) {
commit();
} else {
if (offsetState.shouldCloseConsumer(currentOffset)) {
statsClient.increment("committer.ack.timeout");
return new FailureStatus(new RuntimeException("Acknowledgement Timeout exceeded: " + offsetState.getAcknowledgeTimeoutMs()));
Instant startTime = Instant.now();
long start = clock.currentEpochMillis();
Map<TopicPartition, OffsetAndMetadata> partitionsCommitOffset = new HashMap<>();

int offsetClubbedBatches = 0;
while (true) {
Records commitOffset = commitQueue.poll(queueConfig.getTimeout(), queueConfig.getTimeoutUnit());
if (stopped || clock.currentEpochMillis() - start > offsetState.getOffsetBatchDuration()) {
break;
}
log.debug("waiting for {} acknowledgement for offset {}", defaultSleepMs, currentOffset);
sleep(defaultSleepMs);
statsClient.gauge("committer.queue.wait.ms", defaultSleepMs);

if (commitOffset == null) {
continue;
}

Map<TopicPartition, OffsetAndMetadata> currentOffset = commitOffset.getPartitionsCommitOffset();
Instant commitQueuePollStartTime = Instant.now();
while (true) {
if (offsetState.removeFromOffsetAck(currentOffset)) {

currentOffset.keySet().forEach(topicPartition -> {
OffsetAndMetadata offsetAndMetadata = currentOffset.get(topicPartition);
OffsetMetadata previousOffset = (OffsetMetadata) partitionsCommitOffset.getOrDefault(topicPartition, new OffsetMetadata(Integer.MIN_VALUE));
OffsetMetadata newOffset = new OffsetMetadata(offsetAndMetadata.offset());
if (previousOffset.compareTo(newOffset) < 0) {
partitionsCommitOffset.put(topicPartition, newOffset);
}
});
offsetState.resetOffset();
offsetClubbedBatches++;
break;
} else {
if (offsetState.shouldCloseConsumer(partitionsCommitOffset)) {
statsClient.increment("committer.ack.timeout");
return new FailureStatus(new RuntimeException("Acknowledgement Timeout exceeded: " + offsetState.getAcknowledgeTimeoutMs()));
}

log.debug("waiting for {} acknowledgement for offset {}, {}, {}", defaultSleepMs, currentOffset, offsetState.partitionOffsetAckSize(), currentOffset);
sleep(defaultSleepMs);
}
}
statsClient.timeIt("committer.queue.wait.ms", commitQueuePollStartTime);
}
statsClient.timeIt("committer.processing.time", start);
commit(partitionsCommitOffset);

statsClient.gauge("committer.clubbed.offsets", offsetClubbedBatches);
statsClient.timeIt("committer.processing.time", startTime);
} catch (InterruptedException | RuntimeException e) {
log.info("Received {} exception: {}, resetting committer", e.getClass(), e.getMessage());
e.printStackTrace();
return new FailureStatus(new RuntimeException("Exception in offset committer: " + e.getMessage()));
}
return new SuccessStatus();
}

private void commit() {
Map<TopicPartition, OffsetAndMetadata> partitionsCommitOffset = commitQueue.remove().getPartitionsCommitOffset();
kafkaCommitter.commitSync(partitionsCommitOffset);
partitionOffsetAck.remove(partitionsCommitOffset);
Records nextOffset = commitQueue.peek();
if (nextOffset != null) offsetState.resetOffset(nextOffset.getPartitionsCommitOffset());
log.info("commit partition {} size {}", partitionsCommitOffset.toString(), partitionsCommitOffset.size());
private void commit(Map<TopicPartition, OffsetAndMetadata> partitionsCommitOffset) {
if (partitionsCommitOffset.size() != 0) {
kafkaCommitter.commitSync(partitionsCommitOffset);
}
log.info("committed offsets partition {} size {}", partitionsCommitOffset.toString(), partitionsCommitOffset.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this log really helpful? Can we mark this either debug or remove serializing the whole map to a string?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we will be committing after batching commits, we want to log when the offsets were committed, there is no other info log in the OffsetCommitter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mauliksoneji Yeah, that's fine, I am just wondering what's the use of seeing offset values per partition in the log, we can just log the size of that map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't have much information, the maximum number of keys would be the topic-partition subscribed by the pod, and there is only max offset in the value, this is not that much information.

}
}
Loading