Skip to content

Commit

Permalink
Create migration for only max schema key of the same schema model
Browse files Browse the repository at this point in the history
After starting to use lookupSchemasUntil in fetchSchemasWithSameModel, we are only getting
schemas until the given schema key for every schema key. Previously, we were getting all the
schemas for the same schema model.

This change caused change of behavior when a message contains multiple schema keys for same schema model.
When this happens, RDB Loader tries to create same table multiple times. In order to solve this problem,
this commit contains the change for creating the migration for only max schema key of the same schema model.
  • Loading branch information
spenes committed Oct 2, 2024
1 parent b2dbafb commit 23c1feb
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,11 @@ object Migration {
target: Target[I],
disableRecovery: List[SchemaCriterion]
): F[Migration[C]] = {
val nonAtomicTypes = discovery.shreddedTypes.filterNot(_.isAtomic)
val maxSchemaKeysPerTableName = DataDiscovery.getMaxSchemaKeyPerTableName(nonAtomicTypes).values.toList
val filteredTypes = nonAtomicTypes.filter(s => maxSchemaKeysPerTableName.contains(s.info.getSchemaKey))
val descriptions: LoaderAction[F, List[Description]] =
discovery.shreddedTypes.filterNot(_.isAtomic).traverse {
filteredTypes.traverse {
case s: ShreddedType.Tabular =>
if (!disableRecovery.contains(s.info.toCriterion))
EitherT.rightT[F, LoaderError](Description.Table(discovery.shredModels(s.info.getSchemaKey).mergeRedshiftSchemasResult))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,119 @@ class MigrationSpec extends Specification {
}
}

"build Migration for only max schema keys" in {
implicit val transaction: Transaction[Pure, Pure] = PureTransaction.interpreter
implicit val dao: DAO[Pure] = PureDAO.interpreter(PureDAO.init)
implicit val iglu: Iglu[Pure] = PureIglu.interpreter
implicit val logging: Logging[Pure] = PureLogging.interpreter()
def createTestData(testNum: Int) = {
val s1 = ShreddedType.Tabular(
ShreddedType.Info(
BlobStorage.Folder.coerce("s3://shredded/archive"),
"com.acme",
s"some_context",
SchemaVer.Full(testNum, 0, 0),
SnowplowEntity.Context
)
)
val s2 = s1.copy(info = s1.info.copy(version = SchemaVer.Full(testNum, 0, 1)))
val s3 = s1.copy(info = s1.info.copy(version = SchemaVer.Full(testNum, 0, 2)))
val types = List(s1, s2, s3)
val schema1 = SelfDescribingSchema(SchemaMap(s1.info.getSchemaKey), Schema())
val schema2 = SelfDescribingSchema(SchemaMap(s2.info.getSchemaKey), Schema())
val schema3 = SelfDescribingSchema(SchemaMap(s3.info.getSchemaKey), Schema())
val shredModels = Map(
schema1.self.schemaKey -> DiscoveredShredModels(
foldMapRedshiftSchemas(NonEmptyList.of(schema1))(
schema1.self.schemaKey
),
foldMapMergeRedshiftSchemas(NonEmptyList.of(schema1))
),
schema2.self.schemaKey -> DiscoveredShredModels(
foldMapRedshiftSchemas(NonEmptyList.of(schema1, schema2))(
schema2.self.schemaKey
),
foldMapMergeRedshiftSchemas(NonEmptyList.of(schema1, schema2))
),
schema3.self.schemaKey -> DiscoveredShredModels(
foldMapRedshiftSchemas(NonEmptyList.of(schema1, schema2, schema3))(
schema3.self.schemaKey
),
foldMapMergeRedshiftSchemas(NonEmptyList.of(schema1, schema2, schema3))
)
)

val createToDdl =
s"""CREATE TABLE IF NOT EXISTS public.com_acme_some_context_$testNum (
| "schema_vendor" VARCHAR(128) ENCODE ZSTD NOT NULL,
| "schema_name" VARCHAR(128) ENCODE ZSTD NOT NULL,
| "schema_format" VARCHAR(128) ENCODE ZSTD NOT NULL,
| "schema_version" VARCHAR(128) ENCODE ZSTD NOT NULL,
| "root_id" CHAR(36) ENCODE RAW NOT NULL,
| "root_tstamp" TIMESTAMP ENCODE ZSTD NOT NULL,
| "ref_root" VARCHAR(255) ENCODE ZSTD NOT NULL,
| "ref_tree" VARCHAR(1500) ENCODE ZSTD NOT NULL,
| "ref_parent" VARCHAR(255) ENCODE ZSTD NOT NULL,
| FOREIGN KEY (root_id) REFERENCES public.events(event_id)
|)
|DISTSTYLE KEY
|DISTKEY (root_id)
|SORTKEY (root_tstamp);
|
|COMMENT ON TABLE public.com_acme_some_context_$testNum IS 'iglu:com.acme/some_context/jsonschema/$testNum-0-2';
|""".stripMargin

val expectedSql = LogEntry.Sql(Statement.TableExists(s"com_acme_some_context_$testNum"))
val expectedMigrations = List(
LogEntry.Message(s"Creating public.com_acme_some_context_$testNum table for iglu:com.acme/some_context/jsonschema/$testNum-0-2"),
LogEntry.Sql(Statement.CreateTable(Fragment.const0(createToDdl))),
LogEntry.Sql(
Statement.CommentOn(s"public.com_acme_some_context_$testNum", s"iglu:com.acme/some_context/jsonschema/$testNum-0-2")
),
LogEntry.Message(s"Table public.com_acme_some_context_$testNum created")
)

(types, shredModels, expectedSql, expectedMigrations)
}

val (types, shredModels, expectedSqls, expectedMigration) = (1 to 3)
.map(createTestData)
.foldLeft(
(List.empty[ShreddedType.Tabular], Map.empty[SchemaKey, DiscoveredShredModels], List.empty[LogEntry], List.empty[LogEntry])
) {
case (
(accTypes, accShredModels, accExpectedSqls, accExpectedMigrations),
(types, shredModels, expectedSql, expectedMigrations)
) =>
(
accTypes ::: types,
accShredModels ++ shredModels,
accExpectedSqls :+ expectedSql,
accExpectedMigrations ::: expectedMigrations
)
}

val input =
DataDiscovery(
BlobStorage.Folder.coerce("s3://shredded/archive"),
types,
Compression.Gzip,
TypesInfo.Shredded(List.empty),
Nil,
shredModels
)

val expected = PureTransaction.NoTransactionMessage :: expectedSqls

val (state, value) = Migration.build[Pure, Pure, Unit](input, PureDAO.DummyTarget, Nil).run

state.getLog must beEqualTo(expected)
value must beRight.like { case Migration(preTransaction, inTransaction) =>
preTransaction mustEqual Nil
inTransaction.runS.getLog must beEqualTo(expectedMigration)
}
}

"ignore atomic schema" in {
implicit val transaction: Transaction[Pure, Pure] = PureTransaction.interpreter
implicit val dao: DAO[Pure] = PureDAO.interpreter(PureDAO.init)
Expand Down

0 comments on commit 23c1feb

Please sign in to comment.