Skip to content

Commit

Permalink
pr reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-rcheng committed Sep 11, 2023
1 parent 25b2220 commit d2c71e5
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class SnowflakeTelemetryChannelStatus extends SnowflakeTelemetryBasicInfo
// channel properties
private final String channelName;
private final MetricsJmxReporter metricsJmxReporter;
private final AtomicLong startTime;
private final long startTime;

// offsets
private final AtomicLong offsetPersistedInSnowflake;
Expand All @@ -52,8 +52,7 @@ public class SnowflakeTelemetryChannelStatus extends SnowflakeTelemetryBasicInfo

/**
* Creates a new object tracking {@link
* com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel} metrics with JMX
* TODO @rcheng: update comment when extends telemetryBasicInfo
* com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel} metrics with JMX and send telemetry data to snowflake
*
* @param tableName the table the channel is ingesting to
* @param channelName the name of the TopicPartitionChannel to track
Expand All @@ -70,7 +69,7 @@ public SnowflakeTelemetryChannelStatus(
AtomicLong latestConsumerOffset) {
super(tableName, SnowflakeTelemetryService.TelemetryType.KAFKA_CHANNEL_USAGE);

this.startTime = new AtomicLong(System.currentTimeMillis());
this.startTime = System.currentTimeMillis();
this.channelName = channelName;
this.metricsJmxReporter = metricsJmxReporter;

Expand Down Expand Up @@ -105,7 +104,7 @@ public void dumpTo(ObjectNode msg) {
msg.put(TelemetryConstants.LATEST_CONSUMER_OFFSET, latestConsumerOffset.get());

final long currTime = System.currentTimeMillis();
msg.put(TelemetryConstants.START_TIME, startTime.getAndSet(currTime));
msg.put(TelemetryConstants.START_TIME, this.startTime);
msg.put(TelemetryConstants.END_TIME, currTime);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,6 @@ public void reportKafkaConnectFatalError(final String errorDetail) {
*/
public void reportKafkaPartitionUsage(
final SnowflakeTelemetryBasicInfo partitionStatus, boolean isClosing) {
if (partitionStatus.isEmpty()) {
return;
}
ObjectNode msg = getObjectNode();

partitionStatus.dumpTo(msg);
Expand Down

0 comments on commit d2c71e5

Please sign in to comment.