Skip to content

Commit

Permalink
made breaking test
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rcheng committed Nov 1, 2023
1 parent d805bf3 commit 01a9e3d
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ private boolean shouldIgnoreAddingRecordToBuffer(
}

// Don't ignore if we see the expected offset; otherwise log and skip
if ((kafkaSinkRecord.kafkaOffset() - currentProcessedOffset) == 1L) {
if (kafkaSinkRecord.kafkaOffset() == (currentProcessedOffset - 1)) {
LOGGER.debug(
"Got the desired offset:{} from Kafka, we can add this offset to buffer for channel:{}",
kafkaSinkRecord.kafkaOffset(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
import net.snowflake.ingest.streaming.OpenChannelRequest;
import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.apache.kafka.connect.json.JsonConverter;
import org.junit.Test;

public class TopicPartitionChannelIT {
Expand Down Expand Up @@ -485,4 +488,66 @@ public void testSimpleInsertRowsFailureWithArrowBDECFormat() throws Exception {
service.insert(records);
service.closeAll();
}


// Sometimes the customers converter will send a NULL record which is dropped by KC, resulting in missing offsets
@Test
public void testMissingOffsetRecordIngestionWithSchematization() throws Exception {
// setup
Map<String, String> config = TestUtils.getConfForStreaming();
SnowflakeSinkConnectorConfig.setDefaultValues(config);
config.put(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, "true");

// This will automatically create a channel for topicPartition.
SnowflakeSinkService service =
SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config)
.setRecordNumber(1)
.setErrorReporter(new InMemoryKafkaRecordErrorReporter())
.setSinkTaskContext(new InMemorySinkTaskContext(Collections.singleton(topicPartition)))
.addTask(testTableName, topicPartition)
.build();

// insert blank record so that offset is not -1
JsonConverter converter = new JsonConverter();
HashMap<String, String> converterConfig = new HashMap<>();
converterConfig.put("schemas.enable", "false");
converter.configure(converterConfig, false);
SchemaAndValue schemaInputValue =
converter.toConnectData(
"test", null);
service.insert(
new SinkRecord(
topic,
PARTITION,
Schema.STRING_SCHEMA,
"test",
schemaInputValue.schema(),
schemaInputValue.value(),
0));

TestUtils.assertWithRetry(
() -> service.getOffset(new TopicPartition(topic, PARTITION)) == 1, 20, 5);

// create records with offsets: 2, 3, 4, 5, 6, 8, 9; missing records 1 and 7
final int expectedNumRecords = 7;
final int expectedOffset = 9;

List<SinkRecord> records =
TestUtils.createNativeJsonSinkRecords(0, expectedOffset + 1, topic, PARTITION);
records.remove(7);
records.remove(1);
records.remove(0); // already ingested offset 0
assert expectedNumRecords == records.size();
assert expectedOffset == records.get(expectedNumRecords - 1).kafkaOffset();

// test ingestion, should fail due to schema evolution, so insert twice
service.insert(records);
service.insert(records);

// verify offset and table size
TestUtils.assertWithRetry(
() -> service.getOffset(new TopicPartition(topic, PARTITION)) == expectedOffset, 20, 5);
assert expectedNumRecords == TestUtils.tableSize(testTableName) : Utils.formatString("expected: {}, actual: {}", expectedNumRecords, TestUtils.tableSize(testTableName));
service.closeAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -965,4 +965,6 @@ public void testTopicPartitionChannelInvalidJmxReporter() throws Exception {
topicPartitionChannel.closeChannel();
assert resultStatus.getMetricsJmxReporter() == null;
}


}

0 comments on commit 01a9e3d

Please sign in to comment.