Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Aug 8, 2024
1 parent 48394ce commit 0b37917
Showing 1 changed file with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ case class MergeIntoPaimonTable(
val (_, unTouchedFileRelation) =
createNewRelation(unTouchedFilePaths, dataFilePathToMeta, relation)

// Add FILE_TOUCHED_COL to mark the row as coming from the touched file, if the row has not been
// modified and was from touched file, it should be kept too.
val targetDSWithFileTouchedCol = createDataset(sparkSession, touchedFileRelation)
.withColumn(FILE_TOUCHED_COL, lit(true))
.union(
Expand Down Expand Up @@ -258,8 +260,7 @@ object MergeIntoPaimonTable {
GeneratePredicate.generate(expr, joinedAttributes)
}

// keep row if it is from touched file and not be matched
private def keepRow(row: InternalRow): Boolean = {
private def fromTouchedFile(row: InternalRow): Boolean = {
file_touched_col_index != -1 && row.getBoolean(file_touched_col_index)
}

Expand All @@ -285,7 +286,8 @@ object MergeIntoPaimonTable {
preds.zip(projs).find { case (predicate, _) => predicate.eval(inputRow) } match {
case Some((_, projections)) => projections.apply(inputRow)
case None =>
if (keepRow(inputRow)) {
// keep the row if it is from touched file and not be matched
if (fromTouchedFile(inputRow)) {
keepProj.apply(inputRow)
} else {
noopCopyProj.apply(inputRow)
Expand Down

0 comments on commit 0b37917

Please sign in to comment.