diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java index cd5e5184e..8457c31d3 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java @@ -488,7 +488,8 @@ public boolean hasSchemaEvolutionPermission(String tableName, String role) { public void appendColumnsToTable(String tableName, Map columnToType) { checkConnection(); InternalUtils.assertNotEmpty("tableName", tableName); - StringBuilder appendColumnQuery = new StringBuilder("alter table identifier(?) add column if not exists "); + StringBuilder appendColumnQuery = + new StringBuilder("alter table identifier(?) add column if not exists "); boolean first = true; StringBuilder logColumn = new StringBuilder("["); for (String columnName : columnToType.keySet()) { diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java index 9224d6a05..83c758f03 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java @@ -112,7 +112,9 @@ public class TopicPartitionChannel { *
  • If channel fails to fetch offsetToken from Snowflake, we reopen the channel and try to * fetch offset from Snowflake again *
  • If channel fails to ingest a buffer(Buffer containing rows/offsets), we reopen the - * channel and try to fetch offset from Snowflake again + * channel and try to fetch offset from Snowflake again. Schematization purposefully fails + * the first buffer insert in order to alter the table, and then expects Kafka to resend + * data * * *

    In both cases above, we ask Kafka to send back offsets, strictly from offset number after @@ -124,7 +126,7 @@ public class TopicPartitionChannel { *

    This boolean is used to indicate that we reset offset in kafka and we will only buffer once * we see the offset which is one more than an offset present in Snowflake. */ - private boolean isOffsetResetInKafka; + private boolean isOffsetResetInKafka = false; private final SnowflakeStreamingIngestClient streamingIngestClient; @@ -391,14 +393,13 @@ public void insertRecordToBuffer(SinkRecord kafkaSinkRecord) { * * @param kafkaSinkRecord Record to check for above condition only in case of failures * (isOffsetResetInKafka = true) + * @param currentProcessedOffset The current processed offset * @return true if this record can be skipped to add into buffer, false otherwise. */ private boolean shouldIgnoreAddingRecordToBuffer( - SinkRecord kafkaSinkRecord, long currentProcessedOffset) { - // Don't skip rows if there is no offset reset or there is no offset token information in the - // channel - if (!isOffsetResetInKafka - || currentProcessedOffset == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) { + SinkRecord kafkaSinkRecord, final long currentProcessedOffset) { + // Don't skip rows if there is no offset reset + if (!isOffsetResetInKafka) { return false; } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java index 99bf7f30a..e4176d970 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelIT.java @@ -18,6 +18,9 @@ import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestClient; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.After; import org.junit.Assert; @@ -485,4 +488,78 @@ public void testSimpleInsertRowsFailureWithArrowBDECFormat() throws Exception { service.insert(records); service.closeAll(); } + + @Test + public void testPartialBatchChannelInvalidationIngestion_schematization() throws Exception { + Map config = TestUtils.getConfForStreaming(); + config.put( + SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS, "500"); // we want to flush on record + config.put(SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC, "500000"); + config.put(SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES, "500000"); + config.put( + SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG, + "true"); // using schematization to invalidate + + // setup + InMemorySinkTaskContext inMemorySinkTaskContext = + new InMemorySinkTaskContext(Collections.singleton(topicPartition)); + SnowflakeSinkService service = + SnowflakeSinkServiceFactory.builder(conn, IngestionMethodConfig.SNOWPIPE_STREAMING, config) + .setRecordNumber(1) + .setErrorReporter(new InMemoryKafkaRecordErrorReporter()) + .setSinkTaskContext(inMemorySinkTaskContext) + .addTask(testTableName, topicPartition) + .build(); + + final long firstBatchCount = 18; + final long secondBatchCount = 500; + + // create 18 blank records that do not kick off schematization + JsonConverter converter = new JsonConverter(); + HashMap converterConfig = new HashMap<>(); + converterConfig.put("schemas.enable", "false"); + converter.configure(converterConfig, false); + SchemaAndValue schemaInputValue = converter.toConnectData("test", null); + + List firstBatch = new ArrayList<>(); + for (int i = 0; i < firstBatchCount; i++) { + firstBatch.add( + new SinkRecord( + topic, + PARTITION, + Schema.STRING_SCHEMA, + "test", + schemaInputValue.schema(), + schemaInputValue.value(), + i)); + } + + service.insert(firstBatch); + + // send batch with 500, should kick off a record based flush and schematization on record 19, + // which will fail the batches + List secondBatch = + TestUtils.createNativeJsonSinkRecords(firstBatchCount, secondBatchCount, topic, PARTITION); + service.insert(secondBatch); + + // resend batch 1 and 2 because 2 failed for schematization + service.insert(firstBatch); + service.insert(secondBatch); + + // ensure all data was ingested + TestUtils.assertWithRetry( + () -> + service.getOffset(new TopicPartition(topic, PARTITION)) + == firstBatchCount + secondBatchCount, + 20, + 5); + assert TestUtils.tableSize(testTableName) == firstBatchCount + secondBatchCount + : "expected: " + + firstBatchCount + + secondBatchCount + + " actual: " + + TestUtils.tableSize(testTableName); + + service.closeAll(); + } } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java index 601fb5922..dd32d24f0 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java @@ -397,15 +397,24 @@ public void testFetchOffsetTokenWithRetry_RuntimeException() { /* Only SFExceptions goes into fallback -> reopens channel, fetch offsetToken and throws Appropriate exception */ @Test public void testInsertRows_SuccessAfterReopenChannel() throws Exception { + final int noOfRecords = 5; + int expectedInsertRowsCount = 0; + int expectedOpenChannelCount = 0; + int expectedGetOffsetCount = 0; + + // setup mocks to fail first insert and return two null snowflake offsets (open channel and + // failed insert) before succeeding Mockito.when( mockStreamingChannel.insertRows( ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class))) - .thenThrow(SF_EXCEPTION); - - // get null from snowflake first time it is called and null for second time too since insert - // rows was failure - Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken()).thenReturn(null); + .thenThrow(SF_EXCEPTION) + .thenReturn(new InsertValidationResponse()); + Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken()) + .thenReturn(null) + .thenReturn(null) + .thenReturn(Long.toString(noOfRecords - 1)); + // create tpchannel TopicPartitionChannel topicPartitionChannel = new TopicPartitionChannel( mockStreamingClient, @@ -417,37 +426,47 @@ public void testInsertRows_SuccessAfterReopenChannel() throws Exception { mockKafkaRecordErrorReporter, mockSinkTaskContext, mockTelemetryService); - final int noOfRecords = 5; - // Since record 0 was not able to ingest, all records in this batch will not be added into the - // buffer. + expectedOpenChannelCount++; + expectedGetOffsetCount++; + + // verify initial mock counts after tpchannel creation + Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedInsertRowsCount)) + .insertRows(ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class)); + Mockito.verify(mockStreamingClient, Mockito.times(expectedOpenChannelCount)) + .openChannel(ArgumentMatchers.any()); + Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedGetOffsetCount)) + .getLatestCommittedOffsetToken(); + + // Test inserting record 0, which should fail to ingest so the other records are ignored List records = TestUtils.createJsonStringSinkRecords(0, noOfRecords, TOPIC, PARTITION); - records.forEach(topicPartitionChannel::insertRecordToBuffer); + expectedInsertRowsCount++; + expectedOpenChannelCount++; + expectedGetOffsetCount++; - Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(noOfRecords)) + // verify mocks only tried ingesting once + Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedInsertRowsCount)) .insertRows(ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class)); - Mockito.verify(mockStreamingClient, Mockito.times(noOfRecords + 1)) + Mockito.verify(mockStreamingClient, Mockito.times(expectedOpenChannelCount)) .openChannel(ArgumentMatchers.any()); - Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(noOfRecords + 1)) + Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedGetOffsetCount)) .getLatestCommittedOffsetToken(); - // Now, it should be successful - Mockito.when( - mockStreamingChannel.insertRows( - ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class))) - .thenReturn(new InsertValidationResponse()); - - Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken()) - .thenReturn(Long.toString(noOfRecords - 1)); - // Retry the insert again, now everything should be ingested and the offset token should be // noOfRecords-1 records.forEach(topicPartitionChannel::insertRecordToBuffer); - Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(noOfRecords * 2)) - .insertRows(ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class)); - Assert.assertEquals(noOfRecords - 1, topicPartitionChannel.fetchOffsetTokenWithRetry()); + expectedInsertRowsCount += noOfRecords; + expectedGetOffsetCount++; + + // verify mocks ingested each record + Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedInsertRowsCount)) + .insertRows(ArgumentMatchers.any(Iterable.class), ArgumentMatchers.any(String.class)); + Mockito.verify(mockStreamingClient, Mockito.times(expectedOpenChannelCount)) + .openChannel(ArgumentMatchers.any()); + Mockito.verify(topicPartitionChannel.getChannel(), Mockito.times(expectedGetOffsetCount)) + .getLatestCommittedOffsetToken(); } @Test diff --git a/test/rest_request_template/test_schema_evolution_w_random_row_count.json b/test/rest_request_template/test_schema_evolution_w_random_row_count.json new file mode 100644 index 000000000..a51854566 --- /dev/null +++ b/test/rest_request_template/test_schema_evolution_w_random_row_count.json @@ -0,0 +1,28 @@ +{ + "name": "SNOWFLAKE_CONNECTOR_NAME", + "config": { + "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector", + "topics": "SNOWFLAKE_TEST_TOPIC0,SNOWFLAKE_TEST_TOPIC1", + "snowflake.topic2table.map": "SNOWFLAKE_TEST_TOPIC0:SNOWFLAKE_CONNECTOR_NAME,SNOWFLAKE_TEST_TOPIC1:SNOWFLAKE_CONNECTOR_NAME", + "tasks.max": "1", + "buffer.flush.time": "60", + "buffer.count.records": "300", + "buffer.size.bytes": "5000000", + "snowflake.url.name": "SNOWFLAKE_HOST", + "snowflake.user.name": "SNOWFLAKE_USER", + "snowflake.private.key": "SNOWFLAKE_PRIVATE_KEY", + "snowflake.database.name": "SNOWFLAKE_DATABASE", + "snowflake.schema.name": "SNOWFLAKE_SCHEMA", + "snowflake.role.name": "SNOWFLAKE_ROLE", + "snowflake.ingestion.method": "SNOWPIPE_STREAMING", + "key.converter": "org.apache.kafka.connect.storage.StringConverter", + "value.converter": "org.apache.kafka.connect.json.JsonConverter", + "value.converter.schemas.enable": "false", + "jmx": "true", + "errors.tolerance": "all", + "errors.log.enable": true, + "errors.deadletterqueue.topic.name": "DLQ_TOPIC", + "errors.deadletterqueue.topic.replication.factor": 1, + "snowflake.enable.schematization": true + } +} \ No newline at end of file diff --git a/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_avro_sr.json b/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_avro_sr.json index 27de6cf6e..ddea359aa 100644 --- a/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_avro_sr.json +++ b/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_avro_sr.json @@ -5,8 +5,8 @@ "topics": "SNOWFLAKE_TEST_TOPIC0,SNOWFLAKE_TEST_TOPIC1", "snowflake.topic2table.map": "SNOWFLAKE_TEST_TOPIC0:SNOWFLAKE_CONNECTOR_NAME,SNOWFLAKE_TEST_TOPIC1:SNOWFLAKE_CONNECTOR_NAME", "tasks.max": "1", - "buffer.flush.time": "10", - "buffer.count.records": "100", + "buffer.flush.time": "60", + "buffer.count.records": "300", "buffer.size.bytes": "5000000", "snowflake.url.name": "SNOWFLAKE_HOST", "snowflake.user.name": "SNOWFLAKE_USER", diff --git a/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_json.json b/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_json.json index 38e50ab1b..a51854566 100644 --- a/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_json.json +++ b/test/rest_request_template/travis_correct_schema_evolution_w_auto_table_creation_json.json @@ -5,8 +5,8 @@ "topics": "SNOWFLAKE_TEST_TOPIC0,SNOWFLAKE_TEST_TOPIC1", "snowflake.topic2table.map": "SNOWFLAKE_TEST_TOPIC0:SNOWFLAKE_CONNECTOR_NAME,SNOWFLAKE_TEST_TOPIC1:SNOWFLAKE_CONNECTOR_NAME", "tasks.max": "1", - "buffer.flush.time": "10", - "buffer.count.records": "100", + "buffer.flush.time": "60", + "buffer.count.records": "300", "buffer.size.bytes": "5000000", "snowflake.url.name": "SNOWFLAKE_HOST", "snowflake.user.name": "SNOWFLAKE_USER", diff --git a/test/test_suit/test_schema_evolution_w_auto_table_creation_avro_sr.py b/test/test_suit/test_schema_evolution_w_auto_table_creation_avro_sr.py index dd4129064..81257b63c 100644 --- a/test/test_suit/test_schema_evolution_w_auto_table_creation_avro_sr.py +++ b/test/test_suit/test_schema_evolution_w_auto_table_creation_avro_sr.py @@ -12,7 +12,11 @@ def __init__(self, driver, nameSalt): self.fileName = "travis_correct_schema_evolution_w_auto_table_creation_avro_sr" self.topics = [] self.table = self.fileName + nameSalt - self.recordNum = 100 + + # records + self.initialRecordCount = 12 + self.flushRecordCount = 300 + self.recordNum = self.initialRecordCount + self.flushRecordCount for i in range(2): self.topics.append(self.table + str(i)) @@ -78,8 +82,15 @@ def getConfigFileName(self): def send(self): for i, topic in enumerate(self.topics): + # send initial batch + value = [] + for _ in range(self.initialRecordCount): + value.append(self.records[i]) + self.driver.sendAvroSRData(topic, value, self.valueSchema[i], key=[], key_schema="", partition=0) + + # send second batch that should flush value = [] - for _ in range(self.recordNum): + for _ in range(self.flushRecordCount): value.append(self.records[i]) self.driver.sendAvroSRData(topic, value, self.valueSchema[i], key=[], key_schema="", partition=0) diff --git a/test/test_suit/test_schema_evolution_w_auto_table_creation_json.py b/test/test_suit/test_schema_evolution_w_auto_table_creation_json.py index 48daea794..103279ea6 100644 --- a/test/test_suit/test_schema_evolution_w_auto_table_creation_json.py +++ b/test/test_suit/test_schema_evolution_w_auto_table_creation_json.py @@ -13,7 +13,11 @@ def __init__(self, driver, nameSalt): self.fileName = "travis_correct_schema_evolution_w_auto_table_creation_json" self.topics = [] self.table = self.fileName + nameSalt - self.recordNum = 100 + + # records + self.initialRecordCount = 12 + self.flushRecordCount = 300 + self.recordNum = self.initialRecordCount + self.flushRecordCount for i in range(2): self.topics.append(self.table + str(i)) @@ -48,9 +52,18 @@ def getConfigFileName(self): def send(self): for i, topic in enumerate(self.topics): + # send initial batch + key = [] + value = [] + for e in range(self.initialRecordCount): + key.append(json.dumps({'number': str(e)}).encode('utf-8')) + value.append(json.dumps(self.records[i]).encode('utf-8')) + self.driver.sendBytesData(topic, value, key) + + # send second batch that should flush key = [] value = [] - for e in range(self.recordNum): + for e in range(self.flushRecordCount): key.append(json.dumps({'number': str(e)}).encode('utf-8')) value.append(json.dumps(self.records[i]).encode('utf-8')) self.driver.sendBytesData(topic, value, key) diff --git a/test/test_suit/test_schema_evolution_w_random_row_count.py b/test/test_suit/test_schema_evolution_w_random_row_count.py new file mode 100644 index 000000000..fbd26b977 --- /dev/null +++ b/test/test_suit/test_schema_evolution_w_random_row_count.py @@ -0,0 +1,97 @@ +import json +import random + +from test_suit.test_utils import NonRetryableError + + +# test if the ingestion works when the schematization alter table invalidation happens +# halfway through a batch +class TestSchemaEvolutionWithRandomRowCount: + def __init__(self, driver, nameSalt): + self.driver = driver + self.fileName = "test_schema_evolution_w_random_row_count" + self.topics = [] + self.table = self.fileName + nameSalt + + # records + self.initialRecordCount = random.randrange(1,300) + self.flushRecordCount = 300 + self.recordNum = self.initialRecordCount + self.flushRecordCount + + for i in range(2): + self.topics.append(self.table + str(i)) + + self.records = [] + + self.records.append({ + 'PERFORMANCE_STRING': 'Excellent', + 'PERFORMANCE_CHAR': 'A', + 'RATING_INT': 100 + }) + + self.records.append({ + 'PERFORMANCE_STRING': 'Excellent', + 'RATING_DOUBLE': 0.99, + 'APPROVAL': True + }) + + self.gold_type = { + 'PERFORMANCE_STRING': 'VARCHAR', + 'PERFORMANCE_CHAR': 'VARCHAR', + 'RATING_INT': 'NUMBER', + 'RATING_DOUBLE': 'FLOAT', + 'APPROVAL': 'BOOLEAN', + 'RECORD_METADATA': 'VARIANT' + } + + self.gold_columns = [columnName for columnName in self.gold_type] + + def getConfigFileName(self): + return self.fileName + ".json" + + def send(self): + print("Got random record count of {}".format(str(self.initialRecordCount))) + + for i, topic in enumerate(self.topics): + # send initial batch + key = [] + value = [] + for e in range(self.initialRecordCount): + key.append(json.dumps({'number': str(e)}).encode('utf-8')) + value.append(json.dumps(self.records[i]).encode('utf-8')) + self.driver.sendBytesData(topic, value, key) + + # send second batch that should flush + key = [] + value = [] + for e in range(self.flushRecordCount): + key.append(json.dumps({'number': str(e)}).encode('utf-8')) + value.append(json.dumps(self.records[i]).encode('utf-8')) + self.driver.sendBytesData(topic, value, key) + + def verify(self, round): + rows = self.driver.snowflake_conn.cursor().execute( + "desc table {}".format(self.table)).fetchall() + res_col = {} + + for index, row in enumerate(rows): + self.gold_columns.remove(row[0]) + if not row[1].startswith(self.gold_type[row[0]]): + raise NonRetryableError("Column {} has the wrong type. got: {}, expected: {}".format(row[0], row[1], + self.gold_type[ + row[0]])) + res_col[row[0]] = index + + print("Columns not in table: ", self.gold_columns) + + for columnName in self.gold_columns: + raise NonRetryableError("Column {} was not created".format(columnName)) + + res = self.driver.snowflake_conn.cursor().execute( + "SELECT count(*) FROM {}".format(self.table)).fetchone()[0] + if res != len(self.topics) * self.recordNum: + print("Number of record expected: {}, got: {}".format(len(self.topics) * self.recordNum, res)) + raise NonRetryableError("Number of record in table is different from number of record sent") + + def clean(self): + self.driver.cleanTableStagePipe(self.table) diff --git a/test/test_suites.py b/test/test_suites.py index a88e1f3e8..95d958d85 100644 --- a/test/test_suites.py +++ b/test/test_suites.py @@ -26,15 +26,15 @@ from test_suit.test_native_string_protobuf import TestNativeStringProtobuf from test_suit.test_schema_evolution_avro_sr import TestSchemaEvolutionAvroSR from test_suit.test_schema_evolution_avro_sr_logical_types import TestSchemaEvolutionAvroSRLogicalTypes -from test_suit.test_schema_evolution_drop_table import TestSchemaEvolutionDropTable from test_suit.test_schema_evolution_json import TestSchemaEvolutionJson from test_suit.test_schema_evolution_json_ignore_tombstone import TestSchemaEvolutionJsonIgnoreTombstone -from test_suit.test_schema_evolution_multi_topic_drop_table import TestSchemaEvolutionMultiTopicDropTable from test_suit.test_schema_evolution_nonnullable_json import TestSchemaEvolutionNonNullableJson from test_suit.test_schema_evolution_w_auto_table_creation_avro_sr import \ TestSchemaEvolutionWithAutoTableCreationAvroSR from test_suit.test_schema_evolution_w_auto_table_creation_json import \ TestSchemaEvolutionWithAutoTableCreationJson +from test_suit.test_schema_evolution_w_random_row_count import \ + TestSchemaEvolutionWithRandomRowCount from test_suit.test_schema_mapping import TestSchemaMapping from test_suit.test_schema_not_supported_converter import TestSchemaNotSupportedConverter from test_suit.test_snowpipe_streaming_schema_mapping_dlq import TestSnowpipeStreamingSchemaMappingDLQ @@ -195,6 +195,10 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS test_instance=TestSchemaEvolutionWithAutoTableCreationAvroSR(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=False )), + ("TestSchemaEvolutionWithRandomRowCount", EndToEndTestSuite( + test_instance=TestSchemaEvolutionWithRandomRowCount(driver, nameSalt), clean=True, + run_in_confluent=True, run_in_apache=True + )), ("TestSchemaEvolutionNonNullableJson", EndToEndTestSuite( test_instance=TestSchemaEvolutionNonNullableJson(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True @@ -203,10 +207,6 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS test_instance=TestSchemaNotSupportedConverter(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), - ("TestSchemaEvolutionDropTable", EndToEndTestSuite( - test_instance=TestSchemaEvolutionDropTable(driver, nameSalt), clean=True, run_in_confluent=True, - run_in_apache=True - )), ("TestKcDeleteCreate", EndToEndTestSuite( test_instance=TestKcDeleteCreate(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), @@ -244,9 +244,14 @@ def create_end_to_end_test_suites(driver, nameSalt, schemaRegistryAddress, testS ("TestKcRestart", EndToEndTestSuite( test_instance=TestKcRestart(driver, nameSalt), clean=True, run_in_confluent=True, run_in_apache=True )), - ("TestSchemaEvolutionMultiTopicDropTable", EndToEndTestSuite( - test_instance=TestSchemaEvolutionMultiTopicDropTable(driver, nameSalt), clean=True, run_in_confluent=True, - run_in_apache=True - )), + # do not support out of band table drops due to SNOW-943288, keeping these tests for long term solution + # ("TestSchemaEvolutionDropTable", EndToEndTestSuite( + # test_instance=TestSchemaEvolutionDropTable(driver, nameSalt), clean=True, run_in_confluent=True, + # run_in_apache=True + # )), + # ("TestSchemaEvolutionMultiTopicDropTable", EndToEndTestSuite( + # test_instance=TestSchemaEvolutionMultiTopicDropTable(driver, nameSalt), clean=True, run_in_confluent=True, + # run_in_apache=True + # )), ]) return test_suites