Skip to content

Commit

Permalink
Extract schema migration into a separate class. (#538)
Browse files Browse the repository at this point in the history
The idea is to allow several instances to exist,
i.e. SetupJournalSchema and SetupSnapshotSchema.
  • Loading branch information
rtar authored Dec 12, 2023
1 parent 49e5864 commit a318cbe
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 111 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package com.evolutiongaming.kafka.journal.eventual.cassandra

import cats.MonadThrow
import cats.data.{NonEmptyList => Nel}
import cats.syntax.all._
import com.evolutiongaming.kafka.journal.Settings
import com.evolutiongaming.kafka.journal.eventual.cassandra.CassandraHelper._

import scala.util.Try

/** Migrates the existing schema to the latest version */
trait MigrateSchema[F[_]] {

/** Run all built-in migrations
*
* @param fresh
* Indicates if the schema was just created from scratch, or some tables were already present. The parameter is
* taken into consideration if there is no schema version information available in the settings. In this case, if
* `true`, then it will be assumed that no migrations are required, and the latest version will be saved into
* settings. If `false` then all migration steps will be attempted (because the schema, likely, was created before
* migration steps were added).
*/
def run(fresh: CreateSchema.Fresh)(implicit session: CassandraSession[F]): F[Unit]

}
object MigrateSchema {

/** Save version of a schema to the settings storage under specific key.
*
* @param cassandraSync
* Locking mechanism to ensure two migrations are not happening in paralell.
* @param settings
* Storage to get / save the schema version from / to.
* @param settingKey
* A key to use in a setting store. It is important to use a different key for different schemas, to ensure there
* is no accidential overwrite if both schemas are located in one keyspace.
* @param migrations
* List of CQL statements to execute. The schema version is equal to the size of this list.
* @return
* The instance of schema migrator.
*/
def forSettingKey[F[_]: MonadThrow](
cassandraSync: CassandraSync[F],
settings: Settings[F],
settingKey: String,
migrations: Nel[String]
): MigrateSchema[F] = new MigrateSchema[F] {

def setVersion(version: Int) =
settings
.set(settingKey, version.toString)
.void

def run(fresh: CreateSchema.Fresh)(implicit session: CassandraSession[F]): F[Unit] = {

def migrate = {

def migrate(version: Int) = {
migrations.toList
.drop(version + 1)
.toNel
.map { migrations =>
migrations
.foldLeftM(version) { (version, migration) =>
val version1 = version + 1
for {
_ <- migration.execute.first.void.voidError
_ <- setVersion(version1)
} yield version1
}
.void
}
}

settings
.get(settingKey)
.map { setting =>
setting
.flatMap { a =>
Try.apply { a.value.toInt }.toOption
}
.fold {
if (fresh) {
val version = migrations.size - 1
if (version >= 0) {
setVersion(version).some
} else {
none[F[Unit]]
}
} else {
migrate(-1)
}
} { version =>
migrate(version)
}
}
}

migrate.flatMap { migrate1 =>
migrate1.foldMapM { _ =>
cassandraSync {
migrate.flatMap { migrate =>
migrate.foldMapM(identity)
}
}
}
}

}

}

}
Original file line number Diff line number Diff line change
@@ -1,129 +1,45 @@
package com.evolutiongaming.kafka.journal.eventual.cassandra

import cats.effect.kernel.Temporal
import cats.data.{NonEmptyList => Nel}
import cats.effect.kernel.Temporal
import cats.syntax.all._
import cats.{MonadThrow, Parallel}
import com.evolutiongaming.catshelper.LogOf
import com.evolutiongaming.kafka.journal.eventual.cassandra.CassandraHelper._
import com.evolutiongaming.kafka.journal.eventual.cassandra.EventualCassandraConfig.ConsistencyConfig
import com.evolutiongaming.kafka.journal.{Origin, Settings}
import com.evolutiongaming.scassandra.ToCql.implicits._

import scala.util.Try
/** Creates a new schema, or migrates to the latest schema version, if it already exists */
object SetupSchema {

object SetupSchema { self =>
val SettingKey = "schema-version"

def migrate[F[_]: MonadThrow: CassandraSession](
schema: Schema,
fresh: CreateSchema.Fresh,
settings: Settings[F],
cassandraSync: CassandraSync[F]
): F[Unit] = {
def migrations(schema: Schema): Nel[String] = {

def addHeaders = {
JournalStatements
.addHeaders(schema.journal)
.execute
.first
.void
.handleError { _ => () }
}
def addHeaders =
JournalStatements.addHeaders(schema.journal)

def addVersion = {
JournalStatements
.addVersion(schema.journal)
.execute
.first
.void
.handleError { _ => () }
}
def addVersion =
JournalStatements.addVersion(schema.journal)

def dropMetadata = {
s"DROP TABLE IF EXISTS ${ schema.metadata.toCql }"
.execute
.first
.void
.handleError { _ => () }
}
def dropMetadata =
s"DROP TABLE IF EXISTS ${schema.metadata.toCql}"

def createPointer2 = {
def createPointer2 =
Pointer2Statements.createTable(schema.pointer2)
.execute
.first
.void
.handleError { _ => () }
}

val schemaVersion = "schema-version"

val migrations = Nel.of(
addHeaders,
addVersion,
dropMetadata,
createPointer2)

def setVersion(version: Int) = {
settings
.set("schema-version", version.toString)
.void
}
Nel.of(addHeaders, addVersion, dropMetadata, createPointer2)

def migrate = {

def migrate(version: Int) = {
migrations
.toList
.drop(version + 1)
.toNel
.map { migrations =>
migrations
.foldLeftM(version) { (version, migration) =>
val version1 = version + 1
for {
_ <- migration
_ <- setVersion(version1)
} yield version1
}
.void
}
}

settings
.get(schemaVersion)
.map { setting =>
setting
.flatMap { a =>
Try
.apply { a.value.toInt }
.toOption
}
.fold {
if (fresh) {
val version = migrations.size - 1
if (version >= 0) {
setVersion(version).some
} else {
none[F[Unit]]
}
} else {
migrate(-1)
}
} { version =>
migrate(version)
}
}
}
}

migrate.flatMap { migrate1 =>
migrate1.foldMapM { _ =>
cassandraSync {
migrate.flatMap { migrate =>
migrate.foldMapM(identity)
}
}
}
}
def migrate[F[_]: MonadThrow: CassandraSession](
schema: Schema,
fresh: CreateSchema.Fresh,
settings: Settings[F],
cassandraSync: CassandraSync[F]
): F[Unit] = {
val migrateSchema = MigrateSchema.forSettingKey(cassandraSync, settings, SettingKey, migrations(schema))
migrateSchema.run(fresh)
}

def apply[F[_]: Temporal: Parallel: CassandraCluster: CassandraSession: LogOf](
Expand All @@ -135,11 +51,12 @@ object SetupSchema { self =>
def createSchema(implicit cassandraSync: CassandraSync[F]) = CreateSchema(config)

for {
cassandraSync <- CassandraSync.of[F](config, origin)
ab <- createSchema(cassandraSync)
(schema, fresh) = ab
settings <- SettingsCassandra.of[F](schema, origin, consistencyConfig)
_ <- migrate(schema, fresh, settings, cassandraSync)
cassandraSync <- CassandraSync.of[F](config, origin)
ab <- createSchema(cassandraSync)
(schema, fresh) = ab
settings <- SettingsCassandra.of[F](schema, origin, consistencyConfig)
_ <- migrate(schema, fresh, settings, cassandraSync)
} yield schema
}

}

0 comments on commit a318cbe

Please sign in to comment.