Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Snowpipe Streaming] Fix IndexOutOfBoundException thrown when offsets are not continous during schema-evolution #1037

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -641,10 +641,10 @@ public InsertRowsResponse get() throws Throwable {
"Invoking insertRows API for channel:{}, streamingBuffer:{}",
this.channel.getFullyQualifiedName(),
this.insertRowsStreamingBuffer);
Pair<List<Map<String, Object>>, List<Long>> recordsAndOffsets =
Pair<List<Map<String, Object>>, List<SinkRecord>> recordsAndOriginalSinkRecords =
this.insertRowsStreamingBuffer.getData();
List<Map<String, Object>> records = recordsAndOffsets.getKey();
List<Long> offsets = recordsAndOffsets.getValue();
List<Map<String, Object>> records = recordsAndOriginalSinkRecords.getKey();
List<SinkRecord> originalSinkRecords = recordsAndOriginalSinkRecords.getValue();
InsertValidationResponse finalResponse = new InsertValidationResponse();
boolean needToResetOffset = false;
if (!enableSchemaEvolution) {
Expand All @@ -658,16 +658,19 @@ public InsertRowsResponse get() throws Throwable {
// For schema evolution, we need to call the insertRows API row by row in order to
// preserve the original order, for anything after the first schema mismatch error we will
// retry after the evolution
InsertValidationResponse response =
this.channel.insertRow(records.get(idx), Long.toString(offsets.get(idx)));
SinkRecord originalSinkRecord = originalSinkRecords.get(idx);
InsertValidationResponse response = this.channel.insertRow(
records.get(idx), Long.toString(originalSinkRecord.kafkaOffset())
);
if (response.hasErrors()) {
InsertValidationResponse.InsertError insertError = response.getInsertErrors().get(0);
SchemaEvolutionTargetItems schemaEvolutionTargetItems =
insertErrorMapper.mapToSchemaEvolutionItems(
insertError, this.channel.getTableName());

// TODO : originalSinkRecordIdx can be replaced by idx
long originalSinkRecordIdx =
offsets.get(idx) - this.insertRowsStreamingBuffer.getFirstOffset();
originalSinkRecord.kafkaOffset() - this.insertRowsStreamingBuffer.getFirstOffset();

if (!schemaEvolutionTargetItems.hasDataForSchemaEvolution()) {
InsertValidationResponse.InsertError newInsertError =
Expand All @@ -684,7 +687,7 @@ public InsertRowsResponse get() throws Throwable {
LOGGER.info("Triggering schema evolution. Items: {}", schemaEvolutionTargetItems);
schemaEvolutionService.evolveSchemaIfNeeded(
schemaEvolutionTargetItems,
this.insertRowsStreamingBuffer.getSinkRecord(originalSinkRecordIdx),
originalSinkRecord,
channel.getTableSchema());
// Offset reset needed since it's possible that we successfully ingested partial batch
needToResetOffset = true;
Expand Down Expand Up @@ -1282,7 +1285,7 @@ protected long getApproxSizeOfRecordInBytes(SinkRecord kafkaSinkRecord) {
* before calling insertRows API.
*/
@VisibleForTesting
class StreamingBuffer extends PartitionBuffer<Pair<List<Map<String, Object>>, List<Long>>> {
class StreamingBuffer extends PartitionBuffer<Pair<List<Map<String, Object>>, List<SinkRecord>>> {
// Records coming from Kafka
private final List<SinkRecord> sinkRecords;

Expand Down Expand Up @@ -1316,9 +1319,9 @@ public void insert(SinkRecord kafkaSinkRecord) {
* @return A pair that contains the records and their corresponding offsets
*/
@Override
public Pair<List<Map<String, Object>>, List<Long>> getData() {
public Pair<List<Map<String, Object>>, List<SinkRecord>> getData() {
final List<Map<String, Object>> records = new ArrayList<>();
final List<Long> offsets = new ArrayList<>();
final List<SinkRecord> filteredOriginalSinkRecords = new ArrayList<>();

for (SinkRecord kafkaSinkRecord : sinkRecords) {
SinkRecord snowflakeRecord = getSnowflakeSinkRecordFromKafkaRecord(kafkaSinkRecord);
Expand All @@ -1345,7 +1348,7 @@ public Pair<List<Map<String, Object>>, List<Long>> getData() {
Map<String, Object> tableRow =
recordService.getProcessedRecordForStreamingIngest(snowflakeRecord);
records.add(tableRow);
offsets.add(snowflakeRecord.kafkaOffset());
filteredOriginalSinkRecords.add(kafkaSinkRecord);
} catch (JsonProcessingException e) {
LOGGER.warn(
"Record has JsonProcessingException offset:{}, topic:{}",
Expand All @@ -1371,7 +1374,7 @@ public Pair<List<Map<String, Object>>, List<Long>> getData() {
getBufferSizeBytes(),
getFirstOffset(),
getLastOffset());
return new Pair<>(records, offsets);
return new Pair<>(records, filteredOriginalSinkRecords);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,22 +716,44 @@ public static List<SinkRecord> createJsonStringSinkRecords(
return records;
}

/* Generate (noOfRecords - startOffset) blank records for a given topic and partition. */
public static List<SinkRecord> createBlankJsonSinkRecords(
final long startOffset,
final long noOfRecords,
final String topicName,
final int partitionNo) {
return createJsonRecords(
startOffset, noOfRecords, topicName, partitionNo, null,
Collections.singletonMap("schemas.enable", Boolean.toString(false))
);
}

/* Generate (noOfRecords - startOffset) for a given topic and partition. */
public static List<SinkRecord> createNativeJsonSinkRecords(
final long startOffset,
final long noOfRecords,
final String topicName,
final int partitionNo) {
ArrayList<SinkRecord> records = new ArrayList<>();
return createJsonRecords(
startOffset, noOfRecords, topicName, partitionNo,
TestUtils.JSON_WITH_SCHEMA.getBytes(StandardCharsets.UTF_8),
Collections.singletonMap("schemas.enable", Boolean.toString(true))
);
}

private static List<SinkRecord> createJsonRecords(
final long startOffset,
final long noOfRecords,
final String topicName,
final int partitionNo,
byte[] value,
Map<String, String> converterConfig
) {
JsonConverter converter = new JsonConverter();
HashMap<String, String> converterConfig = new HashMap<>();
converterConfig.put("schemas.enable", "true");
converter.configure(converterConfig, false);
SchemaAndValue schemaInputValue =
converter.toConnectData(
"test", TestUtils.JSON_WITH_SCHEMA.getBytes(StandardCharsets.UTF_8));
SchemaAndValue schemaInputValue = converter.toConnectData("test", value);

ArrayList<SinkRecord> records = new ArrayList<>();
for (long i = startOffset; i < startOffset + noOfRecords; ++i) {
records.add(
new SinkRecord(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package com.snowflake.kafka.connector.internal.streaming;

import static com.snowflake.kafka.connector.internal.streaming.ChannelMigrationResponseCode.SUCCESS;
import static com.snowflake.kafka.connector.internal.streaming.ChannelMigrationResponseCode.isChannelMigrationResponseSuccessful;
import static com.snowflake.kafka.connector.internal.streaming.channel.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE;

import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.dlq.InMemoryKafkaRecordErrorReporter;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.SnowflakeSinkService;
import com.snowflake.kafka.connector.internal.SnowflakeSinkServiceFactory;
import com.snowflake.kafka.connector.internal.TestUtils;
import static com.snowflake.kafka.connector.internal.streaming.ChannelMigrationResponseCode.SUCCESS;
import static com.snowflake.kafka.connector.internal.streaming.ChannelMigrationResponseCode.isChannelMigrationResponseSuccessful;
import com.snowflake.kafka.connector.internal.streaming.channel.TopicPartitionChannel;
import static com.snowflake.kafka.connector.internal.streaming.channel.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE;
import com.snowflake.kafka.connector.internal.streaming.schemaevolution.InsertErrorMapper;
import com.snowflake.kafka.connector.internal.streaming.schemaevolution.snowflake.SnowflakeSchemaEvolutionService;
import com.snowflake.kafka.connector.internal.streaming.telemetry.SnowflakeTelemetryServiceV2;
Expand All @@ -23,9 +22,6 @@
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -510,25 +506,7 @@ public void testPartialBatchChannelInvalidationIngestion_schematization(boolean
final long secondBatchCount = 500;

// create 18 blank records that do not kick off schematization
JsonConverter converter = new JsonConverter();
HashMap<String, String> converterConfig = new HashMap<>();
converterConfig.put("schemas.enable", "false");
converter.configure(converterConfig, false);
SchemaAndValue schemaInputValue = converter.toConnectData("test", null);

List<SinkRecord> firstBatch = new ArrayList<>();
for (int i = 0; i < firstBatchCount; i++) {
firstBatch.add(
new SinkRecord(
topic,
PARTITION,
Schema.STRING_SCHEMA,
"test",
schemaInputValue.schema(),
schemaInputValue.value(),
i));
}

List<SinkRecord> firstBatch = TestUtils.createBlankJsonSinkRecords(0, firstBatchCount, topic, PARTITION);
service.insert(firstBatch);

// send batch with 500, should kick off a record based flush and schematization on record 19,
Expand Down Expand Up @@ -759,53 +737,38 @@ private void testInsertRowsWithGaps(boolean withSchematization, boolean useSingl
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(
SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG,
Boolean.toString(withSchematization));
Boolean.toString(withSchematization)
);

// create tpChannel
SnowflakeSinkService service =
SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config)
.setRecordNumber(1)
.setRecordNumber(4)
.setErrorReporter(new InMemoryKafkaRecordErrorReporter())
.setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition)))
.addTask(testTableName, topicPartition)
.build();

// insert blank records that do not evolve schema: 0, 1
JsonConverter converter = new JsonConverter();
HashMap<String, String> converterConfig = new HashMap<>();
converterConfig.put("schemas.enable", "false");
converter.configure(converterConfig, false);
SchemaAndValue schemaInputValue = converter.toConnectData("test", null);
List<SinkRecord> blankRecords = new ArrayList<>();
for (int i = 0; i < 2; i++) {
blankRecords.add(
new SinkRecord(
topic,
PARTITION,
Schema.STRING_SCHEMA,
"test",
schemaInputValue.schema(),
schemaInputValue.value(),
i));
}

service.insert(blankRecords);
TestUtils.assertWithRetry(
() -> service.getOffset(new TopicPartition(topic, PARTITION)) == 2, 20, 5);
List<SinkRecord> blankRecords = TestUtils.createBlankJsonSinkRecords(0, 2, topic, PARTITION);

// Insert another two records with offset gap that requires evolution: 3, 4
List<SinkRecord> gapRecords = TestUtils.createNativeJsonSinkRecords(2, 3, topic, PARTITION);
gapRecords.remove(0);
service.insert(gapRecords);
// Insert another two records with offset gap that requires evolution: 300, 301
List<SinkRecord> gapRecords = TestUtils.createNativeJsonSinkRecords(300, 2, topic, PARTITION);

List<SinkRecord> mergedList = new ArrayList<>(blankRecords);
mergedList.addAll(gapRecords);
// mergedList' offsets -> [0, 1, 300, 301]
service.insert(mergedList);
// With schematization, we need to resend a new batch should succeed even if there is an offset
// gap from the previous committed offset
if (withSchematization) {
service.insert(gapRecords);
service.insert(mergedList);
}

TestUtils.assertWithRetry(
() -> service.getOffset(new TopicPartition(topic, PARTITION)) == 5, 20, 5);
() -> service.getOffset(new TopicPartition(topic, PARTITION)) == 302,
20, 5
);

assert TestUtils.tableSize(testTableName) == 4
: "expected: " + 4 + " actual: " + TestUtils.tableSize(testTableName);
Expand Down