From 8eb8036be8991ccc9b09015289030fdcd3f7a82c Mon Sep 17 00:00:00 2001 From: revi cheng Date: Tue, 17 Oct 2023 17:27:52 -0700 Subject: [PATCH 01/18] e2e test passes --- .../streaming/TopicPartitionChannel.java | 16 +++++++------- .../streaming/SnowflakeSinkServiceV2IT.java | 2 ++ ..._evolution_w_auto_table_creation_json.json | 4 ++-- ...ma_evolution_w_auto_table_creation_json.py | 22 +++++++++++++++++-- 4 files changed, 32 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java index 48e2e0ff9..6a7613253 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java @@ -281,6 +281,7 @@ public TopicPartitionChannel( final long lastCommittedOffsetToken = fetchOffsetTokenWithRetry(); this.offsetPersistedInSnowflake.set(lastCommittedOffsetToken); this.processedOffset.set(lastCommittedOffsetToken); + LOGGER.info("[REVI 284] set processedOffset to: {}", this.processedOffset.get()); // setup telemetry and metrics String connectorName = @@ -323,28 +324,26 @@ public TopicPartitionChannel( * @param kafkaSinkRecord input record from Kafka */ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) { - final long currentOffsetPersistedInSnowflake = this.offsetPersistedInSnowflake.get(); - final long currentProcessedOffset = this.processedOffset.get(); - // Set the consumer offset to be the first record that Kafka sends us if (latestConsumerOffset.get() == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { this.latestConsumerOffset.set(kafkaSinkRecord.kafkaOffset()); } // Ignore adding to the buffer until we see the expected offset value - if (shouldIgnoreAddingRecordToBuffer(kafkaSinkRecord, currentProcessedOffset)) { + if (shouldIgnoreAddingRecordToBuffer(kafkaSinkRecord, this.processedOffset.get())) { return; } // Accept the incoming record only if we don't have a valid offset token at server side, or the // incoming record offset is 1 + the processed offset - if (currentProcessedOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE - || kafkaSinkRecord.kafkaOffset() >= currentProcessedOffset + 1) { + if (this.processedOffset.get() == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE + || kafkaSinkRecord.kafkaOffset() >= this.processedOffset.get() + 1) { StreamingBuffer copiedStreamingBuffer = null; bufferLock.lock(); try { this.streamingBuffer.insert(kafkaSinkRecord); this.processedOffset.set(kafkaSinkRecord.kafkaOffset()); + LOGGER.info("[REVI 349] set processedOffset to: {}", this.processedOffset.get()); // # of records or size based flushing if (this.streamingBufferThreshold.shouldFlushOnBufferByteSize( streamingBuffer.getBufferSizeBytes()) @@ -376,8 +375,8 @@ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) { + " offsetPersistedInSnowflake:{}, processedOffset:{}", kafkaSinkRecord.kafkaOffset(), this.getChannelName(), - currentOffsetPersistedInSnowflake, - currentProcessedOffset); + this.offsetPersistedInSnowflake.get(), + this.processedOffset.get()); } } @@ -958,6 +957,7 @@ private void resetChannelMetadataAfterRecovery( // might get rejected. this.offsetPersistedInSnowflake.set(offsetRecoveredFromSnowflake); this.processedOffset.set(offsetRecoveredFromSnowflake); + LOGGER.info("[REVI 963] set processedOffset to: {}", this.processedOffset.get()); // State that there was some exception and only clear that state when we have received offset // starting from offsetRecoveredFromSnowflake diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java index 64d70f49b..b97c58a99 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java @@ -33,6 +33,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; + +import net.snowflake.ingest.internal.apache.commons.math3.analysis.function.Sin; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; diff --git a/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_json.json b/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_json.json index 38e50ab1b..b39b87ac5 100644 --- a/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_json.json +++ b/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_json.json @@ -6,8 +6,8 @@ "snowflake.topic2table.map": "SNOWFLAKE_TEST_TOPIC0:SNOWFLAKE_CONNECTOR_NAME,SNOWFLAKE_TEST_TOPIC1:SNOWFLAKE_CONNECTOR_NAME", "tasks.max": "1", "buffer.flush.time": "10", - "buffer.count.records": "100", - "buffer.size.bytes": "5000000", + "buffer.count.records": "300", + "buffer.size.bytes": "500000000", "snowflake.url.name": "SNOWFLAKE_HOST", "snowflake.user.name": "SNOWFLAKE_USER", "snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY", diff --git a/test/test_suit/test_schema_evolution_w_auto_table_creation_json.py b/test/test_suit/test_schema_evolution_w_auto_table_creation_json.py index 48daea794..cc0fbf453 100644 --- a/test/test_suit/test_schema_evolution_w_auto_table_creation_json.py +++ b/test/test_suit/test_schema_evolution_w_auto_table_creation_json.py @@ -1,6 +1,7 @@ import json from test_suit.test_utils import NonRetryableError +from time import sleep # test if the table is updated with the correct column @@ -13,7 +14,11 @@ def __init__(self, driver, nameSalt): self.fileName = "travis_correct_schema_evolution_w_auto_table_creation_json" self.topics = [] self.table = self.fileName + nameSalt - self.recordNum = 100 + + # records + self.initialRecordCount = 12 + self.flushRecordCount = 300 + self.recordNum = self.initialRecordCount + self.flushRecordCount for i in range(2): self.topics.append(self.table + str(i)) @@ -48,13 +53,26 @@ def getConfigFileName(self): def send(self): for i, topic in enumerate(self.topics): + # send initial batch key = [] value = [] - for e in range(self.recordNum): + for e in range(self.initialRecordCount): key.append(json.dumps({'number': str(e)}).encode('utf-8')) value.append(json.dumps(self.records[i]).encode('utf-8')) self.driver.sendBytesData(topic, value, key) + sleep(2) + + # send second batch that should flush + key = [] + value = [] + for e in range(self.flushRecordCount): + key.append(json.dumps({'number': str(e)}).encode('utf-8')) + value.append(json.dumps(self.records[i]).encode('utf-8')) + self.driver.sendBytesData(topic, value, key) + + sleep(10) # sleep to ensure all data is flushed + def verify(self, round): rows = self.driver.snowflake_conn.cursor().execute( "desc table {}".format(self.table)).fetchall() From 9de546099ad21cb81f16f1883f3428104711b8ee Mon Sep 17 00:00:00 2001 From: revi cheng Date: Tue, 17 Oct 2023 17:56:15 -0700 Subject: [PATCH 02/18] add avro, failing though --- .../streaming/TopicPartitionChannel.java | 4 +--- ...olution_w_auto_table_creation_avro_sr.json | 2 +- ..._evolution_w_auto_table_creation_json.json | 2 +- ...evolution_w_auto_table_creation_avro_sr.py | 20 +++++++++++++++++-- 4 files changed, 21 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java index 6a7613253..1bf1c8770 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java @@ -281,7 +281,6 @@ public TopicPartitionChannel( final long lastCommittedOffsetToken = fetchOffsetTokenWithRetry(); this.offsetPersistedInSnowflake.set(lastCommittedOffsetToken); this.processedOffset.set(lastCommittedOffsetToken); - LOGGER.info("[REVI 284] set processedOffset to: {}", this.processedOffset.get()); // setup telemetry and metrics String connectorName = @@ -343,7 +342,7 @@ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) { try { this.streamingBuffer.insert(kafkaSinkRecord); this.processedOffset.set(kafkaSinkRecord.kafkaOffset()); - LOGGER.info("[REVI 349] set processedOffset to: {}", this.processedOffset.get()); + // # of records or size based flushing if (this.streamingBufferThreshold.shouldFlushOnBufferByteSize( streamingBuffer.getBufferSizeBytes()) @@ -957,7 +956,6 @@ private void resetChannelMetadataAfterRecovery( // might get rejected. this.offsetPersistedInSnowflake.set(offsetRecoveredFromSnowflake); this.processedOffset.set(offsetRecoveredFromSnowflake); - LOGGER.info("[REVI 963] set processedOffset to: {}", this.processedOffset.get()); // State that there was some exception and only clear that state when we have received offset // starting from offsetRecoveredFromSnowflake diff --git a/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_avro_sr.json b/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_avro_sr.json index 27de6cf6e..6232e0237 100644 --- a/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_avro_sr.json +++ b/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_avro_sr.json @@ -6,7 +6,7 @@ "snowflake.topic2table.map": "SNOWFLAKE_TEST_TOPIC0:SNOWFLAKE_CONNECTOR_NAME,SNOWFLAKE_TEST_TOPIC1:SNOWFLAKE_CONNECTOR_NAME", "tasks.max": "1", "buffer.flush.time": "10", - "buffer.count.records": "100", + "buffer.count.records": "300", "buffer.size.bytes": "5000000", "snowflake.url.name": "SNOWFLAKE_HOST", "snowflake.user.name": "SNOWFLAKE_USER", diff --git a/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_json.json b/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_json.json index b39b87ac5..e06328050 100644 --- a/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_json.json +++ b/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_json.json @@ -7,7 +7,7 @@ "tasks.max": "1", "buffer.flush.time": "10", "buffer.count.records": "300", - "buffer.size.bytes": "500000000", + "buffer.size.bytes": "5000000", "snowflake.url.name": "SNOWFLAKE_HOST", "snowflake.user.name": "SNOWFLAKE_USER", "snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY", diff --git a/test/test_suit/test_schema_evolution_w_auto_table_creation_avro_sr.py b/test/test_suit/test_schema_evolution_w_auto_table_creation_avro_sr.py index dd4129064..b95192633 100644 --- a/test/test_suit/test_schema_evolution_w_auto_table_creation_avro_sr.py +++ b/test/test_suit/test_schema_evolution_w_auto_table_creation_avro_sr.py @@ -1,5 +1,6 @@ from confluent_kafka import avro from test_suit.test_utils import NonRetryableError +from time import sleep # test if the table is updated with the correct column @@ -12,7 +13,11 @@ def __init__(self, driver, nameSalt): self.fileName = "travis_correct_schema_evolution_w_auto_table_creation_avro_sr" self.topics = [] self.table = self.fileName + nameSalt - self.recordNum = 100 + + # records + self.initialRecordCount = 12 + self.flushRecordCount = 300 + self.recordNum = self.initialRecordCount + self.flushRecordCount for i in range(2): self.topics.append(self.table + str(i)) @@ -78,11 +83,22 @@ def getConfigFileName(self): def send(self): for i, topic in enumerate(self.topics): + # send initial batch value = [] - for _ in range(self.recordNum): + for _ in range(self.initialRecordCount): value.append(self.records[i]) self.driver.sendAvroSRData(topic, value, self.valueSchema[i], key=[], key_schema="", partition=0) + sleep(2) + + # send second batch that should flush + value = [] + for _ in range(self.flushRecordCount): + value.append(self.records[i]) + self.driver.sendAvroSRData(topic, value, self.valueSchema[i], key=[], key_schema="", partition=0) + + sleep(10) # sleep to ensure all data is flushed + def verify(self, round): rows = self.driver.snowflake_conn.cursor().execute( "desc table {}".format(self.table)).fetchall() From 53db4708592755904e500b249039ff3a887014f1 Mon Sep 17 00:00:00 2001 From: revi cheng Date: Wed, 18 Oct 2023 11:48:22 -0700 Subject: [PATCH 03/18] and not or --- .../streaming/TopicPartitionChannel.java | 14 ++-- .../streaming/TopicPartitionChannelTest.java | 71 +++++++++++++------ 2 files changed, 57 insertions(+), 28 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java index 1bf1c8770..32744f507 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java @@ -112,7 +112,7 @@ public class TopicPartitionChannel { *
  • If channel fails to fetch offsetToken from Snowflake, we reopen the channel and try to * fetch offset from Snowflake again *
  • If channel fails to ingest a buffer(Buffer containing rows/offsets), we reopen the - * channel and try to fetch offset from Snowflake again + * channel and try to fetch offset from Snowflake again. Schematization purposefully fails the first buffer insert in order to alter the table, and then expects Kafka to resend data * * *

    In both cases above, we ask Kafka to send back offsets, strictly from offset number after @@ -124,7 +124,7 @@ public class TopicPartitionChannel { *

    This boolean is used to indicate that we reset offset in kafka and we will only buffer once * we see the offset which is one more than an offset present in Snowflake. */ - private boolean isOffsetResetInKafka; + private boolean isOffsetResetInKafka = false; // TODO @rcheng question: atomic? private final SnowflakeStreamingIngestClient streamingIngestClient; @@ -389,19 +389,21 @@ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) { * * @param kafkaSinkRecord Record to check for above condition only in case of failures * (isOffsetResetInKafka = true) + * @param currentProcessedOffset The current processed offset * @return true if this record can be skipped to add into buffer, false otherwise. */ private boolean shouldIgnoreAddingRecordToBuffer( - SinkRecord kafkaSinkRecord, long currentProcessedOffset) { - // Don't skip rows if there is no offset reset or there is no offset token information in the + SinkRecord kafkaSinkRecord, final long currentProcessedOffset) { + // Don't skip rows if there is no offset reset and there is no offset token information in the // channel if (!isOffsetResetInKafka - || currentProcessedOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { + && currentProcessedOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { + LOGGER.debug("No offset registered in Snowflake and offset is not being reset, we can add this offset to buffer for channel:{}", currentProcessedOffset); return false; } // Don't ignore if we see the expected offset; otherwise log and skip - if ((kafkaSinkRecord.kafkaOffset() - currentProcessedOffset) == 1L) { + if ((kafkaSinkRecord.kafkaOffset() - currentProcessedOffset ) == 1L) { LOGGER.debug( "Got the desired offset:{} from Kafka, we can add this offset to buffer for channel:{}", kafkaSinkRecord.kafkaOffset(), diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java index 601fb5922..319b42e60 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java @@ -397,15 +397,23 @@ public void testFetchOffsetTokenWithRetry_RuntimeException() { /* Only SFExceptions goes into fallback -> reopens channel, fetch offsetToken and throws Appropriate exception */ @Test public void testInsertRows_SuccessAfterReopenChannel() throws Exception { + final int noOfRecords = 5; + int expectedInsertRowsCount = 0; + int expectedOpenChannelCount = 0; + int expectedGetOffsetCount = 0; + + // setup mocks to fail first insert and return two null snowflake offsets (open channel and failed insert) before succeeding Mockito.when( mockStreamingChannel.insertRows( ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class))) - .thenThrow(SF_EXCEPTION); - - // get null from snowflake first time it is called and null for second time too since insert - // rows was failure - Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken()).thenReturn(null); + .thenThrow(SF_EXCEPTION) + .thenReturn(new InsertValidationResponse()); + Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken()) + .thenReturn(null) + .thenReturn(null) + .thenReturn(Long.toString(noOfRecords - 1)); + // create tpchannel TopicPartitionChannel topicPartitionChannel = new TopicPartitionChannel( mockStreamingClient, @@ -417,37 +425,56 @@ public void testInsertRows_SuccessAfterReopenChannel() throws Exception { mockKafkaRecordErrorReporter, mockSinkTaskContext, mockTelemetryService); - final int noOfRecords = 5; - // Since record 0 was not able to ingest, all records in this batch will not be added into the - // buffer. + expectedOpenChannelCount++; + expectedGetOffsetCount++; + + // verify initial mock counts after tpchannel creation + Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedInsertRowsCount)) + .insertRows(ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class)); + Mockito.verify(mockStreamingClient, Mockito.times(expectedOpenChannelCount)) + .openChannel(ArgumentMatchers.any()); + Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedGetOffsetCount)) + .getLatestCommittedOffsetToken(); + + // Test inserting record 0, which should fail to ingest so the other records are ignored List records = TestUtils.createJsonStringSinkRecords(0, noOfRecords, TOPIC, PARTITION); - records.forEach(topicPartitionChannel::insertRecordToBuffer); + expectedInsertRowsCount++; + expectedOpenChannelCount++; + expectedGetOffsetCount++; - Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(noOfRecords)) + // verify mocks only tried ingesting once + Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedInsertRowsCount)) .insertRows(ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class)); - Mockito.verify(mockStreamingClient, Mockito.times(noOfRecords + 1)) + Mockito.verify(mockStreamingClient, Mockito.times(expectedOpenChannelCount)) .openChannel(ArgumentMatchers.any()); - Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(noOfRecords + 1)) + Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedGetOffsetCount)) .getLatestCommittedOffsetToken(); // Now, it should be successful - Mockito.when( - mockStreamingChannel.insertRows( - ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class))) - .thenReturn(new InsertValidationResponse()); - - Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken()) - .thenReturn(Long.toString(noOfRecords - 1)); +// Mockito.when( +// mockStreamingChannel.insertRows( +// ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class))) +// .thenReturn(new InsertValidationResponse()); +// +// Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken()) +// .thenReturn(Long.toString(noOfRecords - 1)); // Retry the insert again, now everything should be ingested and the offset token should be // noOfRecords-1 records.forEach(topicPartitionChannel::insertRecordToBuffer); - Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(noOfRecords * 2)) - .insertRows(ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class)); - Assert.assertEquals(noOfRecords - 1, topicPartitionChannel.fetchOffsetTokenWithRetry()); + expectedInsertRowsCount += noOfRecords; + expectedGetOffsetCount++; + + // verify mocks ingested each record + Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedInsertRowsCount)) + .insertRows(ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class)); + Mockito.verify(mockStreamingClient, Mockito.times(expectedOpenChannelCount)) + .openChannel(ArgumentMatchers.any()); + Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedGetOffsetCount)) + .getLatestCommittedOffsetToken(); } @Test From d61d16060696b575f37b112da49b1f1a0f803356 Mon Sep 17 00:00:00 2001 From: revi cheng Date: Wed, 18 Oct 2023 11:48:42 -0700 Subject: [PATCH 04/18] manual formatting --- .../streaming/TopicPartitionChannel.java | 11 ++++++++--- .../streaming/SnowflakeSinkServiceV2IT.java | 2 -- .../streaming/TopicPartitionChannelTest.java | 17 +++++++++-------- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java index 32744f507..4c38caae7 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java @@ -112,7 +112,9 @@ public class TopicPartitionChannel { *

  • If channel fails to fetch offsetToken from Snowflake, we reopen the channel and try to * fetch offset from Snowflake again *
  • If channel fails to ingest a buffer(Buffer containing rows/offsets), we reopen the - * channel and try to fetch offset from Snowflake again. Schematization purposefully fails the first buffer insert in order to alter the table, and then expects Kafka to resend data + * channel and try to fetch offset from Snowflake again. Schematization purposefully fails + * the first buffer insert in order to alter the table, and then expects Kafka to resend + * data * * *

    In both cases above, we ask Kafka to send back offsets, strictly from offset number after @@ -398,12 +400,15 @@ private boolean shouldIgnoreAddingRecordToBuffer( // channel if (!isOffsetResetInKafka && currentProcessedOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { - LOGGER.debug("No offset registered in Snowflake and offset is not being reset, we can add this offset to buffer for channel:{}", currentProcessedOffset); + LOGGER.debug( + "No offset registered in Snowflake and offset is not being reset, we can add this offset" + + " to buffer for channel:{}", + currentProcessedOffset); return false; } // Don't ignore if we see the expected offset; otherwise log and skip - if ((kafkaSinkRecord.kafkaOffset() - currentProcessedOffset ) == 1L) { + if ((kafkaSinkRecord.kafkaOffset() - currentProcessedOffset) == 1L) { LOGGER.debug( "Got the desired offset:{} from Kafka, we can add this offset to buffer for channel:{}", kafkaSinkRecord.kafkaOffset(), diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java index b97c58a99..64d70f49b 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java @@ -33,8 +33,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; - -import net.snowflake.ingest.internal.apache.commons.math3.analysis.function.Sin; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java index 319b42e60..d44d39e7f 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java @@ -402,7 +402,8 @@ public void testInsertRows_SuccessAfterReopenChannel() throws Exception { int expectedOpenChannelCount = 0; int expectedGetOffsetCount = 0; - // setup mocks to fail first insert and return two null snowflake offsets (open channel and failed insert) before succeeding + // setup mocks to fail first insert and return two null snowflake offsets (open channel and + // failed insert) before succeeding Mockito.when( mockStreamingChannel.insertRows( ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class))) @@ -453,13 +454,13 @@ public void testInsertRows_SuccessAfterReopenChannel() throws Exception { .getLatestCommittedOffsetToken(); // Now, it should be successful -// Mockito.when( -// mockStreamingChannel.insertRows( -// ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class))) -// .thenReturn(new InsertValidationResponse()); -// -// Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken()) -// .thenReturn(Long.toString(noOfRecords - 1)); + // Mockito.when( + // mockStreamingChannel.insertRows( + // ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class))) + // .thenReturn(new InsertValidationResponse()); + // + // Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken()) + // .thenReturn(Long.toString(noOfRecords - 1)); // Retry the insert again, now everything should be ingested and the offset token should be // noOfRecords-1 From 6996af5d7fd9767ddfaa2d59e66ff1221f8ee265 Mon Sep 17 00:00:00 2001 From: revi cheng Date: Wed, 18 Oct 2023 11:52:13 -0700 Subject: [PATCH 05/18] personal nit --- .../internal/streaming/TopicPartitionChannelTest.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java index d44d39e7f..dd32d24f0 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java @@ -453,15 +453,6 @@ public void testInsertRows_SuccessAfterReopenChannel() throws Exception { Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedGetOffsetCount)) .getLatestCommittedOffsetToken(); - // Now, it should be successful - // Mockito.when( - // mockStreamingChannel.insertRows( - // ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class))) - // .thenReturn(new InsertValidationResponse()); - // - // Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken()) - // .thenReturn(Long.toString(noOfRecords - 1)); - // Retry the insert again, now everything should be ingested and the offset token should be // noOfRecords-1 records.forEach(topicPartitionChannel::insertRecordToBuffer); From 09b2d6206d9f47a5f1b2d10794477415a572c05d Mon Sep 17 00:00:00 2001 From: revi cheng Date: Thu, 19 Oct 2023 17:21:46 -0700 Subject: [PATCH 06/18] fix tests to actually repro --- ...rrect_schema_evolution_w_auto_table_creation_avro_sr.json | 2 +- ..._correct_schema_evolution_w_auto_table_creation_json.json | 2 +- .../test_schema_evolution_w_auto_table_creation_avro_sr.py | 4 ---- .../test_schema_evolution_w_auto_table_creation_json.py | 5 +---- 4 files changed, 3 insertions(+), 10 deletions(-) diff --git a/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_avro_sr.json b/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_avro_sr.json index 6232e0237..ddea359aa 100644 --- a/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_avro_sr.json +++ b/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_avro_sr.json @@ -5,7 +5,7 @@ "topics": "SNOWFLAKE_TEST_TOPIC0,SNOWFLAKE_TEST_TOPIC1", "snowflake.topic2table.map": "SNOWFLAKE_TEST_TOPIC0:SNOWFLAKE_CONNECTOR_NAME,SNOWFLAKE_TEST_TOPIC1:SNOWFLAKE_CONNECTOR_NAME", "tasks.max": "1", - "buffer.flush.time": "10", + "buffer.flush.time": "60", "buffer.count.records": "300", "buffer.size.bytes": "5000000", "snowflake.url.name": "SNOWFLAKE_HOST", diff --git a/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_json.json b/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_json.json index e06328050..a51854566 100644 --- a/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_json.json +++ b/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_json.json @@ -5,7 +5,7 @@ "topics": "SNOWFLAKE_TEST_TOPIC0,SNOWFLAKE_TEST_TOPIC1", "snowflake.topic2table.map": "SNOWFLAKE_TEST_TOPIC0:SNOWFLAKE_CONNECTOR_NAME,SNOWFLAKE_TEST_TOPIC1:SNOWFLAKE_CONNECTOR_NAME", "tasks.max": "1", - "buffer.flush.time": "10", + "buffer.flush.time": "60", "buffer.count.records": "300", "buffer.size.bytes": "5000000", "snowflake.url.name": "SNOWFLAKE_HOST", diff --git a/test/test_suit/test_schema_evolution_w_auto_table_creation_avro_sr.py b/test/test_suit/test_schema_evolution_w_auto_table_creation_avro_sr.py index b95192633..91317160d 100644 --- a/test/test_suit/test_schema_evolution_w_auto_table_creation_avro_sr.py +++ b/test/test_suit/test_schema_evolution_w_auto_table_creation_avro_sr.py @@ -89,16 +89,12 @@ def send(self): value.append(self.records[i]) self.driver.sendAvroSRData(topic, value, self.valueSchema[i], key=[], key_schema="", partition=0) - sleep(2) - # send second batch that should flush value = [] for _ in range(self.flushRecordCount): value.append(self.records[i]) self.driver.sendAvroSRData(topic, value, self.valueSchema[i], key=[], key_schema="", partition=0) - sleep(10) # sleep to ensure all data is flushed - def verify(self, round): rows = self.driver.snowflake_conn.cursor().execute( "desc table {}".format(self.table)).fetchall() diff --git a/test/test_suit/test_schema_evolution_w_auto_table_creation_json.py b/test/test_suit/test_schema_evolution_w_auto_table_creation_json.py index cc0fbf453..e5d3a7157 100644 --- a/test/test_suit/test_schema_evolution_w_auto_table_creation_json.py +++ b/test/test_suit/test_schema_evolution_w_auto_table_creation_json.py @@ -61,8 +61,6 @@ def send(self): value.append(json.dumps(self.records[i]).encode('utf-8')) self.driver.sendBytesData(topic, value, key) - sleep(2) - # send second batch that should flush key = [] value = [] @@ -71,9 +69,8 @@ def send(self): value.append(json.dumps(self.records[i]).encode('utf-8')) self.driver.sendBytesData(topic, value, key) - sleep(10) # sleep to ensure all data is flushed - def verify(self, round): + sleep(60) rows = self.driver.snowflake_conn.cursor().execute( "desc table {}".format(self.table)).fetchall() res_col = {} From 8639534a57752bc7d863d17e354267c28c2770da Mon Sep 17 00:00:00 2001 From: revi cheng Date: Mon, 23 Oct 2023 14:34:56 -0700 Subject: [PATCH 07/18] remove drop table tests --- test/test_suites.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/test/test_suites.py b/test/test_suites.py index 001fc0a89..35067ba03 100644 --- a/test/test_suites.py +++ b/test/test_suites.py @@ -194,10 +194,10 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS test_instance=TestSchemaNotSupportedConverter(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), - ("TestSchemaEvolutionDropTable", EndToEndTestSuite( - test_instance=TestSchemaEvolutionDropTable(driver, nameSalt), clean=True, run_in_confluent=True, - run_in_apache=True - )), + # ("TestSchemaEvolutionDropTable", EndToEndTestSuite( + # test_instance=TestSchemaEvolutionDropTable(driver, nameSalt), clean=True, run_in_confluent=True, + # run_in_apache=True + # )), ("TestKcDeleteCreate", EndToEndTestSuite( test_instance=TestKcDeleteCreate(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), @@ -235,9 +235,9 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS ("TestKcRestart", EndToEndTestSuite( test_instance=TestKcRestart(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), - ("TestSchemaEvolutionMultiTopicDropTable", EndToEndTestSuite( - test_instance=TestSchemaEvolutionMultiTopicDropTable(driver, nameSalt), clean=True, run_in_confluent=True, - run_in_apache=True - )), + # ("TestSchemaEvolutionMultiTopicDropTable", EndToEndTestSuite( + # test_instance=TestSchemaEvolutionMultiTopicDropTable(driver, nameSalt), clean=True, run_in_confluent=True, + # run_in_apache=True + # )), ]) return test_suites From f84e0bcb80cd1c6ba717b21f07a491e507e550e4 Mon Sep 17 00:00:00 2001 From: revi cheng Date: Tue, 24 Oct 2023 18:36:19 -0700 Subject: [PATCH 08/18] delete drop table tests and revert insertrecordtobuffer --- .../streaming/TopicPartitionChannel.java | 19 +-- ...s_correct_schema_evolution_drop_table.json | 28 ----- ...hema_evolution_multi_topic_drop_table.json | 28 ----- .../test_schema_evolution_drop_table.py | 88 -------------- ...schema_evolution_multi_topic_drop_table.py | 109 ------------------ test/test_suites.py | 10 -- 6 files changed, 10 insertions(+), 272 deletions(-) delete mode 100644 test/rest_request_template/travis_correct_schema_evolution_drop_table.json delete mode 100644 test/rest_request_template/travis_correct_schema_evolution_multi_topic_drop_table.json delete mode 100644 test/test_suit/test_schema_evolution_drop_table.py delete mode 100644 test/test_suit/test_schema_evolution_multi_topic_drop_table.py diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java index 4c38caae7..7e6059ed9 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java @@ -325,31 +325,33 @@ public TopicPartitionChannel( * @param kafkaSinkRecord input record from Kafka */ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) { + final long currentOffsetPersistedInSnowflake = this.offsetPersistedInSnowflake.get(); + final long currentProcessedOffset = this.processedOffset.get(); + // Set the consumer offset to be the first record that Kafka sends us if (latestConsumerOffset.get() == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { this.latestConsumerOffset.set(kafkaSinkRecord.kafkaOffset()); } // Ignore adding to the buffer until we see the expected offset value - if (shouldIgnoreAddingRecordToBuffer(kafkaSinkRecord, this.processedOffset.get())) { + if (shouldIgnoreAddingRecordToBuffer(kafkaSinkRecord, currentProcessedOffset)) { return; } // Accept the incoming record only if we don't have a valid offset token at server side, or the // incoming record offset is 1 + the processed offset - if (this.processedOffset.get() == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE - || kafkaSinkRecord.kafkaOffset() >= this.processedOffset.get() + 1) { + if (currentProcessedOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE + || kafkaSinkRecord.kafkaOffset() >= currentProcessedOffset + 1) { StreamingBuffer copiedStreamingBuffer = null; bufferLock.lock(); try { this.streamingBuffer.insert(kafkaSinkRecord); this.processedOffset.set(kafkaSinkRecord.kafkaOffset()); - // # of records or size based flushing if (this.streamingBufferThreshold.shouldFlushOnBufferByteSize( - streamingBuffer.getBufferSizeBytes()) + streamingBuffer.getBufferSizeBytes()) || this.streamingBufferThreshold.shouldFlushOnBufferRecordCount( - streamingBuffer.getNumOfRecords())) { + streamingBuffer.getNumOfRecords())) { copiedStreamingBuffer = streamingBuffer; this.streamingBuffer = new StreamingBuffer(); LOGGER.debug( @@ -364,7 +366,6 @@ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) { } finally { bufferLock.unlock(); } - // If we found reaching buffer size threshold or count based threshold, we will immediately // flush (Insert them) if (copiedStreamingBuffer != null) { @@ -376,8 +377,8 @@ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) { + " offsetPersistedInSnowflake:{}, processedOffset:{}", kafkaSinkRecord.kafkaOffset(), this.getChannelName(), - this.offsetPersistedInSnowflake.get(), - this.processedOffset.get()); + currentOffsetPersistedInSnowflake, + currentProcessedOffset); } } diff --git a/test/rest_request_template/travis_correct_schema_evolution_drop_table.json b/test/rest_request_template/travis_correct_schema_evolution_drop_table.json deleted file mode 100644 index d8a5be570..000000000 --- a/test/rest_request_template/travis_correct_schema_evolution_drop_table.json +++ /dev/null @@ -1,28 +0,0 @@ -{ - "name": "SNOWFLAKE_CONNECTOR_NAME", - "config": { - "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector", - "topics": "SNOWFLAKE_TEST_TOPIC0", - "snowflake.topic2table.map": "SNOWFLAKE_TEST_TOPIC0:SNOWFLAKE_CONNECTOR_NAME", - "tasks.max": "1", - "buffer.flush.time": "10", - "buffer.count.records": "100", - "buffer.size.bytes": "5000000", - "snowflake.url.name": "SNOWFLAKE_HOST", - "snowflake.user.name": "SNOWFLAKE_USER", - "snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY", - "snowflake.database.name": "SNOWFLAKE_DATABASE", - "snowflake.schema.name": "SNOWFLAKE_SCHEMA", - "snowflake.role.name": "SNOWFLAKE_ROLE", - "snowflake.ingestion.method": "SNOWPIPE_STREAMING", - "key.converter": "org.apache.kafka.connect.storage.StringConverter", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter.schemas.enable": "false", - "jmx": "true", - "errors.tolerance": "all", - "errors.log.enable": true, - "errors.deadletterqueue.topic.name": "DLQ_TOPIC", - "errors.deadletterqueue.topic.replication.factor": 1, - "snowflake.enable.schematization": true - } -} \ No newline at end of file diff --git a/test/rest_request_template/travis_correct_schema_evolution_multi_topic_drop_table.json b/test/rest_request_template/travis_correct_schema_evolution_multi_topic_drop_table.json deleted file mode 100644 index 38e50ab1b..000000000 --- a/test/rest_request_template/travis_correct_schema_evolution_multi_topic_drop_table.json +++ /dev/null @@ -1,28 +0,0 @@ -{ - "name": "SNOWFLAKE_CONNECTOR_NAME", - "config": { - "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector", - "topics": "SNOWFLAKE_TEST_TOPIC0,SNOWFLAKE_TEST_TOPIC1", - "snowflake.topic2table.map": "SNOWFLAKE_TEST_TOPIC0:SNOWFLAKE_CONNECTOR_NAME,SNOWFLAKE_TEST_TOPIC1:SNOWFLAKE_CONNECTOR_NAME", - "tasks.max": "1", - "buffer.flush.time": "10", - "buffer.count.records": "100", - "buffer.size.bytes": "5000000", - "snowflake.url.name": "SNOWFLAKE_HOST", - "snowflake.user.name": "SNOWFLAKE_USER", - "snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY", - "snowflake.database.name": "SNOWFLAKE_DATABASE", - "snowflake.schema.name": "SNOWFLAKE_SCHEMA", - "snowflake.role.name": "SNOWFLAKE_ROLE", - "snowflake.ingestion.method": "SNOWPIPE_STREAMING", - "key.converter": "org.apache.kafka.connect.storage.StringConverter", - "value.converter": "org.apache.kafka.connect.json.JsonConverter", - "value.converter.schemas.enable": "false", - "jmx": "true", - "errors.tolerance": "all", - "errors.log.enable": true, - "errors.deadletterqueue.topic.name": "DLQ_TOPIC", - "errors.deadletterqueue.topic.replication.factor": 1, - "snowflake.enable.schematization": true - } -} \ No newline at end of file diff --git a/test/test_suit/test_schema_evolution_drop_table.py b/test/test_suit/test_schema_evolution_drop_table.py deleted file mode 100644 index 06c2cf49f..000000000 --- a/test/test_suit/test_schema_evolution_drop_table.py +++ /dev/null @@ -1,88 +0,0 @@ -import json -from time import sleep - -from test_suit.test_utils import NonRetryableError - - -# test if the table is updated with the correct column, and if the table is -# recreated and updated after it's being dropped -class TestSchemaEvolutionDropTable: - def __init__(self, driver, nameSalt): - self.driver = driver - self.fileName = "travis_correct_schema_evolution_drop_table" - self.table = self.fileName + nameSalt - self.recordNum = 100 - self.topic = self.table + "0" - - self.driver.snowflake_conn.cursor().execute( - "Create or replace table {} (PERFORMANCE_STRING STRING)".format(self.table)) - self.driver.snowflake_conn.cursor().execute( - "alter table {} set ENABLE_SCHEMA_EVOLUTION = true".format(self.table)) - - self.record = { - 'PERFORMANCE_STRING': 'Excellent', - '"case_sensitive_PERFORMANCE_CHAR"': 'A', - 'RATING_INT': 100 - } - - self.gold_type = { - 'PERFORMANCE_STRING': 'VARCHAR', - 'case_sensitive_PERFORMANCE_CHAR': 'VARCHAR', - 'RATING_INT': 'NUMBER', - 'RECORD_METADATA': 'VARIANT' - } - - self.gold_columns = [columnName for columnName in self.gold_type] - - def getConfigFileName(self): - return self.fileName + ".json" - - def send(self): - key = [] - value = [] - for e in range(self.recordNum): - key.append(json.dumps({'number': str(e)}).encode('utf-8')) - value.append(json.dumps(self.record).encode('utf-8')) - self.driver.sendBytesData(self.topic, value, key) - - # Sleep for some time and then verify the rows are ingested - sleep(120) - self.verify("0") - - # Recreate the table - self.driver.snowflake_conn.cursor().execute( - "Create or replace table {} (PERFORMANCE_STRING STRING, RECORD_METADATA VARIANT)".format(self.table)) - self.driver.snowflake_conn.cursor().execute( - "alter table {} set ENABLE_SCHEMA_EVOLUTION = true".format(self.table)) - - # Ingest another set of rows - self.driver.sendBytesData(self.topic, value, key) - - def verify(self, round): - rows = self.driver.snowflake_conn.cursor().execute( - "desc table {}".format(self.table)).fetchall() - res_col = {} - - gold_columns_copy = self.gold_columns.copy() - - for index, row in enumerate(rows): - gold_columns_copy.remove(row[0]) - if not row[1].startswith(self.gold_type[row[0]]): - raise NonRetryableError("Column {} has the wrong type. got: {}, expected: {}".format(row[0], row[1], - self.gold_type[ - row[0]])) - res_col[row[0]] = index - - print("Columns not in table: ", gold_columns_copy) - - for columnName in gold_columns_copy: - raise NonRetryableError("Column {} was not created".format(columnName)) - - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.table)).fetchone()[0] - if res != self.recordNum: - print("Number of record expected: {}, got: {}".format(self.recordNum, res)) - raise NonRetryableError("Number of record in table is different from number of record sent") - - def clean(self): - self.driver.cleanTableStagePipe(self.table) diff --git a/test/test_suit/test_schema_evolution_multi_topic_drop_table.py b/test/test_suit/test_schema_evolution_multi_topic_drop_table.py deleted file mode 100644 index b72a24408..000000000 --- a/test/test_suit/test_schema_evolution_multi_topic_drop_table.py +++ /dev/null @@ -1,109 +0,0 @@ -import json -from time import sleep - -from test_suit.test_utils import NonRetryableError - - -# test if the table is updated with the correct column, and if the table is -# recreated and updated after it's being dropped -class TestSchemaEvolutionMultiTopicDropTable: - def __init__(self, driver, nameSalt): - self.driver = driver - self.fileName = "travis_correct_schema_evolution_multi_topic_drop_table" - self.topics = [] - self.table = self.fileName + nameSalt - self.recordNum = 100 - - for i in range(2): - self.topics.append(self.table + str(i)) - - self.driver.snowflake_conn.cursor().execute( - "Create or replace table {} (PERFORMANCE_STRING STRING)".format(self.table)) - - self.driver.snowflake_conn.cursor().execute( - "alter table {} set ENABLE_SCHEMA_EVOLUTION = true".format(self.table)) - - self.records = [] - - self.records.append({ - 'PERFORMANCE_STRING': 'Excellent', - '"case_sensitive_PERFORMANCE_CHAR"': 'A', - 'RATING_INT': 100 - }) - - self.records.append({ - 'PERFORMANCE_STRING': 'Excellent', - 'RATING_DOUBLE': 0.99, - 'APPROVAL': True - }) - - self.gold_type = { - 'PERFORMANCE_STRING': 'VARCHAR', - 'case_sensitive_PERFORMANCE_CHAR': 'VARCHAR', - 'RATING_INT': 'NUMBER', - 'RATING_DOUBLE': 'FLOAT', - 'APPROVAL': 'BOOLEAN', - 'RECORD_METADATA': 'VARIANT' - } - - self.gold_columns = [columnName for columnName in self.gold_type] - - def getConfigFileName(self): - return self.fileName + ".json" - - def send(self): - for i, topic in enumerate(self.topics): - key = [] - value = [] - for e in range(self.recordNum): - key.append(json.dumps({'number': str(e)}).encode('utf-8')) - value.append(json.dumps(self.records[i]).encode('utf-8')) - self.driver.sendBytesData(topic, value, key) - - # Sleep for some time and then verify the rows are ingested - sleep(120) - self.verify("0") - - # Recreate the table - self.driver.snowflake_conn.cursor().execute( - "Create or replace table {} (PERFORMANCE_STRING STRING, RECORD_METADATA VARIANT)".format(self.table)) - self.driver.snowflake_conn.cursor().execute( - "alter table {} set ENABLE_SCHEMA_EVOLUTION = true".format(self.table)) - - # Ingest another set of rows - for i, topic in enumerate(self.topics): - key = [] - value = [] - for e in range(self.recordNum): - key.append(json.dumps({'number': str(e)}).encode('utf-8')) - value.append(json.dumps(self.records[i]).encode('utf-8')) - self.driver.sendBytesData(topic, value, key) - - def verify(self, round): - rows = self.driver.snowflake_conn.cursor().execute( - "desc table {}".format(self.table)).fetchall() - res_col = {} - - gold_columns_copy = self.gold_columns.copy() - - for index, row in enumerate(rows): - gold_columns_copy.remove(row[0]) - if not row[1].startswith(self.gold_type[row[0]]): - raise NonRetryableError("Column {} has the wrong type. got: {}, expected: {}".format(row[0], row[1], - self.gold_type[ - row[0]])) - res_col[row[0]] = index - - print("Columns not in table: ", gold_columns_copy) - - for columnName in gold_columns_copy: - raise NonRetryableError("Column {} was not created".format(columnName)) - - res = self.driver.snowflake_conn.cursor().execute( - "SELECT count(*) FROM {}".format(self.table)).fetchone()[0] - if res != self.recordNum * len(self.topics): - print("Number of record expected: {}, got: {}".format(self.recordNum * len(self.topics), res)) - raise NonRetryableError("Number of record in table is different from number of record sent") - - def clean(self): - self.driver.cleanTableStagePipe(self.table) diff --git a/test/test_suites.py b/test/test_suites.py index 35067ba03..2085808fa 100644 --- a/test/test_suites.py +++ b/test/test_suites.py @@ -25,10 +25,8 @@ from test_suit.test_native_string_json_without_schema import TestNativeStringJsonWithoutSchema from test_suit.test_native_string_protobuf import TestNativeStringProtobuf from test_suit.test_schema_evolution_avro_sr import TestSchemaEvolutionAvroSR -from test_suit.test_schema_evolution_drop_table import TestSchemaEvolutionDropTable from test_suit.test_schema_evolution_json import TestSchemaEvolutionJson from test_suit.test_schema_evolution_json_ignore_tombstone import TestSchemaEvolutionJsonIgnoreTombstone -from test_suit.test_schema_evolution_multi_topic_drop_table import TestSchemaEvolutionMultiTopicDropTable from test_suit.test_schema_evolution_nonnullable_json import TestSchemaEvolutionNonNullableJson from test_suit.test_schema_evolution_w_auto_table_creation_avro_sr import \ TestSchemaEvolutionWithAutoTableCreationAvroSR @@ -194,10 +192,6 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS test_instance=TestSchemaNotSupportedConverter(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), - # ("TestSchemaEvolutionDropTable", EndToEndTestSuite( - # test_instance=TestSchemaEvolutionDropTable(driver, nameSalt), clean=True, run_in_confluent=True, - # run_in_apache=True - # )), ("TestKcDeleteCreate", EndToEndTestSuite( test_instance=TestKcDeleteCreate(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), @@ -235,9 +229,5 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS ("TestKcRestart", EndToEndTestSuite( test_instance=TestKcRestart(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), - # ("TestSchemaEvolutionMultiTopicDropTable", EndToEndTestSuite( - # test_instance=TestSchemaEvolutionMultiTopicDropTable(driver, nameSalt), clean=True, run_in_confluent=True, - # run_in_apache=True - # )), ]) return test_suites From fbd1df254e5aaa55a9d45eb57b7a82b910dc58ae Mon Sep 17 00:00:00 2001 From: revi cheng Date: Tue, 24 Oct 2023 18:36:22 -0700 Subject: [PATCH 09/18] autoformatting --- .../connector/internal/streaming/TopicPartitionChannel.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java index 7e6059ed9..b79309f9e 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java @@ -349,9 +349,9 @@ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) { this.processedOffset.set(kafkaSinkRecord.kafkaOffset()); // # of records or size based flushing if (this.streamingBufferThreshold.shouldFlushOnBufferByteSize( - streamingBuffer.getBufferSizeBytes()) + streamingBuffer.getBufferSizeBytes()) || this.streamingBufferThreshold.shouldFlushOnBufferRecordCount( - streamingBuffer.getNumOfRecords())) { + streamingBuffer.getNumOfRecords())) { copiedStreamingBuffer = streamingBuffer; this.streamingBuffer = new StreamingBuffer(); LOGGER.debug( From 669e182a74fa8c9153ec6794af731625d327d4b9 Mon Sep 17 00:00:00 2001 From: revi cheng Date: Wed, 25 Oct 2023 12:30:10 -0700 Subject: [PATCH 10/18] add random row count and small pr comments --- .../streaming/TopicPartitionChannel.java | 9 +- ...t_schema_evolution_w_random_row_count.json | 28 ++++++ ...est_schema_evolution_w_random_row_count.py | 99 +++++++++++++++++++ test/test_suites.py | 6 ++ 4 files changed, 135 insertions(+), 7 deletions(-) create mode 100644 test/rest_request_template/test_schema_evolution_w_random_row_count.json create mode 100644 test/test_suit/test_schema_evolution_w_random_row_count.py diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java index b79309f9e..3b6a46a2f 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java @@ -126,7 +126,7 @@ public class TopicPartitionChannel { *

    This boolean is used to indicate that we reset offset in kafka and we will only buffer once * we see the offset which is one more than an offset present in Snowflake. */ - private boolean isOffsetResetInKafka = false; // TODO @rcheng question: atomic? + private boolean isOffsetResetInKafka = false; private final SnowflakeStreamingIngestClient streamingIngestClient; @@ -399,12 +399,7 @@ private boolean shouldIgnoreAddingRecordToBuffer( SinkRecord kafkaSinkRecord, final long currentProcessedOffset) { // Don't skip rows if there is no offset reset and there is no offset token information in the // channel - if (!isOffsetResetInKafka - && currentProcessedOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { - LOGGER.debug( - "No offset registered in Snowflake and offset is not being reset, we can add this offset" - + " to buffer for channel:{}", - currentProcessedOffset); + if (!isOffsetResetInKafka) { return false; } diff --git a/test/rest_request_template/test_schema_evolution_w_random_row_count.json b/test/rest_request_template/test_schema_evolution_w_random_row_count.json new file mode 100644 index 000000000..a51854566 --- /dev/null +++ b/test/rest_request_template/test_schema_evolution_w_random_row_count.json @@ -0,0 +1,28 @@ +{ + "name": "SNOWFLAKE_CONNECTOR_NAME", + "config": { + "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector", + "topics": "SNOWFLAKE_TEST_TOPIC0,SNOWFLAKE_TEST_TOPIC1", + "snowflake.topic2table.map": "SNOWFLAKE_TEST_TOPIC0:SNOWFLAKE_CONNECTOR_NAME,SNOWFLAKE_TEST_TOPIC1:SNOWFLAKE_CONNECTOR_NAME", + "tasks.max": "1", + "buffer.flush.time": "60", + "buffer.count.records": "300", + "buffer.size.bytes": "5000000", + "snowflake.url.name": "SNOWFLAKE_HOST", + "snowflake.user.name": "SNOWFLAKE_USER", + "snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY", + "snowflake.database.name": "SNOWFLAKE_DATABASE", + "snowflake.schema.name": "SNOWFLAKE_SCHEMA", + "snowflake.role.name": "SNOWFLAKE_ROLE", + "snowflake.ingestion.method": "SNOWPIPE_STREAMING", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "jmx": "true", + "errors.tolerance": "all", + "errors.log.enable": true, + "errors.deadletterqueue.topic.name": "DLQ_TOPIC", + "errors.deadletterqueue.topic.replication.factor": 1, + "snowflake.enable.schematization": true + } +} \ No newline at end of file diff --git a/test/test_suit/test_schema_evolution_w_random_row_count.py b/test/test_suit/test_schema_evolution_w_random_row_count.py new file mode 100644 index 000000000..b2dd6b01f --- /dev/null +++ b/test/test_suit/test_schema_evolution_w_random_row_count.py @@ -0,0 +1,99 @@ +import json +import random + +from test_suit.test_utils import NonRetryableError +from time import sleep + + +# test if the ingestion works when the schematization alter table invalidation happens +# halfway through a batch +class TestSchemaEvolutionWithRandomRowCount: + def __init__(self, driver, nameSalt): + self.driver = driver + self.fileName = "test_schema_evolution_w_random_row_count" + self.topics = [] + self.table = self.fileName + nameSalt + + # records + self.initialRecordCount = random.randrange(1,300) + self.flushRecordCount = 300 + self.recordNum = self.initialRecordCount + self.flushRecordCount + + for i in range(2): + self.topics.append(self.table + str(i)) + + self.records = [] + + self.records.append({ + 'PERFORMANCE_STRING': 'Excellent', + 'PERFORMANCE_CHAR': 'A', + 'RATING_INT': 100 + }) + + self.records.append({ + 'PERFORMANCE_STRING': 'Excellent', + 'RATING_DOUBLE': 0.99, + 'APPROVAL': True + }) + + self.gold_type = { + 'PERFORMANCE_STRING': 'VARCHAR', + 'PERFORMANCE_CHAR': 'VARCHAR', + 'RATING_INT': 'NUMBER', + 'RATING_DOUBLE': 'FLOAT', + 'APPROVAL': 'BOOLEAN', + 'RECORD_METADATA': 'VARIANT' + } + + self.gold_columns = [columnName for columnName in self.gold_type] + + def getConfigFileName(self): + return self.fileName + ".json" + + def send(self): + print("Got random record count of {}".format(str(self.initialRecordCount))) + + for i, topic in enumerate(self.topics): + # send initial batch + key = [] + value = [] + for e in range(self.initialRecordCount): + key.append(json.dumps({'number': str(e)}).encode('utf-8')) + value.append(json.dumps(self.records[i]).encode('utf-8')) + self.driver.sendBytesData(topic, value, key) + + # send second batch that should flush + key = [] + value = [] + for e in range(self.flushRecordCount): + key.append(json.dumps({'number': str(e)}).encode('utf-8')) + value.append(json.dumps(self.records[i]).encode('utf-8')) + self.driver.sendBytesData(topic, value, key) + + def verify(self, round): + sleep(60) + rows = self.driver.snowflake_conn.cursor().execute( + "desc table {}".format(self.table)).fetchall() + res_col = {} + + for index, row in enumerate(rows): + self.gold_columns.remove(row[0]) + if not row[1].startswith(self.gold_type[row[0]]): + raise NonRetryableError("Column {} has the wrong type. got: {}, expected: {}".format(row[0], row[1], + self.gold_type[ + row[0]])) + res_col[row[0]] = index + + print("Columns not in table: ", self.gold_columns) + + for columnName in self.gold_columns: + raise NonRetryableError("Column {} was not created".format(columnName)) + + res = self.driver.snowflake_conn.cursor().execute( + "SELECT count(*) FROM {}".format(self.table)).fetchone()[0] + if res != len(self.topics) * self.recordNum: + print("Number of record expected: {}, got: {}".format(len(self.topics) * self.recordNum, res)) + raise NonRetryableError("Number of record in table is different from number of record sent") + + def clean(self): + self.driver.cleanTableStagePipe(self.table) diff --git a/test/test_suites.py b/test/test_suites.py index a5dbed080..7430a86ef 100644 --- a/test/test_suites.py +++ b/test/test_suites.py @@ -32,6 +32,8 @@ TestSchemaEvolutionWithAutoTableCreationAvroSR from test_suit.test_schema_evolution_w_auto_table_creation_json import \ TestSchemaEvolutionWithAutoTableCreationJson +from test_suit.test_schema_evolution_w_random_row_count import \ + TestSchemaEvolutionWithRandomRowCount from test_suit.test_schema_mapping import TestSchemaMapping from test_suit.test_schema_not_supported_converter import TestSchemaNotSupportedConverter from test_suit.test_snowpipe_streaming_schema_mapping_dlq import TestSnowpipeStreamingSchemaMappingDLQ @@ -185,6 +187,10 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS test_instance=TestSchemaEvolutionWithAutoTableCreationAvroSR(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=False )), + ("TestSchemaEvolutionWithRandomRowCount", EndToEndTestSuite( + test_instance=TestSchemaEvolutionWithRandomRowCount(driver, nameSalt), clean=True, + run_in_confluent=True, run_in_apache=True + )), ("TestSchemaEvolutionNonNullableJson", EndToEndTestSuite( test_instance=TestSchemaEvolutionNonNullableJson(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True From 8fe850a82fa752c0deb5b3e837a1717c2e9ae529 Mon Sep 17 00:00:00 2001 From: revi cheng Date: Tue, 31 Oct 2023 11:25:12 -0700 Subject: [PATCH 11/18] edit comment --- .../connector/internal/streaming/TopicPartitionChannel.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java index 3b6a46a2f..b46f4f3ce 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java @@ -397,8 +397,7 @@ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) { */ private boolean shouldIgnoreAddingRecordToBuffer( SinkRecord kafkaSinkRecord, final long currentProcessedOffset) { - // Don't skip rows if there is no offset reset and there is no offset token information in the - // channel + // Don't skip rows if there is no offset reset if (!isOffsetResetInKafka) { return false; } From abc31215a8f1f4a64e64699a1c9ed0cc51c48d5a Mon Sep 17 00:00:00 2001 From: revi cheng Date: Tue, 31 Oct 2023 11:31:34 -0700 Subject: [PATCH 12/18] remove droptable from testsuites --- test/test_suites.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/test/test_suites.py b/test/test_suites.py index 8616babdd..83cea14b0 100644 --- a/test/test_suites.py +++ b/test/test_suites.py @@ -198,10 +198,6 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS test_instance=TestSchemaNotSupportedConverter(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), - ("TestSchemaEvolutionDropTable", EndToEndTestSuite( - test_instance=TestSchemaEvolutionDropTable(driver, nameSalt), clean=True, run_in_confluent=True, - run_in_apache=True - )), ("TestKcDeleteCreate", EndToEndTestSuite( test_instance=TestKcDeleteCreate(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), From bde023332be013c37bf3a985ee9866cec0c54b2d Mon Sep 17 00:00:00 2001 From: revi cheng Date: Wed, 1 Nov 2023 09:50:13 -0700 Subject: [PATCH 13/18] remove sleeps --- .../test_schema_evolution_w_auto_table_creation_avro_sr.py | 1 - .../test_schema_evolution_w_auto_table_creation_json.py | 2 -- test/test_suit/test_schema_evolution_w_random_row_count.py | 2 -- 3 files changed, 5 deletions(-) diff --git a/test/test_suit/test_schema_evolution_w_auto_table_creation_avro_sr.py b/test/test_suit/test_schema_evolution_w_auto_table_creation_avro_sr.py index 91317160d..81257b63c 100644 --- a/test/test_suit/test_schema_evolution_w_auto_table_creation_avro_sr.py +++ b/test/test_suit/test_schema_evolution_w_auto_table_creation_avro_sr.py @@ -1,6 +1,5 @@ from confluent_kafka import avro from test_suit.test_utils import NonRetryableError -from time import sleep # test if the table is updated with the correct column diff --git a/test/test_suit/test_schema_evolution_w_auto_table_creation_json.py b/test/test_suit/test_schema_evolution_w_auto_table_creation_json.py index e5d3a7157..103279ea6 100644 --- a/test/test_suit/test_schema_evolution_w_auto_table_creation_json.py +++ b/test/test_suit/test_schema_evolution_w_auto_table_creation_json.py @@ -1,7 +1,6 @@ import json from test_suit.test_utils import NonRetryableError -from time import sleep # test if the table is updated with the correct column @@ -70,7 +69,6 @@ def send(self): self.driver.sendBytesData(topic, value, key) def verify(self, round): - sleep(60) rows = self.driver.snowflake_conn.cursor().execute( "desc table {}".format(self.table)).fetchall() res_col = {} diff --git a/test/test_suit/test_schema_evolution_w_random_row_count.py b/test/test_suit/test_schema_evolution_w_random_row_count.py index b2dd6b01f..fbd26b977 100644 --- a/test/test_suit/test_schema_evolution_w_random_row_count.py +++ b/test/test_suit/test_schema_evolution_w_random_row_count.py @@ -2,7 +2,6 @@ import random from test_suit.test_utils import NonRetryableError -from time import sleep # test if the ingestion works when the schematization alter table invalidation happens @@ -71,7 +70,6 @@ def send(self): self.driver.sendBytesData(topic, value, key) def verify(self, round): - sleep(60) rows = self.driver.snowflake_conn.cursor().execute( "desc table {}".format(self.table)).fetchall() res_col = {} From 6583b55c575c911754d2437c77d8edc95980232a Mon Sep 17 00:00:00 2001 From: revi cheng Date: Wed, 1 Nov 2023 09:50:16 -0700 Subject: [PATCH 14/18] autoformatting --- .../kafka/connector/internal/SnowflakeConnectionServiceV1.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java index cd5e5184e..8457c31d3 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java @@ -488,7 +488,8 @@ public boolean hasSchemaEvolutionPermission(String tableName, String role) { public void appendColumnsToTable(String tableName, Map columnToType) { checkConnection(); InternalUtils.assertNotEmpty("tableName", tableName); - StringBuilder appendColumnQuery = new StringBuilder("alter table identifier(?) add column if not exists "); + StringBuilder appendColumnQuery = + new StringBuilder("alter table identifier(?) add column if not exists "); boolean first = true; StringBuilder logColumn = new StringBuilder("["); for (String columnName : columnToType.keySet()) { From 604ae6b70cf02f21371abf4b0afbcebb0827e24d Mon Sep 17 00:00:00 2001 From: revi cheng Date: Tue, 7 Nov 2023 14:35:45 -0800 Subject: [PATCH 15/18] add it --- .../streaming/TopicPartitionChannelIT.java | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java index 99bf7f30a..f2c6cf0b1 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java @@ -18,6 +18,9 @@ import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.After; import org.junit.Assert; @@ -485,4 +488,65 @@ public void testSimpleInsertRowsFailureWithArrowBDECFormat() throws Exception { service.insert(records); service.closeAll(); } + + @Test + public void testPartialBatchChannelInvalidationIngestion_schematization() throws Exception { + Map config = TestUtils.getConfForStreaming(); + config.put(SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS, "500"); // we want to flush on record + config.put(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC, "500000"); + config.put(SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES, "500000"); + config.put(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, "true"); // using schematization to invalidate + + // setup + InMemorySinkTaskContext inMemorySinkTaskContext = + new InMemorySinkTaskContext(Collections.singleton(topicPartition)); + SnowflakeSinkService service = + SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config) + .setRecordNumber(1) + .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) + .setSinkTaskContext(inMemorySinkTaskContext) + .addTask(testTableName, topicPartition) + .build(); + + final long firstBatchCount = 18; + final long secondBatchCount = 500; + + // create 18 blank records that do not kick off schematization + JsonConverter converter = new JsonConverter(); + HashMap converterConfig = new HashMap<>(); + converterConfig.put("schemas.enable", "false"); + converter.configure(converterConfig, false); + SchemaAndValue schemaInputValue = converter.toConnectData("test", null); + + List firstBatch = new ArrayList<>(); + for (int i = 0; i < firstBatchCount; i++) { + firstBatch.add(new SinkRecord( + topic, + PARTITION, + Schema.STRING_SCHEMA, + "test", + schemaInputValue.schema(), + schemaInputValue.value(), + i)); + } + + service.insert(firstBatch); + + // send batch with 500, should kick off a record based flush and schematization on record 19, which will fail the batches + List secondBatch = + TestUtils.createNativeJsonSinkRecords(firstBatchCount, secondBatchCount, topic, PARTITION); + service.insert(secondBatch); + + // resend batch 1 and 2 because 2 failed for schematization + service.insert(firstBatch); + service.insert(secondBatch); + + // ensure all data was ingested + TestUtils.assertWithRetry( + () -> service.getOffset(new TopicPartition(topic, PARTITION)) == firstBatchCount + secondBatchCount, 20, 5); + assert TestUtils.tableSize(testTableName) == firstBatchCount + secondBatchCount + : "expected: " + firstBatchCount + secondBatchCount + " actual: " + TestUtils.tableSize(testTableName); + + service.closeAll(); + } } From 152b28a6b829f16a79b67e664a4965087d31f69a Mon Sep 17 00:00:00 2001 From: revi cheng Date: Tue, 7 Nov 2023 14:35:57 -0800 Subject: [PATCH 16/18] autoformatting --- .../streaming/TopicPartitionChannelIT.java | 39 ++++++++++++------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java index f2c6cf0b1..e4176d970 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java @@ -492,10 +492,13 @@ public void testSimpleInsertRowsFailureWithArrowBDECFormat() throws Exception { @Test public void testPartialBatchChannelInvalidationIngestion_schematization() throws Exception { Map config = TestUtils.getConfForStreaming(); - config.put(SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS, "500"); // we want to flush on record + config.put( + SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS, "500"); // we want to flush on record config.put(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC, "500000"); config.put(SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES, "500000"); - config.put(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, "true"); // using schematization to invalidate + config.put( + SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, + "true"); // using schematization to invalidate // setup InMemorySinkTaskContext inMemorySinkTaskContext = @@ -520,19 +523,21 @@ public void testPartialBatchChannelInvalidationIngestion_schematization() throws List firstBatch = new ArrayList<>(); for (int i = 0; i < firstBatchCount; i++) { - firstBatch.add(new SinkRecord( - topic, - PARTITION, - Schema.STRING_SCHEMA, - "test", - schemaInputValue.schema(), - schemaInputValue.value(), - i)); + firstBatch.add( + new SinkRecord( + topic, + PARTITION, + Schema.STRING_SCHEMA, + "test", + schemaInputValue.schema(), + schemaInputValue.value(), + i)); } service.insert(firstBatch); - // send batch with 500, should kick off a record based flush and schematization on record 19, which will fail the batches + // send batch with 500, should kick off a record based flush and schematization on record 19, + // which will fail the batches List secondBatch = TestUtils.createNativeJsonSinkRecords(firstBatchCount, secondBatchCount, topic, PARTITION); service.insert(secondBatch); @@ -543,9 +548,17 @@ public void testPartialBatchChannelInvalidationIngestion_schematization() throws // ensure all data was ingested TestUtils.assertWithRetry( - () -> service.getOffset(new TopicPartition(topic, PARTITION)) == firstBatchCount + secondBatchCount, 20, 5); + () -> + service.getOffset(new TopicPartition(topic, PARTITION)) + == firstBatchCount + secondBatchCount, + 20, + 5); assert TestUtils.tableSize(testTableName) == firstBatchCount + secondBatchCount - : "expected: " + firstBatchCount + secondBatchCount + " actual: " + TestUtils.tableSize(testTableName); + : "expected: " + + firstBatchCount + + secondBatchCount + + " actual: " + + TestUtils.tableSize(testTableName); service.closeAll(); } From 9e32250401877688a9297ae07131fb02caa8c770 Mon Sep 17 00:00:00 2001 From: revi cheng Date: Tue, 7 Nov 2023 14:44:42 -0800 Subject: [PATCH 17/18] add test back disabled --- .../streaming/TopicPartitionChannel.java | 1 + ...s_correct_schema_evolution_drop_table.json | 28 +++++ ...hema_evolution_multi_topic_drop_table.json | 28 +++++ .../test_schema_evolution_drop_table.py | 88 ++++++++++++++ ...schema_evolution_multi_topic_drop_table.py | 109 ++++++++++++++++++ test/test_suites.py | 9 ++ 6 files changed, 263 insertions(+) create mode 100644 test/rest_request_template/travis_correct_schema_evolution_drop_table.json create mode 100644 test/rest_request_template/travis_correct_schema_evolution_multi_topic_drop_table.json create mode 100644 test/test_suit/test_schema_evolution_drop_table.py create mode 100644 test/test_suit/test_schema_evolution_multi_topic_drop_table.py diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java index 96dc55277..778f740dd 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java @@ -366,6 +366,7 @@ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) { } finally { bufferLock.unlock(); } + // If we found reaching buffer size threshold or count based threshold, we will immediately // flush (Insert them) if (copiedStreamingBuffer != null) { diff --git a/test/rest_request_template/travis_correct_schema_evolution_drop_table.json b/test/rest_request_template/travis_correct_schema_evolution_drop_table.json new file mode 100644 index 000000000..d8a5be570 --- /dev/null +++ b/test/rest_request_template/travis_correct_schema_evolution_drop_table.json @@ -0,0 +1,28 @@ +{ + "name": "SNOWFLAKE_CONNECTOR_NAME", + "config": { + "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector", + "topics": "SNOWFLAKE_TEST_TOPIC0", + "snowflake.topic2table.map": "SNOWFLAKE_TEST_TOPIC0:SNOWFLAKE_CONNECTOR_NAME", + "tasks.max": "1", + "buffer.flush.time": "10", + "buffer.count.records": "100", + "buffer.size.bytes": "5000000", + "snowflake.url.name": "SNOWFLAKE_HOST", + "snowflake.user.name": "SNOWFLAKE_USER", + "snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY", + "snowflake.database.name": "SNOWFLAKE_DATABASE", + "snowflake.schema.name": "SNOWFLAKE_SCHEMA", + "snowflake.role.name": "SNOWFLAKE_ROLE", + "snowflake.ingestion.method": "SNOWPIPE_STREAMING", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "jmx": "true", + "errors.tolerance": "all", + "errors.log.enable": true, + "errors.deadletterqueue.topic.name": "DLQ_TOPIC", + "errors.deadletterqueue.topic.replication.factor": 1, + "snowflake.enable.schematization": true + } +} \ No newline at end of file diff --git a/test/rest_request_template/travis_correct_schema_evolution_multi_topic_drop_table.json b/test/rest_request_template/travis_correct_schema_evolution_multi_topic_drop_table.json new file mode 100644 index 000000000..38e50ab1b --- /dev/null +++ b/test/rest_request_template/travis_correct_schema_evolution_multi_topic_drop_table.json @@ -0,0 +1,28 @@ +{ + "name": "SNOWFLAKE_CONNECTOR_NAME", + "config": { + "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector", + "topics": "SNOWFLAKE_TEST_TOPIC0,SNOWFLAKE_TEST_TOPIC1", + "snowflake.topic2table.map": "SNOWFLAKE_TEST_TOPIC0:SNOWFLAKE_CONNECTOR_NAME,SNOWFLAKE_TEST_TOPIC1:SNOWFLAKE_CONNECTOR_NAME", + "tasks.max": "1", + "buffer.flush.time": "10", + "buffer.count.records": "100", + "buffer.size.bytes": "5000000", + "snowflake.url.name": "SNOWFLAKE_HOST", + "snowflake.user.name": "SNOWFLAKE_USER", + "snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY", + "snowflake.database.name": "SNOWFLAKE_DATABASE", + "snowflake.schema.name": "SNOWFLAKE_SCHEMA", + "snowflake.role.name": "SNOWFLAKE_ROLE", + "snowflake.ingestion.method": "SNOWPIPE_STREAMING", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "jmx": "true", + "errors.tolerance": "all", + "errors.log.enable": true, + "errors.deadletterqueue.topic.name": "DLQ_TOPIC", + "errors.deadletterqueue.topic.replication.factor": 1, + "snowflake.enable.schematization": true + } +} \ No newline at end of file diff --git a/test/test_suit/test_schema_evolution_drop_table.py b/test/test_suit/test_schema_evolution_drop_table.py new file mode 100644 index 000000000..06c2cf49f --- /dev/null +++ b/test/test_suit/test_schema_evolution_drop_table.py @@ -0,0 +1,88 @@ +import json +from time import sleep + +from test_suit.test_utils import NonRetryableError + + +# test if the table is updated with the correct column, and if the table is +# recreated and updated after it's being dropped +class TestSchemaEvolutionDropTable: + def __init__(self, driver, nameSalt): + self.driver = driver + self.fileName = "travis_correct_schema_evolution_drop_table" + self.table = self.fileName + nameSalt + self.recordNum = 100 + self.topic = self.table + "0" + + self.driver.snowflake_conn.cursor().execute( + "Create or replace table {} (PERFORMANCE_STRING STRING)".format(self.table)) + self.driver.snowflake_conn.cursor().execute( + "alter table {} set ENABLE_SCHEMA_EVOLUTION = true".format(self.table)) + + self.record = { + 'PERFORMANCE_STRING': 'Excellent', + '"case_sensitive_PERFORMANCE_CHAR"': 'A', + 'RATING_INT': 100 + } + + self.gold_type = { + 'PERFORMANCE_STRING': 'VARCHAR', + 'case_sensitive_PERFORMANCE_CHAR': 'VARCHAR', + 'RATING_INT': 'NUMBER', + 'RECORD_METADATA': 'VARIANT' + } + + self.gold_columns = [columnName for columnName in self.gold_type] + + def getConfigFileName(self): + return self.fileName + ".json" + + def send(self): + key = [] + value = [] + for e in range(self.recordNum): + key.append(json.dumps({'number': str(e)}).encode('utf-8')) + value.append(json.dumps(self.record).encode('utf-8')) + self.driver.sendBytesData(self.topic, value, key) + + # Sleep for some time and then verify the rows are ingested + sleep(120) + self.verify("0") + + # Recreate the table + self.driver.snowflake_conn.cursor().execute( + "Create or replace table {} (PERFORMANCE_STRING STRING, RECORD_METADATA VARIANT)".format(self.table)) + self.driver.snowflake_conn.cursor().execute( + "alter table {} set ENABLE_SCHEMA_EVOLUTION = true".format(self.table)) + + # Ingest another set of rows + self.driver.sendBytesData(self.topic, value, key) + + def verify(self, round): + rows = self.driver.snowflake_conn.cursor().execute( + "desc table {}".format(self.table)).fetchall() + res_col = {} + + gold_columns_copy = self.gold_columns.copy() + + for index, row in enumerate(rows): + gold_columns_copy.remove(row[0]) + if not row[1].startswith(self.gold_type[row[0]]): + raise NonRetryableError("Column {} has the wrong type. got: {}, expected: {}".format(row[0], row[1], + self.gold_type[ + row[0]])) + res_col[row[0]] = index + + print("Columns not in table: ", gold_columns_copy) + + for columnName in gold_columns_copy: + raise NonRetryableError("Column {} was not created".format(columnName)) + + res = self.driver.snowflake_conn.cursor().execute( + "SELECT count(*) FROM {}".format(self.table)).fetchone()[0] + if res != self.recordNum: + print("Number of record expected: {}, got: {}".format(self.recordNum, res)) + raise NonRetryableError("Number of record in table is different from number of record sent") + + def clean(self): + self.driver.cleanTableStagePipe(self.table) diff --git a/test/test_suit/test_schema_evolution_multi_topic_drop_table.py b/test/test_suit/test_schema_evolution_multi_topic_drop_table.py new file mode 100644 index 000000000..b72a24408 --- /dev/null +++ b/test/test_suit/test_schema_evolution_multi_topic_drop_table.py @@ -0,0 +1,109 @@ +import json +from time import sleep + +from test_suit.test_utils import NonRetryableError + + +# test if the table is updated with the correct column, and if the table is +# recreated and updated after it's being dropped +class TestSchemaEvolutionMultiTopicDropTable: + def __init__(self, driver, nameSalt): + self.driver = driver + self.fileName = "travis_correct_schema_evolution_multi_topic_drop_table" + self.topics = [] + self.table = self.fileName + nameSalt + self.recordNum = 100 + + for i in range(2): + self.topics.append(self.table + str(i)) + + self.driver.snowflake_conn.cursor().execute( + "Create or replace table {} (PERFORMANCE_STRING STRING)".format(self.table)) + + self.driver.snowflake_conn.cursor().execute( + "alter table {} set ENABLE_SCHEMA_EVOLUTION = true".format(self.table)) + + self.records = [] + + self.records.append({ + 'PERFORMANCE_STRING': 'Excellent', + '"case_sensitive_PERFORMANCE_CHAR"': 'A', + 'RATING_INT': 100 + }) + + self.records.append({ + 'PERFORMANCE_STRING': 'Excellent', + 'RATING_DOUBLE': 0.99, + 'APPROVAL': True + }) + + self.gold_type = { + 'PERFORMANCE_STRING': 'VARCHAR', + 'case_sensitive_PERFORMANCE_CHAR': 'VARCHAR', + 'RATING_INT': 'NUMBER', + 'RATING_DOUBLE': 'FLOAT', + 'APPROVAL': 'BOOLEAN', + 'RECORD_METADATA': 'VARIANT' + } + + self.gold_columns = [columnName for columnName in self.gold_type] + + def getConfigFileName(self): + return self.fileName + ".json" + + def send(self): + for i, topic in enumerate(self.topics): + key = [] + value = [] + for e in range(self.recordNum): + key.append(json.dumps({'number': str(e)}).encode('utf-8')) + value.append(json.dumps(self.records[i]).encode('utf-8')) + self.driver.sendBytesData(topic, value, key) + + # Sleep for some time and then verify the rows are ingested + sleep(120) + self.verify("0") + + # Recreate the table + self.driver.snowflake_conn.cursor().execute( + "Create or replace table {} (PERFORMANCE_STRING STRING, RECORD_METADATA VARIANT)".format(self.table)) + self.driver.snowflake_conn.cursor().execute( + "alter table {} set ENABLE_SCHEMA_EVOLUTION = true".format(self.table)) + + # Ingest another set of rows + for i, topic in enumerate(self.topics): + key = [] + value = [] + for e in range(self.recordNum): + key.append(json.dumps({'number': str(e)}).encode('utf-8')) + value.append(json.dumps(self.records[i]).encode('utf-8')) + self.driver.sendBytesData(topic, value, key) + + def verify(self, round): + rows = self.driver.snowflake_conn.cursor().execute( + "desc table {}".format(self.table)).fetchall() + res_col = {} + + gold_columns_copy = self.gold_columns.copy() + + for index, row in enumerate(rows): + gold_columns_copy.remove(row[0]) + if not row[1].startswith(self.gold_type[row[0]]): + raise NonRetryableError("Column {} has the wrong type. got: {}, expected: {}".format(row[0], row[1], + self.gold_type[ + row[0]])) + res_col[row[0]] = index + + print("Columns not in table: ", gold_columns_copy) + + for columnName in gold_columns_copy: + raise NonRetryableError("Column {} was not created".format(columnName)) + + res = self.driver.snowflake_conn.cursor().execute( + "SELECT count(*) FROM {}".format(self.table)).fetchone()[0] + if res != self.recordNum * len(self.topics): + print("Number of record expected: {}, got: {}".format(self.recordNum * len(self.topics), res)) + raise NonRetryableError("Number of record in table is different from number of record sent") + + def clean(self): + self.driver.cleanTableStagePipe(self.table) diff --git a/test/test_suites.py b/test/test_suites.py index 2badfd728..95d958d85 100644 --- a/test/test_suites.py +++ b/test/test_suites.py @@ -244,5 +244,14 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS ("TestKcRestart", EndToEndTestSuite( test_instance=TestKcRestart(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), + # do not support out of band table drops due to SNOW-943288, keeping these tests for long term solution + # ("TestSchemaEvolutionDropTable", EndToEndTestSuite( + # test_instance=TestSchemaEvolutionDropTable(driver, nameSalt), clean=True, run_in_confluent=True, + # run_in_apache=True + # )), + # ("TestSchemaEvolutionMultiTopicDropTable", EndToEndTestSuite( + # test_instance=TestSchemaEvolutionMultiTopicDropTable(driver, nameSalt), clean=True, run_in_confluent=True, + # run_in_apache=True + # )), ]) return test_suites From 14d9de341c6e245e8718336c87d3f8bf12880e8b Mon Sep 17 00:00:00 2001 From: revi cheng Date: Tue, 7 Nov 2023 14:44:45 -0800 Subject: [PATCH 18/18] autoformatting --- .../connector/internal/streaming/TopicPartitionChannel.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java index 778f740dd..83c758f03 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java @@ -366,7 +366,7 @@ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) { } finally { bufferLock.unlock(); } - + // If we found reaching buffer size threshold or count based threshold, we will immediately // flush (Insert them) if (copiedStreamingBuffer != null) {