diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java index cf53840a9..920056425 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeConnectionServiceV1.java @@ -425,6 +425,7 @@ public void appendMetaColIfNotExist(final String tableName) { */ @Override public boolean hasSchemaEvolutionPermission(String tableName, String role) { + LOGGER.info("Checking schema evolution permission for table {}", tableName); checkConnection(); InternalUtils.assertNotEmpty("tableName", tableName); String query = "show grants on table identifier(?)"; diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java index 6f27ba3e3..47d36274c 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java @@ -669,6 +669,10 @@ public InsertRowsResponse get() throws Throwable { // Simply added to the final response if it's not schema related errors finalResponse.addError(insertError); } else { + LOGGER.info( + "Triggering schema evolution. NonNullableColumns={}, extraColumns={}", + String.join(",", nonNullableColumns), + extraColNames == null ? "null" : String.join(",", extraColNames)); SchematizationUtils.evolveSchemaIfNeeded( this.conn, this.channel.getTableName(), diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java index 278de8057..cc58ad072 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java @@ -707,12 +707,19 @@ private void createTableIfNotExists(final String tableName) { private void populateSchemaEvolutionPermissions(String tableName) { if (!tableName2SchemaEvolutionPermission.containsKey(tableName)) { if (enableSchematization) { - tableName2SchemaEvolutionPermission.put( - tableName, + boolean hasSchemaEvolutionPermission = conn != null && conn.hasSchemaEvolutionPermission( - tableName, connectorConfig.get(SNOWFLAKE_ROLE))); + tableName, connectorConfig.get(SNOWFLAKE_ROLE)); + LOGGER.info( + "[SCHEMA_EVOLUTION_CACHE] Setting {} for table {}", + hasSchemaEvolutionPermission, + tableName); + tableName2SchemaEvolutionPermission.put(tableName, hasSchemaEvolutionPermission); } else { + LOGGER.info( + "[SCHEMA_EVOLUTION_CACHE] Schematization disabled. Setting false for table {}", + tableName); tableName2SchemaEvolutionPermission.put(tableName, false); } }