-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[NU-1823] Fix for schemaless topics in kafka source/sink #7232
Conversation
📝 Walkthrough📝 WalkthroughWalkthroughThe pull request introduces updates across multiple components related to Kafka integration within the software. Key changes include enhancements to the handling of Kafka sources and sinks, specifically allowing them to operate with schemaless topics, which broadens their usability with JSON data. The Possibly related PRs
Suggested labels
Suggested reviewers
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (8)
💤 Files with no reviewable changes (1)
🚧 Files skipped from review as they are similar to previous changes (7)
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (4)
utils/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/azure/AzureSchemaBasedSerdeProviderIntegrationTest.scala (2)
Line range hint
25-89
: Add test coverage for schemaless topicsWhile this test verifies schema-based serialization, it doesn't cover the PR's main objective of handling schemaless topics. Consider adding another test case that verifies the behavior when
showTopicsWithoutSchema
is set to true.Here's a suggested additional test case:
test("should handle schemaless topics when enabled") { val kafkaConfig = KafkaConfig( Some(config), None, avroKryoGenericRecordSchemaIdSerialization = Some(false), showTopicsWithoutSchema = true // Enable schemaless topics ) // Test logic for schemaless topic scenario }
Line range hint
47-48
: Address TODO comment about schema name validationThe TODO comment indicates missing schema name validation in the serializer. This should be addressed to ensure topic-schema name matching convention is properly enforced.
Would you like me to help implement the schema name validation check or create a GitHub issue to track this task?
utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalKafkaDeserializer.scala (1)
Line range hint
45-59
: LGTM with suggestions for improved error handling.The changes correctly implement the schemaless topic handling by checking the content type and providing appropriate schemas. However, there are a few suggestions for improvement:
Consider these enhancements:
- Make the content type comparison case-insensitive for better robustness:
- if (value.equals(ContentTypes.PLAIN.toString)) { + if (value.equalsIgnoreCase(ContentTypes.PLAIN.toString)) {
- Provide more descriptive error message that includes the actual content type:
- throw new IllegalStateException("Topic without schema should have ContentType Json or Plain, was neither") + throw new IllegalStateException(s"Topic without schema should have ContentType Json or Plain, got: $value")
- Consider using pattern matching for better Scala idioms:
writerSchemaId.value match { case StringSchemaId(value) => value.toLowerCase match { case plain if plain == ContentTypes.PLAIN.toString.toLowerCase => SchemaWithMetadata(ContentTypesSchemas.schemaForPlain, SchemaId.fromString(ContentTypes.PLAIN.toString)) case json if json == ContentTypes.JSON.toString.toLowerCase => SchemaWithMetadata(ContentTypesSchemas.schemaForJson, SchemaId.fromString(ContentTypes.JSON.toString)) case other => throw new IllegalStateException(s"Topic without schema should have ContentType Json or Plain, got: $other") } case other => throw new IllegalStateException(s"Topic without schema has invalid schema ID type: ${other.getClass.getSimpleName}") }utils/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/azure/AzureTestsFromFileIntegrationTest.scala (1)
Line range hint
44-89
: Consider adding negative test cases.While the test covers the happy path for schema-based serialization, consider adding test cases for:
- Schema evolution scenarios
- Invalid schema IDs
- Malformed records
- Schema validation failures
Would you like me to help generate additional test cases?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (7)
docs/Changelog.md
(1 hunks)utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/KafkaUniversalComponentTransformer.scala
(0 hunks)utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala
(1 hunks)utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/formatter/AbstractSchemaBasedRecordFormatter.scala
(0 hunks)utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalKafkaDeserializer.scala
(1 hunks)utils/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/azure/AzureSchemaBasedSerdeProviderIntegrationTest.scala
(1 hunks)utils/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/azure/AzureTestsFromFileIntegrationTest.scala
(1 hunks)
💤 Files with no reviewable changes (2)
- utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/KafkaUniversalComponentTransformer.scala
- utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/formatter/AbstractSchemaBasedRecordFormatter.scala
🔇 Additional comments (6)
utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala (2)
47-52
:
Review the logic flow of schema checking
The current implementation has some concerns that need to be addressed:
-
The logic seems counterintuitive - when
showTopicsWithoutSchema
is false, the method always returns true without actually checking if the topic has a schema. This could lead to unexpected behavior where topics without schemas are treated as if they have schemas. -
A new instance of
TopicsWithExistingSubjectSelectionStrategy
is created on each method call, which could be inefficient if this method is called frequently. -
The error handling could be improved - using
exists
on theValidated
result means we're silently handling potential errors fromgetTopics
.
Consider refactoring the method to:
def isTopicWithSchema(topic: String, kafkaConfig: KafkaConfig): Boolean = {
- if (!kafkaConfig.showTopicsWithoutSchema) {
- true
- } else {
- val topicsWithSchema = new TopicsWithExistingSubjectSelectionStrategy().getTopics(this, kafkaConfig)
- topicsWithSchema.exists(_.map(_.name).contains(topic))
+ if (kafkaConfig.showTopicsWithoutSchema) {
+ TopicsWithExistingSubjectSelectionStrategy
+ .getTopics(this, kafkaConfig)
+ .fold(
+ error => throw new RuntimeException(s"Failed to check schema for topic $topic: $error"),
+ _.exists(_.name == topic)
+ )
+ } else {
+ // When showTopicsWithoutSchema is false, we should still verify schema existence
+ getAllVersions(topic, isKey = false).isValid || getAllVersions(topic, isKey = true).isValid
}
}
Let's verify the usage of this method across the codebase:
Line range hint 1-46
: Verify implementations of SchemaRegistryClient trait
The changes to isTopicWithSchema
might require updates in implementing classes, especially around error handling and the usage of showTopicsWithoutSchema
.
Let's verify the implementations:
Also applies to: 53-70
utils/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/azure/AzureSchemaBasedSerdeProviderIntegrationTest.scala (1)
25-25
: Verify test stability in CI environment
The test activation is good, but consider the following improvements for CI stability:
- The Event Hubs namespace fallback to "nu-cloud" might cause issues in CI
- The hard-coded "test-group" could lead to conflicts in parallel test runs
utils/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/azure/AzureTestsFromFileIntegrationTest.scala (2)
44-44
: Enabling previously ignored test case.
The test activation is appropriate as it verifies critical functionality for schema-based serialization and deserialization with Azure Schema Registry. This aligns well with the PR's focus on improving schema handling in Kafka components.
Let's verify if there are any other ignored tests that should be activated:
Line range hint 44-89
: Verify test environment configuration.
The test uses environment-dependent configuration (AZURE_EVENT_HUBS_NAMESPACE
) and has @Network
annotation, indicating external dependencies. Ensure CI/CD pipelines have proper Azure credentials and network access.
✅ Verification successful
Tests are properly configured with fallback values for Azure credentials.
The verification shows that the test environment is well-designed with proper fallback mechanisms:
- All Azure-related tests have default fallback values for environment variables:
AZURE_EVENT_HUBS_NAMESPACE
defaults to "nu-cloud"AZURE_EVENT_HUBS_SHARED_ACCESS_KEY_NAME
defaults to "unknown"AZURE_EVENT_HUBS_SHARED_ACCESS_KEY
defaults to "unknown"
This means the tests can run in both CI/CD environments (with proper credentials) and local development environments (with default values), making the test suite more robust and developer-friendly.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check for environment variable usage in CI configuration
rg "AZURE_EVENT_HUBS_NAMESPACE" --type yaml --glob "**/ci/**"
Length of output: 210
Script:
#!/bin/bash
# Let's check for Azure configuration in all YAML files and GitHub workflows
rg "AZURE_EVENT_HUBS_NAMESPACE" --type yaml
# Also check for Network annotation usage and test configuration
rg "@Network" --type scala
Length of output: 967
Script:
#!/bin/bash
# Check for Azure-related environment variables in any configuration files
rg -g "*.{yaml,yml,conf,properties,xml}" "AZURE.*NAMESPACE|AZURE.*CONNECTION"
Length of output: 79
Script:
#!/bin/bash
# Look for test configuration files
fd -e conf -e yaml -e properties "test" --exec cat {}
Length of output: 55
Script:
#!/bin/bash
# Let's check how the Azure configuration is handled in the test files
rg -A 5 "AZURE_EVENT_HUBS_NAMESPACE|AzureSchemaRegistryClientFactory" --type scala
# Also check for any test configuration setup
rg -A 10 "kafkaConfig.*schemaRegistryClientKafkaConfig" --type scala
Length of output: 19567
Script:
#!/bin/bash
# Look for environment variable setup in test files
fd -e scala "Azure.*Test" --exec grep -A 5 -B 5 "override def beforeAll"
Length of output: 74
docs/Changelog.md (1)
18-18
: LGTM! Documentation accurately reflects the changes.
The changelog entry clearly describes the new feature that allows Kafka source and sink to operate with schemaless topics and handle any JSON data without schema validation.
...uk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalKafkaDeserializer.scala
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (2)
utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/TopicSelectionStrategy.scala (2)
44-44
: Consider making the timeout configurableThe hard-coded timeout of 5000ms might not be suitable for all environments. Consider making this configurable through
kafkaConfig
to allow for environment-specific tuning.- _.listTopics(new ListTopicsOptions().timeoutMs(5000)) + _.listTopics(new ListTopicsOptions().timeoutMs(kafkaConfig.listTopicsTimeoutMs))
54-59
: Improve error handling documentationThe current comment could be more professional and descriptive. Consider updating it to better explain the error handling strategy.
- // In some tests we pass dummy kafka address, so when we try to get topics from kafka it fails + // Handle Kafka connection errors gracefully: + // - Return empty list on timeout to support environments with temporary connectivity issues + // - Propagate other exceptions for proper error handling upstream
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (4)
utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/TopicSelectionStrategy.scala
(3 hunks)utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala
(2 hunks)utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/formatter/AbstractSchemaBasedRecordFormatter.scala
(1 hunks)utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalKafkaDeserializer.scala
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/formatter/AbstractSchemaBasedRecordFormatter.scala
- utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalKafkaDeserializer.scala
🔇 Additional comments (5)
utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala (3)
6-6
: LGTM: Import statement is correctly placed and necessary.
The added import for TopicsWithExistingSubjectSelectionStrategy
is required for the new implementation.
43-43
: LGTM: Method signature simplification improves API design.
The removal of the strategy parameter makes the API cleaner and follows the single responsibility principle by encapsulating the strategy decision within the method.
44-49
: Verify the default behavior and document the change.
The new implementation returns true
by default when showTopicsWithoutSchema
is false. While this aligns with the PR objective to support schemaless topics, please ensure:
- This default behavior is documented in the codebase
- Existing code that previously used different strategies has been updated
Let's verify the usage and documentation:
✅ Verification successful
Let me check the documentation and implementation details with additional scripts:
Based on the gathered information, I can now generate the final response:
The default behavior is documented and test coverage exists
The default behavior when showTopicsWithoutSchema
is false is documented in the Kafka integration documentation:
showTopicsWithoutSchema
(Low importance, boolean, default: true) - Determine if all Kafka topics should be displayed or only topics with matching schema on schema registry
The implementation is also covered by tests in Azure Schema Registry integration tests, which verify the behavior when showTopicsWithoutSchema = false
.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check for documentation of the showTopicsWithoutSchema flag
rg -l "showTopicsWithoutSchema" --type md
# Find previous usages of TopicSelectionStrategy to ensure they're updated
rg "TopicSelectionStrategy" --type scala
# Look for tests covering this behavior
rg -l "showTopicsWithoutSchema.*true|false" --type scala | grep "test"
Length of output: 25799
Script:
#!/bin/bash
# Check documentation content
cat docs/integration/KafkaIntegration.md
# Check test coverage for this behavior
ast-grep --pattern 'def showTopicsWithoutSchema = $_'
# Look for test cases
rg -A 10 "showTopicsWithoutSchema.*=.*false" --type scala
Length of output: 48761
utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/TopicSelectionStrategy.scala (2)
5-5
: LGTM: Import changes align with new error handling strategy
The replacement of KafkaException
with TimeoutException
and addition of ExecutionException
properly support the enhanced error handling implementation.
Also applies to: 9-9
Line range hint 40-63
: Verify test coverage for schemaless topics handling
The implementation looks good and aligns with the PR objectives. Please ensure that the following scenarios are covered in tests:
- Successful retrieval of both schema registry and schemaless topics
- Timeout handling when Kafka is unreachable
- Distinct topic list generation when topics exist in both sources
ce33666
to
18df7af
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (2)
utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala (1)
43-43
: Consider adding scaladoc for the configuration behaviorThe method would benefit from documentation explaining:
- The meaning and impact of showTopicsWithoutSchema
- The default behavior when config flag is false
- Example usage scenarios
Add scaladoc like this:
+ /** + * Determines if a topic has an associated schema based on configuration. + * @param topic The topic name to check + * @param kafkaConfig Configuration containing showTopicsWithoutSchema flag + * @return true if topic should be treated as having a schema: + * - Always returns true when showTopicsWithoutSchema=false + * - When showTopicsWithoutSchema=true, checks actual schema existence + */ def isTopicWithSchema(topic: String, kafkaConfig: KafkaConfig): Boolean = {docs/Changelog.md (1)
22-23
: LGTM! Consider adding more details about validation behavior.The changelog entry correctly documents the new schemaless topic support. To make it even more helpful for users, consider adding a note about how data validation behaves in schemaless mode.
You could expand the entry like this:
-* [#7066](https://github.com/TouK/nussknacker/pull/7066) Kafka source and sink can now operate with schemaless topics. They accept any json. Data will not be validated with schema. +* [#7066](https://github.com/TouK/nussknacker/pull/7066) Kafka source and sink can now operate with schemaless topics. They accept any JSON data without schema validation. Note: When operating in schemaless mode, no schema-based validation is performed on the data, allowing for flexible JSON structures.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (8)
docs/Changelog.md
(1 hunks)utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/KafkaUniversalComponentTransformer.scala
(0 hunks)utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/TopicSelectionStrategy.scala
(3 hunks)utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala
(2 hunks)utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/formatter/AbstractSchemaBasedRecordFormatter.scala
(1 hunks)utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalKafkaDeserializer.scala
(2 hunks)utils/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/azure/AzureSchemaBasedSerdeProviderIntegrationTest.scala
(1 hunks)utils/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/azure/AzureTestsFromFileIntegrationTest.scala
(1 hunks)
💤 Files with no reviewable changes (1)
- utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/KafkaUniversalComponentTransformer.scala
🚧 Files skipped from review as they are similar to previous changes (5)
- utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/TopicSelectionStrategy.scala
- utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/formatter/AbstractSchemaBasedRecordFormatter.scala
- utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalKafkaDeserializer.scala
- utils/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/azure/AzureSchemaBasedSerdeProviderIntegrationTest.scala
- utils/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/azure/AzureTestsFromFileIntegrationTest.scala
🔇 Additional comments (2)
utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala (2)
6-6
: LGTM: Import aligns with implementation changes
The import of TopicsWithExistingSubjectSelectionStrategy
is appropriate for the new implementation.
43-48
: LGTM: Simplified implementation with better control
The new implementation provides a cleaner approach by:
- Using configuration to control schema validation behavior
- Optimizing topic fetching (only when showTopicsWithoutSchema=true)
- Simplifying the logic by using a single strategy
Let's verify that all callers have been updated to the new signature:
✅ Verification successful
Based on the search results, I can now generate the final response:
LGTM: All callers are using the new signature correctly
The search results confirm that all calls to isTopicWithSchema
are using the new two-parameter signature (topic and kafkaConfig), and there are no remaining calls using the old three-parameter version. The method is used consistently across:
- KafkaUniversalComponentTransformer
- AbstractSchemaBasedRecordFormatter
- UniversalKafkaDeserializer
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for any remaining calls to isTopicWithSchema with 3 parameters
ast-grep --pattern 'isTopicWithSchema($topic, $strategy, $config)'
Length of output: 68
Script:
#!/bin/bash
# Let's try a more comprehensive search for method calls
rg "isTopicWithSchema" -A 2
# Also search for the class/trait definition to understand the old signature
rg "trait.*SchemaRegistryClient|class.*SchemaRegistryClient" -A 5
Length of output: 26633
… is set to true - should resolve problem with azure tests Added documentation
…ock the whole application - will have shorter timeout and if Timeout occurs return empty list from kafka (will still make a call to schema registry)
…(didn't work previously when naming strategy was set)
18df7af
to
d7cce4a
Compare
Describe your changes
IsTopicWithSchema function now fetch topics only if flag for schemaless topic is set to true
Decreased timeout when fetching topics from kafka and fixed catching error.
Fix for ad-hoc test when naming strategy is used for source.
Checklist before merge
Summary by CodeRabbit
Release Notes
New Features
Improvements
Bug Fixes
Tests