From c2ef3174bc57ad445ef437ce65b9ae08ed7aa5a8 Mon Sep 17 00:00:00 2001 From: Sudesh Date: Sat, 21 Dec 2024 16:00:49 +0530 Subject: [PATCH] Fix indexOutOfBoundException --- .../BufferedTopicPartitionChannel.java | 27 ++++--- .../kafka/connector/internal/TestUtils.java | 34 +++++++-- .../streaming/TopicPartitionChannelIT.java | 73 +++++-------------- 3 files changed, 61 insertions(+), 73 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java index f092ebd1d..8f5304af8 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java @@ -641,10 +641,10 @@ public InsertRowsResponse get() throws Throwable { "Invoking insertRows API for channel:{}, streamingBuffer:{}", this.channel.getFullyQualifiedName(), this.insertRowsStreamingBuffer); - Pair>, List> recordsAndOffsets = + Pair>, List> recordsAndOriginalSinkRecords = this.insertRowsStreamingBuffer.getData(); - List> records = recordsAndOffsets.getKey(); - List offsets = recordsAndOffsets.getValue(); + List> records = recordsAndOriginalSinkRecords.getKey(); + List originalSinkRecords = recordsAndOriginalSinkRecords.getValue(); InsertValidationResponse finalResponse = new InsertValidationResponse(); boolean needToResetOffset = false; if (!enableSchemaEvolution) { @@ -658,16 +658,19 @@ public InsertRowsResponse get() throws Throwable { // For schema evolution, we need to call the insertRows API row by row in order to // preserve the original order, for anything after the first schema mismatch error we will // retry after the evolution - InsertValidationResponse response = - this.channel.insertRow(records.get(idx), Long.toString(offsets.get(idx))); + SinkRecord originalSinkRecord = originalSinkRecords.get(idx); + InsertValidationResponse response = this.channel.insertRow( + records.get(idx), Long.toString(originalSinkRecord.kafkaOffset()) + ); if (response.hasErrors()) { InsertValidationResponse.InsertError insertError = response.getInsertErrors().get(0); SchemaEvolutionTargetItems schemaEvolutionTargetItems = insertErrorMapper.mapToSchemaEvolutionItems( insertError, this.channel.getTableName()); + // TODO : originalSinkRecordIdx can be replaced by idx long originalSinkRecordIdx = - offsets.get(idx) - this.insertRowsStreamingBuffer.getFirstOffset(); + originalSinkRecord.kafkaOffset() - this.insertRowsStreamingBuffer.getFirstOffset(); if (!schemaEvolutionTargetItems.hasDataForSchemaEvolution()) { InsertValidationResponse.InsertError newInsertError = @@ -684,7 +687,7 @@ public InsertRowsResponse get() throws Throwable { LOGGER.info("Triggering schema evolution. Items: {}", schemaEvolutionTargetItems); schemaEvolutionService.evolveSchemaIfNeeded( schemaEvolutionTargetItems, - this.insertRowsStreamingBuffer.getSinkRecord(originalSinkRecordIdx), + originalSinkRecord, channel.getTableSchema()); // Offset reset needed since it's possible that we successfully ingested partial batch needToResetOffset = true; @@ -1282,7 +1285,7 @@ protected long getApproxSizeOfRecordInBytes(SinkRecord kafkaSinkRecord) { * before calling insertRows API. */ @VisibleForTesting - class StreamingBuffer extends PartitionBuffer>, List>> { + class StreamingBuffer extends PartitionBuffer>, List>> { // Records coming from Kafka private final List sinkRecords; @@ -1316,9 +1319,9 @@ public void insert(SinkRecord kafkaSinkRecord) { * @return A pair that contains the records and their corresponding offsets */ @Override - public Pair>, List> getData() { + public Pair>, List> getData() { final List> records = new ArrayList<>(); - final List offsets = new ArrayList<>(); + final List filteredOriginalSinkRecords = new ArrayList<>(); for (SinkRecord kafkaSinkRecord : sinkRecords) { SinkRecord snowflakeRecord = getSnowflakeSinkRecordFromKafkaRecord(kafkaSinkRecord); @@ -1345,7 +1348,7 @@ public Pair>, List> getData() { Map tableRow = recordService.getProcessedRecordForStreamingIngest(snowflakeRecord); records.add(tableRow); - offsets.add(snowflakeRecord.kafkaOffset()); + filteredOriginalSinkRecords.add(kafkaSinkRecord); } catch (JsonProcessingException e) { LOGGER.warn( "Record has JsonProcessingException offset:{}, topic:{}", @@ -1371,7 +1374,7 @@ public Pair>, List> getData() { getBufferSizeBytes(), getFirstOffset(), getLastOffset()); - return new Pair<>(records, offsets); + return new Pair<>(records, filteredOriginalSinkRecords); } @Override diff --git a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java index 6e4607329..f8eef5856 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java @@ -716,22 +716,44 @@ public static List createJsonStringSinkRecords( return records; } + /* Generate (noOfRecords - startOffset) blank records for a given topic and partition. */ + public static List createBlankJsonSinkRecords( + final long startOffset, + final long noOfRecords, + final String topicName, + final int partitionNo) { + return createJsonRecords( + startOffset, noOfRecords, topicName, partitionNo, null, + Collections.singletonMap("schemas.enable", Boolean.toString(false)) + ); + } + /* Generate (noOfRecords - startOffset) for a given topic and partition. */ public static List createNativeJsonSinkRecords( final long startOffset, final long noOfRecords, final String topicName, final int partitionNo) { - ArrayList records = new ArrayList<>(); + return createJsonRecords( + startOffset, noOfRecords, topicName, partitionNo, + TestUtils.JSON_WITH_SCHEMA.getBytes(StandardCharsets.UTF_8), + Collections.singletonMap("schemas.enable", Boolean.toString(true)) + ); + } + private static List createJsonRecords( + final long startOffset, + final long noOfRecords, + final String topicName, + final int partitionNo, + byte[] value, + Map converterConfig + ) { JsonConverter converter = new JsonConverter(); - HashMap converterConfig = new HashMap<>(); - converterConfig.put("schemas.enable", "true"); converter.configure(converterConfig, false); - SchemaAndValue schemaInputValue = - converter.toConnectData( - "test", TestUtils.JSON_WITH_SCHEMA.getBytes(StandardCharsets.UTF_8)); + SchemaAndValue schemaInputValue = converter.toConnectData("test", value); + ArrayList records = new ArrayList<>(); for (long i = startOffset; i < startOffset + noOfRecords; ++i) { records.add( new SinkRecord( diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java index 85b2d5442..13ab2c09b 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java @@ -1,9 +1,5 @@ package com.snowflake.kafka.connector.internal.streaming; -import static com.snowflake.kafka.connector.internal.streaming.ChannelMigrationResponseCode.SUCCESS; -import static com.snowflake.kafka.connector.internal.streaming.ChannelMigrationResponseCode.isChannelMigrationResponseSuccessful; -import static com.snowflake.kafka.connector.internal.streaming.channel.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE; - import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.dlq.InMemoryKafkaRecordErrorReporter; @@ -11,7 +7,10 @@ import com.snowflake.kafka.connector.internal.SnowflakeSinkService; import com.snowflake.kafka.connector.internal.SnowflakeSinkServiceFactory; import com.snowflake.kafka.connector.internal.TestUtils; +import static com.snowflake.kafka.connector.internal.streaming.ChannelMigrationResponseCode.SUCCESS; +import static com.snowflake.kafka.connector.internal.streaming.ChannelMigrationResponseCode.isChannelMigrationResponseSuccessful; import com.snowflake.kafka.connector.internal.streaming.channel.TopicPartitionChannel; +import static com.snowflake.kafka.connector.internal.streaming.channel.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE; import com.snowflake.kafka.connector.internal.streaming.schemaevolution.InsertErrorMapper; import com.snowflake.kafka.connector.internal.streaming.schemaevolution.snowflake.SnowflakeSchemaEvolutionService; import com.snowflake.kafka.connector.internal.streaming.telemetry.SnowflakeTelemetryServiceV2; @@ -23,9 +22,6 @@ import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaAndValue; -import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -510,25 +506,7 @@ public void testPartialBatchChannelInvalidationIngestion_schematization(boolean final long secondBatchCount = 500; // create 18 blank records that do not kick off schematization - JsonConverter converter = new JsonConverter(); - HashMap converterConfig = new HashMap<>(); - converterConfig.put("schemas.enable", "false"); - converter.configure(converterConfig, false); - SchemaAndValue schemaInputValue = converter.toConnectData("test", null); - - List firstBatch = new ArrayList<>(); - for (int i = 0; i < firstBatchCount; i++) { - firstBatch.add( - new SinkRecord( - topic, - PARTITION, - Schema.STRING_SCHEMA, - "test", - schemaInputValue.schema(), - schemaInputValue.value(), - i)); - } - + List firstBatch = TestUtils.createBlankJsonSinkRecords(0, firstBatchCount, topic, PARTITION); service.insert(firstBatch); // send batch with 500, should kick off a record based flush and schematization on record 19, @@ -759,53 +737,38 @@ private void testInsertRowsWithGaps(boolean withSchematization, boolean useSingl SnowflakeSinkConnectorConfig.setDefaultValues(config); config.put( SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, - Boolean.toString(withSchematization)); + Boolean.toString(withSchematization) + ); // create tpChannel SnowflakeSinkService service = SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config) - .setRecordNumber(1) + .setRecordNumber(4) .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) .setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition))) .addTask(testTableName, topicPartition) .build(); // insert blank records that do not evolve schema: 0, 1 - JsonConverter converter = new JsonConverter(); - HashMap converterConfig = new HashMap<>(); - converterConfig.put("schemas.enable", "false"); - converter.configure(converterConfig, false); - SchemaAndValue schemaInputValue = converter.toConnectData("test", null); - List blankRecords = new ArrayList<>(); - for (int i = 0; i < 2; i++) { - blankRecords.add( - new SinkRecord( - topic, - PARTITION, - Schema.STRING_SCHEMA, - "test", - schemaInputValue.schema(), - schemaInputValue.value(), - i)); - } - - service.insert(blankRecords); - TestUtils.assertWithRetry( - () -> service.getOffset(new TopicPartition(topic, PARTITION)) == 2, 20, 5); + List blankRecords = TestUtils.createBlankJsonSinkRecords(0, 2, topic, PARTITION); - // Insert another two records with offset gap that requires evolution: 3, 4 - List gapRecords = TestUtils.createNativeJsonSinkRecords(2, 3, topic, PARTITION); - gapRecords.remove(0); - service.insert(gapRecords); + // Insert another two records with offset gap that requires evolution: 300, 301 + List gapRecords = TestUtils.createNativeJsonSinkRecords(300, 2, topic, PARTITION); + List mergedList = new ArrayList<>(blankRecords); + mergedList.addAll(gapRecords); + // mergedList' offsets -> [0, 1, 300, 301] + service.insert(mergedList); // With schematization, we need to resend a new batch should succeed even if there is an offset // gap from the previous committed offset if (withSchematization) { - service.insert(gapRecords); + service.insert(mergedList); } TestUtils.assertWithRetry( - () -> service.getOffset(new TopicPartition(topic, PARTITION)) == 5, 20, 5); + () -> service.getOffset(new TopicPartition(topic, PARTITION)) == 302, + 20, 5 + ); assert TestUtils.tableSize(testTableName) == 4 : "expected: " + 4 + " actual: " + TestUtils.tableSize(testTableName);