Skip to content

Commit

Permalink
add it
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rcheng committed Nov 7, 2023
1 parent 645d703 commit 604ae6b
Showing 1 changed file with 64 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -485,4 +488,65 @@ public void testSimpleInsertRowsFailureWithArrowBDECFormat() throws Exception {
service.insert(records);
service.closeAll();
}

@Test
public void testPartialBatchChannelInvalidationIngestion_schematization() throws Exception {
Map<String, String> config = TestUtils.getConfForStreaming();
config.put(SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS, "500"); // we want to flush on record
config.put(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC, "500000");
config.put(SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES, "500000");
config.put(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, "true"); // using schematization to invalidate

// setup
InMemorySinkTaskContext inMemorySinkTaskContext =
new InMemorySinkTaskContext(Collections.singleton(topicPartition));
SnowflakeSinkService service =
SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config)
.setRecordNumber(1)
.setErrorReporter(new InMemoryKafkaRecordErrorReporter())
.setSinkTaskContext(inMemorySinkTaskContext)
.addTask(testTableName, topicPartition)
.build();

final long firstBatchCount = 18;
final long secondBatchCount = 500;

// create 18 blank records that do not kick off schematization
JsonConverter converter = new JsonConverter();
HashMap<String, String> converterConfig = new HashMap<>();
converterConfig.put("schemas.enable", "false");
converter.configure(converterConfig, false);
SchemaAndValue schemaInputValue = converter.toConnectData("test", null);

List<SinkRecord> firstBatch = new ArrayList<>();
for (int i = 0; i < firstBatchCount; i++) {
firstBatch.add(new SinkRecord(
topic,
PARTITION,
Schema.STRING_SCHEMA,
"test",
schemaInputValue.schema(),
schemaInputValue.value(),
i));
}

service.insert(firstBatch);

// send batch with 500, should kick off a record based flush and schematization on record 19, which will fail the batches
List<SinkRecord> secondBatch =
TestUtils.createNativeJsonSinkRecords(firstBatchCount, secondBatchCount, topic, PARTITION);
service.insert(secondBatch);

// resend batch 1 and 2 because 2 failed for schematization
service.insert(firstBatch);
service.insert(secondBatch);

// ensure all data was ingested
TestUtils.assertWithRetry(
() -> service.getOffset(new TopicPartition(topic, PARTITION)) == firstBatchCount + secondBatchCount, 20, 5);
assert TestUtils.tableSize(testTableName) == firstBatchCount + secondBatchCount
: "expected: " + firstBatchCount + secondBatchCount + " actual: " + TestUtils.tableSize(testTableName);

service.closeAll();
}
}

0 comments on commit 604ae6b

Please sign in to comment.