Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Dec 16, 2024
1 parent 9179d65 commit 6c63fda
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,20 +130,19 @@ public DataFilePathFactory createDataFilePathFactory(BinaryRow partition, int bu
}

public Path bucketPath(BinaryRow partition, int bucket) {
Path dataFileRoot = this.root;
if (dataFilePathDirectory != null) {
dataFileRoot = new Path(dataFileRoot, dataFilePathDirectory);
}
return new Path(dataFileRoot + "/" + relativePartitionAndBucketPath(partition, bucket));
return new Path(root, relativeBucketPath(partition, bucket));
}

public Path relativePartitionAndBucketPath(BinaryRow partition, int bucket) {
public Path relativeBucketPath(BinaryRow partition, int bucket) {
Path relativeBucketPath = new Path(BUCKET_PATH_PREFIX + bucket);
String partitionPath = getPartitionString(partition);
String fullPath =
partitionPath.isEmpty()
? BUCKET_PATH_PREFIX + bucket
: partitionPath + "/" + BUCKET_PATH_PREFIX + bucket;
return new Path(fullPath);
if (!partitionPath.isEmpty()) {
relativeBucketPath = new Path(partitionPath, relativeBucketPath);
}
if (dataFilePathDirectory != null) {
relativeBucketPath = new Path(dataFilePathDirectory, relativeBucketPath);
}
return relativeBucketPath;
}

/** IMPORTANT: This method is NOT THREAD SAFE. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,12 @@ trait PaimonCommand extends WithFileStoreTable with ExpressionHelper with SQLCon
val relativeFilePath = location.toUri.relativize(new URI(filePath)).toString
val (partition, bucket) = dataFileToPartitionAndBucket.toMap.apply(relativeFilePath)
val pathFactory = my_table.store().pathFactory()
val partitionAndBucket = pathFactory
.relativePartitionAndBucketPath(partition, bucket)
val relativeBucketPath = pathFactory
.relativeBucketPath(partition, bucket)
.toString

SparkDeletionVectors(
partitionAndBucket,
relativeBucketPath,
SerializationUtils.serializeBinaryRow(partition),
bucket,
Seq((new Path(filePath).getName, dv.serializeToBytes()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ case class SparkDataFileMeta(

def relativePath(fileStorePathFactory: FileStorePathFactory): String = {
fileStorePathFactory
.relativePartitionAndBucketPath(partition, bucket)
.relativeBucketPath(partition, bucket)
.toUri
.toString + "/" + dataFileMeta.fileName()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ case class SparkDeletionVectors(
) {
def relativePaths(fileStorePathFactory: FileStorePathFactory): Seq[String] = {
val prefix = fileStorePathFactory
.relativePartitionAndBucketPath(SerializationUtils.deserializeBinaryRow(partition), bucket)
.relativeBucketPath(SerializationUtils.deserializeBinaryRow(partition), bucket)
.toUri
.toString + "/"
dataFileAndDeletionVector.map(prefix + _._1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,22 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe
assert(dvMeta.cardinality() == 334)
}

test("Paimon deletionVector: delete from non-pk table with data file path") {
sql(s"""
|CREATE TABLE T (id INT)
|TBLPROPERTIES (
| 'deletion-vectors.enabled' = 'true',
| 'bucket-key' = 'id',
| 'bucket' = '1',
| 'data-file.path-directory' = 'data'
|)
|""".stripMargin)

sql("INSERT INTO T SELECT /*+ REPARTITION(1) */ id FROM range (1, 50000)")
sql("DELETE FROM T WHERE id >= 111 and id <= 444")
checkAnswer(sql("SELECT count(*) FROM T"), Row(49665))
}

private def getPathName(path: String): String = {
new Path(path).getName
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,35 @@ abstract class MergeIntoTableTestBase extends PaimonSparkTestBase with PaimonTab
}
}

test(s"Paimon MergeInto: update + insert with data file path") {
withTable("source", "target") {

Seq((1, 100, "c11"), (3, 300, "c33")).toDF("a", "b", "c").createOrReplaceTempView("source")

createTable(
"target",
"a INT, b INT, c STRING",
Seq("a"),
Seq(),
Map("data-file.path-directory" -> "data"))
spark.sql("INSERT INTO target values (1, 10, 'c1'), (2, 20, 'c2')")

spark.sql(s"""
|MERGE INTO target
|USING source
|ON target.a = source.a
|WHEN MATCHED THEN
|UPDATE SET a = source.a, b = source.b, c = source.c
|WHEN NOT MATCHED
|THEN INSERT (a, b, c) values (a, b, c)
|""".stripMargin)

checkAnswer(
spark.sql("SELECT * FROM target ORDER BY a, b"),
Seq(Row(1, 100, "c11"), Row(2, 20, "c2"), Row(3, 300, "c33")))
}
}

test(s"Paimon MergeInto: delete + insert") {
withTable("source", "target") {

Expand Down

0 comments on commit 6c63fda

Please sign in to comment.