Skip to content

Commit

Permalink
stateSchemaFilePath -> stateSchemaFilePaths
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Dec 2, 2024
1 parent fe80545 commit f161db3
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ class StateMetadataPartitionReader(
operatorStateMetadata.version,
v2.operatorPropertiesJson,
-1, // numColsPrefixKey is not available in OperatorStateMetadataV2
Some(stateStoreMetadata.stateSchemaFilePath(stateStoreMetadata.stateSchemaId))
Some(stateStoreMetadata.stateSchemaFilePaths(stateStoreMetadata.stateSchemaId))
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ case class TransformWithStateExec(
metadata match {
case v2: OperatorStateMetadataV2 =>
val ssInfo = v2.stateStoreInfo.head
val stateSchemaFilePath = ssInfo.stateSchemaFilePath(ssInfo.stateSchemaId)
val stateSchemaFilePath = ssInfo.stateSchemaFilePaths(ssInfo.stateSchemaId)
Some(new Path(stateSchemaFilePath))
case _ => None
}
Expand All @@ -498,7 +498,7 @@ case class TransformWithStateExec(
val (oldSchemaId, oldSchemaPaths): (Int, Map[Int, String]) = oldMetadata match {
case Some(v2: OperatorStateMetadataV2) =>
val ssInfo = v2.stateStoreInfo.head
(ssInfo.stateSchemaId, ssInfo.stateSchemaFilePath)
(ssInfo.stateSchemaId, ssInfo.stateSchemaFilePaths)
case _ => (-1, Map.empty)
}
List(oldSchemaPaths + (oldSchemaId + 1 -> validationResult.schemaPath))
Expand All @@ -507,7 +507,7 @@ case class TransformWithStateExec(
case Some(v2: OperatorStateMetadataV2) =>
// If schema hasn't evolved, keep existing mappings
val ssInfo = v2.stateStoreInfo.head
List(ssInfo.stateSchemaFilePath)
List(ssInfo.stateSchemaFilePaths)
case _ =>
// If no previous metadata and no evolution, start with schema ID 0
List(Map(0 -> validationResult.schemaPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ case class StateStoreMetadataV2(
numColsPrefixKey: Int,
numPartitions: Int,
stateSchemaId: Int,
stateSchemaFilePath: Map[Int, String])
stateSchemaFilePaths: Map[Int, String])
extends StateStoreMetadata with Serializable

object StateStoreMetadataV2 {
Expand Down Expand Up @@ -470,7 +470,7 @@ class OperatorStateMetadataV2FileManager(
val earliestBatchToKeep = latestMetadata match {
case Some(OperatorStateMetadataV2(_, stateStoreInfo, _)) =>
val ssInfo = stateStoreInfo.head
val schemaFilePath = ssInfo.stateSchemaFilePath.minBy(_._1)._2
val schemaFilePath = ssInfo.stateSchemaFilePaths.minBy(_._1)._2
new Path(schemaFilePath).getName.split("_").head.toLong
case _ => 0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession {
assert(operatorMetadataV2.operatorPropertiesJson.nonEmpty)
val stateStoreInfo = operatorMetadataV2.stateStoreInfo.head
val expectedStateStoreInfo = expectedMetadataV2.stateStoreInfo.head
assert(stateStoreInfo.stateSchemaFilePath.nonEmpty)
assert(stateStoreInfo.stateSchemaFilePaths.nonEmpty)
assert(stateStoreInfo.storeName == expectedStateStoreInfo.storeName)
assert(stateStoreInfo.numPartitions == expectedStateStoreInfo.numPartitions)
}
Expand Down

0 comments on commit f161db3

Please sign in to comment.