From 48579aa30a298982b5ef00577d4b9311b7f08fbe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Z=C4=85bek?= Date: Tue, 12 Nov 2024 13:22:01 +0100 Subject: [PATCH] 2nd approach use jdbc.internal.fasterxml only in telemetry services --- .../SnowflakeTelemetryChannelCreation.java | 2 +- .../SnowflakeTelemetryChannelStatus.java | 2 +- .../SnowflakeTelemetryServiceV2.java | 2 +- .../SnowflakeTelemetryBasicInfo.java | 2 +- .../SnowflakeTelemetryPipeCreation.java | 2 +- .../SnowflakeTelemetryPipeStatus.java | 2 +- .../telemetry/SnowflakeTelemetryService.java | 28 ++++--------------- .../SnowflakeTelemetryServiceV1.java | 2 +- .../internal/StageFilesProcessorTest.java | 2 +- 9 files changed, 13 insertions(+), 31 deletions(-) diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelCreation.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelCreation.java index 68c76018a..9de9ef32a 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelCreation.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelCreation.java @@ -22,9 +22,9 @@ import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.TOPIC_PARTITION_CHANNEL_CREATION_TIME; import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.TOPIC_PARTITION_CHANNEL_NAME; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryBasicInfo; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; /** * This object is sent only once when a channel starts. No concurrent modification is made on this diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelStatus.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelStatus.java index 71adffb70..2c8018c0f 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelStatus.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelStatus.java @@ -22,7 +22,6 @@ import com.codahale.metrics.Gauge; import com.codahale.metrics.MetricRegistry; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; import com.snowflake.kafka.connector.internal.metrics.MetricsJmxReporter; import com.snowflake.kafka.connector.internal.metrics.MetricsUtil; @@ -30,6 +29,7 @@ import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService; import com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants; import java.util.concurrent.atomic.AtomicLong; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; /** * Extension of {@link SnowflakeTelemetryBasicInfo} class used to send data to snowflake when the diff --git a/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryServiceV2.java b/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryServiceV2.java index ede3292e1..e866eb4f4 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryServiceV2.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryServiceV2.java @@ -17,11 +17,11 @@ package com.snowflake.kafka.connector.internal.streaming.telemetry; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService; import java.sql.Connection; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; import net.snowflake.client.jdbc.telemetry.Telemetry; import net.snowflake.client.jdbc.telemetry.TelemetryClient; diff --git a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryBasicInfo.java b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryBasicInfo.java index 65fc7edc7..33686fa35 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryBasicInfo.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryBasicInfo.java @@ -1,9 +1,9 @@ package com.snowflake.kafka.connector.internal.telemetry; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.snowflake.kafka.connector.internal.KCLogger; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; /** Minimum information needed to sent to Snowflake through Telemetry API */ public abstract class SnowflakeTelemetryBasicInfo { diff --git a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeCreation.java b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeCreation.java index 3e41701de..a0ef9b3af 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeCreation.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeCreation.java @@ -10,7 +10,7 @@ import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.START_TIME; import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.TABLE_NAME; -import com.fasterxml.jackson.databind.node.ObjectNode; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; /** * This object is send only once when pipe starts No concurrent modification is made on this object, diff --git a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeStatus.java b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeStatus.java index 052b4b774..65f29ff1f 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeStatus.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeStatus.java @@ -36,7 +36,6 @@ import com.codahale.metrics.Metric; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Maps; import com.snowflake.kafka.connector.internal.metrics.MetricsJmxReporter; @@ -49,6 +48,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.LongUnaryOperator; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; /** * Extension of {@link SnowflakeTelemetryBasicInfo} class used to send data to snowflake diff --git a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryService.java b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryService.java index 7cde1030f..2ad46e4f9 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryService.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryService.java @@ -2,14 +2,14 @@ import static com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig.*; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig; import com.snowflake.kafka.connector.Utils; import com.snowflake.kafka.connector.internal.KCLogger; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; import java.util.Map; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.JsonNode; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; import net.snowflake.client.jdbc.telemetry.Telemetry; import net.snowflake.client.jdbc.telemetry.TelemetryUtil; import org.apache.kafka.common.utils.AppInfoParser; @@ -197,32 +197,14 @@ protected ObjectNode getDefaultObjectNode(IngestionMethodConfig ingestionMethodC * * * - *

NOTE! com.fasterxml is mapped to net.snowflake.client.jdbc.internal.fasterxml. The reasoning - * is to unbound rest of the connector from jdbc driver type. - * * @param type type of Data * @param data JsonData to wrap in a json field called data */ protected void send(SnowflakeTelemetryService.TelemetryType type, JsonNode data) { - net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode telemetryData; - net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper objectMapper = - new net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper(); - try { - telemetryData = - objectMapper.readValue( - data.toString(), - net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode.class); - } catch (net.snowflake.client.jdbc.internal.fasterxml.jackson.core.JsonProcessingException e) { - LOGGER.error( - "Failed to parse telemetry data: {}, Error: {}", data.toString(), e.getMessage()); - telemetryData = objectMapper.createObjectNode(); - } - net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode msg = - new net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.ObjectMapper() - .createObjectNode(); + ObjectNode msg = MAPPER.createObjectNode(); msg.put(SOURCE, KAFKA_CONNECTOR); msg.put(TYPE, type.toString()); - msg.set(DATA, telemetryData); + msg.set(DATA, data); msg.put(VERSION, Utils.VERSION); // version number try { telemetry.addLogToBatch(TelemetryUtil.buildJobData(msg)); diff --git a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryServiceV1.java index 2343e6e31..bba06b7e1 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryServiceV1.java @@ -1,9 +1,9 @@ package com.snowflake.kafka.connector.internal.telemetry; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; import java.sql.Connection; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; import net.snowflake.client.jdbc.telemetry.Telemetry; import net.snowflake.client.jdbc.telemetry.TelemetryClient; diff --git a/src/test/java/com/snowflake/kafka/connector/internal/StageFilesProcessorTest.java b/src/test/java/com/snowflake/kafka/connector/internal/StageFilesProcessorTest.java index 1ae805269..7a655d81c 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/StageFilesProcessorTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/StageFilesProcessorTest.java @@ -15,7 +15,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryBasicInfo; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryPipeCreation; @@ -36,6 +35,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.stream.Collectors; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; import net.snowflake.client.jdbc.internal.joda.time.DateTime; import net.snowflake.client.jdbc.internal.joda.time.DateTimeZone; import org.junit.jupiter.api.BeforeEach;