From 0b37917d23dce703b6b6b783080d639be299c23f Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Thu, 8 Aug 2024 08:30:36 +0800 Subject: [PATCH] 1 --- .../paimon/spark/commands/MergeIntoPaimonTable.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala index aec6d96668f49..5fec8b99751f4 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala @@ -118,6 +118,8 @@ case class MergeIntoPaimonTable( val (_, unTouchedFileRelation) = createNewRelation(unTouchedFilePaths, dataFilePathToMeta, relation) + // Add FILE_TOUCHED_COL to mark the row as coming from the touched file, if the row has not been + // modified and was from touched file, it should be kept too. val targetDSWithFileTouchedCol = createDataset(sparkSession, touchedFileRelation) .withColumn(FILE_TOUCHED_COL, lit(true)) .union( @@ -258,8 +260,7 @@ object MergeIntoPaimonTable { GeneratePredicate.generate(expr, joinedAttributes) } - // keep row if it is from touched file and not be matched - private def keepRow(row: InternalRow): Boolean = { + private def fromTouchedFile(row: InternalRow): Boolean = { file_touched_col_index != -1 && row.getBoolean(file_touched_col_index) } @@ -285,7 +286,8 @@ object MergeIntoPaimonTable { preds.zip(projs).find { case (predicate, _) => predicate.eval(inputRow) } match { case Some((_, projections)) => projections.apply(inputRow) case None => - if (keepRow(inputRow)) { + // keep the row if it is from touched file and not be matched + if (fromTouchedFile(inputRow)) { keepProj.apply(inputRow) } else { noopCopyProj.apply(inputRow)