-
Notifications
You must be signed in to change notification settings - Fork 99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Streaming Telemetry 2][SNOW-899866] Enables reportKafkaPartitionStart/Usage telemetry for Streaming #694
Conversation
src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java
Outdated
Show resolved
Hide resolved
@@ -38,10 +40,9 @@ public SnowflakeTelemetryServiceV2(Connection conn) { | |||
this.telemetry = TelemetryClient.createTelemetry(conn); | |||
} | |||
|
|||
@Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved reportKafkaPartitionUsage to the parent SnowflakeTelemetryService
|
||
/** | ||
* Base Constructor. Accepts a tableName and StageName. | ||
* | ||
* @param tableName Checks for Nullability | ||
*/ | ||
public SnowflakeTelemetryBasicInfo(final String tableName) { | ||
public SnowflakeTelemetryBasicInfo( | ||
final String tableName, SnowflakeTelemetryService.TelemetryType telemetryType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pass telemetry type here to simplify reportKafkaPartitionUsage/Creation code paths across snowpipe and streaming
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments, PTAL. This is super helpful for us to debug issues without asking for client logs, thanks!
@@ -289,6 +290,8 @@ public TopicPartitionChannel( | |||
this.offsetPersistedInSnowflake, | |||
this.processedOffset, | |||
this.latestConsumerOffset); | |||
this.telemetryServiceV2.reportKafkaPartitionStart( | |||
new SnowflakeTelemetryChannelCreation(this.tableName, this.channelName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the only place that we're creating channel? If no, we need to either cover all cases or update the metric name to something like partition start
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this is the only place we create a channel. There are two constructors for TopicPartitionChannel. The test constructor calls into this constructor, so all TopicPartitionChannel object creations will go through this codepath
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we also reopen/create channel when the channel gets invalidated? Not sure if this is the same code path though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TopicPartitionChannel object does not get recreated when the channel is invalidated, so the current reportKafkaPartitionStart tracks when a new TopicPartitionChannel is created.
Alternatively we could emit a reportKafkaPartitionStart with each channel open, however I don't think this is a good idea because we can track this server side and we already have multiple channel open/close issues that may overload the telemetry service with events.
There are two code paths for opening channels: TopicPartitionChannel constructor and when we recover offset from snowflake (channel invalidation).
.../snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelStatus.java
Outdated
Show resolved
Hide resolved
.../snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelStatus.java
Show resolved
Hide resolved
@@ -1044,6 +1047,9 @@ private SnowflakeStreamingIngestChannel openChannelForTable() { | |||
public void closeChannel() { | |||
try { | |||
this.channel.close().get(); | |||
|
|||
// telemetry and metrics | |||
this.telemetryServiceV2.reportKafkaPartitionUsage(this.snowflakeTelemetryChannelStatus, true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we send this somewhere periodically instead of just during close?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would be ideal, but currently we don't have anything running periodically in the background. InsertRows is an option, but I'm worried about overloading the telemetry service since we have no control over how often the customer flushes their buffer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it something we plan to add in the future? I think we send the usage for Snowpipe periodically. Otherwise I feel like this is not very useful since there're no usage reported if there is no open/close on the channel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not have any plans to add that in the future. It could potentially be a part of the flush service.
Currently we not periodically send Snowpipe usage. reportKafkaPartition(
) is called on on close and when the cleaner runs, which only runs on the first insert to initialize a pipe
...nowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelCreation.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mostly looks good, left some questions and suggestions. PTAL and let me know what you think. Thanks!
Lets also verify the data we send to snowflake and confirm we dont change anything in snowpipe telemetry. You can pick any of the above gh action runs and verify the data in telemetry view in snowhouse. (I dont think anything has changed but just to be double sure) |
public class Streaming { | ||
public static final String OFFSET_PERSISTED_IN_SNOWFLAKE = "persisted-in-snowflake-offset"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- remove inner class. previous recommendation was to have a separate section/heading in this class which distinguishes between snowpipe and streaming. apologies if I was not clear before.
- lets clarify what tp here means. it is not intuitive but hopefully they read code and understand what tp means.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- discussed offline, added as a comment section
- I'll expand TP -> TopicPartition
* See {@link com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel} for offset | ||
* description | ||
*/ | ||
public static final String OFFSET_PERSISTED_IN_SNOWFLAKE = "persisted-in-snowflake-offset"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Follow up:
- merge gates are failing, please ensure they are passing before pushing
- doc changes - please create a jira for @sfc-gh-lema (for jmx)
LGTM, otherwise! Good stuff! Thank you
Jira for documentation changes @sfc-gh-lema |
…t/Usage telemetry for Streaming (snowflakedb#694)
…t/Usage telemetry for Streaming (snowflakedb#694)
…t/Usage telemetry for Streaming (snowflakedb#694)
…t/Usage telemetry for Streaming (snowflakedb#694)
…t/Usage telemetry for Streaming (snowflakedb#694)
https://snowflakecomputing.atlassian.net/browse/SNOW-899866