-
Notifications
You must be signed in to change notification settings - Fork 100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SNOW-954150] Use map of clients with different configurations instead of one client for multiple connectors configurations #744
Conversation
// note this test relies on testrole_kafka and testrole_kafka_1 roles being granted to test_kafka | ||
// user | ||
@Test | ||
public void testStreamingIngest_multipleChannel_distinctClients() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
confirmed this test fails with the same error as jira when run on master, passes with this pr's changes
src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientHandler.java
Outdated
Show resolved
Hide resolved
Is the role issue related to this? |
Unrelated, but it could factor into why this wasn't caught until now |
src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProvider.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProvider.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProvider.java
Outdated
Show resolved
Hide resolved
src/test/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2IT.java
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProvider.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProvider.java
Outdated
Show resolved
Hide resolved
Overall lgtm, will wait for one for revision. Also, it might be worth looking into LoadingCache too since we are using map which has loading, eviction technique etc. Not saying we should use either since both has pros and cons. |
Could you update the description to explain how the map is done? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments, PTAL!
src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientHandler.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientHandler.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientHandler.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientHandler.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientHandler.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProvider.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProvider.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingUtils.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/streaming/StreamingClientProvider.java
Outdated
Show resolved
Hide resolved
|
Updated description with changes, rerunning release testing framework for e2e test with multiple connectors now |
LOGGER.info("Initializing Streaming Client..."); | ||
|
||
// get streaming properties from config | ||
Properties streamingClientProps = new Properties(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this and getNewClientName is moved into StreamingClientProperties.java
@@ -47,72 +52,137 @@ public static StreamingClientProvider getStreamingClientProviderInstance() { | |||
return StreamingClientProviderSingleton.streamingClientProvider; | |||
} | |||
|
|||
/** ONLY FOR TESTING - to get a provider with injected properties */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved these methods to bottom of class for readability
|
||
@After | ||
public void tearDown() { | ||
this.streamingClientHandler.closeClient(this.client1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is UT it should not actually create the client, so I refactored this test to use mocks and pulled most of the complexity into the new IT. caught by @sfc-gh-japatel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some minor comments, PTAL, otherwise LGTM, thanks!
* @return A formatted string with the loggable properties | ||
*/ | ||
public String getLoggableClientProperties() { | ||
return this.clientProperties == null | this.clientProperties.isEmpty() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can clientProperties
be null or empty? I thought we have check to make sure some of the configurations are required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it shouldn't ever be null or empty, but I added the check just in case
"Streaming client optimization is enabled per worker node. Reusing valid clients when" | ||
+ " possible"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we move this log after the client creation is succeeded? Similar to the logic below when one client optimization is not enabled, then you can combine two log lines together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point. Moved line to end of method so that we only have one log and added a warn log for when the registered client is invalid because ideally it should always be valid
// invalidations are processed on the next get or in the background, so we still need to close | ||
// the client here | ||
this.registeredClients.invalidate(clientProperties); | ||
this.streamingClientHandler.closeClient(registeredClient); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duplicate as line 162?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also do you need to check whether the client is still valid before closing it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We call close on the given client and the client registered in the cache. Technically they should be the same client, however I prefer to call close on both just in case the given client is different or somehow the registered cache was corrupted
The streamingClientHandler will check if the client is valid before calling close, so the extra close call will no-op if it is invalid
@@ -35,9 +28,6 @@ | |||
/** This class handles all calls to manage the streaming ingestion client */ | |||
public class StreamingClientHandler { | |||
private static final KCLogger LOGGER = new KCLogger(StreamingClientHandler.class.getName()); | |||
private static final String STREAMING_CLIENT_PREFIX_NAME = "KC_CLIENT_"; | |||
private static final String TEST_CLIENT_NAME = "TEST_CLIENT"; | |||
|
|||
private AtomicInteger createdClientId = new AtomicInteger(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this back (was removed in previous PR iterations) in case removing it causes concurrency issues with client naming
…d of one client for multiple connectors configurations (#744)
…d of one client for multiple connectors configurations (#744)
…d of one client for multiple connectors configurations (#744)
…d of one client for multiple connectors configurations (snowflakedb#744)
…d of one client for multiple connectors configurations (snowflakedb#744)
Summary
If two sink connectors with the one client optimization and different connection properties such as the user's role, ingestion is stopped on the connector that was last added. The fix is to map client properties to existing client in the one client optimization.
Changes
Initial Issue Investigation
ConnectorA with the admin role and ConnectorB with the public role
a. If the one client optimization is enabled, ConnectorB(public) tries using ConnectorA(admin)'s client which has the admin role. As a result ConnectorB(public) fails with 400 unauthorized and does not begin ingestion.
Testing
Caused by this jira: https://snowflakecomputing.atlassian.net/browse/SNOW-954150