Skip to content

Commit

Permalink
autoformatting
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rcheng committed Oct 16, 2023
1 parent e2ddc07 commit b438829
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import com.google.common.collect.Iterables;
import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.Utils;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,7 @@ public InsertRowsResponse insertRecord(SinkRecord record) {
if (isRecordBroken(snowflakeRecord)) {
// check for error tolerance and log tolerance values
// errors.log.enable and errors.tolerance
LOGGER.debug(
"Broken record offset:{}, topic:{}",
record.kafkaOffset(),
record.topic());
LOGGER.debug("Broken record offset:{}, topic:{}", record.kafkaOffset(), record.topic());
kafkaRecordErrorReporter.reportError(record, new DataException("Broken Record"));
} else {
// lag telemetry, note that sink record timestamp might be null
Expand All @@ -337,11 +334,14 @@ public InsertRowsResponse insertRecord(SinkRecord record) {
try {
this.channelLock.lock();

response = insertRowWithFallback(record, recordService.getProcessedRecordForStreamingIngest(snowflakeRecord), snowflakeRecord.kafkaOffset());
response =
insertRowWithFallback(
record,
recordService.getProcessedRecordForStreamingIngest(snowflakeRecord),
snowflakeRecord.kafkaOffset());

if (response.hasErrors()) {
handleInsertRowsFailures(
response.getInsertErrors(), record);
handleInsertRowsFailures(response.getInsertErrors(), record);
}

LOGGER.info(
Expand All @@ -351,13 +351,14 @@ public InsertRowsResponse insertRecord(SinkRecord record) {
response.hasErrors(),
response.needToResetOffset());

// Due to schema evolution, we may need to reopen the channel and reset the offset in kafka
// Due to schema evolution, we may need to reopen the channel and reset the offset in
// kafka
// since it's possible that not all rows are ingested
if (response.needToResetOffset()) {
streamingApiFallbackSupplier(
StreamingApiFallbackInvoker.INSERT_ROWS_SCHEMA_EVOLUTION_FALLBACK);
} else {
this.processedOffset.set(record.kafkaOffset());
this.processedOffset.set(record.kafkaOffset());
}

} catch (JsonProcessingException e) {
Expand All @@ -368,18 +369,12 @@ public InsertRowsResponse insertRecord(SinkRecord record) {
kafkaRecordErrorReporter.reportError(record, e);
} catch (TopicPartitionChannelInsertionException ex) {
// Suppressing the exception because other channels might still continue to ingest
LOGGER.warn(
String.format(
"Failure inserting for channel:%s", this.getChannelName()),
ex);
LOGGER.warn(String.format("Failure inserting for channel:%s", this.getChannelName()), ex);
} finally {
this.channelLock.unlock();
}
}




} else {
// TODO @rcheng: update this comment
LOGGER.debug(
Expand All @@ -406,8 +401,7 @@ public InsertRowsResponse insertRecord(SinkRecord record) {
* (isOffsetResetInKafka = true)
* @return true if this record can be skipped to add into buffer, false otherwise.
*/
private boolean shouldIgnoreRecord(
SinkRecord kafkaSinkRecord, long currentProcessedOffset) {
private boolean shouldIgnoreRecord(SinkRecord kafkaSinkRecord, long currentProcessedOffset) {
// Don't skip rows if there is no offset reset or there is no offset token information in the
// channel
if (!isOffsetResetInKafka
Expand Down Expand Up @@ -508,7 +502,8 @@ private SinkRecord handleNativeRecord(SinkRecord record, boolean isKey) {
*
* @return InsertRowsResponse a response that wraps around InsertValidationResponse
*/
private InsertRowsResponse insertRowWithFallback(SinkRecord sinkRecord, Map<String, Object> record, long offset) {
private InsertRowsResponse insertRowWithFallback(
SinkRecord sinkRecord, Map<String, Object> record, long offset) {
Fallback<Object> reopenChannelFallbackExecutorForInsertRows =
Fallback.builder(
executionAttemptedEvent -> {
Expand All @@ -518,8 +513,7 @@ private InsertRowsResponse insertRowWithFallback(SinkRecord sinkRecord, Map<Stri
.onFailedAttempt(
event ->
LOGGER.warn(
String.format(
"Failed Attempt to invoke the insertRows API"),
String.format("Failed Attempt to invoke the insertRows API"),
event.getLastException()))
.onFailure(
event ->
Expand All @@ -538,8 +532,7 @@ private InsertRowsResponse insertRowWithFallback(SinkRecord sinkRecord, Map<Stri
}

/** Invokes the API given the channel. */
private static class InsertRowApiResponseSupplier
implements CheckedSupplier<InsertRowsResponse> {
private static class InsertRowApiResponseSupplier implements CheckedSupplier<InsertRowsResponse> {

// Reference to the Snowpipe Streaming channel
private final SnowflakeStreamingIngestChannel channel;
Expand Down Expand Up @@ -572,28 +565,27 @@ private InsertRowApiResponseSupplier(

@Override
public InsertRowsResponse get() throws Throwable {
LOGGER.debug(
"Invoking insertRows API for channel:{}",
this.channel.getFullyQualifiedName());
LOGGER.debug("Invoking insertRows API for channel:{}", this.channel.getFullyQualifiedName());

InsertValidationResponse response = this.channel.insertRow(this.record, Long.toString(this.offset));
InsertValidationResponse response =
this.channel.insertRow(this.record, Long.toString(this.offset));

if (enableSchemaEvolution) {
if (response.hasErrors()) {
InsertValidationResponse.InsertError insertError = response.getInsertErrors().get(0);
List<String> extraColNames = insertError.getExtraColNames();
List<String> nonNullableColumns = insertError.getMissingNotNullColNames();
if (extraColNames != null || nonNullableColumns != null) {
SchematizationUtils.evolveSchemaIfNeeded(
this.conn,
this.channel.getTableName(),
nonNullableColumns,
extraColNames,
this.sinkRecord);
// Offset reset needed since it's possible that we successfully ingested partial batch
return new InsertRowsResponse(new InsertValidationResponse(), true);
}
if (response.hasErrors()) {
InsertValidationResponse.InsertError insertError = response.getInsertErrors().get(0);
List<String> extraColNames = insertError.getExtraColNames();
List<String> nonNullableColumns = insertError.getMissingNotNullColNames();
if (extraColNames != null || nonNullableColumns != null) {
SchematizationUtils.evolveSchemaIfNeeded(
this.conn,
this.channel.getTableName(),
nonNullableColumns,
extraColNames,
this.sinkRecord);
// Offset reset needed since it's possible that we successfully ingested partial batch
return new InsertRowsResponse(new InsertValidationResponse(), true);
}
}
}
return new InsertRowsResponse(response, false);
}
Expand Down Expand Up @@ -652,8 +644,7 @@ private void insertRowsFallbackSupplier(Throwable ex)
* @param insertErrors errors from validation response. (Only if it has errors)
*/
private void handleInsertRowsFailures(
List<InsertValidationResponse.InsertError> insertErrors,
SinkRecord record) {
List<InsertValidationResponse.InsertError> insertErrors, SinkRecord record) {
if (logErrors) {
for (InsertValidationResponse.InsertError insertError : insertErrors) {
LOGGER.error("Insert Row Error message:{}", insertError.getException().getMessage());
Expand All @@ -667,9 +658,7 @@ private void handleInsertRowsFailures(
ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG);
} else {
for (InsertValidationResponse.InsertError insertError : insertErrors) {
this.kafkaRecordErrorReporter.reportError(
record,
insertError.getException());
this.kafkaRecordErrorReporter.reportError(record, insertError.getException());
}
}
} else {
Expand Down Expand Up @@ -811,9 +800,9 @@ private long streamingApiFallbackSupplier(
}

/**
* Resets the offset in kafka, resets metadata related to offsetsz. If we
* don't get a valid offset token (because of a table recreation or channel inactivity), we will
* rely on kafka to send us the correct offset
* Resets the offset in kafka, resets metadata related to offsetsz. If we don't get a valid offset
* token (because of a table recreation or channel inactivity), we will rely on kafka to send us
* the correct offset
*
* <p>Idea behind resetting offset (1 more than what we found in snowflake) is that Kafka should
* send offsets from this offset number so as to not miss any data.
Expand Down

0 comments on commit b438829

Please sign in to comment.