Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configurable deserializer and fix build #109

Merged
merged 3 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,48 @@ message Person {
repeated PhoneNumber phones = 4;
}
```

## Configurable example
`io.example.conduktor.custom.deserializers.MyConfigurableDeserializer`

[located here](./src/main/scala/io/example/conduktor/custom/deserializers/MyConfigurableDeserializer.scala)

This example allow to show deserializer configuration to change it's behavior.
To configure the behabor, the Deserializer check for a `output` property in it's configuration.

### Passthrough mode :
With configuration :
```properties
output=passthrough
```
The data on record are not de coded and returned as-is in bytes array form.

### Config mode :
With configuration :
```properties
output=config
```
The configuration is returned on each record deserialization.
For example with configuration
```properties
output=config
other.property=some value
```
Will always return JSON like
```json
{
"output": "config",
"other.property": "some value"
}
```

### Constant mode :

With configuration output defined to something else other than `config` or `passthrough` and not empty like:
```properties
output=some constant output
```
The Deserializer will always return String value like
```json
"some constant output"
```
6 changes: 6 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import org.typelevel.scalacoptions.ScalacOptions

name := "my_custom_deserializers"
version := sys.env.getOrElse("CREATED_TAG", "0.1")
scalaVersion := "2.13.10"
Expand All @@ -8,6 +10,10 @@ libraryDependencies ++= Seq(
"com.thesamet.scalapb.common-protos" %% "proto-google-common-protos-scalapb_0.11" % "2.9.6-0"
)

Compile / tpolecatExcludeOptions ++= Set(
ScalacOptions.warnNonUnitStatement, // for scalaPB gen sources
)

assembly / assemblyJarName := "plugins.jar"

// ## Github Packages publish configs
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.4.2")
addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.0")

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.example.conduktor.custom.deserializers

import org.apache.kafka.common.serialization.Deserializer

import java.util
import scala.jdk.CollectionConverters.MapHasAsScala

case object MyConfigurableDeserializerException
extends RuntimeException(
"ConfigurableDeserializer fail when its `::configure` method is called without `output` property"
)

sealed trait Output
final case class Constant(value: String) extends Output
final case class Config(config: util.Map[String, _]) extends Output
final case object Passthrough extends Output
final case object Unconfigured extends Output

final class MyConfigurableDeserializer extends Deserializer[Any] {

var output: Output = Unconfigured

override def deserialize(topic: String, data: Array[Byte]): Any = output match {
case Constant(value) => value
case Config(config) => config
case Passthrough => data
case Unconfigured => throw MyConfigurableDeserializerException
}

override def configure(configs: util.Map[String, _], isKey: Boolean): Unit =
configs.asScala.get("output").map(_.asInstanceOf[String]) match {
case Some("config") => output = Config(configs)
case Some("passthrough") => output = Passthrough
case Some(value) => output = Constant(value)
case None => throw MyConfigurableDeserializerException
}
}
Loading