Skip to content

Commit

Permalink
Scalize stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-albert committed Jan 21, 2024
1 parent b578bbd commit 1e038c5
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 65 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,4 @@ Thumbs.db
# Metals - Scala language server
.metals/
.bloop/
.bsp
2 changes: 0 additions & 2 deletions src/main/scala/com/cqrs/demo/stream/Event.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.cqrs.demo.stream

import play.api.libs.json._
import org.apache.kafka.common.serialization.Serde

case class Event(
followerID: String,
Expand All @@ -14,5 +13,4 @@ object Event {
implicit val eventReads: Reads[Event] = Json.reads[Event]
implicit val eventWrites: Writes[Event] = Json.writes[Event]

val jsonSerde: Serde[Event] = SerDes.jsonSerde[Event]
}
23 changes: 12 additions & 11 deletions src/main/scala/com/cqrs/demo/stream/EventsStreamer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,34 +3,35 @@ package com.cqrs.demo.stream
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
import org.apache.kafka.common.serialization.Serdes

import java.util.Properties
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala.serialization.Serdes._
import com.cqrs.demo.stream.implicits._

object EventsStreamer {

def main(args: Array[String]): Unit = {
val props = new Properties()
props.load(getClass.getClassLoader.getResourceAsStream("kafka-streams.properties"))

val builder = new StreamsBuilder()

val sourceStream: KStream[String, Option[Event]] =
val sourceStream: KStream[String, Event] =
builder
.stream[String, Event]("events")(Consumed.`with`(Serdes.String, Event.jsonSerde))
.mapValues(event => Option(event))
.stream[String, Event]("events")

// Process and send to 'jdbc' topic
sourceStream
.flatMap { (_, maybeEvent) => maybeEvent.map(event => Person.forwards(event)).getOrElse(Seq.empty) }
.to("people")(Produced.`with`(Serdes.String, Person.jsonSerde))
.flatMap((_, event) => Person.forwards(event))
.to("people")

// Process and send to 'graph' topic
sourceStream
.flatMap { (_, maybeEvent) => maybeEvent.map(event => Node.forwards(event)).getOrElse(Seq.empty) }
.to("graph")(Produced.`with`(Serdes.String, Node.jsonSerde))
.flatMap((_, event) => Node.forwards(event))
.to("graph")
sourceStream
.flatMap { (_, maybeEvent) => maybeEvent.map(event => Relationship.forwards(event)).getOrElse(Seq.empty) }
.to("graph")(Produced.`with`(Serdes.String, Relationship.jsonSerde))
.flatMap((_, event) => Relationship.forwards(event))
.to("graph")

// Build and start the Kafka Streams application
val streams = new KafkaStreams(builder.build(), new StreamsConfig(props))
Expand Down
3 changes: 0 additions & 3 deletions src/main/scala/com/cqrs/demo/stream/Node.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.cqrs.demo.stream

import play.api.libs.json._
import org.apache.kafka.common.serialization.Serde

case class Node(
op: String,
Expand All @@ -21,8 +20,6 @@ object Node {
implicit val nodePropertiesWrites: Writes[NodeProperties] = Json.writes[NodeProperties]
implicit val nodeWrites: Writes[Node] = Json.writes[Node]

val jsonSerde: Serde[Node] = SerDes.jsonSerde[Node]

def forwards(event: Event): Seq[(String, Node)] = {
val node1 = Node(
op = "merge",
Expand Down
39 changes: 13 additions & 26 deletions src/main/scala/com/cqrs/demo/stream/Person.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package com.cqrs.demo.stream

import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.ObjectNode
import com.fasterxml.jackson.databind.ObjectMapper
import play.api.libs.json._
import org.apache.kafka.common.serialization._

case class Person(id: Int, name: String) {
val schema: JsValue = Json.obj(
case class Person(id: Int, name: String)

object Person {
implicit val personReads: Reads[Person] = Json.reads[Person]

implicit val personWrites: Writes[Person] = OWrites(person =>
Json.obj(
"schema" -> schema,
"payload" -> Json.toJson(person)(Json.writes[Person])
)
)

private val schema: JsValue = Json.obj(
"type" -> "struct",
"name" -> "User",
"fields" -> Json.arr(
Expand All @@ -16,19 +23,6 @@ case class Person(id: Int, name: String) {
)
)

def withSchema: JsValue = {
Json.obj(
"schema" -> schema,
"payload" -> Json.toJson(this)
)
}
}

object Person {
implicit val personReads: Reads[Person] = Json.reads[Person]
implicit val personWrites: Writes[Person] = Json.writes[Person]

val jsonSerde: Serde[Person] = Serdes.serdeFrom(new PersonJsonSerializer(), new SerDes.JsonDeserializer())

def forwards(event: Event): Seq[(String, Person)] = {
Seq(
Expand All @@ -38,10 +32,3 @@ object Person {
}
}

class PersonJsonSerializer extends Serializer[Person] {
private val objectMapper = new ObjectMapper()
override def serialize(topic: String, data: Person): Array[Byte] = {
data.withSchema.toString().getBytes
}
override def close(): Unit = {}
}
2 changes: 0 additions & 2 deletions src/main/scala/com/cqrs/demo/stream/Relationship.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ object Relationship {
implicit val relationshipNodeWrites: Writes[RelationshipNode] = Json.writes[RelationshipNode]
implicit val relationshipWrites: Writes[Relationship] = Json.writes[Relationship]

val jsonSerde: Serde[Relationship] = SerDes.jsonSerde[Relationship]

def forwards(event: Event): Seq[(String, Relationship)] = {
val relationship = Relationship(
op = "merge",
Expand Down
21 changes: 0 additions & 21 deletions src/main/scala/com/cqrs/demo/stream/Serdes.scala

This file was deleted.

16 changes: 16 additions & 0 deletions src/main/scala/com/cqrs/demo/stream/implicits.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.cqrs.demo.stream

import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.scala.serialization.Serdes
import play.api.libs.json.{Json, Reads, Writes}

object implicits {
implicit def jsonSerde[T >: Null](
implicit reads: Reads[T],
writes: Writes[T]
): Serde[T] =
Serdes.fromFn[T](
(data: T) => Json.toJson(data).toString().getBytes,
(data: Array[Byte]) => Json.parse(new String(data)).validate[T].asOpt
)
}

0 comments on commit 1e038c5

Please sign in to comment.