Skip to content

Commit

Permalink
autoformatting
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rcheng committed Nov 7, 2023
1 parent 604ae6b commit 152b28a
Showing 1 changed file with 26 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -492,10 +492,13 @@ public void testSimpleInsertRowsFailureWithArrowBDECFormat() throws Exception {
@Test
public void testPartialBatchChannelInvalidationIngestion_schematization() throws Exception {
Map<String, String> config = TestUtils.getConfForStreaming();
config.put(SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS, "500"); // we want to flush on record
config.put(
SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS, "500"); // we want to flush on record
config.put(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC, "500000");
config.put(SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES, "500000");
config.put(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, "true"); // using schematization to invalidate
config.put(
SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG,
"true"); // using schematization to invalidate

// setup
InMemorySinkTaskContext inMemorySinkTaskContext =
Expand All @@ -520,19 +523,21 @@ public void testPartialBatchChannelInvalidationIngestion_schematization() throws

List<SinkRecord> firstBatch = new ArrayList<>();
for (int i = 0; i < firstBatchCount; i++) {
firstBatch.add(new SinkRecord(
topic,
PARTITION,
Schema.STRING_SCHEMA,
"test",
schemaInputValue.schema(),
schemaInputValue.value(),
i));
firstBatch.add(
new SinkRecord(
topic,
PARTITION,
Schema.STRING_SCHEMA,
"test",
schemaInputValue.schema(),
schemaInputValue.value(),
i));
}

service.insert(firstBatch);

// send batch with 500, should kick off a record based flush and schematization on record 19, which will fail the batches
// send batch with 500, should kick off a record based flush and schematization on record 19,
// which will fail the batches
List<SinkRecord> secondBatch =
TestUtils.createNativeJsonSinkRecords(firstBatchCount, secondBatchCount, topic, PARTITION);
service.insert(secondBatch);
Expand All @@ -543,9 +548,17 @@ public void testPartialBatchChannelInvalidationIngestion_schematization() throws

// ensure all data was ingested
TestUtils.assertWithRetry(
() -> service.getOffset(new TopicPartition(topic, PARTITION)) == firstBatchCount + secondBatchCount, 20, 5);
() ->
service.getOffset(new TopicPartition(topic, PARTITION))
== firstBatchCount + secondBatchCount,
20,
5);
assert TestUtils.tableSize(testTableName) == firstBatchCount + secondBatchCount
: "expected: " + firstBatchCount + secondBatchCount + " actual: " + TestUtils.tableSize(testTableName);
: "expected: "
+ firstBatchCount
+ secondBatchCount
+ " actual: "
+ TestUtils.tableSize(testTableName);

service.closeAll();
}
Expand Down

0 comments on commit 152b28a

Please sign in to comment.