Skip to content

Commit

Permalink
Remove headers and tags.
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar committed Nov 1, 2023
1 parent dd3a372 commit 0c4face
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import com.evolutiongaming.kafka.journal._
import com.evolutiongaming.kafka.journal.eventual.EventualPayloadAndType
import com.evolutiongaming.kafka.journal.eventual.cassandra.CassandraHelper._
import com.evolutiongaming.kafka.journal.eventual.cassandra.EventualCassandraConfig.ConsistencyConfig
import com.evolutiongaming.kafka.journal.eventual.cassandra.HeadersHelper._
import com.evolutiongaming.scassandra.syntax._
import com.evolutiongaming.scassandra.{DecodeByName, EncodeByName, TableName}
import scodec.bits.ByteVector
Expand All @@ -31,12 +30,10 @@ object SnapshotStatements {
|timestamp TIMESTAMP,
|origin TEXT,
|version TEXT,
|tags SET<TEXT>,
|metadata TEXT,
|payload_type TEXT,
|payload_txt TEXT,
|payload_bin BLOB,
|headers MAP<TEXT, TEXT>,
|status TEXT
|PRIMARY KEY ((id, topic, segment), buffer_nr))
|""".stripMargin
Expand Down Expand Up @@ -73,14 +70,12 @@ object SnapshotStatements {
|timestamp,
|origin,
|version,
|tags,
|payload_type,
|payload_txt,
|payload_bin,
|metadata,
|headers,
|status)
|VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|""".stripMargin

for {
Expand Down Expand Up @@ -110,12 +105,10 @@ object SnapshotStatements {
.encode("timestamp", record.timestamp)
.encodeSome(record.origin)
.encodeSome(record.version)
.encode("tags", snapshot.tags)
.encodeSome("payload_type", payloadType)
.encodeSome("payload_txt", txt)
.encodeSome("payload_bin", bin)
.encode("metadata", record.metadata)(encodeByNameRecordMetadata)
.encode(record.headers)
.encode(record.status)
.setConsistencyLevel(consistencyConfig.value)
}
Expand Down Expand Up @@ -151,12 +144,10 @@ object SnapshotStatements {
|timestamp,
|origin,
|version,
|tags,
|payload_type,
|payload_txt,
|payload_bin,
|metadata,
|headers,
|status FROM ${ name.toCql }
|WHERE id = ?
|AND topic = ?
Expand Down Expand Up @@ -197,13 +188,10 @@ object SnapshotStatements {
val seqNr = row.decode[SeqNr]
val snapshot = Snapshot(
seqNr = seqNr,
tags = row.decode[Tags]("tags"),
payload = payload)

val metadata = row.decode[Option[RecordMetadata]]("metadata") getOrElse RecordMetadata.empty

val headers = row.decode[Headers]

val status = row.decode[SnapshotStatus]

SnapshotRecord(
Expand All @@ -213,7 +201,6 @@ object SnapshotStatements {
version = row.decode[Option[Version]],
partitionOffset = partitionOffset,
metadata = metadata,
headers = headers,
status = status)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,5 @@ package com.evolutiongaming.kafka.journal

final case class Snapshot[A](
seqNr: SeqNr,
tags: Tags = Tags.empty,
payload: Option[A] = None
)
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,5 @@ final case class SnapshotRecord[A](
origin: Option[Origin],
version: Option[Version],
metadata: RecordMetadata,
headers: Headers,
status: SnapshotStatus
)

0 comments on commit 0c4face

Please sign in to comment.