diff --git a/.gitignore b/.gitignore index c65aaba..4886f51 100644 --- a/.gitignore +++ b/.gitignore @@ -76,3 +76,4 @@ Thumbs.db # Metals - Scala language server .metals/ .bloop/ +.bsp \ No newline at end of file diff --git a/src/main/scala/com/cqrs/demo/stream/Event.scala b/src/main/scala/com/cqrs/demo/stream/Event.scala index 74caa83..787d3e4 100644 --- a/src/main/scala/com/cqrs/demo/stream/Event.scala +++ b/src/main/scala/com/cqrs/demo/stream/Event.scala @@ -1,7 +1,6 @@ package com.cqrs.demo.stream import play.api.libs.json._ -import org.apache.kafka.common.serialization.Serde case class Event( followerID: String, @@ -14,5 +13,4 @@ object Event { implicit val eventReads: Reads[Event] = Json.reads[Event] implicit val eventWrites: Writes[Event] = Json.writes[Event] - val jsonSerde: Serde[Event] = SerDes.jsonSerde[Event] } diff --git a/src/main/scala/com/cqrs/demo/stream/EventsStreamer.scala b/src/main/scala/com/cqrs/demo/stream/EventsStreamer.scala index 0ee76fc..121fcce 100644 --- a/src/main/scala/com/cqrs/demo/stream/EventsStreamer.scala +++ b/src/main/scala/com/cqrs/demo/stream/EventsStreamer.scala @@ -3,34 +3,35 @@ package com.cqrs.demo.stream import org.apache.kafka.streams.scala.StreamsBuilder import org.apache.kafka.streams.scala.kstream._ import org.apache.kafka.streams.{KafkaStreams, StreamsConfig} -import org.apache.kafka.common.serialization.Serdes - import java.util.Properties +import org.apache.kafka.streams.scala.ImplicitConversions._ +import org.apache.kafka.streams.scala.serialization.Serdes._ +import com.cqrs.demo.stream.implicits._ object EventsStreamer { + def main(args: Array[String]): Unit = { val props = new Properties() props.load(getClass.getClassLoader.getResourceAsStream("kafka-streams.properties")) val builder = new StreamsBuilder() - val sourceStream: KStream[String, Option[Event]] = + val sourceStream: KStream[String, Event] = builder - .stream[String, Event]("events")(Consumed.`with`(Serdes.String, Event.jsonSerde)) - .mapValues(event => Option(event)) + .stream[String, Event]("events") // Process and send to 'jdbc' topic sourceStream - .flatMap { (_, maybeEvent) => maybeEvent.map(event => Person.forwards(event)).getOrElse(Seq.empty) } - .to("people")(Produced.`with`(Serdes.String, Person.jsonSerde)) + .flatMap((_, event) => Person.forwards(event)) + .to("people") // Process and send to 'graph' topic sourceStream - .flatMap { (_, maybeEvent) => maybeEvent.map(event => Node.forwards(event)).getOrElse(Seq.empty) } - .to("graph")(Produced.`with`(Serdes.String, Node.jsonSerde)) + .flatMap((_, event) => Node.forwards(event)) + .to("graph") sourceStream - .flatMap { (_, maybeEvent) => maybeEvent.map(event => Relationship.forwards(event)).getOrElse(Seq.empty) } - .to("graph")(Produced.`with`(Serdes.String, Relationship.jsonSerde)) + .flatMap((_, event) => Relationship.forwards(event)) + .to("graph") // Build and start the Kafka Streams application val streams = new KafkaStreams(builder.build(), new StreamsConfig(props)) diff --git a/src/main/scala/com/cqrs/demo/stream/Node.scala b/src/main/scala/com/cqrs/demo/stream/Node.scala index 23e072e..66068e2 100644 --- a/src/main/scala/com/cqrs/demo/stream/Node.scala +++ b/src/main/scala/com/cqrs/demo/stream/Node.scala @@ -1,7 +1,6 @@ package com.cqrs.demo.stream import play.api.libs.json._ -import org.apache.kafka.common.serialization.Serde case class Node( op: String, @@ -21,8 +20,6 @@ object Node { implicit val nodePropertiesWrites: Writes[NodeProperties] = Json.writes[NodeProperties] implicit val nodeWrites: Writes[Node] = Json.writes[Node] - val jsonSerde: Serde[Node] = SerDes.jsonSerde[Node] - def forwards(event: Event): Seq[(String, Node)] = { val node1 = Node( op = "merge", diff --git a/src/main/scala/com/cqrs/demo/stream/Person.scala b/src/main/scala/com/cqrs/demo/stream/Person.scala index 4b422bb..07e566d 100644 --- a/src/main/scala/com/cqrs/demo/stream/Person.scala +++ b/src/main/scala/com/cqrs/demo/stream/Person.scala @@ -1,13 +1,20 @@ package com.cqrs.demo.stream -import com.fasterxml.jackson.databind.JsonNode -import com.fasterxml.jackson.databind.node.ObjectNode -import com.fasterxml.jackson.databind.ObjectMapper import play.api.libs.json._ -import org.apache.kafka.common.serialization._ -case class Person(id: Int, name: String) { - val schema: JsValue = Json.obj( +case class Person(id: Int, name: String) + +object Person { + implicit val personReads: Reads[Person] = Json.reads[Person] + + implicit val personWrites: Writes[Person] = OWrites(person => + Json.obj( + "schema" -> schema, + "payload" -> Json.toJson(person)(Json.writes[Person]) + ) + ) + + private val schema: JsValue = Json.obj( "type" -> "struct", "name" -> "User", "fields" -> Json.arr( @@ -16,19 +23,6 @@ case class Person(id: Int, name: String) { ) ) - def withSchema: JsValue = { - Json.obj( - "schema" -> schema, - "payload" -> Json.toJson(this) - ) - } -} - -object Person { - implicit val personReads: Reads[Person] = Json.reads[Person] - implicit val personWrites: Writes[Person] = Json.writes[Person] - - val jsonSerde: Serde[Person] = Serdes.serdeFrom(new PersonJsonSerializer(), new SerDes.JsonDeserializer()) def forwards(event: Event): Seq[(String, Person)] = { Seq( @@ -38,10 +32,3 @@ object Person { } } -class PersonJsonSerializer extends Serializer[Person] { - private val objectMapper = new ObjectMapper() - override def serialize(topic: String, data: Person): Array[Byte] = { - data.withSchema.toString().getBytes - } - override def close(): Unit = {} -} diff --git a/src/main/scala/com/cqrs/demo/stream/Relationship.scala b/src/main/scala/com/cqrs/demo/stream/Relationship.scala index 6ec6eb1..9ac2bab 100644 --- a/src/main/scala/com/cqrs/demo/stream/Relationship.scala +++ b/src/main/scala/com/cqrs/demo/stream/Relationship.scala @@ -24,8 +24,6 @@ object Relationship { implicit val relationshipNodeWrites: Writes[RelationshipNode] = Json.writes[RelationshipNode] implicit val relationshipWrites: Writes[Relationship] = Json.writes[Relationship] - val jsonSerde: Serde[Relationship] = SerDes.jsonSerde[Relationship] - def forwards(event: Event): Seq[(String, Relationship)] = { val relationship = Relationship( op = "merge", diff --git a/src/main/scala/com/cqrs/demo/stream/Serdes.scala b/src/main/scala/com/cqrs/demo/stream/Serdes.scala deleted file mode 100644 index f5f2dba..0000000 --- a/src/main/scala/com/cqrs/demo/stream/Serdes.scala +++ /dev/null @@ -1,21 +0,0 @@ -package com.cqrs.demo.stream - -import play.api.libs.json._ -import org.apache.kafka.common.serialization._ - -object SerDes { - def jsonSerde[T](implicit reads: Reads[T], writes: Writes[T]): Serde[T] = - Serdes.serdeFrom(new JsonSerializer[T], new JsonDeserializer[T]) - - class JsonSerializer[T](implicit writes: Writes[T]) extends Serializer[T] { - override def serialize(topic: String, data: T): Array[Byte] = - Json.toJson(data).toString().getBytes - override def close(): Unit = {} - } - - class JsonDeserializer[T](implicit reads: Reads[T]) extends Deserializer[T] { - override def deserialize(topic: String, data: Array[Byte]): T = - Json.parse(new String(data)).validate[T].getOrElse(null.asInstanceOf[T]) - override def close(): Unit = {} - } -} diff --git a/src/main/scala/com/cqrs/demo/stream/implicits.scala b/src/main/scala/com/cqrs/demo/stream/implicits.scala new file mode 100644 index 0000000..b727f66 --- /dev/null +++ b/src/main/scala/com/cqrs/demo/stream/implicits.scala @@ -0,0 +1,16 @@ +package com.cqrs.demo.stream + +import org.apache.kafka.common.serialization.Serde +import org.apache.kafka.streams.scala.serialization.Serdes +import play.api.libs.json.{Json, Reads, Writes} + +object implicits { + implicit def jsonSerde[T >: Null]( + implicit reads: Reads[T], + writes: Writes[T] + ): Serde[T] = + Serdes.fromFn[T]( + (data: T) => Json.toJson(data).toString().getBytes, + (data: Array[Byte]) => Json.parse(new String(data)).validate[T].asOpt + ) +}