Skip to content

Commit

Permalink
add channel open
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rcheng committed Sep 12, 2023
1 parent 2ffa3bd commit 3388e25
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public class TopicPartitionChannel {
private final AtomicLong latestConsumerOffset =
new AtomicLong(NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE);

private final AtomicLong channelTryOpenCount = new AtomicLong(0);

/**
* Offsets are reset in kafka when one of following cases arises in which we rely on source of
* truth (Which is Snowflake's committed offsetToken)
Expand Down Expand Up @@ -289,7 +291,8 @@ public TopicPartitionChannel(
metricsJmxReporter,
this.offsetPersistedInSnowflake,
this.processedOffset,
this.latestConsumerOffset);
this.latestConsumerOffset,
this.channelTryOpenCount);
this.telemetryServiceV2.reportKafkaPartitionStart(
new SnowflakeTelemetryChannelCreation(this.tableName, this.channelName));

Expand Down Expand Up @@ -1037,6 +1040,7 @@ private SnowflakeStreamingIngestChannel openChannelForTable() {
.build();
LOGGER.info(
"Opening a channel with name:{} for table name:{}", this.channelName, this.tableName);
this.channelTryOpenCount.incrementAndGet();
return streamingIngestClient.openChannel(channelRequest);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryBasicInfo;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants;

import java.util.concurrent.atomic.AtomicLong;
import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode;

Expand All @@ -38,7 +39,7 @@
* <p>Most of the data sent to Snowflake is an aggregated data.
*/
public class SnowflakeTelemetryChannelStatus extends SnowflakeTelemetryBasicInfo {
public static final long NUM_METRICS = 3; // update when new metrics are added
public static final long NUM_METRICS = 4; // update when new metrics are added

// channel properties
private final String channelName;
Expand All @@ -50,6 +51,9 @@ public class SnowflakeTelemetryChannelStatus extends SnowflakeTelemetryBasicInfo
private final AtomicLong processedOffset;
private final AtomicLong latestConsumerOffset;

// channel metrics
private final AtomicLong channelTryOpenCount;

/**
* Creates a new object tracking {@link
* com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel} metrics with JMX and
Expand All @@ -65,9 +69,10 @@ public SnowflakeTelemetryChannelStatus(
final String channelName,
final boolean enableCustomJMXConfig,
final MetricsJmxReporter metricsJmxReporter,
AtomicLong offsetPersistedInSnowflake,
AtomicLong processedOffset,
AtomicLong latestConsumerOffset) {
final AtomicLong offsetPersistedInSnowflake,
final AtomicLong processedOffset,
final AtomicLong latestConsumerOffset,
final AtomicLong channelTryOpenCount) {
super(tableName, SnowflakeTelemetryService.TelemetryType.KAFKA_CHANNEL_USAGE);

this.startTime = System.currentTimeMillis();
Expand All @@ -77,6 +82,7 @@ public SnowflakeTelemetryChannelStatus(
this.offsetPersistedInSnowflake = offsetPersistedInSnowflake;
this.processedOffset = processedOffset;
this.latestConsumerOffset = latestConsumerOffset;
this.channelTryOpenCount = channelTryOpenCount;

if (enableCustomJMXConfig) {
if (metricsJmxReporter == null) {
Expand All @@ -92,17 +98,19 @@ public boolean isEmpty() {
// Check that all properties are still at the default value.
return this.offsetPersistedInSnowflake.get() == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE
&& this.processedOffset.get() == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE
&& this.latestConsumerOffset.get() == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE;
&& this.latestConsumerOffset.get() == NO_OFFSET_TOKEN_REGISTERED_IN_SNOWFLAKE
&& this.channelTryOpenCount.get() == 0;
}

@Override
public void dumpTo(ObjectNode msg) {
msg.put(TelemetryConstants.TABLE_NAME, this.tableName);
msg.put(TelemetryConstants.CHANNEL_NAME, this.channelName);

msg.put(TelemetryConstants.OFFSET_PERSISTED_IN_SNOWFLAKE, offsetPersistedInSnowflake.get());
msg.put(TelemetryConstants.PROCESSED_OFFSET, processedOffset.get());
msg.put(TelemetryConstants.LATEST_CONSUMER_OFFSET, latestConsumerOffset.get());
msg.put(TelemetryConstants.OFFSET_PERSISTED_IN_SNOWFLAKE, this.offsetPersistedInSnowflake.get());
msg.put(TelemetryConstants.PROCESSED_OFFSET, this.processedOffset.get());
msg.put(TelemetryConstants.LATEST_CONSUMER_OFFSET, this.latestConsumerOffset.get());
msg.put(TelemetryConstants.CHANNEL_TRY_OPEN_COUNT, this.channelTryOpenCount.get());

final long currTime = System.currentTimeMillis();
msg.put(TelemetryConstants.START_TIME, this.startTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,6 @@ public final class TelemetryConstants {
public static final String IS_REUSE_PIPE = "is_reuse_pipe";
public static final String FILE_COUNT_RESTART = "file_count_restart";
public static final String FILE_COUNT_REPROCESS_PURGE = "file_count_reprocess_purge";

public static final String CHANNEL_TRY_OPEN_COUNT = "channel_try_open_count";
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public void testRegisterAndUnregisterJmxMetrics() {
metricsJmxReporter,
new AtomicLong(-1),
new AtomicLong(-1),
new AtomicLong(-1));
new AtomicLong(-1),
new AtomicLong(0));
verify(metricsJmxReporter, times(1)).start();
verify(metricRegistry, times(3)).register(Mockito.anyString(), Mockito.any());
verify(metricsJmxReporter, times(1)).removeMetricsFromRegistry(channelName);
Expand All @@ -56,7 +57,8 @@ public void testDisabledJmx() {
metricsJmxReporter,
new AtomicLong(-1),
new AtomicLong(-1),
new AtomicLong(-1));
new AtomicLong(-1),
new AtomicLong(0));
verify(metricsJmxReporter, times(0)).start();
verify(metricRegistry, times(0)).register(Mockito.anyString(), Mockito.any());
verify(metricsJmxReporter, times(0)).removeMetricsFromRegistry(channelName);
Expand All @@ -77,7 +79,8 @@ public void testInvalidJmxReporter() {
null,
new AtomicLong(-1),
new AtomicLong(-1),
new AtomicLong(-1));
new AtomicLong(-1),
new AtomicLong(0));
snowflakeTelemetryChannelStatus.tryUnregisterChannelJMXMetrics();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ public void testReportKafkaPartitionUsage() {
null,
new AtomicLong(expectedOffsetPersistedInSnowflake),
new AtomicLong(expectedProcessedOffset),
new AtomicLong(expectedLatestConsumerOffset));
new AtomicLong(expectedLatestConsumerOffset),
new AtomicLong(0));

partitionUsage = channelStatus;
}
Expand Down

0 comments on commit 3388e25

Please sign in to comment.