diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java index 99bf7f30a..f2c6cf0b1 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java @@ -18,6 +18,9 @@ import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.After; import org.junit.Assert; @@ -485,4 +488,65 @@ public void testSimpleInsertRowsFailureWithArrowBDECFormat() throws Exception { service.insert(records); service.closeAll(); } + + @Test + public void testPartialBatchChannelInvalidationIngestion_schematization() throws Exception { + Map config = TestUtils.getConfForStreaming(); + config.put(SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS, "500"); // we want to flush on record + config.put(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC, "500000"); + config.put(SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES, "500000"); + config.put(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, "true"); // using schematization to invalidate + + // setup + InMemorySinkTaskContext inMemorySinkTaskContext = + new InMemorySinkTaskContext(Collections.singleton(topicPartition)); + SnowflakeSinkService service = + SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config) + .setRecordNumber(1) + .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) + .setSinkTaskContext(inMemorySinkTaskContext) + .addTask(testTableName, topicPartition) + .build(); + + final long firstBatchCount = 18; + final long secondBatchCount = 500; + + // create 18 blank records that do not kick off schematization + JsonConverter converter = new JsonConverter(); + HashMap converterConfig = new HashMap<>(); + converterConfig.put("schemas.enable", "false"); + converter.configure(converterConfig, false); + SchemaAndValue schemaInputValue = converter.toConnectData("test", null); + + List firstBatch = new ArrayList<>(); + for (int i = 0; i < firstBatchCount; i++) { + firstBatch.add(new SinkRecord( + topic, + PARTITION, + Schema.STRING_SCHEMA, + "test", + schemaInputValue.schema(), + schemaInputValue.value(), + i)); + } + + service.insert(firstBatch); + + // send batch with 500, should kick off a record based flush and schematization on record 19, which will fail the batches + List secondBatch = + TestUtils.createNativeJsonSinkRecords(firstBatchCount, secondBatchCount, topic, PARTITION); + service.insert(secondBatch); + + // resend batch 1 and 2 because 2 failed for schematization + service.insert(firstBatch); + service.insert(secondBatch); + + // ensure all data was ingested + TestUtils.assertWithRetry( + () -> service.getOffset(new TopicPartition(topic, PARTITION)) == firstBatchCount + secondBatchCount, 20, 5); + assert TestUtils.tableSize(testTableName) == firstBatchCount + secondBatchCount + : "expected: " + firstBatchCount + secondBatchCount + " actual: " + TestUtils.tableSize(testTableName); + + service.closeAll(); + } }