From fe8054524043034a6643ab864cba7163fcd1b763 Mon Sep 17 00:00:00 2001 From: Eric Marnadi Date: Tue, 26 Nov 2024 17:18:08 -0800 Subject: [PATCH] init --- .../state/metadata/StateMetadataSource.scala | 2 +- .../streaming/IncrementalExecution.scala | 19 +++++----- .../StreamingSymmetricHashJoinExec.scala | 2 +- .../streaming/TransformWithStateExec.scala | 35 +++++++++++++++++-- .../state/OperatorStateMetadata.scala | 6 ++-- .../streaming/statefulOperators.scala | 8 +++-- .../state/OperatorStateMetadataSuite.scala | 3 +- 7 files changed, 57 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala index 64fdfb7997623..de66865dd639b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala @@ -270,7 +270,7 @@ class StateMetadataPartitionReader( operatorStateMetadata.version, v2.operatorPropertiesJson, -1, // numColsPrefixKey is not available in OperatorStateMetadataV2 - Some(stateStoreMetadata.stateSchemaFilePath) + Some(stateStoreMetadata.stateSchemaFilePath(stateStoreMetadata.stateSchemaId)) ) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 2a7e9818aedd9..4a7b3a37f0a5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -221,17 +221,15 @@ class IncrementalExecution( } val schemaValidationResult = statefulOp. validateAndMaybeEvolveStateSchema(hadoopConf, currentBatchId, stateSchemaVersion) - val stateSchemaPaths = schemaValidationResult.map(_.schemaPath) // write out the state schema paths to the metadata file statefulOp match { case ssw: StateStoreWriter => - val metadata = ssw.operatorStateMetadata(stateSchemaPaths) // validate metadata - if (isFirstBatch && currentBatchId != 0) { + val oldMetadata = if (isFirstBatch && currentBatchId != 0) { // If we are restarting from a different checkpoint directory // there may be a mismatch between the stateful operators in the // physical plan and the metadata. - val oldMetadata = try { + try { OperatorStateMetadataReader.createReader( new Path(checkpointLocation, ssw.getStateInfo.operatorId.toString), hadoopConf, ssw.operatorStateMetadataVersion, currentBatchId - 1).read() @@ -242,10 +240,15 @@ class IncrementalExecution( log"versions: ${MDC(ERROR, e.getMessage)}") None } - oldMetadata match { - case Some(oldMetadata) => ssw.validateNewMetadata(oldMetadata, metadata) - case None => - } + } else { + None + } + val stateSchemaMapping = ssw.stateSchemaMapping(schemaValidationResult, + oldMetadata) + val metadata = ssw.operatorStateMetadata(stateSchemaMapping) + oldMetadata match { + case Some(oldMetadata) => ssw.validateNewMetadata(oldMetadata, metadata) + case None => } val metadataWriter = OperatorStateMetadataWriter.createWriter( new Path(checkpointLocation, ssw.getStateInfo.operatorId.toString), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala index c90c87899c73f..de5bd6a97236a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala @@ -228,7 +228,7 @@ case class StreamingSymmetricHashJoinExec( SymmetricHashJoinStateManager.allStateStoreNames(LeftSide, RightSide) override def operatorStateMetadata( - stateSchemaPaths: List[String] = List.empty): OperatorStateMetadata = { + stateSchemaPaths: List[Map[Int, String]] = List.empty): OperatorStateMetadata = { val info = getStateInfo val operatorInfo = OperatorInfoV1(info.operatorId, shortName) val stateStoreInfo = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala index f4705b89d5a87..d1cdc25e9151d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala @@ -474,7 +474,9 @@ case class TransformWithStateExec( case Some(metadata) => metadata match { case v2: OperatorStateMetadataV2 => - Some(new Path(v2.stateStoreInfo.head.stateSchemaFilePath)) + val ssInfo = v2.stateStoreInfo.head + val stateSchemaFilePath = ssInfo.stateSchemaFilePath(ssInfo.stateSchemaId) + Some(new Path(stateSchemaFilePath)) case _ => None } case None => None @@ -487,15 +489,42 @@ case class TransformWithStateExec( newSchemaFilePath = Some(newStateSchemaFilePath))) } + override def stateSchemaMapping( + stateSchemaValidationResults: List[StateSchemaValidationResult], + oldMetadata: Option[OperatorStateMetadata]): List[Map[Int, String]] = { + val validationResult = stateSchemaValidationResults.head + val evolvedSchema = validationResult.evolvedSchema + if (evolvedSchema) { + val (oldSchemaId, oldSchemaPaths): (Int, Map[Int, String]) = oldMetadata match { + case Some(v2: OperatorStateMetadataV2) => + val ssInfo = v2.stateStoreInfo.head + (ssInfo.stateSchemaId, ssInfo.stateSchemaFilePath) + case _ => (-1, Map.empty) + } + List(oldSchemaPaths + (oldSchemaId + 1 -> validationResult.schemaPath)) + } else { + oldMetadata match { + case Some(v2: OperatorStateMetadataV2) => + // If schema hasn't evolved, keep existing mappings + val ssInfo = v2.stateStoreInfo.head + List(ssInfo.stateSchemaFilePath) + case _ => + // If no previous metadata and no evolution, start with schema ID 0 + List(Map(0 -> validationResult.schemaPath)) + } + } + } + /** Metadata of this stateful operator and its states stores. */ override def operatorStateMetadata( - stateSchemaPaths: List[String]): OperatorStateMetadata = { + stateSchemaPaths: List[Map[Int, String]]): OperatorStateMetadata = { val info = getStateInfo val operatorInfo = OperatorInfoV1(info.operatorId, shortName) // stateSchemaFilePath should be populated at this point + val maxId = stateSchemaPaths.head.keys.max val stateStoreInfo = Array(StateStoreMetadataV2( - StateStoreId.DEFAULT_STORE_NAME, 0, info.numPartitions, stateSchemaPaths.head)) + StateStoreId.DEFAULT_STORE_NAME, 0, info.numPartitions, maxId, stateSchemaPaths.head)) val operatorProperties = TransformWithStateOperatorProperties( timeMode.toString, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala index aa2f332afeff4..348d52333f191 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala @@ -51,7 +51,8 @@ case class StateStoreMetadataV2( storeName: String, numColsPrefixKey: Int, numPartitions: Int, - stateSchemaFilePath: String) + stateSchemaId: Int, + stateSchemaFilePath: Map[Int, String]) extends StateStoreMetadata with Serializable object StateStoreMetadataV2 { @@ -468,7 +469,8 @@ class OperatorStateMetadataV2FileManager( // find the batchId of the earliest schema file we need to keep val earliestBatchToKeep = latestMetadata match { case Some(OperatorStateMetadataV2(_, stateStoreInfo, _)) => - val schemaFilePath = stateStoreInfo.head.stateSchemaFilePath + val ssInfo = stateStoreInfo.head + val schemaFilePath = ssInfo.stateSchemaFilePath.minBy(_._1)._2 new Path(schemaFilePath).getName.split("_").head.toLong case _ => 0 } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 20d8302acd495..f5b5cd109117c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -319,7 +319,7 @@ trait StateStoreWriter /** Metadata of this stateful operator and its states stores. */ def operatorStateMetadata( - stateSchemaPaths: List[String] = List.empty): OperatorStateMetadata = { + stateSchemaPaths: List[Map[Int, String]] = List.empty): OperatorStateMetadata = { val info = getStateInfo val operatorInfo = OperatorInfoV1(info.operatorId, shortName) val stateStoreInfo = @@ -327,6 +327,10 @@ trait StateStoreWriter OperatorStateMetadataV1(operatorInfo, stateStoreInfo) } + def stateSchemaMapping( + stateSchemaValidationResults: List[StateSchemaValidationResult], + oldMetadata: Option[OperatorStateMetadata]): List[Map[Int, String]] = List.empty + /** Set the operator level metrics */ protected def setOperatorMetrics(numStateStoreInstances: Int = 1): Unit = { assert(numStateStoreInstances >= 1, s"invalid number of stores: $numStateStoreInstances") @@ -1071,7 +1075,7 @@ case class SessionWindowStateStoreSaveExec( } override def operatorStateMetadata( - stateSchemaPaths: List[String] = List.empty): OperatorStateMetadata = { + stateSchemaPaths: List[Map[Int, String]] = List.empty): OperatorStateMetadata = { val info = getStateInfo val operatorInfo = OperatorInfoV1(info.operatorId, shortName) val stateStoreInfo = Array(StateStoreMetadataV1( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala index 45786fdc73d0e..695c9bc7fa7cd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala @@ -151,7 +151,8 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession { // Assign some placeholder values to the state store metadata since they are generated // dynamically by the operator. val expectedMetadata = OperatorStateMetadataV2(OperatorInfoV1(0, "transformWithStateExec"), - Array(StateStoreMetadataV2("default", 0, numShufflePartitions, checkpointDir.toString)), + Array(StateStoreMetadataV2( + "default", 0, numShufflePartitions, 0, Map.empty)), "") checkOperatorStateMetadata(checkpointDir.toString, 0, expectedMetadata, 2)