Skip to content

Commit

Permalink
stash
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rcheng committed Oct 16, 2023
1 parent 213ec58 commit e2ddc07
Showing 1 changed file with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,8 @@ public InsertRowsResponse insertRecord(SinkRecord record) {
this.latestConsumerOffset.set(record.kafkaOffset());
}

// Ignore adding to the buffer until we see the expected offset value
if (shouldIgnoreAddingRecordToBuffer(record, currentProcessedOffset)) {
// Ignore records until we see the expected offset value
if (shouldIgnoreRecord(record, currentProcessedOffset)) {
return response;
}

Expand Down Expand Up @@ -383,7 +383,7 @@ public InsertRowsResponse insertRecord(SinkRecord record) {
} else {
// TODO @rcheng: update this comment
LOGGER.debug(
"Skip adding offset:{} to buffer for channel:{} because"
"Skip ingesting current record with offset offset:{} with channel:{} because"
+ " offsetPersistedInSnowflake:{}, processedOffset:{}",
record.kafkaOffset(),
this.getChannelName(),
Expand All @@ -406,7 +406,7 @@ public InsertRowsResponse insertRecord(SinkRecord record) {
* (isOffsetResetInKafka = true)
* @return true if this record can be skipped to add into buffer, false otherwise.
*/
private boolean shouldIgnoreAddingRecordToBuffer(
private boolean shouldIgnoreRecord(
SinkRecord kafkaSinkRecord, long currentProcessedOffset) {
// Don't skip rows if there is no offset reset or there is no offset token information in the
// channel
Expand All @@ -418,14 +418,14 @@ private boolean shouldIgnoreAddingRecordToBuffer(
// Don't ignore if we see the expected offset; otherwise log and skip
if ((kafkaSinkRecord.kafkaOffset() - currentProcessedOffset) == 1L) {
LOGGER.debug(
"Got the desired offset:{} from Kafka, we can add this offset to buffer for channel:{}",
"Got the desired offset:{} from Kafka, we can ingest using this channel:{}",
kafkaSinkRecord.kafkaOffset(),
this.getChannelName());
isOffsetResetInKafka = false;
return false;
} else {
LOGGER.debug(
"Ignore adding offset:{} to buffer for channel:{} because we recently encountered"
"Ignore current record at offset:{} because we recently encountered"
+ " error and reset offset in Kafka. currentProcessedOffset:{}",
kafkaSinkRecord.kafkaOffset(),
this.getChannelName(),
Expand Down Expand Up @@ -537,7 +537,7 @@ private InsertRowsResponse insertRowWithFallback(SinkRecord sinkRecord, Map<Stri
this.channel, sinkRecord, record, offset, this.enableSchemaEvolution, this.conn));
}

/** Invokes the API given the channel and streaming Buffer. */
/** Invokes the API given the channel. */
private static class InsertRowApiResponseSupplier
implements CheckedSupplier<InsertRowsResponse> {

Expand Down Expand Up @@ -811,7 +811,7 @@ private long streamingApiFallbackSupplier(
}

/**
* Resets the offset in kafka, resets metadata related to offsets and clears the buffer. If we
* Resets the offset in kafka, resets metadata related to offsetsz. If we
* don't get a valid offset token (because of a table recreation or channel inactivity), we will
* rely on kafka to send us the correct offset
*
Expand Down

0 comments on commit e2ddc07

Please sign in to comment.