Skip to content

Commit

Permalink
[Streaming Telemetry 2][SNOW-899866] Enables reportKafkaPartitionStar…
Browse files Browse the repository at this point in the history
…t/Usage telemetry for Streaming (snowflakedb#694)
  • Loading branch information
sfc-gh-rcheng authored and khsoneji committed Oct 12, 2023
1 parent 94f04e3 commit e3dddca
Show file tree
Hide file tree
Showing 19 changed files with 782 additions and 295 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,6 @@ public class MetricsUtil {
*/
public static final String PURGED_OFFSET = "purged-offset";

/**
* See {@link com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel} for offset
* description
*/
public static final String OFFSET_PERSISTED_IN_SNOWFLAKE = "persisted-in-snowflake-offset";

public static final String LATEST_CONSUMER_OFFSET = "latest-consumer-offset";

// Buffer related constants
public static final String BUFFER_SUB_DOMAIN = "buffer";

Expand All @@ -91,6 +83,16 @@ public class MetricsUtil {

public static final String LATENCY_SUB_DOMAIN = "latencies";

// ************ Streaming Constants ************//
/**
* See {@link com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel} for offset
* description
*/
public static final String OFFSET_PERSISTED_IN_SNOWFLAKE = "persisted-in-snowflake-offset";

public static final String LATEST_CONSUMER_OFFSET = "latest-consumer-offset";
// ********** ^ Streaming Constants ^ **********//

public enum EventType {
/**
* Time difference between the record put into kafka to record fetched into Kafka Connector Can
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.snowflake.kafka.connector.internal.PartitionBuffer;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.metrics.MetricsJmxReporter;
import com.snowflake.kafka.connector.internal.streaming.telemetry.SnowflakeTelemetryChannelCreation;
import com.snowflake.kafka.connector.internal.streaming.telemetry.SnowflakeTelemetryChannelStatus;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import com.snowflake.kafka.connector.records.RecordService;
Expand Down Expand Up @@ -243,6 +244,8 @@ public TopicPartitionChannel(
SnowflakeTelemetryService telemetryService,
boolean enableCustomJMXMonitoring,
MetricsJmxReporter metricsJmxReporter) {
final long startTime = System.currentTimeMillis();

this.streamingIngestClient = Preconditions.checkNotNull(streamingIngestClient);
Preconditions.checkState(!streamingIngestClient.isClosed());
this.topicPartition = Preconditions.checkNotNull(topicPartition);
Expand Down Expand Up @@ -280,15 +283,23 @@ public TopicPartitionChannel(
this.processedOffset.set(lastCommittedOffsetToken);

// setup telemetry and metrics
String connectorName =
conn == null || conn.getConnectorName() == null || conn.getConnectorName().isEmpty()
? "default_connector_name"
: conn.getConnectorName();
this.snowflakeTelemetryChannelStatus =
new SnowflakeTelemetryChannelStatus(
tableName,
connectorName,
channelName,
startTime,
enableCustomJMXMonitoring,
metricsJmxReporter,
this.offsetPersistedInSnowflake,
this.processedOffset,
this.latestConsumerOffset);
this.telemetryServiceV2.reportKafkaPartitionStart(
new SnowflakeTelemetryChannelCreation(this.tableName, this.channelName, startTime));

if (lastCommittedOffsetToken != NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE) {
this.sinkTaskContext.offset(this.topicPartition, lastCommittedOffsetToken + 1L);
Expand Down Expand Up @@ -1044,6 +1055,9 @@ private SnowflakeStreamingIngestChannel openChannelForTable() {
public void closeChannel() {
try {
this.channel.close().get();

// telemetry and metrics
this.telemetryServiceV2.reportKafkaPartitionUsage(this.snowflakeTelemetryChannelStatus, true);
this.snowflakeTelemetryChannelStatus.tryUnregisterChannelJMXMetrics();
} catch (InterruptedException | ExecutionException e) {
final String errMsg =
Expand Down Expand Up @@ -1089,6 +1103,16 @@ protected long getOffsetPersistedInSnowflake() {
return this.offsetPersistedInSnowflake.get();
}

@VisibleForTesting
protected long getProcessedOffset() {
return this.processedOffset.get();
}

@VisibleForTesting
protected long getLatestConsumerOffset() {
return this.latestConsumerOffset.get();
}

@VisibleForTesting
protected boolean isPartitionBufferEmpty() {
return streamingBuffer.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2023 Snowflake Inc. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package com.snowflake.kafka.connector.internal.streaming.telemetry;

import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.IS_REUSE_TABLE;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.TABLE_NAME;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.TOPIC_PARTITION_CHANNEL_CREATION_TIME;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.TOPIC_PARTITION_CHANNEL_NAME;

import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryBasicInfo;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode;

/**
* This object is sent only once when a channel starts. No concurrent modification is made on this
* object, thus no lock is required.
*/
public class SnowflakeTelemetryChannelCreation extends SnowflakeTelemetryBasicInfo {
private final long tpChannelCreationTime; // start time of the channel
private final String tpChannelName;
private boolean isReuseTable = false; // is the channel reusing existing table

public SnowflakeTelemetryChannelCreation(
final String tableName, final String channelName, final long startTime) {
super(tableName, SnowflakeTelemetryService.TelemetryType.KAFKA_CHANNEL_START);
this.tpChannelName = channelName;
this.tpChannelCreationTime = startTime;
}

@Override
public void dumpTo(ObjectNode msg) {
msg.put(TABLE_NAME, this.tableName);
msg.put(TOPIC_PARTITION_CHANNEL_NAME, this.tpChannelName);

msg.put(IS_REUSE_TABLE, this.isReuseTable);
msg.put(TOPIC_PARTITION_CHANNEL_CREATION_TIME, tpChannelCreationTime);
}

@Override
public boolean isEmpty() {
throw new IllegalStateException(
"Empty function doesnt apply to:" + this.getClass().getSimpleName());
}

public void setReuseTable(boolean reuseTable) {
isReuseTable = reuseTable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,33 @@
package com.snowflake.kafka.connector.internal.streaming.telemetry;

import static com.snowflake.kafka.connector.internal.metrics.MetricsUtil.constructMetricName;
import static com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE;

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.snowflake.kafka.connector.internal.KCLogger;
import com.google.common.annotations.VisibleForTesting;
import com.snowflake.kafka.connector.internal.metrics.MetricsJmxReporter;
import com.snowflake.kafka.connector.internal.metrics.MetricsUtil;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryBasicInfo;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants;
import java.util.concurrent.atomic.AtomicLong;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode;

public class SnowflakeTelemetryChannelStatus {
private static final KCLogger LOGGER =
new KCLogger(SnowflakeTelemetryChannelStatus.class.toString());

/**
* Extension of {@link SnowflakeTelemetryBasicInfo} class used to send data to snowflake when the
* TopicPartitionChannel closes. Also creates and registers various metrics with JMX
*
* <p>Most of the data sent to Snowflake is aggregated data.
*/
public class SnowflakeTelemetryChannelStatus extends SnowflakeTelemetryBasicInfo {
public static final long NUM_METRICS = 3; // update when new metrics are added

// channel properties
private final String connectorName;
private final String channelName;
private final MetricsJmxReporter metricsJmxReporter;
private final long channelCreationTime;

// offsets
private final AtomicLong offsetPersistedInSnowflake;
Expand All @@ -43,8 +53,8 @@ public class SnowflakeTelemetryChannelStatus {

/**
* Creates a new object tracking {@link
* com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel} metrics with JMX
* TODO @rcheng: update comment when extends telemetryBasicInfo
* com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel} metrics with JMX and
* send telemetry data to snowflake
*
* @param tableName the table the channel is ingesting to
* @param channelName the name of the TopicPartitionChannel to track
Expand All @@ -53,12 +63,18 @@ public class SnowflakeTelemetryChannelStatus {
*/
public SnowflakeTelemetryChannelStatus(
final String tableName,
final String connectorName,
final String channelName,
final long startTime,
final boolean enableCustomJMXConfig,
final MetricsJmxReporter metricsJmxReporter,
AtomicLong offsetPersistedInSnowflake,
AtomicLong processedOffset,
AtomicLong latestConsumerOffset) {
final AtomicLong offsetPersistedInSnowflake,
final AtomicLong processedOffset,
final AtomicLong latestConsumerOffset) {
super(tableName, SnowflakeTelemetryService.TelemetryType.KAFKA_CHANNEL_USAGE);

this.channelCreationTime = startTime;
this.connectorName = connectorName;
this.channelName = channelName;
this.metricsJmxReporter = metricsJmxReporter;

Expand All @@ -75,6 +91,29 @@ public SnowflakeTelemetryChannelStatus(
}
}

@Override
public boolean isEmpty() {
// Check that all properties are still at the default value.
return this.offsetPersistedInSnowflake.get() == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE
&& this.processedOffset.get() == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE
&& this.latestConsumerOffset.get() == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE;
}

@Override
public void dumpTo(ObjectNode msg) {
msg.put(TelemetryConstants.TABLE_NAME, this.tableName);
msg.put(TelemetryConstants.CONNECTOR_NAME, this.connectorName);
msg.put(TelemetryConstants.TOPIC_PARTITION_CHANNEL_NAME, this.channelName);

msg.put(
TelemetryConstants.OFFSET_PERSISTED_IN_SNOWFLAKE, this.offsetPersistedInSnowflake.get());
msg.put(TelemetryConstants.PROCESSED_OFFSET, this.processedOffset.get());
msg.put(TelemetryConstants.LATEST_CONSUMER_OFFSET, this.latestConsumerOffset.get());

msg.put(TelemetryConstants.TOPIC_PARTITION_CHANNEL_CREATION_TIME, this.channelCreationTime);
msg.put(TelemetryConstants.TOPIC_PARTITION_CHANNEL_CLOSE_TIME, System.currentTimeMillis());
}

/** Registers all the Metrics inside the metricRegistry. */
private void registerChannelJMXMetrics() {
LOGGER.debug(
Expand Down Expand Up @@ -129,4 +168,19 @@ public void tryUnregisterChannelJMXMetrics() {
public MetricsJmxReporter getMetricsJmxReporter() {
return this.metricsJmxReporter;
}

@VisibleForTesting
public long getOffsetPersistedInSnowflake() {
return this.offsetPersistedInSnowflake.get();
}

@VisibleForTesting
public long getProcessedOffset() {
return this.processedOffset.get();
}

@VisibleForTesting
public long getLatestConsumerOffset() {
return this.latestConsumerOffset.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
* under the License.
*/

package com.snowflake.kafka.connector.internal.streaming;
package com.snowflake.kafka.connector.internal.streaming.telemetry;

import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryBasicInfo;
import com.google.common.annotations.VisibleForTesting;
import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import java.sql.Connection;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode;
import net.snowflake.client.jdbc.telemetry.Telemetry;
import net.snowflake.client.jdbc.telemetry.TelemetryClient;

/**
Expand All @@ -38,10 +40,9 @@ public SnowflakeTelemetryServiceV2(Connection conn) {
this.telemetry = TelemetryClient.createTelemetry(conn);
}

@Override
public void reportKafkaPartitionUsage(
SnowflakeTelemetryBasicInfo partitionStatus, boolean isClosing) {
throw new IllegalStateException("Snowpipe Streaming Doesnt Have Pipe Usage");
@VisibleForTesting
public SnowflakeTelemetryServiceV2(Telemetry telemetry) {
this.telemetry = telemetry;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,22 @@

/** Minimum information needed to sent to Snowflake through Telemetry API */
public abstract class SnowflakeTelemetryBasicInfo {
final String tableName;
public final String tableName;
public final SnowflakeTelemetryService.TelemetryType telemetryType;

static final KCLogger LOGGER = new KCLogger(SnowflakeTelemetryBasicInfo.class.getName());
public static final KCLogger LOGGER = new KCLogger(SnowflakeTelemetryBasicInfo.class.getName());

/**
* Base Constructor. Accepts a tableName and StageName.
*
* @param tableName Checks for Nullability
*/
public SnowflakeTelemetryBasicInfo(final String tableName) {
public SnowflakeTelemetryBasicInfo(
final String tableName, SnowflakeTelemetryService.TelemetryType telemetryType) {
Preconditions.checkArgument(
!Strings.isNullOrEmpty(tableName), "tableName cannot be null or empty");
this.tableName = tableName;
this.telemetryType = telemetryType;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class SnowflakeTelemetryPipeCreation extends SnowflakeTelemetryBasicInfo

public SnowflakeTelemetryPipeCreation(
final String tableName, final String stageName, final String pipeName) {
super(tableName);
super(tableName, SnowflakeTelemetryService.TelemetryType.KAFKA_PIPE_START);
this.stageName = stageName;
this.pipeName = pipeName;
this.startTime = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public SnowflakeTelemetryPipeStatus(
final String pipeName,
final boolean enableCustomJMXConfig,
final MetricsJmxReporter metricsJmxReporter) {
super(tableName);
super(tableName, SnowflakeTelemetryService.TelemetryType.KAFKA_PIPE_USAGE);
this.stageName = stageName;
this.pipeName = pipeName;

Expand Down
Loading

0 comments on commit e3dddca

Please sign in to comment.