Skip to content

Commit

Permalink
feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
ericm-db committed Jan 25, 2024
1 parent 0e2ab50 commit 36e9fa4
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
hasInitialState, planLater(initialState), planLater(child)
) :: Nil
case logical.TransformWithState(keyDeserializer, valueDeserializer, groupingAttributes,
dataAttributes, statefulProcessor, timeoutMode, outputMode, outputObjAttr, child) =>
dataAttributes, statefulProcessor, timeoutMode, outputMode, outputObjAttr, child) =>
TransformWithStateExec.generateSparkPlanForBatchQueries(keyDeserializer, valueDeserializer,
groupingAttributes, dataAttributes, statefulProcessor, timeoutMode, outputMode,
outputObjAttr, planLater(child)) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ case class TransformWithStateExec(
override protected def doExecute(): RDD[InternalRow] = {
metrics // force lazy init at driver

// populate stateInfo if this is a streaming query
child.execute().mapPartitionsWithStateStore[InternalRow](
getStateInfo,
schemaForKeyRow,
Expand All @@ -181,15 +180,15 @@ object TransformWithStateExec {

// Plan logical transformWithState for batch queries
def generateSparkPlanForBatchQueries(
keyDeserializer: Expression,
valueDeserializer: Expression,
groupingAttributes: Seq[Attribute],
dataAttributes: Seq[Attribute],
statefulProcessor: StatefulProcessor[Any, Any, Any],
timeoutMode: TimeoutMode,
outputMode: OutputMode,
outputObjAttr: Attribute,
child: SparkPlan): SparkPlan = {
keyDeserializer: Expression,
valueDeserializer: Expression,
groupingAttributes: Seq[Attribute],
dataAttributes: Seq[Attribute],
statefulProcessor: StatefulProcessor[Any, Any, Any],
timeoutMode: TimeoutMode,
outputMode: OutputMode,
outputObjAttr: Attribute,
child: SparkPlan): SparkPlan = {
val shufflePartitions = child.session.sessionState.conf.numShufflePartitions
val statefulOperatorStateInfo = StatefulOperatorStateInfo(
Utils.createTempDir().getAbsolutePath,
Expand Down

0 comments on commit 36e9fa4

Please sign in to comment.