Skip to content

Commit

Permalink
[NU-1823] Fix for schemaless topics in kafka source/sink (#7232)
Browse files Browse the repository at this point in the history
* IsTopicWithSchema now fetch topics only if flag for schemaless topics is set to true
Added documentation

* TopicSelectionStrategy when set to fetch all topics, it should not block the whole application - will have shorter timeout and if Timeout occurs return empty list from kafka (will still make a call to schema registry)

* Change in record formatter to add namespace to topic in ad-hoc tests (didn't work previously when naming strategy was set)

---------

Co-authored-by: Szymon Bogusz <[email protected]>
  • Loading branch information
ForrestFairy and Szymon Bogusz authored Nov 29, 2024
1 parent 9266517 commit 3006060
Show file tree
Hide file tree
Showing 8 changed files with 23 additions and 21 deletions.
3 changes: 2 additions & 1 deletion docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
* [#7257](https://github.com/TouK/nussknacker/pull/7257) `components-api` module: Replaced wide dependency to `async-http-client-backend-future`
by the narrowest possible dependency to sttp's core
* [#7259](https://github.com/TouK/nussknacker/pull/7259) `flink-executor` and `lite-runtime` modules: Added compile-time
dependency to `http-utils` (which depends on `async-http-client-backend-future` and indirectly on Netty)
dependency to `http-utils` (which depends on `async-http-client-backend-future` and indirectly on Netty)
* [#7066](https://github.com/TouK/nussknacker/pull/7066) Kafka source and sink can now operate with schemaless topics. They accept any json. Data will not be validated with schema.

## 1.18

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ abstract class KafkaUniversalComponentTransformer[T, TN <: TopicName: TopicValid
)(implicit nodeId: NodeId): WithError[ParameterCreatorWithNoDependency with ParameterExtractor[String]] = {
if (schemaRegistryClient.isTopicWithSchema(
preparedTopic.prepared.topicName.toUnspecialized.name,
topicSelectionStrategy,
kafkaConfig
)) {
val versions = schemaRegistryClient.getAllVersions(preparedTopic.prepared.toUnspecialized, isKey = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package pl.touk.nussknacker.engine.schemedkafka
import cats.data.Validated
import org.apache.kafka.clients.admin.ListTopicsOptions
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.errors.TimeoutException
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, KafkaUtils, UnspecializedTopicName}
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{SchemaRegistryClient, SchemaRegistryError}

import java.util.concurrent.ExecutionException
import java.util.regex.Pattern
import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -36,11 +38,11 @@ class AllNonHiddenTopicsSelectionStrategy extends TopicSelectionStrategy {
kafkaConfig: KafkaConfig
): Validated[SchemaRegistryError, List[UnspecializedTopicName]] = {
val topicsFromSchemaRegistry = schemaRegistryClient.getAllTopics
val validatorConfig = kafkaConfig.topicsExistenceValidationConfig.validatorConfig

val schemaLessTopics: List[UnspecializedTopicName] = {
try {
KafkaUtils.usingAdminClient(kafkaConfig) {
_.listTopics(new ListTopicsOptions().timeoutMs(validatorConfig.adminClientTimeout.toMillis.toInt))
_.listTopics(new ListTopicsOptions().timeoutMs(5000))
.names()
.get()
.asScala
Expand All @@ -51,6 +53,11 @@ class AllNonHiddenTopicsSelectionStrategy extends TopicSelectionStrategy {
}
} catch {
// In some tests we pass dummy kafka address, so when we try to get topics from kafka it fails
case err: ExecutionException =>
err.getCause match {
case _: TimeoutException => List.empty
case _ => throw err
}
case _: KafkaException =>
List.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@ package pl.touk.nussknacker.engine.schemedkafka.schemaregistry
import cats.data.Validated
import io.confluent.kafka.schemaregistry.ParsedSchema
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, UnspecializedTopicName}
import pl.touk.nussknacker.engine.schemedkafka.{
TopicSelectionStrategy,
TopicsMatchingPatternWithExistingSubjectsSelectionStrategy,
TopicsWithExistingSubjectSelectionStrategy
}
import pl.touk.nussknacker.engine.schemedkafka.TopicsWithExistingSubjectSelectionStrategy

trait SchemaRegistryClient extends Serializable {

Expand Down Expand Up @@ -44,12 +40,13 @@ trait SchemaRegistryClient extends Serializable {

def getAllVersions(topic: UnspecializedTopicName, isKey: Boolean): Validated[SchemaRegistryError, List[Integer]]

def isTopicWithSchema(topic: String, strategy: TopicSelectionStrategy, kafkaConfig: KafkaConfig): Boolean = {
val topicsWithSchema = strategy match {
case strategy: TopicsMatchingPatternWithExistingSubjectsSelectionStrategy => strategy.getTopics(this, kafkaConfig)
case _ => new TopicsWithExistingSubjectSelectionStrategy().getTopics(this, kafkaConfig)
def isTopicWithSchema(topic: String, kafkaConfig: KafkaConfig): Boolean = {
if (!kafkaConfig.showTopicsWithoutSchema) {
true
} else {
val topicsWithSchema = new TopicsWithExistingSubjectSelectionStrategy().getTopics(this, kafkaConfig)
topicsWithSchema.exists(_.map(_.name).contains(topic))
}
topicsWithSchema.exists(_.map(_.name).contains(topic))
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import pl.touk.nussknacker.engine.api.process.TopicName
import pl.touk.nussknacker.engine.api.test.TestRecord
import pl.touk.nussknacker.engine.kafka.consumerrecord.SerializableConsumerRecord
import pl.touk.nussknacker.engine.kafka.{KafkaConfig, RecordFormatter, serialization}
import pl.touk.nussknacker.engine.schemedkafka.TopicsWithExistingSubjectSelectionStrategy
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{
ContentTypes,
ContentTypesSchemas,
Expand Down Expand Up @@ -114,7 +113,6 @@ abstract class AbstractSchemaBasedRecordFormatter[K: ClassTag, V: ClassTag] exte

if (schemaRegistryClient.isTopicWithSchema(
topic.name,
new TopicsWithExistingSubjectSelectionStrategy,
kafkaConfig
)) {
val valueSchemaOpt = record.valueSchemaId.map(schemaRegistryClient.getSchemaById).map(_.schema)
Expand Down Expand Up @@ -144,7 +142,7 @@ abstract class AbstractSchemaBasedRecordFormatter[K: ClassTag, V: ClassTag] exte

}

record.consumerRecord.toKafkaConsumerRecord(topic, serializeKeyValue)
record.consumerRecord.copy(topic = Some(topic.name)).toKafkaConsumerRecord(topic, serializeKeyValue)
}

protected def readRecordKeyMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import org.apache.flink.formats.avro.typeutils.NkSerializableParsedSchema
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.serialization.Deserializer
import pl.touk.nussknacker.engine.kafka.KafkaConfig
import pl.touk.nussknacker.engine.schemedkafka.{RuntimeSchemaData, TopicsWithExistingSubjectSelectionStrategy}
import pl.touk.nussknacker.engine.schemedkafka.RuntimeSchemaData
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.serialization.SchemaRegistryBasedDeserializerFactory
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{
ChainedSchemaIdFromMessageExtractor,
Expand Down Expand Up @@ -42,7 +42,7 @@ class UniversalKafkaDeserializer[T](
.getOrElse(throw MessageWithoutSchemaIdException)

val schemaWithMetadata = {
if (schemaRegistryClient.isTopicWithSchema(topic, new TopicsWithExistingSubjectSelectionStrategy, kafkaConfig)) {
if (schemaRegistryClient.isTopicWithSchema(topic, kafkaConfig)) {
schemaRegistryClient.getSchemaById(writerSchemaId.value)
} else {
writerSchemaId.value match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Optional
@Network
class AzureSchemaBasedSerdeProviderIntegrationTest extends AnyFunSuite with OptionValues with Matchers {

ignore("serialization round-trip") {
test("serialization round-trip") {
val eventHubsNamespace = Option(System.getenv("AZURE_EVENT_HUBS_NAMESPACE")).getOrElse("nu-cloud")
val config = Map(
"schema.registry.url" -> s"https://$eventHubsNamespace.servicebus.windows.net",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class AzureTestsFromFileIntegrationTest

private val kafkaConfig = KafkaConfig(Some(schemaRegistryConfigMap), None, showTopicsWithoutSchema = false)

ignore("test from file round-trip") {
test("test from file round-trip") {
val schemaRegistryClient = AzureSchemaRegistryClientFactory.create(kafkaConfig.schemaRegistryClientKafkaConfig)
val serdeProvider = UniversalSchemaBasedSerdeProvider.create(UniversalSchemaRegistryClientFactory)
val factory = serdeProvider.deserializationSchemaFactory.create[String, GenericRecord](kafkaConfig, None, None)
Expand Down

0 comments on commit 3006060

Please sign in to comment.