Skip to content

Commit

Permalink
stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Dec 13, 2024
1 parent bb36184 commit 06acd48
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ class StateMetadataPartitionReader(
operatorStateMetadata.version,
v2.operatorPropertiesJson,
-1, // numColsPrefixKey is not available in OperatorStateMetadataV2
Some(stateStoreMetadata.stateSchemaFilePath)
Some(stateStoreMetadata.stateSchemaFilePaths(stateStoreMetadata.stateSchemaId))
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ class IncrementalExecution(
// write out the state schema paths to the metadata file
statefulOp match {
case ssw: StateStoreWriter =>
val metadata = ssw.operatorStateMetadata(stateSchemaPaths)
// validate metadata
if (isFirstBatch && currentBatchId != 0) {
// If we are restarting from a different checkpoint directory
Expand All @@ -242,17 +241,20 @@ class IncrementalExecution(
log"versions: ${MDC(ERROR, e.getMessage)}")
None
}
val stateSchemaMapping = ssw.stateSchemaMapping(schemaValidationResult,
oldMetadata)
val metadata = ssw.operatorStateMetadata(stateSchemaMapping)
oldMetadata match {
case Some(oldMetadata) => ssw.validateNewMetadata(oldMetadata, metadata)
case None =>
}
}
val metadataWriter = OperatorStateMetadataWriter.createWriter(
val metadataWriter = OperatorStateMetadataWriter.createWriter(
new Path(checkpointLocation, ssw.getStateInfo.operatorId.toString),
hadoopConf,
ssw.operatorStateMetadataVersion,
Some(currentBatchId))
metadataWriter.write(metadata)
metadataWriter.write(metadata)
}
case _ =>
}
statefulOp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ case class StreamingSymmetricHashJoinExec(
SymmetricHashJoinStateManager.allStateStoreNames(LeftSide, RightSide)

override def operatorStateMetadata(
stateSchemaPaths: List[String] = List.empty): OperatorStateMetadata = {
stateSchemaPaths: List[Map[Int, String]] = List.empty
): OperatorStateMetadata = {
val info = getStateInfo
val operatorInfo = OperatorInfoV1(info.operatorId, shortName)
val stateStoreInfo =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,10 @@ case class TransformWithStateExec(
case Some(metadata) =>
metadata match {
case v2: OperatorStateMetadataV2 =>
Some(new Path(v2.stateStoreInfo.head.stateSchemaFilePath))
val stateSchemaFilePaths = v2.stateStoreInfo.head.stateSchemaFilePaths
val stateSchemaId = stateSchemaFilePaths.keys.max
val stateSchemaFilePath = stateSchemaFilePaths(stateSchemaId)
Some(new Path(stateSchemaFilePath))
case _ => None
}
case None => None
Expand All @@ -493,13 +496,16 @@ case class TransformWithStateExec(

/** Metadata of this stateful operator and its states stores. */
override def operatorStateMetadata(
stateSchemaPaths: List[String]): OperatorStateMetadata = {
stateSchemaPaths: List[Map[Int, String]]): OperatorStateMetadata = {
val info = getStateInfo
val operatorInfo = OperatorInfoV1(info.operatorId, shortName)
// stateSchemaFilePath should be populated at this point

val stateStoreInfo =
Array(StateStoreMetadataV2(
StateStoreId.DEFAULT_STORE_NAME, 0, info.numPartitions, stateSchemaPaths.head))
Array(
StateStoreMetadataV2(
StateStoreId.DEFAULT_STORE_NAME,
0, info.numPartitions, stateSchemaPaths.head.keys.max, stateSchemaPaths.head))

val operatorProperties = TransformWithStateOperatorProperties(
timeMode.toString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ class OperatorStateMetadataV1Reader(
class OperatorStateMetadataV2Writer(
stateCheckpointPath: Path,
hadoopConf: Configuration,
currentBatchId: Long) extends OperatorStateMetadataWriter {
currentBatchId: Long) extends OperatorStateMetadataWriter with Logging {

private val metadataFilePath = OperatorStateMetadataV2.metadataFilePath(
stateCheckpointPath, currentBatchId)
Expand All @@ -301,6 +301,7 @@ class OperatorStateMetadataV2Writer(
if (fm.exists(metadataFilePath)) return

fm.mkdirs(metadataFilePath.getParent)
logError(s"### metadataFilePath: $metadataFilePath")
val outputStream = fm.createAtomic(metadataFilePath, overwriteIfPossible = false)
OperatorStateMetadataUtils.writeMetadata(outputStream, operatorMetadata, metadataFilePath)
}
Expand All @@ -309,7 +310,7 @@ class OperatorStateMetadataV2Writer(
class OperatorStateMetadataV2Reader(
stateCheckpointPath: Path,
hadoopConf: Configuration,
batchId: Long) extends OperatorStateMetadataReader {
batchId: Long) extends OperatorStateMetadataReader with Logging {

// Check that the requested batchId is available in the checkpoint directory
val baseCheckpointDir = stateCheckpointPath.getParent.getParent
Expand Down Expand Up @@ -347,6 +348,7 @@ class OperatorStateMetadataV2Reader(

override def read(): Option[OperatorStateMetadata] = {
val batches = listOperatorMetadataBatches()
logError(s"### batches: ${batches.mkString("Array(", ", ", ")")}")
val lastBatchId = batches.filter(_ <= batchId).lastOption
if (lastBatchId.isEmpty) {
throw StateDataSourceErrors.failedToReadOperatorMetadata(stateCheckpointPath.toString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,14 +319,40 @@ trait StateStoreWriter

/** Metadata of this stateful operator and its states stores. */
def operatorStateMetadata(
stateSchemaPaths: List[String] = List.empty): OperatorStateMetadata = {
stateSchemaPaths: List[Map[Int, String]] = List.empty): OperatorStateMetadata = {
val info = getStateInfo
val operatorInfo = OperatorInfoV1(info.operatorId, shortName)
val stateStoreInfo =
Array(StateStoreMetadataV1(StateStoreId.DEFAULT_STORE_NAME, 0, info.numPartitions))
OperatorStateMetadataV1(operatorInfo, stateStoreInfo)
}

def stateSchemaMapping(
stateSchemaValidationResults: List[StateSchemaValidationResult],
oldMetadata: Option[OperatorStateMetadata]): List[Map[Int, String]] = {
val validationResult = stateSchemaValidationResults.head
val evolvedSchema = validationResult.evolvedSchema
if (evolvedSchema) {
val (oldSchemaId, oldSchemaPaths): (Int, Map[Int, String]) = oldMetadata match {
case Some(v2: OperatorStateMetadataV2) =>
val ssInfo = v2.stateStoreInfo.head
(ssInfo.stateSchemaId, ssInfo.stateSchemaFilePaths)
case _ => (-1, Map.empty)
}
List(oldSchemaPaths + (oldSchemaId + 1 -> validationResult.schemaPath))
} else {
oldMetadata match {
case Some(v2: OperatorStateMetadataV2) =>
// If schema hasn't evolved, keep existing mappings
val ssInfo = v2.stateStoreInfo.head
List(ssInfo.stateSchemaFilePaths)
case _ =>
// If no previous metadata and no evolution, start with schema ID 0
List(Map(0 -> validationResult.schemaPath))
}
}
}

/** Set the operator level metrics */
protected def setOperatorMetrics(numStateStoreInstances: Int = 1): Unit = {
assert(numStateStoreInstances >= 1, s"invalid number of stores: $numStateStoreInstances")
Expand Down Expand Up @@ -1071,7 +1097,7 @@ case class SessionWindowStateStoreSaveExec(
}

override def operatorStateMetadata(
stateSchemaPaths: List[String] = List.empty): OperatorStateMetadata = {
stateSchemaPaths: List[Map[Int, String]] = List.empty): OperatorStateMetadata = {
val info = getStateInfo
val operatorInfo = OperatorInfoV1(info.operatorId, shortName)
val stateStoreInfo = Array(StateStoreMetadataV1(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession {
assert(operatorMetadataV2.operatorPropertiesJson.nonEmpty)
val stateStoreInfo = operatorMetadataV2.stateStoreInfo.head
val expectedStateStoreInfo = expectedMetadataV2.stateStoreInfo.head
assert(stateStoreInfo.stateSchemaFilePath.nonEmpty)
assert(stateStoreInfo.stateSchemaFilePaths.nonEmpty)
assert(stateStoreInfo.storeName == expectedStateStoreInfo.storeName)
assert(stateStoreInfo.numPartitions == expectedStateStoreInfo.numPartitions)
}
Expand Down Expand Up @@ -151,8 +151,8 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession {
// Assign some placeholder values to the state store metadata since they are generated
// dynamically by the operator.
val expectedMetadata = OperatorStateMetadataV2(OperatorInfoV1(0, "transformWithStateExec"),
Array(StateStoreMetadataV2("default", 0, numShufflePartitions, checkpointDir.toString)),
"")
Array(StateStoreMetadataV2(
"default", 0, numShufflePartitions, 0, Map(0 -> checkpointDir.toString))), "")
checkOperatorStateMetadata(checkpointDir.toString, 0, expectedMetadata, 2)

// Verify that the state store metadata is not available for invalid batches.
Expand Down

0 comments on commit 06acd48

Please sign in to comment.