Skip to content

Commit

Permalink
Loader: Fix column names for shredded tables (close #1332)
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Jan 2, 2024
1 parent 5857953 commit a4c312a
Showing 1 changed file with 2 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,26 +167,19 @@ object DataDiscovery {

def getShredModels[F[_]: Monad: Iglu](
nonAtomicTypes: List[ShreddedType]
): EitherT[F, LoaderError, Map[SchemaKey, MergeRedshiftSchemasResult]] = {
val maxSchemaKeyPerTableName = getMaxSchemaKeyPerTableName(nonAtomicTypes)
): EitherT[F, LoaderError, Map[SchemaKey, MergeRedshiftSchemasResult]] =
nonAtomicTypes
.traverse { shreddedType =>
EitherT(Iglu[F].getSchemasWithSameModel(shreddedType.info.getSchemaKey)).map { schemas =>
val maxSchemaKey = maxSchemaKeyPerTableName(shreddedType.info.getName)
val filtered = schemas.filter(_.self.schemaKey <= maxSchemaKey).toNel.get
val filtered = schemas.filter(_.self.schemaKey <= shreddedType.info.getSchemaKey).toNel.get
val mergeRedshiftSchemasResult = foldMapMergeRedshiftSchemas(filtered)
(shreddedType.info.getSchemaKey, mergeRedshiftSchemasResult)
}
}
.map(_.toMap)
}

implicit val ord: Ordering[SchemaKey] = ordering

/** Find the maximum SchemaKey for all table names in a given set of shredded types */
def getMaxSchemaKeyPerTableName(shreddedTypes: List[ShreddedType]): Map[String, SchemaKey] =
shreddedTypes.groupBy(_.info.getName).mapValues(_.maxBy(_.info.version).info.getSchemaKey)

def logAndRaise[F[_]: MonadThrow: Logging](error: LoaderError): F[Option[WithOrigin]] =
Logging[F].error(error)("A problem occurred in the loading of SQS message") *> MonadThrow[F].raiseError(error)

Expand Down

0 comments on commit a4c312a

Please sign in to comment.