Skip to content

Commit

Permalink
Merge pull request #106 from SwissBorg/compact-metadata-jsonb
Browse files Browse the repository at this point in the history
Make journal event and snapshot metadata jsonb more compact.
  • Loading branch information
mkubala authored Oct 15, 2020
2 parents 7525310 + 8700a7d commit 350c175
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ class ByteArrayJournalSerializer(serialization: Serialization, tagConverter: Tag
val serId = serializer.identifier
val serManifest = Serializers.manifestFor(serializer, payload)
val meta =
Metadata(serId, serManifest, persistentRepr.manifest, persistentRepr.writerUuid, persistentRepr.timestamp)
Metadata(
serId,
Option(serManifest).filterNot(_.trim.isEmpty),
Option(persistentRepr.manifest).filterNot(_.trim.isEmpty),
persistentRepr.writerUuid,
persistentRepr.timestamp)
JournalRow(
Long.MinValue,
persistentRepr.deleted,
Expand All @@ -52,14 +57,14 @@ class ByteArrayJournalSerializer(serialization: Serialization, tagConverter: Tag
override def deserialize(journalRow: JournalRow): Try[(PersistentRepr, Long)] =
for {
metadata <- journalRow.metadata.as[Metadata].toTry
e <- serialization.deserialize(journalRow.message, metadata.serId, metadata.serManifest)
e <- serialization.deserialize(journalRow.message, metadata.serId, metadata.serManifest.getOrElse(""))
} yield {
(
PersistentRepr(
e,
journalRow.sequenceNumber,
journalRow.persistenceId,
metadata.eventManifest,
metadata.eventManifest.getOrElse(""),
// not used, marked as deprecated (https://github.com/akka/akka/issues/27278)
deleted = false,
// not used, marked as deprecated (https://github.com/akka/akka/issues/27278
Expand All @@ -71,13 +76,23 @@ class ByteArrayJournalSerializer(serialization: Serialization, tagConverter: Tag
}

object ByteArrayJournalSerializer {
case class Metadata(serId: Int, serManifest: String, eventManifest: String, writerUuid: String, timestamp: Long)
case class Metadata(
serId: Int,
serManifest: Option[String],
eventManifest: Option[String],
writerUuid: String,
timestamp: Long)

object Metadata {
implicit val encoder: Encoder[Metadata] =
Encoder.forProduct5("serId", "serManifest", "eventManifest", "writerUuid", "timestamp")(e =>
(e.serId, e.serManifest, e.eventManifest, e.writerUuid, e.timestamp))
implicit val encoder: Encoder[Metadata] = Encoder
.forProduct5[Metadata, Int, Option[String], Option[String], String, Long]("sid", "sm", "em", "wid", "t") { e =>
(e.serId, e.serManifest, e.eventManifest, e.writerUuid, e.timestamp)
}
.mapJson(_.dropNullValues)

implicit val decoder: Decoder[Metadata] =
Decoder.forProduct5("serId", "serManifest", "eventManifest", "writerUuid", "timestamp")(Metadata.apply)
Decoder.forProduct5("sid", "sm", "em", "wid", "t")(Metadata.apply).or {
Decoder.forProduct5("serId", "serManifest", "eventManifest", "writerUuid", "timestamp")(Metadata.apply)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class ByteArraySnapshotSerializer(serialization: Serialization) extends Snapshot
ser <- Try(serialization.findSerializerFor(payload))
serializedSnapshot <- serialization.serialize(payload)
} yield {
val metadataJson = Metadata(ser.identifier, Serializers.manifestFor(ser, payload))
val metadataJson = Metadata(ser.identifier, Option(Serializers.manifestFor(ser, payload)).filterNot(_.trim.isEmpty))
SnapshotRow(
metadata.persistenceId,
metadata.sequenceNr,
Expand All @@ -36,7 +36,7 @@ class ByteArraySnapshotSerializer(serialization: Serialization) extends Snapshot
def deserialize(snapshotRow: SnapshotRow): Try[(SnapshotMetadata, Any)] = {
for {
metadata <- snapshotRow.metadata.as[Metadata].toTry
snapshot <- serialization.deserialize(snapshotRow.snapshot, metadata.serId, metadata.serManifest)
snapshot <- serialization.deserialize(snapshotRow.snapshot, metadata.serId, metadata.serManifest.getOrElse(""))
} yield {
val snapshotMetadata =
SnapshotMetadata(snapshotRow.persistenceId, snapshotRow.sequenceNumber, snapshotRow.created)
Expand All @@ -46,10 +46,13 @@ class ByteArraySnapshotSerializer(serialization: Serialization) extends Snapshot
}

object ByteArraySnapshotSerializer {
case class Metadata(serId: Int, serManifest: String)
case class Metadata(serId: Int, serManifest: Option[String])

object Metadata {
implicit val encoder: Encoder[Metadata] = Encoder.forProduct2("serId", "serManifest")(m => (m.serId, m.serManifest))
implicit val decoder: Decoder[Metadata] = Decoder.forProduct2("serId", "serManifest")(Metadata.apply)
implicit val encoder: Encoder[Metadata] = Encoder
.forProduct2[Metadata, Int, Option[String]]("sid", "sm")(m => (m.serId, m.serManifest))
.mapJson(_.dropNullValues)
implicit val decoder: Decoder[Metadata] =
Decoder.forProduct2("sid", "sm")(Metadata.apply).or(Decoder.forProduct2("serId", "serManifest")(Metadata.apply))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ package akka.persistence.postgres

import akka.actor.ActorSystem
import akka.persistence.postgres.config.{ JournalConfig, ReadJournalConfig }
import akka.persistence.postgres.db.SlickExtension
import akka.persistence.postgres.query.javadsl.PostgresReadJournal
import akka.persistence.postgres.util.DropCreate
import akka.persistence.postgres.db.SlickExtension
import akka.serialization.SerializationExtension
import akka.stream.{ ActorMaterializer, Materializer }
import akka.stream.{ Materializer, SystemMaterializer }
import akka.util.Timeout
import com.typesafe.config.{ Config, ConfigFactory, ConfigValue }
import org.scalatest.BeforeAndAfterAll
Expand All @@ -26,7 +26,7 @@ abstract class SharedActorSystemTestSpec(val config: Config) extends SimpleSpec
})

implicit lazy val system: ActorSystem = ActorSystem("test", config)
implicit lazy val mat: Materializer = ActorMaterializer()
implicit lazy val mat: Materializer = SystemMaterializer(system).materializer

implicit lazy val ec: ExecutionContext = system.dispatcher
implicit val pc: PatienceConfig = PatienceConfig(timeout = 1.minute)
Expand All @@ -40,6 +40,7 @@ abstract class SharedActorSystemTestSpec(val config: Config) extends SimpleSpec
val readJournalConfig = new ReadJournalConfig(config.getConfig(PostgresReadJournal.Identifier))

override protected def afterAll(): Unit = {
super.afterAll()
db.close()
system.terminate().futureValue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,18 @@ import java.time.{ LocalDateTime, ZoneOffset }
import java.util.UUID

import akka.persistence.journal.Tagged
import akka.persistence.postgres.journal.dao.ByteArrayJournalSerializer.Metadata
import akka.persistence.postgres.journal.dao.FakeTagIdResolver.unwanted1
import akka.persistence.postgres.tag.TagIdResolver
import akka.persistence.{ AtomicWrite, PersistentRepr }
import akka.serialization.Serializers
import akka.serialization.{ Serializer, Serializers }
import io.circe.Json
import org.scalatest.EitherValues
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.must.Matchers

import scala.collection.immutable._
import scala.concurrent.Future

class ByteArrayJournalSerializerTest extends SharedActorSystemTestSpec with ScalaFutures with EitherValues {
class ByteArrayJournalSerializerTest extends SharedActorSystemTestSpec with ScalaFutures {
it should "serialize a serializable message and indicate whether or not the serialization succeeded" in {
val serializer = new ByteArrayJournalSerializer(serialization, new FakeTagIdResolver())
val result = serializer.serialize(Seq(AtomicWrite(PersistentRepr("foo"))))
Expand Down Expand Up @@ -67,54 +65,156 @@ class ByteArrayJournalSerializerTest extends SharedActorSystemTestSpec with Scal
result.head.failed.futureValue shouldBe a[Throwable]
}

it should "serialize metadata" in {
val serializer = new ByteArrayJournalSerializer(serialization, new FakeTagIdResolver())
val payload = "foo"
val repr = PersistentRepr(payload)
val result = serializer.serialize(Seq(AtomicWrite(repr)))
val serializedRows = result.head.futureValue
serializedRows should have size 1
{

val meta = serializedRows.head.metadata.as[Metadata].right.value
val payloadSer = serialization.serializerFor(payload.getClass)
meta should equal {
Metadata(
serId = payloadSer.identifier,
serManifest = Serializers.manifestFor(payloadSer, payload),
eventManifest = repr.manifest,
writerUuid = repr.writerUuid,
timestamp = repr.timestamp)
def serialized(repr: PersistentRepr): (Serializer, Json) = {
val serializer = new ByteArrayJournalSerializer(serialization, new FakeTagIdResolver())
val result = serializer.serialize(Seq(AtomicWrite(repr)))
val serializedRows = result.head.futureValue
serializedRows should have size 1

val payloadSer = serialization.serializerFor(repr.payload.getClass)
val metaJson = serializedRows.head.metadata
(payloadSer, metaJson)
}
}

it should "deserialize metadata" in {
val serializer = new ByteArrayJournalSerializer(serialization, new FakeTagIdResolver())
it should "serialize metadata" in {
val repr = PersistentRepr("foo", manifest = "customManifest")
val (payloadSer, metaJson) = serialized(repr)
metaJson should equal {
Json.obj(
"sid" -> Json.fromInt(payloadSer.identifier),
"wid" -> Json.fromString(repr.writerUuid),
"t" -> Json.fromLong(repr.timestamp),
"em" -> Json.fromString("customManifest"))
}
}

it should "serialize metadata and skip empty fields" in {
val repr = PersistentRepr("foo")
val (payloadSer, metaJson) = serialized(repr)
metaJson should equal {
Json.obj(
"sid" -> Json.fromInt(payloadSer.identifier),
"wid" -> Json.fromString(repr.writerUuid),
"t" -> Json.fromLong(repr.timestamp))
}
}

it should "serialize metadata and skip blank fields" in {
val repr = PersistentRepr("foo", manifest = "")
val (payloadSer, metaJson) = serialized(repr)

metaJson should equal {
Json.obj(
"sid" -> Json.fromInt(payloadSer.identifier),
"wid" -> Json.fromString(repr.writerUuid),
"t" -> Json.fromLong(repr.timestamp))
}
}
}

{
val payload = "foo"
val payloadSer = serialization.serializerFor(payload.getClass)
val serId = payloadSer.identifier
val serManifest = Serializers.manifestFor(payloadSer, payload)
val eventManifest = "event manifest"
val writerUuid = UUID.randomUUID().toString
val timestamp = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC)

val meta = Json.fromFields(
List(
"serId" -> Json.fromLong(serId),
"serManifest" -> Json.fromString(serManifest),
"eventManifest" -> Json.fromString(eventManifest),
"writerUuid" -> Json.fromString(writerUuid),
"timestamp" -> Json.fromLong(timestamp)))
val row = JournalRow(1L, false, "my-7", 2137L, payload.getBytes(Charset.forName("UTF-8")), Nil, meta)

val result = serializer.deserialize(row)

val (repr, ordering) = result.success.value

ordering should equal(1L)
repr should equal {
PersistentRepr(payload, 2137L, "my-7", eventManifest, false, null, writerUuid)

def deserialized(meta: Json): PersistentRepr = {
val serializer = new ByteArrayJournalSerializer(serialization, new FakeTagIdResolver())
val row = JournalRow(1L, false, "my-7", 2137L, payload.getBytes(Charset.forName("UTF-8")), Nil, meta)

val result = serializer.deserialize(row)

val (repr, ordering) = result.success.value

ordering should equal(1L)
repr
}

it should "deserialize metadata with long keys" in {
val eventManifest = "event manifest"
val writerUuid = UUID.randomUUID().toString
val timestamp = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC)

val meta = Json.fromFields(
List(
"serId" -> Json.fromLong(serId),
"serManifest" -> Json.fromString(serManifest),
"eventManifest" -> Json.fromString(eventManifest),
"writerUuid" -> Json.fromString(writerUuid),
"timestamp" -> Json.fromLong(timestamp)))
val repr = deserialized(meta)
repr should equal {
PersistentRepr(payload, 2137L, "my-7", eventManifest, false, null, writerUuid)
}
}

it should "deserialize metadata with short keys" in {
val eventManifest = "event manifest"
val writerUuid = UUID.randomUUID().toString
val timestamp = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC)

val meta = Json.fromFields(
List(
"sid" -> Json.fromLong(serId),
"sm" -> Json.fromString(serManifest),
"em" -> Json.fromString(eventManifest),
"wid" -> Json.fromString(writerUuid),
"t" -> Json.fromLong(timestamp)))
val repr = deserialized(meta)
repr should equal {
PersistentRepr(payload, 2137L, "my-7", eventManifest, false, null, writerUuid)
}
}

it should "deserialize metadata with missing keys" in {
val writerUuid = UUID.randomUUID().toString
val timestamp = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC)

val meta = Json.fromFields(
List("sid" -> Json.fromLong(serId), "wid" -> Json.fromString(writerUuid), "t" -> Json.fromLong(timestamp)))
val repr = deserialized(meta)
repr should equal {
PersistentRepr(payload, 2137L, "my-7", "", false, null, writerUuid)
}
}

it should "deserialize metadata with empty keys" in {
val writerUuid = UUID.randomUUID().toString
val timestamp = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC)

val meta = Json.fromFields(
List(
"sid" -> Json.fromLong(serId),
"wid" -> Json.fromString(writerUuid),
"em" -> Json.fromString(""),
"t" -> Json.fromLong(timestamp)))
val repr = deserialized(meta)
repr should equal {
PersistentRepr(payload, 2137L, "my-7", "", false, null, writerUuid)
}
}

it should "deserialize metadata with mixed legacy long and new short keys - short ones taking precedence" in {
val writerUuid = UUID.randomUUID().toString
val timestamp = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC)

val meta = Json.fromFields(
List(
"sid" -> Json.fromLong(serId),
"serId" -> Json.fromLong(-1),
"sm" -> Json.fromString(serManifest),
"serManifest" -> Json.fromString("broken"),
"eventManifest" -> Json.fromString("this should be skipped"),
"wid" -> Json.fromString(writerUuid),
"t" -> Json.fromLong(timestamp)))
val repr = deserialized(meta)
repr should equal {
PersistentRepr(payload, 2137L, "my-7", "", false, null, writerUuid)
}
}

}

}
Expand Down
Loading

0 comments on commit 350c175

Please sign in to comment.