diff --git a/pom.xml b/pom.xml index 44a01f937..47bd67c86 100644 --- a/pom.xml +++ b/pom.xml @@ -332,7 +332,7 @@ net.snowflake snowflake-jdbc - 3.18.0 + 3.20.0 diff --git a/pom_confluent.xml b/pom_confluent.xml index 969ff531a..14801e5a2 100644 --- a/pom_confluent.xml +++ b/pom_confluent.xml @@ -462,7 +462,7 @@ net.snowflake snowflake-jdbc - 3.18.0 + 3.20.0 diff --git a/src/test/java/com/snowflake/kafka/connector/ConnectClusterBaseIT.java b/src/test/java/com/snowflake/kafka/connector/ConnectClusterBaseIT.java index 789e4a30c..b0370d939 100644 --- a/src/test/java/com/snowflake/kafka/connector/ConnectClusterBaseIT.java +++ b/src/test/java/com/snowflake/kafka/connector/ConnectClusterBaseIT.java @@ -1,9 +1,6 @@ package com.snowflake.kafka.connector; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_FLUSH_TIME_SEC; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.NAME; -import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.SNOWPIPE_STREAMING_CLOSE_CHANNELS_IN_PARALLEL; +import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.*; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; @@ -74,7 +71,8 @@ protected final Map defaultProperties(String topicName, String c config.put(TOPICS_CONFIG, topicName); config.put(INGESTION_METHOD_OPT, IngestionMethodConfig.SNOWPIPE_STREAMING.toString()); config.put(Utils.SF_ROLE, "testrole_kafka"); - config.put(BUFFER_FLUSH_TIME_SEC, "1"); + config.put(SNOWPIPE_STREAMING_MAX_CLIENT_LAG, "1"); + config.put(SNOWPIPE_STREAMING_ENABLE_SINGLE_BUFFER, "true"); config.put(TASKS_MAX_CONFIG, TASK_NUMBER.toString()); config.put(SNOWPIPE_STREAMING_CLOSE_CHANNELS_IN_PARALLEL, "true"); config.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); diff --git a/src/test/java/com/snowflake/kafka/connector/SmtIT.java b/src/test/java/com/snowflake/kafka/connector/SmtIT.java index cc77d7858..1235dc8d1 100644 --- a/src/test/java/com/snowflake/kafka/connector/SmtIT.java +++ b/src/test/java/com/snowflake/kafka/connector/SmtIT.java @@ -66,7 +66,8 @@ void testIfSmtReturningNullsIngestDataCorrectly(String behaviorOnNull, int expec // then await() - .timeout(Duration.ofSeconds(60)) + .timeout(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) .untilAsserted( () -> { assertThat(fakeStreamingClientHandler.ingestedRows()).hasSize(expectedRecordNumber);