From 20160fff55f848ca64c552158905fa72a88b6eb2 Mon Sep 17 00:00:00 2001 From: Yann Date: Tue, 20 Aug 2024 12:47:47 +0800 Subject: [PATCH] update --- .../apache/paimon/spark/commands/PaimonCommand.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala index 88fd205db3df3..aad4b82bd5b68 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala @@ -185,7 +185,16 @@ trait PaimonCommand extends WithFileStoreTable with ExpressionHelper { sparkSession: SparkSession): Dataset[SparkDeletionVectors] = { import sparkSession.implicits._ - val dataFileToPartitionAndBucket: Array[(String, (BinaryRow, Int))] = + val resolver = sparkSession.sessionState.conf.resolver + Seq(FILE_PATH_COLUMN, ROW_INDEX_COLUMN).foreach { + metadata => + dataWithMetadataColumns.schema + .find(field => resolver(field.name, metadata)) + .orElse(throw new RuntimeException( + "This input dataset doesn't contains the required metadata columns: __paimon_file_path and __paimon_row_index.")) + } + + val dataFileToPartitionAndBucket = dataFilePathToMeta.mapValues(meta => (meta.partition, meta.bucket)).toArray val my_table = table