Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Nov 27, 2024
1 parent 7cbfc2c commit fe80545
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ class StateMetadataPartitionReader(
operatorStateMetadata.version,
v2.operatorPropertiesJson,
-1, // numColsPrefixKey is not available in OperatorStateMetadataV2
Some(stateStoreMetadata.stateSchemaFilePath)
Some(stateStoreMetadata.stateSchemaFilePath(stateStoreMetadata.stateSchemaId))
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,17 +221,15 @@ class IncrementalExecution(
}
val schemaValidationResult = statefulOp.
validateAndMaybeEvolveStateSchema(hadoopConf, currentBatchId, stateSchemaVersion)
val stateSchemaPaths = schemaValidationResult.map(_.schemaPath)
// write out the state schema paths to the metadata file
statefulOp match {
case ssw: StateStoreWriter =>
val metadata = ssw.operatorStateMetadata(stateSchemaPaths)
// validate metadata
if (isFirstBatch && currentBatchId != 0) {
val oldMetadata = if (isFirstBatch && currentBatchId != 0) {
// If we are restarting from a different checkpoint directory
// there may be a mismatch between the stateful operators in the
// physical plan and the metadata.
val oldMetadata = try {
try {
OperatorStateMetadataReader.createReader(
new Path(checkpointLocation, ssw.getStateInfo.operatorId.toString),
hadoopConf, ssw.operatorStateMetadataVersion, currentBatchId - 1).read()
Expand All @@ -242,10 +240,15 @@ class IncrementalExecution(
log"versions: ${MDC(ERROR, e.getMessage)}")
None
}
oldMetadata match {
case Some(oldMetadata) => ssw.validateNewMetadata(oldMetadata, metadata)
case None =>
}
} else {
None
}
val stateSchemaMapping = ssw.stateSchemaMapping(schemaValidationResult,
oldMetadata)
val metadata = ssw.operatorStateMetadata(stateSchemaMapping)
oldMetadata match {
case Some(oldMetadata) => ssw.validateNewMetadata(oldMetadata, metadata)
case None =>
}
val metadataWriter = OperatorStateMetadataWriter.createWriter(
new Path(checkpointLocation, ssw.getStateInfo.operatorId.toString),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ case class StreamingSymmetricHashJoinExec(
SymmetricHashJoinStateManager.allStateStoreNames(LeftSide, RightSide)

override def operatorStateMetadata(
stateSchemaPaths: List[String] = List.empty): OperatorStateMetadata = {
stateSchemaPaths: List[Map[Int, String]] = List.empty): OperatorStateMetadata = {
val info = getStateInfo
val operatorInfo = OperatorInfoV1(info.operatorId, shortName)
val stateStoreInfo =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,9 @@ case class TransformWithStateExec(
case Some(metadata) =>
metadata match {
case v2: OperatorStateMetadataV2 =>
Some(new Path(v2.stateStoreInfo.head.stateSchemaFilePath))
val ssInfo = v2.stateStoreInfo.head
val stateSchemaFilePath = ssInfo.stateSchemaFilePath(ssInfo.stateSchemaId)
Some(new Path(stateSchemaFilePath))
case _ => None
}
case None => None
Expand All @@ -487,15 +489,42 @@ case class TransformWithStateExec(
newSchemaFilePath = Some(newStateSchemaFilePath)))
}

override def stateSchemaMapping(
stateSchemaValidationResults: List[StateSchemaValidationResult],
oldMetadata: Option[OperatorStateMetadata]): List[Map[Int, String]] = {
val validationResult = stateSchemaValidationResults.head
val evolvedSchema = validationResult.evolvedSchema
if (evolvedSchema) {
val (oldSchemaId, oldSchemaPaths): (Int, Map[Int, String]) = oldMetadata match {
case Some(v2: OperatorStateMetadataV2) =>
val ssInfo = v2.stateStoreInfo.head
(ssInfo.stateSchemaId, ssInfo.stateSchemaFilePath)
case _ => (-1, Map.empty)
}
List(oldSchemaPaths + (oldSchemaId + 1 -> validationResult.schemaPath))
} else {
oldMetadata match {
case Some(v2: OperatorStateMetadataV2) =>
// If schema hasn't evolved, keep existing mappings
val ssInfo = v2.stateStoreInfo.head
List(ssInfo.stateSchemaFilePath)
case _ =>
// If no previous metadata and no evolution, start with schema ID 0
List(Map(0 -> validationResult.schemaPath))
}
}
}

/** Metadata of this stateful operator and its states stores. */
override def operatorStateMetadata(
stateSchemaPaths: List[String]): OperatorStateMetadata = {
stateSchemaPaths: List[Map[Int, String]]): OperatorStateMetadata = {
val info = getStateInfo
val operatorInfo = OperatorInfoV1(info.operatorId, shortName)
// stateSchemaFilePath should be populated at this point
val maxId = stateSchemaPaths.head.keys.max
val stateStoreInfo =
Array(StateStoreMetadataV2(
StateStoreId.DEFAULT_STORE_NAME, 0, info.numPartitions, stateSchemaPaths.head))
StateStoreId.DEFAULT_STORE_NAME, 0, info.numPartitions, maxId, stateSchemaPaths.head))

val operatorProperties = TransformWithStateOperatorProperties(
timeMode.toString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ case class StateStoreMetadataV2(
storeName: String,
numColsPrefixKey: Int,
numPartitions: Int,
stateSchemaFilePath: String)
stateSchemaId: Int,
stateSchemaFilePath: Map[Int, String])
extends StateStoreMetadata with Serializable

object StateStoreMetadataV2 {
Expand Down Expand Up @@ -468,7 +469,8 @@ class OperatorStateMetadataV2FileManager(
// find the batchId of the earliest schema file we need to keep
val earliestBatchToKeep = latestMetadata match {
case Some(OperatorStateMetadataV2(_, stateStoreInfo, _)) =>
val schemaFilePath = stateStoreInfo.head.stateSchemaFilePath
val ssInfo = stateStoreInfo.head
val schemaFilePath = ssInfo.stateSchemaFilePath.minBy(_._1)._2
new Path(schemaFilePath).getName.split("_").head.toLong
case _ => 0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,14 +319,18 @@ trait StateStoreWriter

/** Metadata of this stateful operator and its states stores. */
def operatorStateMetadata(
stateSchemaPaths: List[String] = List.empty): OperatorStateMetadata = {
stateSchemaPaths: List[Map[Int, String]] = List.empty): OperatorStateMetadata = {
val info = getStateInfo
val operatorInfo = OperatorInfoV1(info.operatorId, shortName)
val stateStoreInfo =
Array(StateStoreMetadataV1(StateStoreId.DEFAULT_STORE_NAME, 0, info.numPartitions))
OperatorStateMetadataV1(operatorInfo, stateStoreInfo)
}

def stateSchemaMapping(
stateSchemaValidationResults: List[StateSchemaValidationResult],
oldMetadata: Option[OperatorStateMetadata]): List[Map[Int, String]] = List.empty

/** Set the operator level metrics */
protected def setOperatorMetrics(numStateStoreInstances: Int = 1): Unit = {
assert(numStateStoreInstances >= 1, s"invalid number of stores: $numStateStoreInstances")
Expand Down Expand Up @@ -1071,7 +1075,7 @@ case class SessionWindowStateStoreSaveExec(
}

override def operatorStateMetadata(
stateSchemaPaths: List[String] = List.empty): OperatorStateMetadata = {
stateSchemaPaths: List[Map[Int, String]] = List.empty): OperatorStateMetadata = {
val info = getStateInfo
val operatorInfo = OperatorInfoV1(info.operatorId, shortName)
val stateStoreInfo = Array(StateStoreMetadataV1(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession {
// Assign some placeholder values to the state store metadata since they are generated
// dynamically by the operator.
val expectedMetadata = OperatorStateMetadataV2(OperatorInfoV1(0, "transformWithStateExec"),
Array(StateStoreMetadataV2("default", 0, numShufflePartitions, checkpointDir.toString)),
Array(StateStoreMetadataV2(
"default", 0, numShufflePartitions, 0, Map.empty)),
"")
checkOperatorStateMetadata(checkpointDir.toString, 0, expectedMetadata, 2)

Expand Down

0 comments on commit fe80545

Please sign in to comment.