From 5a4f0cb8002e3a1083305f9b297f3456498668dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Z=C4=85bek?= Date: Tue, 12 Nov 2024 13:02:39 +0100 Subject: [PATCH 1/2] NO-SNOW use com.fasterxml instead of net.snowflake.client.jdbc.internal.fasterxml --- .../BufferedTopicPartitionChannel.java | 2 +- .../DirectTopicPartitionChannel.java | 2 +- .../schemaevolution/ColumnTypeMapper.java | 2 +- .../schemaevolution/TableSchemaResolver.java | 2 +- .../iceberg/IcebergColumnTypeMapper.java | 2 +- .../snowflake/SnowflakeColumnTypeMapper.java | 2 +- .../SnowflakeTelemetryChannelCreation.java | 2 +- .../SnowflakeTelemetryChannelStatus.java | 2 +- .../SnowflakeTelemetryServiceV2.java | 2 +- .../SnowflakeTelemetryBasicInfo.java | 2 +- .../SnowflakeTelemetryPipeCreation.java | 2 +- .../SnowflakeTelemetryPipeStatus.java | 2 +- .../telemetry/SnowflakeTelemetryService.java | 28 +++++++++++++++---- .../SnowflakeTelemetryServiceV1.java | 2 +- .../IcebergTableStreamingRecordMapper.java | 8 +++--- .../connector/records/RecordService.java | 12 ++++---- .../records/RecordServiceFactory.java | 2 +- .../records/SnowflakeAvroConverter.java | 2 +- ...akeAvroConverterWithoutSchemaRegistry.java | 2 +- .../connector/records/SnowflakeConverter.java | 2 +- .../records/SnowflakeRecordContent.java | 4 +-- .../SnowflakeTableStreamingRecordMapper.java | 6 ++-- .../records/StreamingRecordMapper.java | 8 +++--- .../snowflake/kafka/connector/SinkTaskIT.java | 2 +- .../SnowflakeSinkTaskForStreamingIT.java | 6 ++-- .../connector/internal/MetaColumnIT.java | 4 +-- .../connector/internal/SinkServiceIT.java | 2 +- ...SnowflakeTelemetryPipeStatusMetricsIT.java | 2 +- .../internal/StageFilesProcessorTest.java | 2 +- .../kafka/connector/internal/TestUtils.java | 4 +-- .../iceberg/IcebergColumnTypeMapperTest.java | 4 +-- .../SnowflakeColumnTypeMapperTest.java | 4 +-- .../SnowflakeTableSchemaResolverTest.java | 4 +-- .../records/AbstractMetaColumnTest.java | 4 +-- .../connector/records/ConverterTest.java | 8 +++--- .../kafka/connector/records/HeaderTest.java | 4 +-- ...IcebergTableStreamingRecordMapperTest.java | 4 +-- .../connector/records/ProcessRecordTest.java | 4 +-- .../connector/records/RecordContentTest.java | 8 +++--- .../records/SnowpipeMetaColumnTest.java | 6 ++-- .../SnowpipeStreamingMetaColumnTest.java | 6 ++-- .../iceberg/sql/ComplexJsonRecord.java | 8 +++--- .../streaming/iceberg/sql/MetadataRecord.java | 6 ++-- .../iceberg/sql/PrimitiveJsonRecord.java | 8 +++--- 44 files changed, 109 insertions(+), 91 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java index 1099c5ddc..7f37878a2 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/BufferedTopicPartitionChannel.java @@ -9,6 +9,7 @@ import static java.time.temporal.ChronoUnit.SECONDS; import static org.apache.kafka.common.record.TimestampType.NO_TIMESTAMP_TYPE; +import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; @@ -50,7 +51,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException; import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java index 28a56fe93..595d413d8 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/DirectTopicPartitionChannel.java @@ -9,6 +9,7 @@ import static java.time.temporal.ChronoUnit.SECONDS; import static org.apache.kafka.common.record.TimestampType.NO_TIMESTAMP_TYPE; +import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; @@ -50,7 +51,6 @@ import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException; import net.snowflake.ingest.streaming.InsertValidationResponse; import net.snowflake.ingest.streaming.OpenChannelRequest; import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel; diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/ColumnTypeMapper.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/ColumnTypeMapper.java index e7d3d196b..f9fcbe579 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/ColumnTypeMapper.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/ColumnTypeMapper.java @@ -1,6 +1,6 @@ package com.snowflake.kafka.connector.internal.streaming.schemaevolution; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.JsonNode; import org.apache.kafka.connect.data.Schema; public abstract class ColumnTypeMapper { diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/TableSchemaResolver.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/TableSchemaResolver.java index e5ac01f97..ffce5aa3c 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/TableSchemaResolver.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/TableSchemaResolver.java @@ -1,5 +1,6 @@ package com.snowflake.kafka.connector.internal.streaming.schemaevolution; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Streams; @@ -12,7 +13,6 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.sink.SinkRecord; diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java index 206c21b39..42c5ee1b9 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapper.java @@ -9,8 +9,8 @@ import static org.apache.kafka.connect.data.Schema.Type.STRING; import static org.apache.kafka.connect.data.Schema.Type.STRUCT; +import com.fasterxml.jackson.databind.JsonNode; import com.snowflake.kafka.connector.internal.streaming.schemaevolution.ColumnTypeMapper; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/snowflake/SnowflakeColumnTypeMapper.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/snowflake/SnowflakeColumnTypeMapper.java index 129ce3c40..c501acabe 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/snowflake/SnowflakeColumnTypeMapper.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/snowflake/SnowflakeColumnTypeMapper.java @@ -11,8 +11,8 @@ import static org.apache.kafka.connect.data.Schema.Type.STRING; import static org.apache.kafka.connect.data.Schema.Type.STRUCT; +import com.fasterxml.jackson.databind.JsonNode; import com.snowflake.kafka.connector.internal.streaming.schemaevolution.ColumnTypeMapper; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelCreation.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelCreation.java index 9de9ef32a..68c76018a 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelCreation.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelCreation.java @@ -22,9 +22,9 @@ import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.TOPIC_PARTITION_CHANNEL_CREATION_TIME; import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.TOPIC_PARTITION_CHANNEL_NAME; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryBasicInfo; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; /** * This object is sent only once when a channel starts. No concurrent modification is made on this diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelStatus.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelStatus.java index 2c8018c0f..71adffb70 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelStatus.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelStatus.java @@ -22,6 +22,7 @@ import com.codahale.metrics.Gauge; import com.codahale.metrics.MetricRegistry; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; import com.snowflake.kafka.connector.internal.metrics.MetricsJmxReporter; import com.snowflake.kafka.connector.internal.metrics.MetricsUtil; @@ -29,7 +30,6 @@ import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService; import com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants; import java.util.concurrent.atomic.AtomicLong; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; /** * Extension of {@link SnowflakeTelemetryBasicInfo} class used to send data to snowflake when the diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryServiceV2.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryServiceV2.java index e866eb4f4..ede3292e1 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryServiceV2.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryServiceV2.java @@ -17,11 +17,11 @@ package com.snowflake.kafka.connector.internal.streaming.telemetry; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService; import java.sql.Connection; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; import net.snowflake.client.jdbc.telemetry.Telemetry; import net.snowflake.client.jdbc.telemetry.TelemetryClient; diff --git a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryBasicInfo.java b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryBasicInfo.java index 33686fa35..65fc7edc7 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryBasicInfo.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryBasicInfo.java @@ -1,9 +1,9 @@ package com.snowflake.kafka.connector.internal.telemetry; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.snowflake.kafka.connector.internal.KCLogger; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; /** Minimum information needed to sent to Snowflake through Telemetry API */ public abstract class SnowflakeTelemetryBasicInfo { diff --git a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeCreation.java b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeCreation.java index a0ef9b3af..3e41701de 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeCreation.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeCreation.java @@ -10,7 +10,7 @@ import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.START_TIME; import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.TABLE_NAME; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.ObjectNode; /** * This object is send only once when pipe starts No concurrent modification is made on this object, diff --git a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeStatus.java b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeStatus.java index 65f29ff1f..052b4b774 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeStatus.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeStatus.java @@ -36,6 +36,7 @@ import com.codahale.metrics.Metric; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import com.snowflake.kafka.connector.internal.metrics.MetricsJmxReporter; @@ -48,7 +49,6 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.LongUnaryOperator; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; /** * Extension of {@link SnowflakeTelemetryBasicInfo} class used to send data to snowflake diff --git a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryService.java b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryService.java index 2ad46e4f9..7cde1030f 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryService.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryService.java @@ -2,14 +2,14 @@ import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.*; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.internal.KCLogger; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; import java.util.Map; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; import net.snowflake.client.jdbc.telemetry.Telemetry; import net.snowflake.client.jdbc.telemetry.TelemetryUtil; import org.apache.kafka.common.utils.AppInfoParser; @@ -197,14 +197,32 @@ protected ObjectNode getDefaultObjectNode(IngestionMethodConfig ingestionMethodC * * * + *

NOTE! com.fasterxml is mapped to net.snowflake.client.jdbc.internal.fasterxml. The reasoning + * is to unbound rest of the connector from jdbc driver type. + * * @param type type of Data * @param data JsonData to wrap in a json field called data */ protected void send(SnowflakeTelemetryService.TelemetryType type, JsonNode data) { - ObjectNode msg = MAPPER.createObjectNode(); + net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode telemetryData; + net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper objectMapper = + new net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper(); + try { + telemetryData = + objectMapper.readValue( + data.toString(), + net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode.class); + } catch (net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException e) { + LOGGER.error( + "Failed to parse telemetry data: {}, Error: {}", data.toString(), e.getMessage()); + telemetryData = objectMapper.createObjectNode(); + } + net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode msg = + new net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper() + .createObjectNode(); msg.put(SOURCE, KAFKA_CONNECTOR); msg.put(TYPE, type.toString()); - msg.set(DATA, data); + msg.set(DATA, telemetryData); msg.put(VERSION, Utils.VERSION); // version number try { telemetry.addLogToBatch(TelemetryUtil.buildJobData(msg)); diff --git a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryServiceV1.java index bba06b7e1..2343e6e31 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryServiceV1.java @@ -1,9 +1,9 @@ package com.snowflake.kafka.connector.internal.telemetry; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; import java.sql.Connection; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; import net.snowflake.client.jdbc.telemetry.Telemetry; import net.snowflake.client.jdbc.telemetry.TelemetryClient; diff --git a/src/main/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapper.java b/src/main/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapper.java index a368c4e44..99b566cf0 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapper.java +++ b/src/main/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapper.java @@ -4,16 +4,16 @@ import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_METADATA; import static com.snowflake.kafka.connector.records.RecordService.*; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.snowflake.kafka.connector.Utils; import java.util.AbstractMap; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.stream.Collectors; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.type.TypeReference; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; class IcebergTableStreamingRecordMapper extends StreamingRecordMapper { private static final TypeReference> OBJECTS_MAP_TYPE_REFERENCE = diff --git a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java index 1a92238e9..95c1cfc5d 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/RecordService.java +++ b/src/main/java/com/snowflake/kafka/connector/records/RecordService.java @@ -16,6 +16,12 @@ */ package com.snowflake.kafka.connector.records; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; import com.snowflake.kafka.connector.internal.KCLogger; import com.snowflake.kafka.connector.internal.SnowflakeErrors; @@ -29,12 +35,6 @@ import java.util.Map; import java.util.TimeZone; import javax.annotation.Nullable; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ArrayNode; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.JsonNodeFactory; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.data.ConnectSchema; import org.apache.kafka.connect.data.Date; diff --git a/src/main/java/com/snowflake/kafka/connector/records/RecordServiceFactory.java b/src/main/java/com/snowflake/kafka/connector/records/RecordServiceFactory.java index ab8d3deac..b187b3077 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/RecordServiceFactory.java +++ b/src/main/java/com/snowflake/kafka/connector/records/RecordServiceFactory.java @@ -1,6 +1,6 @@ package com.snowflake.kafka.connector.records; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectMapper; public class RecordServiceFactory { public static RecordService createRecordService( diff --git a/src/main/java/com/snowflake/kafka/connector/records/SnowflakeAvroConverter.java b/src/main/java/com/snowflake/kafka/connector/records/SnowflakeAvroConverter.java index 0d7d353f2..93c1ed8eb 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/SnowflakeAvroConverter.java +++ b/src/main/java/com/snowflake/kafka/connector/records/SnowflakeAvroConverter.java @@ -16,6 +16,7 @@ */ package com.snowflake.kafka.connector.records; +import com.fasterxml.jackson.databind.JsonNode; import com.snowflake.kafka.connector.internal.SnowflakeErrors; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; @@ -24,7 +25,6 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Map; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; import org.apache.avro.Conversions; import org.apache.avro.Schema; import org.apache.avro.SchemaParseException; diff --git a/src/main/java/com/snowflake/kafka/connector/records/SnowflakeAvroConverterWithoutSchemaRegistry.java b/src/main/java/com/snowflake/kafka/connector/records/SnowflakeAvroConverterWithoutSchemaRegistry.java index e00d8a976..139e75c4e 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/SnowflakeAvroConverterWithoutSchemaRegistry.java +++ b/src/main/java/com/snowflake/kafka/connector/records/SnowflakeAvroConverterWithoutSchemaRegistry.java @@ -16,10 +16,10 @@ */ package com.snowflake.kafka.connector.records; +import com.fasterxml.jackson.databind.JsonNode; import com.snowflake.kafka.connector.internal.SnowflakeErrors; import java.io.IOException; import java.util.ArrayList; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; import org.apache.avro.file.DataFileReader; import org.apache.avro.file.SeekableByteArrayInput; import org.apache.avro.generic.GenericDatumReader; diff --git a/src/main/java/com/snowflake/kafka/connector/records/SnowflakeConverter.java b/src/main/java/com/snowflake/kafka/connector/records/SnowflakeConverter.java index 20acf4eed..d53ce3496 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/SnowflakeConverter.java +++ b/src/main/java/com/snowflake/kafka/connector/records/SnowflakeConverter.java @@ -16,10 +16,10 @@ */ package com.snowflake.kafka.connector.records; +import com.fasterxml.jackson.databind.ObjectMapper; import com.snowflake.kafka.connector.internal.KCLogger; import com.snowflake.kafka.connector.internal.SnowflakeErrors; import java.util.Map; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.storage.Converter; diff --git a/src/main/java/com/snowflake/kafka/connector/records/SnowflakeRecordContent.java b/src/main/java/com/snowflake/kafka/connector/records/SnowflakeRecordContent.java index c88d7f3aa..bef5cc939 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/SnowflakeRecordContent.java +++ b/src/main/java/com/snowflake/kafka/connector/records/SnowflakeRecordContent.java @@ -1,8 +1,8 @@ package com.snowflake.kafka.connector.records; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.snowflake.kafka.connector.internal.SnowflakeErrors; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.connect.data.Schema; public class SnowflakeRecordContent { diff --git a/src/main/java/com/snowflake/kafka/connector/records/SnowflakeTableStreamingRecordMapper.java b/src/main/java/com/snowflake/kafka/connector/records/SnowflakeTableStreamingRecordMapper.java index 06e83c1c9..d666057d3 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/SnowflakeTableStreamingRecordMapper.java +++ b/src/main/java/com/snowflake/kafka/connector/records/SnowflakeTableStreamingRecordMapper.java @@ -3,14 +3,14 @@ import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_CONTENT; import static com.snowflake.kafka.connector.Utils.TABLE_COLUMN_METADATA; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.internal.SnowflakeErrors; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; class SnowflakeTableStreamingRecordMapper extends StreamingRecordMapper { diff --git a/src/main/java/com/snowflake/kafka/connector/records/StreamingRecordMapper.java b/src/main/java/com/snowflake/kafka/connector/records/StreamingRecordMapper.java index f58b751bf..f6a5c34c7 100644 --- a/src/main/java/com/snowflake/kafka/connector/records/StreamingRecordMapper.java +++ b/src/main/java/com/snowflake/kafka/connector/records/StreamingRecordMapper.java @@ -1,11 +1,11 @@ package com.snowflake.kafka.connector.records; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.NumericNode; import com.snowflake.kafka.connector.records.RecordService.SnowflakeTableRow; import java.util.Map; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.NumericNode; abstract class StreamingRecordMapper { diff --git a/src/test/java/com/snowflake/kafka/connector/SinkTaskIT.java b/src/test/java/com/snowflake/kafka/connector/SinkTaskIT.java index a6eb5c41c..84264e980 100644 --- a/src/test/java/com/snowflake/kafka/connector/SinkTaskIT.java +++ b/src/test/java/com/snowflake/kafka/connector/SinkTaskIT.java @@ -3,6 +3,7 @@ import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS_DEFAULT; import static com.snowflake.kafka.connector.internal.TestUtils.TEST_CONNECTOR_NAME; +import com.fasterxml.jackson.databind.ObjectMapper; import com.snowflake.kafka.connector.internal.KCLogger; import com.snowflake.kafka.connector.internal.SnowflakeConnectionService; import com.snowflake.kafka.connector.internal.TestUtils; @@ -11,7 +12,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Map; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.TimestampType; diff --git a/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskForStreamingIT.java b/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskForStreamingIT.java index 8a4fa1b5d..7cf891baf 100644 --- a/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskForStreamingIT.java +++ b/src/test/java/com/snowflake/kafka/connector/SnowflakeSinkTaskForStreamingIT.java @@ -3,6 +3,9 @@ import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS; import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.INGESTION_METHOD_OPT; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.snowflake.kafka.connector.internal.KCLogger; import com.snowflake.kafka.connector.internal.SnowflakeConnectionService; import com.snowflake.kafka.connector.internal.SnowflakeErrors; @@ -21,9 +24,6 @@ import java.util.Map; import java.util.Set; import java.util.stream.Stream; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.sink.SinkRecord; diff --git a/src/test/java/com/snowflake/kafka/connector/internal/MetaColumnIT.java b/src/test/java/com/snowflake/kafka/connector/internal/MetaColumnIT.java index bbb0a3478..dbc86932f 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/MetaColumnIT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/MetaColumnIT.java @@ -1,12 +1,12 @@ package com.snowflake.kafka.connector.internal; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.records.SnowflakeConverter; import com.snowflake.kafka.connector.records.SnowflakeJsonConverter; import java.nio.charset.StandardCharsets; import java.sql.ResultSet; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; diff --git a/src/test/java/com/snowflake/kafka/connector/internal/SinkServiceIT.java b/src/test/java/com/snowflake/kafka/connector/internal/SinkServiceIT.java index 20c2f23db..ab3cd0e9a 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/SinkServiceIT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/SinkServiceIT.java @@ -4,6 +4,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.*; +import com.fasterxml.jackson.databind.ObjectMapper; import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.records.SnowflakeConverter; @@ -14,7 +15,6 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.*; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; diff --git a/src/test/java/com/snowflake/kafka/connector/internal/SnowflakeTelemetryPipeStatusMetricsIT.java b/src/test/java/com/snowflake/kafka/connector/internal/SnowflakeTelemetryPipeStatusMetricsIT.java index ea0969292..b49be9e2f 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/SnowflakeTelemetryPipeStatusMetricsIT.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/SnowflakeTelemetryPipeStatusMetricsIT.java @@ -4,13 +4,13 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; +import com.fasterxml.jackson.databind.ObjectMapper; import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.internal.metrics.MetricsUtil; import com.snowflake.kafka.connector.records.SnowflakeConverter; import com.snowflake.kafka.connector.records.SnowflakeJsonConverter; import java.nio.charset.StandardCharsets; import java.util.Map; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; diff --git a/src/test/java/com/snowflake/kafka/connector/internal/StageFilesProcessorTest.java b/src/test/java/com/snowflake/kafka/connector/internal/StageFilesProcessorTest.java index 7a655d81c..1ae805269 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/StageFilesProcessorTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/StageFilesProcessorTest.java @@ -15,6 +15,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryBasicInfo; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryPipeCreation; @@ -35,7 +36,6 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.stream.Collectors; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; import net.snowflake.client.jdbc.internal.joda.time.DateTime; import net.snowflake.client.jdbc.internal.joda.time.DateTimeZone; import org.junit.jupiter.api.BeforeEach; diff --git a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java index 6865a79ff..6440239c9 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/TestUtils.java @@ -29,6 +29,8 @@ import static com.snowflake.kafka.connector.Utils.buildOAuthHttpPostRequest; import static com.snowflake.kafka.connector.Utils.getSnowflakeOAuthToken; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; @@ -70,8 +72,6 @@ import net.snowflake.client.jdbc.internal.apache.http.impl.client.CloseableHttpClient; import net.snowflake.client.jdbc.internal.apache.http.impl.client.HttpClientBuilder; import net.snowflake.client.jdbc.internal.apache.http.util.EntityUtils; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import net.snowflake.client.jdbc.internal.google.gson.JsonObject; import net.snowflake.client.jdbc.internal.google.gson.JsonParser; import org.apache.kafka.common.record.TimestampType; diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapperTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapperTest.java index 6adb0a1f3..f049bf606 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapperTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/iceberg/IcebergColumnTypeMapperTest.java @@ -3,9 +3,9 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; import java.util.stream.Stream; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.JsonNodeFactory; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/snowflake/SnowflakeColumnTypeMapperTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/snowflake/SnowflakeColumnTypeMapperTest.java index be7abb4d1..a454f03a3 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/snowflake/SnowflakeColumnTypeMapperTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/snowflake/SnowflakeColumnTypeMapperTest.java @@ -2,9 +2,9 @@ import static org.assertj.core.api.Assertions.assertThat; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; import java.util.stream.Stream; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.JsonNodeFactory; import org.apache.kafka.connect.data.Date; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; diff --git a/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/snowflake/SnowflakeTableSchemaResolverTest.java b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/snowflake/SnowflakeTableSchemaResolverTest.java index d5d11f781..dbe557a02 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/snowflake/SnowflakeTableSchemaResolverTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/streaming/schemaevolution/snowflake/SnowflakeTableSchemaResolverTest.java @@ -2,6 +2,8 @@ import static org.assertj.core.api.Assertions.*; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.internal.TestUtils; import com.snowflake.kafka.connector.internal.streaming.schemaevolution.ColumnInfos; @@ -11,8 +13,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.json.JsonConverter; diff --git a/src/test/java/com/snowflake/kafka/connector/records/AbstractMetaColumnTest.java b/src/test/java/com/snowflake/kafka/connector/records/AbstractMetaColumnTest.java index c23d53b5d..4e1851adb 100644 --- a/src/test/java/com/snowflake/kafka/connector/records/AbstractMetaColumnTest.java +++ b/src/test/java/com/snowflake/kafka/connector/records/AbstractMetaColumnTest.java @@ -17,6 +17,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.params.provider.Arguments.arguments; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.snowflake.kafka.connector.builder.SinkRecordBuilder; @@ -30,8 +32,6 @@ import java.util.Set; import java.util.stream.Stream; import javax.annotation.Nullable; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; diff --git a/src/test/java/com/snowflake/kafka/connector/records/ConverterTest.java b/src/test/java/com/snowflake/kafka/connector/records/ConverterTest.java index 651b2eda6..d5e17782d 100644 --- a/src/test/java/com/snowflake/kafka/connector/records/ConverterTest.java +++ b/src/test/java/com/snowflake/kafka/connector/records/ConverterTest.java @@ -24,6 +24,10 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException; import com.snowflake.kafka.connector.mock.MockSchemaRegistryClient; import io.confluent.connect.avro.AvroConverter; @@ -53,10 +57,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import net.snowflake.client.jdbc.internal.apache.commons.codec.binary.Hex; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; import org.apache.avro.LogicalTypes; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; diff --git a/src/test/java/com/snowflake/kafka/connector/records/HeaderTest.java b/src/test/java/com/snowflake/kafka/connector/records/HeaderTest.java index 4d3591866..68fc481b2 100644 --- a/src/test/java/com/snowflake/kafka/connector/records/HeaderTest.java +++ b/src/test/java/com/snowflake/kafka/connector/records/HeaderTest.java @@ -5,6 +5,8 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.math.BigDecimal; import java.text.SimpleDateFormat; @@ -15,8 +17,6 @@ import java.util.List; import java.util.Map; import java.util.TimeZone; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; diff --git a/src/test/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapperTest.java b/src/test/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapperTest.java index 226d04824..c037df25c 100644 --- a/src/test/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapperTest.java +++ b/src/test/java/com/snowflake/kafka/connector/records/IcebergTableStreamingRecordMapperTest.java @@ -3,6 +3,8 @@ import static com.snowflake.kafka.connector.streaming.iceberg.sql.PrimitiveJsonRecord.primitiveJsonExample; import static org.assertj.core.api.Assertions.assertThat; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.snowflake.kafka.connector.Utils; @@ -10,8 +12,6 @@ import java.util.HashMap; import java.util.Map; import java.util.stream.Stream; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; diff --git a/src/test/java/com/snowflake/kafka/connector/records/ProcessRecordTest.java b/src/test/java/com/snowflake/kafka/connector/records/ProcessRecordTest.java index c8cb098b7..1a7f553f8 100644 --- a/src/test/java/com/snowflake/kafka/connector/records/ProcessRecordTest.java +++ b/src/test/java/com/snowflake/kafka/connector/records/ProcessRecordTest.java @@ -2,6 +2,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.snowflake.kafka.connector.mock.MockSchemaRegistryClient; import java.io.IOException; import java.net.URL; @@ -9,8 +11,6 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.Arrays; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.sink.SinkRecord; diff --git a/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java b/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java index e1796018b..1e43f9660 100644 --- a/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java +++ b/src/test/java/com/snowflake/kafka/connector/records/RecordContentTest.java @@ -5,6 +5,10 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.snowflake.kafka.connector.builder.SinkRecordBuilder; import com.snowflake.kafka.connector.internal.SnowflakeErrors; import com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException; @@ -17,10 +21,6 @@ import java.util.Collections; import java.util.Map; import java.util.stream.Stream; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.type.TypeReference; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.SchemaBuilder; diff --git a/src/test/java/com/snowflake/kafka/connector/records/SnowpipeMetaColumnTest.java b/src/test/java/com/snowflake/kafka/connector/records/SnowpipeMetaColumnTest.java index f3b308820..ea694c1ee 100644 --- a/src/test/java/com/snowflake/kafka/connector/records/SnowpipeMetaColumnTest.java +++ b/src/test/java/com/snowflake/kafka/connector/records/SnowpipeMetaColumnTest.java @@ -6,13 +6,13 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.snowflake.kafka.connector.builder.SinkRecordBuilder; import java.util.Map; import javax.annotation.Nullable; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.jupiter.api.Test; diff --git a/src/test/java/com/snowflake/kafka/connector/records/SnowpipeStreamingMetaColumnTest.java b/src/test/java/com/snowflake/kafka/connector/records/SnowpipeStreamingMetaColumnTest.java index 895d0bacc..b3a6a3421 100644 --- a/src/test/java/com/snowflake/kafka/connector/records/SnowpipeStreamingMetaColumnTest.java +++ b/src/test/java/com/snowflake/kafka/connector/records/SnowpipeStreamingMetaColumnTest.java @@ -7,6 +7,9 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.builder.SinkRecordBuilder; @@ -16,9 +19,6 @@ import java.util.Map; import java.util.Optional; import javax.annotation.Nullable; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.sink.SinkRecord; import org.junit.jupiter.api.Test; diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/ComplexJsonRecord.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/ComplexJsonRecord.java index b8d88fa70..92adbb66d 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/ComplexJsonRecord.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/ComplexJsonRecord.java @@ -1,7 +1,10 @@ package com.snowflake.kafka.connector.streaming.iceberg.sql; -import static net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; +import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord.RecordWithMetadata; @@ -11,9 +14,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonCreator; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonProperty; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import org.assertj.core.api.Assertions; public class ComplexJsonRecord { diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/MetadataRecord.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/MetadataRecord.java index c0f7de137..947893b6b 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/MetadataRecord.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/MetadataRecord.java @@ -1,14 +1,14 @@ package com.snowflake.kafka.connector.streaming.iceberg.sql; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; import com.snowflake.kafka.connector.Utils; import java.io.IOException; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Map; import java.util.Objects; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonCreator; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonProperty; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import org.assertj.core.api.Assertions; public class MetadataRecord { diff --git a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/PrimitiveJsonRecord.java b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/PrimitiveJsonRecord.java index a195e21ed..9058a0ae7 100644 --- a/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/PrimitiveJsonRecord.java +++ b/src/test/java/com/snowflake/kafka/connector/streaming/iceberg/sql/PrimitiveJsonRecord.java @@ -1,7 +1,10 @@ package com.snowflake.kafka.connector.streaming.iceberg.sql; -import static net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; +import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.streaming.iceberg.sql.MetadataRecord.RecordWithMetadata; import java.io.IOException; @@ -10,9 +13,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Objects; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonCreator; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.annotation.JsonProperty; -import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; import org.assertj.core.api.Assertions; public class PrimitiveJsonRecord { From 48579aa30a298982b5ef00577d4b9311b7f08fbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Z=C4=85bek?= Date: Tue, 12 Nov 2024 13:22:01 +0100 Subject: [PATCH 2/2] 2nd approach use jdbc.internal.fasterxml only in telemetry services --- .../SnowflakeTelemetryChannelCreation.java | 2 +- .../SnowflakeTelemetryChannelStatus.java | 2 +- .../SnowflakeTelemetryServiceV2.java | 2 +- .../SnowflakeTelemetryBasicInfo.java | 2 +- .../SnowflakeTelemetryPipeCreation.java | 2 +- .../SnowflakeTelemetryPipeStatus.java | 2 +- .../telemetry/SnowflakeTelemetryService.java | 28 ++++--------------- .../SnowflakeTelemetryServiceV1.java | 2 +- .../internal/StageFilesProcessorTest.java | 2 +- 9 files changed, 13 insertions(+), 31 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelCreation.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelCreation.java index 68c76018a..9de9ef32a 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelCreation.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelCreation.java @@ -22,9 +22,9 @@ import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.TOPIC_PARTITION_CHANNEL_CREATION_TIME; import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.TOPIC_PARTITION_CHANNEL_NAME; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryBasicInfo; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; /** * This object is sent only once when a channel starts. No concurrent modification is made on this diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelStatus.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelStatus.java index 71adffb70..2c8018c0f 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelStatus.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelStatus.java @@ -22,7 +22,6 @@ import com.codahale.metrics.Gauge; import com.codahale.metrics.MetricRegistry; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; import com.snowflake.kafka.connector.internal.metrics.MetricsJmxReporter; import com.snowflake.kafka.connector.internal.metrics.MetricsUtil; @@ -30,6 +29,7 @@ import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService; import com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants; import java.util.concurrent.atomic.AtomicLong; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; /** * Extension of {@link SnowflakeTelemetryBasicInfo} class used to send data to snowflake when the diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryServiceV2.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryServiceV2.java index ede3292e1..e866eb4f4 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryServiceV2.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryServiceV2.java @@ -17,11 +17,11 @@ package com.snowflake.kafka.connector.internal.streaming.telemetry; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService; import java.sql.Connection; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; import net.snowflake.client.jdbc.telemetry.Telemetry; import net.snowflake.client.jdbc.telemetry.TelemetryClient; diff --git a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryBasicInfo.java b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryBasicInfo.java index 65fc7edc7..33686fa35 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryBasicInfo.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryBasicInfo.java @@ -1,9 +1,9 @@ package com.snowflake.kafka.connector.internal.telemetry; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.snowflake.kafka.connector.internal.KCLogger; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; /** Minimum information needed to sent to Snowflake through Telemetry API */ public abstract class SnowflakeTelemetryBasicInfo { diff --git a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeCreation.java b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeCreation.java index 3e41701de..a0ef9b3af 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeCreation.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeCreation.java @@ -10,7 +10,7 @@ import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.START_TIME; import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.TABLE_NAME; -import com.fasterxml.jackson.databind.node.ObjectNode; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; /** * This object is send only once when pipe starts No concurrent modification is made on this object, diff --git a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeStatus.java b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeStatus.java index 052b4b774..65f29ff1f 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeStatus.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeStatus.java @@ -36,7 +36,6 @@ import com.codahale.metrics.Metric; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import com.snowflake.kafka.connector.internal.metrics.MetricsJmxReporter; @@ -49,6 +48,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.LongUnaryOperator; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; /** * Extension of {@link SnowflakeTelemetryBasicInfo} class used to send data to snowflake diff --git a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryService.java b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryService.java index 7cde1030f..2ad46e4f9 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryService.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryService.java @@ -2,14 +2,14 @@ import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.*; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.internal.KCLogger; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; import java.util.Map; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; import net.snowflake.client.jdbc.telemetry.Telemetry; import net.snowflake.client.jdbc.telemetry.TelemetryUtil; import org.apache.kafka.common.utils.AppInfoParser; @@ -197,32 +197,14 @@ protected ObjectNode getDefaultObjectNode(IngestionMethodConfig ingestionMethodC * * * - *

NOTE! com.fasterxml is mapped to net.snowflake.client.jdbc.internal.fasterxml. The reasoning - * is to unbound rest of the connector from jdbc driver type. - * * @param type type of Data * @param data JsonData to wrap in a json field called data */ protected void send(SnowflakeTelemetryService.TelemetryType type, JsonNode data) { - net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode telemetryData; - net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper objectMapper = - new net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper(); - try { - telemetryData = - objectMapper.readValue( - data.toString(), - net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode.class); - } catch (net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException e) { - LOGGER.error( - "Failed to parse telemetry data: {}, Error: {}", data.toString(), e.getMessage()); - telemetryData = objectMapper.createObjectNode(); - } - net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode msg = - new net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper() - .createObjectNode(); + ObjectNode msg = MAPPER.createObjectNode(); msg.put(SOURCE, KAFKA_CONNECTOR); msg.put(TYPE, type.toString()); - msg.set(DATA, telemetryData); + msg.set(DATA, data); msg.put(VERSION, Utils.VERSION); // version number try { telemetry.addLogToBatch(TelemetryUtil.buildJobData(msg)); diff --git a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryServiceV1.java index 2343e6e31..bba06b7e1 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryServiceV1.java @@ -1,9 +1,9 @@ package com.snowflake.kafka.connector.internal.telemetry; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; import java.sql.Connection; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; import net.snowflake.client.jdbc.telemetry.Telemetry; import net.snowflake.client.jdbc.telemetry.TelemetryClient; diff --git a/src/test/java/com/snowflake/kafka/connector/internal/StageFilesProcessorTest.java b/src/test/java/com/snowflake/kafka/connector/internal/StageFilesProcessorTest.java index 1ae805269..7a655d81c 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/StageFilesProcessorTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/StageFilesProcessorTest.java @@ -15,7 +15,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryBasicInfo; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryPipeCreation; @@ -36,6 +35,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.stream.Collectors; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; import net.snowflake.client.jdbc.internal.joda.time.DateTime; import net.snowflake.client.jdbc.internal.joda.time.DateTimeZone; import org.junit.jupiter.api.BeforeEach;