Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Streaming Telemetry 2][SNOW-899866] Enables reportKafkaPartitionStart/Usage telemetry for Streaming #694

Merged
merged 35 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
0775e19
initial imp
sfc-gh-rcheng Aug 25, 2023
273c8d7
autoformatting
sfc-gh-rcheng Aug 25, 2023
340d0d0
actually send telem
sfc-gh-rcheng Aug 26, 2023
ad0a854
autoformatting
sfc-gh-rcheng Aug 26, 2023
ec5352e
refactoer test
sfc-gh-rcheng Aug 28, 2023
59af12c
added creation
sfc-gh-rcheng Aug 28, 2023
038aa7f
autoformatting
sfc-gh-rcheng Aug 28, 2023
ad9b223
make telem nonnull required
sfc-gh-rcheng Aug 28, 2023
f576c0f
autoformatting
sfc-gh-rcheng Aug 28, 2023
92bf65f
personal nits
sfc-gh-rcheng Aug 30, 2023
16833c6
refactoring
sfc-gh-rcheng Aug 30, 2023
59786f0
autoformatting
sfc-gh-rcheng Aug 30, 2023
1de191c
merge part 1
sfc-gh-rcheng Sep 7, 2023
1bc3dfb
push for comparison
sfc-gh-rcheng Sep 7, 2023
9a281a7
autoformatting
sfc-gh-rcheng Sep 7, 2023
ba46a21
fix tests
sfc-gh-rcheng Sep 7, 2023
25b2220
autoformatting
sfc-gh-rcheng Sep 7, 2023
d2c71e5
pr reviews
sfc-gh-rcheng Sep 11, 2023
2ffa3bd
autoformatting
sfc-gh-rcheng Sep 11, 2023
3388e25
add channel open
sfc-gh-rcheng Sep 12, 2023
ffa0b7b
add to metrics
sfc-gh-rcheng Sep 12, 2023
fd9e692
autoformatting
sfc-gh-rcheng Sep 12, 2023
fbf7719
pr reviews
sfc-gh-rcheng Sep 13, 2023
262ecc6
autoformatting
sfc-gh-rcheng Sep 13, 2023
766b770
remove channel open count
sfc-gh-rcheng Sep 13, 2023
0faac75
start time -> channel creation time
sfc-gh-rcheng Sep 13, 2023
9d5943d
autoformatting
sfc-gh-rcheng Sep 13, 2023
a47cbcf
clarify to tpchannel
sfc-gh-rcheng Sep 13, 2023
09855b2
autoformatting
sfc-gh-rcheng Sep 13, 2023
56dffbd
no streaming subclass
sfc-gh-rcheng Sep 13, 2023
b2c036d
autoformatting
sfc-gh-rcheng Sep 13, 2023
a951535
tp -> topicparition and comment
sfc-gh-rcheng Sep 13, 2023
947b091
autoformatting
sfc-gh-rcheng Sep 13, 2023
d8a8226
personal nits
sfc-gh-rcheng Sep 13, 2023
6b23cdc
fix npe
sfc-gh-rcheng Sep 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@

/** All metrics related constants. Mainly for JMX */
public class MetricsUtil {
public class Streaming {
/**
* See {@link com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel} for offset
* description
*/
public static final String OFFSET_PERSISTED_IN_SNOWFLAKE = "persisted-in-snowflake-offset";

public static final String LATEST_CONSUMER_OFFSET = "latest-consumer-offset";
}

public static final String JMX_METRIC_PREFIX = "snowflake.kafka.connector";

// Offset related constants
Expand Down Expand Up @@ -70,14 +80,6 @@ public class MetricsUtil {
*/
public static final String PURGED_OFFSET = "purged-offset";

/**
* See {@link com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel} for offset
* description
*/
public static final String OFFSET_PERSISTED_IN_SNOWFLAKE = "persisted-in-snowflake-offset";

public static final String LATEST_CONSUMER_OFFSET = "latest-consumer-offset";

// Buffer related constants
public static final String BUFFER_SUB_DOMAIN = "buffer";

Expand All @@ -91,6 +93,9 @@ public class MetricsUtil {

public static final String LATENCY_SUB_DOMAIN = "latencies";

// Partition related constants
public static final String PARTITION_SUB_DOMAIN = "partition";

public enum EventType {
/**
* Time difference between the record put into kafka to record fetched into Kafka Connector Can
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.snowflake.kafka.connector.internal.PartitionBuffer;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.metrics.MetricsJmxReporter;
import com.snowflake.kafka.connector.internal.streaming.telemetry.SnowflakeTelemetryChannelCreation;
import com.snowflake.kafka.connector.internal.streaming.telemetry.SnowflakeTelemetryChannelStatus;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import com.snowflake.kafka.connector.records.RecordService;
Expand Down Expand Up @@ -243,6 +244,8 @@ public TopicPartitionChannel(
SnowflakeTelemetryService telemetryService,
boolean enableCustomJMXMonitoring,
MetricsJmxReporter metricsJmxReporter) {
final long startTime = System.currentTimeMillis();

this.streamingIngestClient = Preconditions.checkNotNull(streamingIngestClient);
Preconditions.checkState(!streamingIngestClient.isClosed());
this.topicPartition = Preconditions.checkNotNull(topicPartition);
Expand Down Expand Up @@ -280,15 +283,23 @@ public TopicPartitionChannel(
this.processedOffset.set(lastCommittedOffsetToken);

// setup telemetry and metrics
String connectorName =
conn.getConnectorName() == null || conn.getConnectorName().isEmpty()
? "default_connector_name"
: conn.getConnectorName();
this.snowflakeTelemetryChannelStatus =
new SnowflakeTelemetryChannelStatus(
tableName,
connectorName,
channelName,
startTime,
enableCustomJMXMonitoring,
metricsJmxReporter,
this.offsetPersistedInSnowflake,
this.processedOffset,
this.latestConsumerOffset);
this.telemetryServiceV2.reportKafkaPartitionStart(
new SnowflakeTelemetryChannelCreation(this.tableName, this.channelName, startTime));

if (lastCommittedOffsetToken != NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
this.sinkTaskContext.offset(this.topicPartition, lastCommittedOffsetToken + 1L);
Expand Down Expand Up @@ -1044,6 +1055,9 @@ private SnowflakeStreamingIngestChannel openChannelForTable() {
public void closeChannel() {
try {
this.channel.close().get();

// telemetry and metrics
this.telemetryServiceV2.reportKafkaPartitionUsage(this.snowflakeTelemetryChannelStatus, true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we send this somewhere periodically instead of just during close?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be ideal, but currently we don't have anything running periodically in the background. InsertRows is an option, but I'm worried about overloading the telemetry service since we have no control over how often the customer flushes their buffer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it something we plan to add in the future? I think we send the usage for Snowpipe periodically. Otherwise I feel like this is not very useful since there're no usage reported if there is no open/close on the channel

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not have any plans to add that in the future. It could potentially be a part of the flush service.

Currently we not periodically send Snowpipe usage. reportKafkaPartition() is called on on close and when the cleaner runs, which only runs on the first insert to initialize a pipe

this.snowflakeTelemetryChannelStatus.tryUnregisterChannelJMXMetrics();
} catch (InterruptedException | ExecutionException e) {
final String errMsg =
Expand Down Expand Up @@ -1089,6 +1103,16 @@ protected long getOffsetPersistedInSnowflake() {
return this.offsetPersistedInSnowflake.get();
}

@VisibleForTesting
protected long getProcessedOffset() {
return this.processedOffset.get();
}

@VisibleForTesting
protected long getLatestConsumerOffset() {
return this.latestConsumerOffset.get();
}

@VisibleForTesting
protected boolean isPartitionBufferEmpty() {
return streamingBuffer.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2023 Snowflake Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.snowflake.kafka.connector.internal.streaming.telemetry;

import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.IS_REUSE_TABLE;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.Streaming.TP_CHANNEL_CREATION_TIME;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.Streaming.TP_CHANNEL_NAME;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.TABLE_NAME;

import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryBasicInfo;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode;

/**
* This object is sent only once when a channel starts. No concurrent modification is made on this
* object, thus no lock is required.
*/
public class SnowflakeTelemetryChannelCreation extends SnowflakeTelemetryBasicInfo {
private final long tpChannelCreationTime; // start time of the channel
private final String tpChannelName;
private boolean isReuseTable = false; // is the channel reusing existing table
sfc-gh-rcheng marked this conversation as resolved.
Show resolved Hide resolved

public SnowflakeTelemetryChannelCreation(
final String tableName, final String channelName, final long startTime) {
super(tableName, SnowflakeTelemetryService.TelemetryType.KAFKA_CHANNEL_START);
this.tpChannelName = channelName;
this.tpChannelCreationTime = startTime;
}

@Override
public void dumpTo(ObjectNode msg) {
msg.put(TABLE_NAME, this.tableName);
msg.put(TP_CHANNEL_NAME, this.tpChannelName);

msg.put(IS_REUSE_TABLE, this.isReuseTable);
msg.put(TP_CHANNEL_CREATION_TIME, tpChannelCreationTime);
}

@Override
public boolean isEmpty() {
throw new IllegalStateException(
"Empty function doesnt apply to:" + this.getClass().getSimpleName());
}

public void setReuseTable(boolean reuseTable) {
isReuseTable = reuseTable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,33 @@
package com.snowflake.kafka.connector.internal.streaming.telemetry;

import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.constructMetricName;
import static com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.snowflake.kafka.connector.internal.KCLogger;
import com.google.common.annotations.VisibleForTesting;
import com.snowflake.kafka.connector.internal.metrics.MetricsJmxReporter;
import com.snowflake.kafka.connector.internal.metrics.MetricsUtil;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryBasicInfo;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants;
import java.util.concurrent.atomic.AtomicLong;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode;

public class SnowflakeTelemetryChannelStatus {
private static final KCLogger LOGGER =
new KCLogger(SnowflakeTelemetryChannelStatus.class.toString());

/**
* Extension of {@link SnowflakeTelemetryBasicInfo} class used to send data to snowflake when the
* TopicPartitionChannel closes. Also creates and registers various metrics with JMX
*
* <p>Most of the data sent to Snowflake is an aggregated data.
*/
public class SnowflakeTelemetryChannelStatus extends SnowflakeTelemetryBasicInfo {
public static final long NUM_METRICS = 3; // update when new metrics are added

// channel properties
private final String connectorName;
private final String channelName;
private final MetricsJmxReporter metricsJmxReporter;
private final long channelCreationTime;

// offsets
private final AtomicLong offsetPersistedInSnowflake;
Expand All @@ -43,8 +53,8 @@ public class SnowflakeTelemetryChannelStatus {

/**
* Creates a new object tracking {@link
* com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel} metrics with JMX
* TODO @rcheng: update comment when extends telemetryBasicInfo
* com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel} metrics with JMX and
* send telemetry data to snowflake
*
* @param tableName the table the channel is ingesting to
* @param channelName the name of the TopicPartitionChannel to track
Expand All @@ -53,12 +63,18 @@ public class SnowflakeTelemetryChannelStatus {
*/
public SnowflakeTelemetryChannelStatus(
final String tableName,
final String connectorName,
final String channelName,
final long startTime,
final boolean enableCustomJMXConfig,
final MetricsJmxReporter metricsJmxReporter,
AtomicLong offsetPersistedInSnowflake,
AtomicLong processedOffset,
AtomicLong latestConsumerOffset) {
final AtomicLong offsetPersistedInSnowflake,
final AtomicLong processedOffset,
final AtomicLong latestConsumerOffset) {
super(tableName, SnowflakeTelemetryService.TelemetryType.KAFKA_CHANNEL_USAGE);

this.channelCreationTime = startTime;
this.connectorName = connectorName;
this.channelName = channelName;
this.metricsJmxReporter = metricsJmxReporter;

Expand All @@ -75,6 +91,30 @@ public SnowflakeTelemetryChannelStatus(
}
}

@Override
public boolean isEmpty() {
sfc-gh-rcheng marked this conversation as resolved.
Show resolved Hide resolved
// Check that all properties are still at the default value.
return this.offsetPersistedInSnowflake.get() == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE
&& this.processedOffset.get() == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE
&& this.latestConsumerOffset.get() == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE;
}

@Override
public void dumpTo(ObjectNode msg) {
msg.put(TelemetryConstants.TABLE_NAME, this.tableName);
msg.put(TelemetryConstants.CONNECTOR_NAME, this.connectorName);
msg.put(TelemetryConstants.Streaming.TP_CHANNEL_NAME, this.channelName);

msg.put(
TelemetryConstants.Streaming.OFFSET_PERSISTED_IN_SNOWFLAKE,
this.offsetPersistedInSnowflake.get());
msg.put(TelemetryConstants.PROCESSED_OFFSET, this.processedOffset.get());
msg.put(TelemetryConstants.Streaming.LATEST_CONSUMER_OFFSET, this.latestConsumerOffset.get());

msg.put(TelemetryConstants.Streaming.TP_CHANNEL_CREATION_TIME, this.channelCreationTime);
msg.put(TelemetryConstants.Streaming.TP_CHANNEL_CLOSE_TIME, System.currentTimeMillis());
}

/** Registers all the Metrics inside the metricRegistry. */
private void registerChannelJMXMetrics() {
LOGGER.debug(
Expand All @@ -91,7 +131,7 @@ private void registerChannelJMXMetrics() {
constructMetricName(
this.channelName,
MetricsUtil.OFFSET_SUB_DOMAIN,
MetricsUtil.OFFSET_PERSISTED_IN_SNOWFLAKE),
MetricsUtil.Streaming.OFFSET_PERSISTED_IN_SNOWFLAKE),
(Gauge<Long>) this.offsetPersistedInSnowflake::get);

currentMetricRegistry.register(
Expand All @@ -101,7 +141,9 @@ private void registerChannelJMXMetrics() {

currentMetricRegistry.register(
constructMetricName(
this.channelName, MetricsUtil.OFFSET_SUB_DOMAIN, MetricsUtil.LATEST_CONSUMER_OFFSET),
this.channelName,
MetricsUtil.OFFSET_SUB_DOMAIN,
MetricsUtil.Streaming.LATEST_CONSUMER_OFFSET),
(Gauge<Long>) this.latestConsumerOffset::get);
} catch (IllegalArgumentException ex) {
LOGGER.warn("Metrics already present:{}", ex.getMessage());
Expand Down Expand Up @@ -129,4 +171,19 @@ public void tryUnregisterChannelJMXMetrics() {
public MetricsJmxReporter getMetricsJmxReporter() {
return this.metricsJmxReporter;
}

@VisibleForTesting
public long getOffsetPersistedInSnowflake() {
return this.offsetPersistedInSnowflake.get();
}

@VisibleForTesting
public long getProcessedOffset() {
return this.processedOffset.get();
}

@VisibleForTesting
public long getLatestConsumerOffset() {
return this.latestConsumerOffset.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
* under the License.
*/

package com.snowflake.kafka.connector.internal.streaming;
package com.snowflake.kafka.connector.internal.streaming.telemetry;

import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryBasicInfo;
import com.google.common.annotations.VisibleForTesting;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import java.sql.Connection;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode;
import net.snowflake.client.jdbc.telemetry.Telemetry;
import net.snowflake.client.jdbc.telemetry.TelemetryClient;

/**
Expand All @@ -38,10 +40,9 @@ public SnowflakeTelemetryServiceV2(Connection conn) {
this.telemetry = TelemetryClient.createTelemetry(conn);
}

@Override
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved reportKafkaPartitionUsage to the parent SnowflakeTelemetryService

public void reportKafkaPartitionUsage(
SnowflakeTelemetryBasicInfo partitionStatus, boolean isClosing) {
throw new IllegalStateException("Snowpipe Streaming Doesnt Have Pipe Usage");
@VisibleForTesting
public SnowflakeTelemetryServiceV2(Telemetry telemetry) {
this.telemetry = telemetry;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,22 @@

/** Minimum information needed to sent to Snowflake through Telemetry API */
public abstract class SnowflakeTelemetryBasicInfo {
final String tableName;
public final String tableName;
public final SnowflakeTelemetryService.TelemetryType telemetryType;

static final KCLogger LOGGER = new KCLogger(SnowflakeTelemetryBasicInfo.class.getName());
public static final KCLogger LOGGER = new KCLogger(SnowflakeTelemetryBasicInfo.class.getName());

/**
* Base Constructor. Accepts a tableName and StageName.
*
* @param tableName Checks for Nullability
*/
public SnowflakeTelemetryBasicInfo(final String tableName) {
public SnowflakeTelemetryBasicInfo(
final String tableName, SnowflakeTelemetryService.TelemetryType telemetryType) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pass telemetry type here to simplify reportKafkaPartitionUsage/Creation code paths across snowpipe and streaming

Preconditions.checkArgument(
!Strings.isNullOrEmpty(tableName), "tableName cannot be null or empty");
this.tableName = tableName;
this.telemetryType = telemetryType;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class SnowflakeTelemetryPipeCreation extends SnowflakeTelemetryBasicInfo

public SnowflakeTelemetryPipeCreation(
final String tableName, final String stageName, final String pipeName) {
super(tableName);
super(tableName, SnowflakeTelemetryService.TelemetryType.KAFKA_PIPE_START);
this.stageName = stageName;
this.pipeName = pipeName;
this.startTime = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public SnowflakeTelemetryPipeStatus(
final String pipeName,
final boolean enableCustomJMXConfig,
final MetricsJmxReporter metricsJmxReporter) {
super(tableName);
super(tableName, SnowflakeTelemetryService.TelemetryType.KAFKA_PIPE_USAGE);
this.stageName = stageName;
this.pipeName = pipeName;

Expand Down
Loading
Loading