From 311b16965255b3d559ef7c0ebf07c6d27c5f22b5 Mon Sep 17 00:00:00 2001 From: Yann Date: Tue, 26 Mar 2024 11:58:10 +0800 Subject: [PATCH 1/3] [spark][core] Support input_file_name UDF --- .../paimon/io/RowDataFileRecordReader.java | 43 ++++- .../operation/AbstractFileStoreRead.java | 34 ++++ .../operation/AppendOnlyFileStoreRead.java | 5 +- .../org/apache/paimon/operation/FileHook.java | 49 ++++++ .../paimon/operation/FileStoreRead.java | 3 + .../operation/KeyValueFileStoreRead.java | 2 +- .../table/AppendOnlyFileStoreTable.java | 2 +- .../table/source/AbstractDataTableRead.java | 10 ++ .../table/source/KeyValueTableRead.java | 4 +- .../spark/PaimonPartitionReaderFactory.scala | 26 ++- .../scala/org/apache/spark/sql/Utils.scala | 9 + .../paimon/spark/sql/PaimonQueryTest.scala | 161 ++++++++++++++++++ .../paimon/spark/sql/WithTableOptions.scala | 2 + 13 files changed, 342 insertions(+), 8 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreRead.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/operation/FileHook.java create mode 100644 paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java index ed891a32b2e1..f15547c8bebd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java @@ -28,6 +28,7 @@ import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.operation.FileHook; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.utils.FileUtils; import org.apache.paimon.utils.ProjectedRow; @@ -35,15 +36,23 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; /** Reads {@link InternalRow} from data files. */ public class RowDataFileRecordReader implements RecordReader { + private final Path path; private final RecordReader reader; @Nullable private final int[] indexMapping; @Nullable private final PartitionInfo partitionInfo; @Nullable private final CastFieldGetter[] castMapping; + private boolean triggerOpenHooks = false; + private final List> openFileHooks = new ArrayList<>(); + private final List> closeFileHooks = new ArrayList<>(); + public RowDataFileRecordReader( FileIO fileIO, Path path, @@ -51,19 +60,33 @@ public RowDataFileRecordReader( FormatReaderFactory readerFactory, @Nullable int[] indexMapping, @Nullable CastFieldGetter[] castMapping, - @Nullable PartitionInfo partitionInfo) + @Nullable PartitionInfo partitionInfo, + List hooks) throws IOException { FileUtils.checkExists(fileIO, path); + this.path = path; FormatReaderContext context = new FormatReaderContext(fileIO, path, fileSize); this.reader = readerFactory.createReader(context); this.indexMapping = indexMapping; this.partitionInfo = partitionInfo; this.castMapping = castMapping; + for (FileHook hook : hooks) { + if (hook.getTrigger().equals(FileHook.ReaderTrigger.OPEN_FILE)) { + openFileHooks.add(hook.getFunction()); + } else if (hook.getTrigger().equals(FileHook.ReaderTrigger.CLOSE_FILE)) { + closeFileHooks.add(hook.getFunction()); + } else { + throw new UnsupportedOperationException( + hook.getTrigger().name() + " is not supported."); + } + } } @Nullable @Override public RecordReader.RecordIterator readBatch() throws IOException { + triggerOpenFileHooks(); + RecordIterator iterator = reader.readBatch(); if (iterator == null) { return null; @@ -94,5 +117,23 @@ public RecordReader.RecordIterator readBatch() throws IOException { @Override public void close() throws IOException { reader.close(); + triggerCloseFileHooks(); + } + + private void triggerOpenFileHooks() { + if (!triggerOpenHooks && !openFileHooks.isEmpty()) { + for (Consumer func : openFileHooks) { + func.accept(path.toUri().toString()); + } + triggerOpenHooks = true; + } + } + + private void triggerCloseFileHooks() { + if (!closeFileHooks.isEmpty()) { + for (Consumer func : closeFileHooks) { + func.accept(path.toUri().toString()); + } + } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreRead.java new file mode 100644 index 000000000000..2f53b6f430b8 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreRead.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import java.util.ArrayList; +import java.util.List; + +/** abstract file store read. */ +public abstract class AbstractFileStoreRead implements FileStoreRead { + + protected List fileHooks = new ArrayList<>(); + + @Override + public FileStoreRead withFileHooks(List fileHooks) { + this.fileHooks.addAll(fileHooks); + return this; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java index b67edaaf3e6b..61150f2787d8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java @@ -56,7 +56,7 @@ import static org.apache.paimon.predicate.PredicateBuilder.splitAnd; /** {@link FileStoreRead} for {@link AppendOnlyFileStore}. */ -public class AppendOnlyFileStoreRead implements FileStoreRead { +public class AppendOnlyFileStoreRead extends AbstractFileStoreRead { private static final Logger LOG = LoggerFactory.getLogger(AppendOnlyFileStoreRead.class); @@ -182,7 +182,8 @@ public RecordReader createReader(DataSplit split) throws IOExceptio bulkFormatMapping.getIndexMapping(), bulkFormatMapping.getCastMapping(), PartitionUtils.create( - bulkFormatMapping.getPartitionPair(), partition))); + bulkFormatMapping.getPartitionPair(), partition), + fileHooks)); } return ConcatRecordReader.create(suppliers); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileHook.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileHook.java new file mode 100644 index 000000000000..fe2c22278f42 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileHook.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import java.util.function.Consumer; + +/** file hook. */ +public class FileHook { + + /** ReaderTrigger. */ + public enum ReaderTrigger { + OPEN_FILE, + + CLOSE_FILE; + } + + private final ReaderTrigger trigger; + + private final Consumer function; + + public FileHook(ReaderTrigger trigger, Consumer function) { + this.trigger = trigger; + this.function = function; + } + + public ReaderTrigger getTrigger() { + return this.trigger; + } + + public Consumer getFunction() { + return this.function; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreRead.java index 2d3e121b1c45..b26ff2a5a4a5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreRead.java @@ -23,6 +23,7 @@ import org.apache.paimon.table.source.DataSplit; import java.io.IOException; +import java.util.List; /** * Read operation which provides {@link RecordReader} creation. @@ -33,6 +34,8 @@ public interface FileStoreRead { FileStoreRead withFilter(Predicate predicate); + FileStoreRead withFileHooks(List fileHooks); + /** Create a {@link RecordReader} from split. */ RecordReader createReader(DataSplit split) throws IOException; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java index b0ab4338f117..d53a1865161d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java @@ -65,7 +65,7 @@ import static org.apache.paimon.predicate.PredicateBuilder.splitAnd; /** {@link FileStoreRead} implementation for {@link KeyValueFileStore}. */ -public class KeyValueFileStoreRead implements FileStoreRead { +public class KeyValueFileStoreRead extends AbstractFileStoreRead { private final TableSchema tableSchema; private final FileIO fileIO; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index 4d91328f24c7..bd9f7d23616a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -122,7 +122,7 @@ public void projection(int[][] projection) { @Override public RecordReader reader(Split split) throws IOException { - return read.createReader((DataSplit) split); + return read.withFileHooks(hooks).createReader((DataSplit) split); } }; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java index 930cddcd589d..9116cfb44b55 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java @@ -21,6 +21,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; import org.apache.paimon.operation.DefaultValueAssigner; +import org.apache.paimon.operation.FileHook; import org.apache.paimon.operation.FileStoreRead; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateProjectionConverter; @@ -29,6 +30,8 @@ import org.apache.paimon.utils.Projection; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Optional; /** A {@link InnerTableRead} for data table. */ @@ -41,6 +44,8 @@ public abstract class AbstractDataTableRead implements InnerTableRead { private boolean executeFilter = false; private Predicate predicate; + protected final List hooks = new ArrayList<>(); + public AbstractDataTableRead(FileStoreRead fileStoreRead, TableSchema schema) { this.fileStoreRead = fileStoreRead; this.defaultValueAssigner = schema == null ? null : DefaultValueAssigner.create(schema); @@ -48,6 +53,11 @@ public AbstractDataTableRead(FileStoreRead fileStoreRead, TableSchema schema) public abstract void projection(int[][] projection); + public TableRead withFileHook(FileHook hook) { + hooks.add(hook); + return this; + } + public abstract RecordReader reader(Split split) throws IOException; @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java index bb0354eee52e..12d51590dccd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java @@ -52,7 +52,9 @@ public TableRead withIOManager(IOManager ioManager) { @Override public final RecordReader reader(Split split) throws IOException { - return new RowDataRecordReader(read.createReader((DataSplit) split)); + RecordReader recordReader = + read.withFileHooks(hooks).createReader((DataSplit) split); + return new RowDataRecordReader(recordReader); } protected abstract RecordReader.RecordIterator rowDataRecordIteratorFromKv( diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala index 961aad8a26ad..cb87e7a309e5 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala @@ -20,10 +20,12 @@ package org.apache.paimon.spark import org.apache.paimon.data import org.apache.paimon.disk.IOManager +import org.apache.paimon.operation.FileHook import org.apache.paimon.reader.RecordReader import org.apache.paimon.spark.SparkUtils.createIOManager -import org.apache.paimon.table.source.{ReadBuilder, Split} +import org.apache.paimon.table.source.{AbstractDataTableRead, ReadBuilder, Split} +import org.apache.spark.sql.Utils import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} @@ -38,14 +40,34 @@ case class PaimonPartitionReaderFactory(readBuilder: ReadBuilder) extends Partit override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { partition match { case paimonInputPartition: SparkInputPartition => + val tableRead = readBuilder.newRead().withIOManager(ioManager) + tableRead match { + case dataTableRead: AbstractDataTableRead[_] => + addFileHook(dataTableRead) + case _ => + } val readFunc: Split => RecordReader[data.InternalRow] = - (split: Split) => readBuilder.newRead().withIOManager(ioManager).createReader(split) + (split: Split) => tableRead.createReader(split) PaimonPartitionReader(readFunc, paimonInputPartition, row) case _ => throw new RuntimeException(s"It's not a Paimon input partition, $partition") } } + private def addFileHook(tableRead: AbstractDataTableRead[_]): Unit = { + tableRead.withFileHook( + new FileHook( + FileHook.ReaderTrigger.OPEN_FILE, + (fileName: String) => Utils.setInputFileName(fileName) + )) + + tableRead.withFileHook( + new FileHook( + FileHook.ReaderTrigger.CLOSE_FILE, + (_: String) => Utils.unsetInputFileName() + )) + } + override def equals(obj: Any): Boolean = { obj match { case other: PaimonPartitionReaderFactory => diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala index 8f7e5aaf7c5c..4767dab39468 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql +import org.apache.spark.rdd.InputFileBlockHolder import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} @@ -69,4 +70,12 @@ object Utils { def bytesToString(size: Long): String = { SparkUtils.bytesToString(size) } + + def setInputFileName(inputFileName: String): Unit = { + InputFileBlockHolder.set(inputFileName, 0, -1) + } + + def unsetInputFileName(): Unit = { + InputFileBlockHolder.unset() + } } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala new file mode 100644 index 000000000000..ef683366d9d3 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.paimon.spark.PaimonSparkTestBase +import org.apache.paimon.table.source.DataSplit + +import org.apache.spark.sql.{Row, SparkSession} +import org.junit.jupiter.api.Assertions + +import java.util + +import scala.collection.JavaConverters._ + +class PaimonQueryTest extends PaimonSparkTestBase { + + fileFormats.foreach { + fileFormat => + bucketModes.foreach { + bucketMode => + test(s"Query input_file_name(): file.format=$fileFormat, bucket=$bucketMode") { + val _spark: SparkSession = spark + import _spark.implicits._ + + withTable("T") { + spark.sql(s""" + |CREATE TABLE T (id INT, name STRING) + |TBLPROPERTIES ('file.format'='$fileFormat', 'bucket'='$bucketMode') + |""".stripMargin) + + val location = loadTable("T").location().toUri.toString + + spark.sql("INSERT INTO T VALUES (1, 'x1'), (3, 'x3')") + + val res1 = spark.sql(s""" + |SELECT *, + |startswith(input_file_name(), '$location') AS start, + |endswith(input_file_name(), '.$fileFormat') AS end + |FROM T + |ORdER BY id + |""".stripMargin) + checkAnswer(res1, Row(1, "x1", true, true) :: Row(3, "x3", true, true) :: Nil) + + spark.sql("INSERT INTO T VALUES (2, 'x2'), (4, 'x4'), (6, 'x6')") + + val res2 = + spark.sql("SELECT input_file_name() FROM T").distinct().as[String].collect().sorted + val allDataFiles = getAllFiles("T", Seq.empty, null) + Assertions.assertTrue(res2.sameElements(allDataFiles)) + } + } + + } + } + + fileFormats.foreach { + fileFormat => + bucketModes.foreach { + bucketMode => + test( + s"Query input_file_name() for partitioned table: file.format=$fileFormat, bucket=$bucketMode") { + val _spark: SparkSession = spark + import _spark.implicits._ + + withTable("T") { + spark.sql(s""" + |CREATE TABLE T (id INT, name STRING, pt STRING) + |PARTITIONED BY (pt) + |TBLPROPERTIES ('file.format'='$fileFormat', 'bucket'='$bucketMode') + |""".stripMargin) + + val location = loadTable("T").location().toUri.toString + + spark.sql("INSERT INTO T VALUES (1, 'x1', '2024'), (3, 'x3', '2024')") + + val res1 = spark.sql(s""" + |SELECT id, name, pt, + |startswith(input_file_name(), '$location') AS start, + |endswith(input_file_name(), '.$fileFormat') AS end + |FROM T + |ORdER BY id + |""".stripMargin) + checkAnswer( + res1, + Row(1, "x1", "2024", true, true) :: Row(3, "x3", "2024", true, true) :: Nil) + + spark.sql(""" + |INSERT INTO T + |VALUES (2, 'x2', '2025'), (4, 'x4', '2025'), (6, 'x6', '2026') + |""".stripMargin) + + val res2 = + spark + .sql("SELECT input_file_name() FROM T WHERE pt='2026'") + .distinct() + .as[String] + .collect() + .sorted + val partitionFilter = new util.HashMap[String, String]() + partitionFilter.put("pt", "2026") + val partialDataFiles = getAllFiles("T", Seq("pt"), partitionFilter) + Assertions.assertTrue(res2.sameElements(partialDataFiles)) + + val res3 = + spark.sql("SELECT input_file_name() FROM T").distinct().as[String].collect().sorted + val allDataFiles = getAllFiles("T", Seq("pt"), null) + Assertions.assertTrue(res3.sameElements(allDataFiles)) + } + } + + } + } + + private def getAllFiles( + tableName: String, + partitions: Seq[String], + partitionFilter: java.util.Map[String, String]): Array[String] = { + val paimonTable = loadTable(tableName) + val location = paimonTable.location() + + val files = paimonTable + .newSnapshotReader() + .withPartitionFilter(partitionFilter) + .read() + .splits() + .asScala + .collect { case ds: DataSplit => ds } + .flatMap { + ds => + val prefix = if (partitions.isEmpty) { + s"$location/bucket-${ds.bucket}" + } else { + val partitionPath = partitions.zipWithIndex + .map { + case (pt, index) => + s"$pt=" + ds.partition().getString(index) + } + .mkString("/") + s"$location/$partitionPath/bucket-${ds.bucket}" + } + ds.dataFiles().asScala.map(f => prefix + "/" + f.fileName) + } + files.sorted.toArray + } +} diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/WithTableOptions.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/WithTableOptions.scala index 5b1c65525404..e390058bafab 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/WithTableOptions.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/WithTableOptions.scala @@ -23,6 +23,8 @@ trait WithTableOptions { // 3: fixed bucket, -1: dynamic bucket protected val bucketModes: Seq[Int] = Seq(3, -1) + protected val fileFormats: Seq[String] = Seq("orc", "parquet") + protected val withPk: Seq[Boolean] = Seq(true, false) } From 00434f86b60fe8af61464eec3e311c42748e6805 Mon Sep 17 00:00:00 2001 From: Yann Date: Tue, 26 Mar 2024 16:40:37 +0800 Subject: [PATCH 2/3] [followup] define spark PaimonRecordReaderIterator to set inputFileName info --- .../data/columnar/ColumnarRowIterator.java | 17 ++- ...nIterator.java => FileRecordIterator.java} | 28 +++-- .../apache/paimon/reader/RecordReader.java | 4 +- .../ApplyDeletionVectorReader.java | 6 +- .../paimon/io/RowDataFileRecordReader.java | 43 +------- .../apache/paimon/mergetree/LookupLevels.java | 7 +- .../operation/AbstractFileStoreRead.java | 34 ------ .../operation/AppendOnlyFileStoreRead.java | 5 +- .../org/apache/paimon/operation/FileHook.java | 49 --------- .../paimon/operation/FileStoreRead.java | 3 - .../operation/KeyValueFileStoreRead.java | 2 +- .../table/AppendOnlyFileStoreTable.java | 2 +- .../table/source/AbstractDataTableRead.java | 10 -- .../table/source/KeyValueTableRead.java | 4 +- .../paimon/format/orc/OrcReaderFactory.java | 15 ++- .../format/parquet/ParquetReaderFactory.java | 23 ++-- .../paimon/spark/PaimonPartitionReader.scala | 2 +- .../spark/PaimonPartitionReaderFactory.scala | 22 +--- .../spark/PaimonRecordReaderIterator.scala | 104 ++++++++++++++++++ 19 files changed, 179 insertions(+), 201 deletions(-) rename paimon-common/src/main/java/org/apache/paimon/reader/{RecordWithPositionIterator.java => FileRecordIterator.java} (77%) delete mode 100644 paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreRead.java delete mode 100644 paimon-core/src/main/java/org/apache/paimon/operation/FileHook.java create mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java index 6de861af019c..13d706cf6b6a 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java @@ -20,8 +20,9 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.PartitionInfo; +import org.apache.paimon.fs.Path; +import org.apache.paimon.reader.FileRecordIterator; import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.reader.RecordWithPositionIterator; import org.apache.paimon.utils.RecyclableIterator; import org.apache.paimon.utils.VectorMappingUtils; @@ -32,8 +33,9 @@ * {@link ColumnarRow#setRowId}. */ public class ColumnarRowIterator extends RecyclableIterator - implements RecordWithPositionIterator { + implements FileRecordIterator { + private final Path filePath; private final ColumnarRow rowData; private final Runnable recycler; @@ -41,8 +43,9 @@ public class ColumnarRowIterator extends RecyclableIterator private int nextPos; private long nextGlobalPos; - public ColumnarRowIterator(ColumnarRow rowData, @Nullable Runnable recycler) { + public ColumnarRowIterator(Path filePath, ColumnarRow rowData, @Nullable Runnable recycler) { super(recycler); + this.filePath = filePath; this.rowData = rowData; this.recycler = recycler; } @@ -74,8 +77,14 @@ public long returnedPosition() { return nextGlobalPos - 1; } + @Override + public Path filePath() { + return this.filePath; + } + public ColumnarRowIterator copy(ColumnVector[] vectors) { - ColumnarRowIterator newIterator = new ColumnarRowIterator(rowData.copy(vectors), recycler); + ColumnarRowIterator newIterator = + new ColumnarRowIterator(filePath, rowData.copy(vectors), recycler); newIterator.reset(num, nextGlobalPos); return newIterator; } diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/RecordWithPositionIterator.java b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java similarity index 77% rename from paimon-common/src/main/java/org/apache/paimon/reader/RecordWithPositionIterator.java rename to paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java index e4778413a34f..0cef8cc001e6 100644 --- a/paimon-common/src/main/java/org/apache/paimon/reader/RecordWithPositionIterator.java +++ b/paimon-common/src/main/java/org/apache/paimon/reader/FileRecordIterator.java @@ -18,6 +18,7 @@ package org.apache.paimon.reader; +import org.apache.paimon.fs.Path; import org.apache.paimon.utils.Filter; import javax.annotation.Nullable; @@ -30,7 +31,7 @@ * * @param The type of the record. */ -public interface RecordWithPositionIterator extends RecordReader.RecordIterator { +public interface FileRecordIterator extends RecordReader.RecordIterator { /** * Get the row position of the row returned by {@link RecordReader.RecordIterator#next}. @@ -39,15 +40,23 @@ public interface RecordWithPositionIterator extends RecordReader.RecordIterat */ long returnedPosition(); + /** @return the file path */ + Path filePath(); + @Override - default RecordWithPositionIterator transform(Function function) { - RecordWithPositionIterator thisIterator = this; - return new RecordWithPositionIterator() { + default FileRecordIterator transform(Function function) { + FileRecordIterator thisIterator = this; + return new FileRecordIterator() { @Override public long returnedPosition() { return thisIterator.returnedPosition(); } + @Override + public Path filePath() { + return thisIterator.filePath(); + } + @Nullable @Override public R next() throws IOException { @@ -66,14 +75,19 @@ public void releaseBatch() { } @Override - default RecordWithPositionIterator filter(Filter filter) { - RecordWithPositionIterator thisIterator = this; - return new RecordWithPositionIterator() { + default FileRecordIterator filter(Filter filter) { + FileRecordIterator thisIterator = this; + return new FileRecordIterator() { @Override public long returnedPosition() { return thisIterator.returnedPosition(); } + @Override + public Path filePath() { + return thisIterator.filePath(); + } + @Nullable @Override public T next() throws IOException { diff --git a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java index 276a855719d6..5c7482d9d9c1 100644 --- a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java @@ -149,11 +149,11 @@ default void forEachRemaining(Consumer action) throws IOException { */ default void forEachRemainingWithPosition(BiConsumer action) throws IOException { - RecordWithPositionIterator batch; + FileRecordIterator batch; T record; try { - while ((batch = (RecordWithPositionIterator) readBatch()) != null) { + while ((batch = (FileRecordIterator) readBatch()) != null) { while ((record = batch.next()) != null) { action.accept(batch.returnedPosition(), record); } diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java index 3bba07506338..dadde99eac5f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/ApplyDeletionVectorReader.java @@ -18,8 +18,8 @@ package org.apache.paimon.deletionvectors; +import org.apache.paimon.reader.FileRecordIterator; import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.reader.RecordWithPositionIterator; import javax.annotation.Nullable; @@ -62,10 +62,10 @@ public RecordIterator readBatch() throws IOException { } checkArgument( - batch instanceof RecordWithPositionIterator, + batch instanceof FileRecordIterator, "There is a bug, RecordIterator in ApplyDeletionVectorReader must be RecordWithPositionIterator"); - RecordWithPositionIterator batchWithPosition = (RecordWithPositionIterator) batch; + FileRecordIterator batchWithPosition = (FileRecordIterator) batch; return batchWithPosition.filter( a -> !deletionVector.isDeleted(batchWithPosition.returnedPosition())); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java index f15547c8bebd..ed891a32b2e1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileRecordReader.java @@ -28,7 +28,6 @@ import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; -import org.apache.paimon.operation.FileHook; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.utils.FileUtils; import org.apache.paimon.utils.ProjectedRow; @@ -36,23 +35,15 @@ import javax.annotation.Nullable; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.function.Consumer; /** Reads {@link InternalRow} from data files. */ public class RowDataFileRecordReader implements RecordReader { - private final Path path; private final RecordReader reader; @Nullable private final int[] indexMapping; @Nullable private final PartitionInfo partitionInfo; @Nullable private final CastFieldGetter[] castMapping; - private boolean triggerOpenHooks = false; - private final List> openFileHooks = new ArrayList<>(); - private final List> closeFileHooks = new ArrayList<>(); - public RowDataFileRecordReader( FileIO fileIO, Path path, @@ -60,33 +51,19 @@ public RowDataFileRecordReader( FormatReaderFactory readerFactory, @Nullable int[] indexMapping, @Nullable CastFieldGetter[] castMapping, - @Nullable PartitionInfo partitionInfo, - List hooks) + @Nullable PartitionInfo partitionInfo) throws IOException { FileUtils.checkExists(fileIO, path); - this.path = path; FormatReaderContext context = new FormatReaderContext(fileIO, path, fileSize); this.reader = readerFactory.createReader(context); this.indexMapping = indexMapping; this.partitionInfo = partitionInfo; this.castMapping = castMapping; - for (FileHook hook : hooks) { - if (hook.getTrigger().equals(FileHook.ReaderTrigger.OPEN_FILE)) { - openFileHooks.add(hook.getFunction()); - } else if (hook.getTrigger().equals(FileHook.ReaderTrigger.CLOSE_FILE)) { - closeFileHooks.add(hook.getFunction()); - } else { - throw new UnsupportedOperationException( - hook.getTrigger().name() + " is not supported."); - } - } } @Nullable @Override public RecordReader.RecordIterator readBatch() throws IOException { - triggerOpenFileHooks(); - RecordIterator iterator = reader.readBatch(); if (iterator == null) { return null; @@ -117,23 +94,5 @@ public RecordReader.RecordIterator readBatch() throws IOException { @Override public void close() throws IOException { reader.close(); - triggerCloseFileHooks(); - } - - private void triggerOpenFileHooks() { - if (!triggerOpenHooks && !openFileHooks.isEmpty()) { - for (Consumer func : openFileHooks) { - func.accept(path.toUri().toString()); - } - triggerOpenHooks = true; - } - } - - private void triggerCloseFileHooks() { - if (!closeFileHooks.isEmpty()) { - for (Consumer func : closeFileHooks) { - func.accept(path.toUri().toString()); - } - } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java index dd45e7fc19c8..d6024ebb88cf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java @@ -28,8 +28,8 @@ import org.apache.paimon.lookup.LookupStoreWriter; import org.apache.paimon.memory.MemorySegment; import org.apache.paimon.options.MemorySize; +import org.apache.paimon.reader.FileRecordIterator; import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.reader.RecordWithPositionIterator; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BloomFilter; @@ -176,9 +176,8 @@ private LookupFile createLookupFile(DataFileMeta file) throws IOException { try (RecordReader reader = fileReaderFactory.apply(file)) { KeyValue kv; if (valueProcessor.withPosition()) { - RecordWithPositionIterator batch; - while ((batch = (RecordWithPositionIterator) reader.readBatch()) - != null) { + FileRecordIterator batch; + while ((batch = (FileRecordIterator) reader.readBatch()) != null) { while ((kv = batch.next()) != null) { byte[] keyBytes = keySerializer.serializeToBytes(kv.key()); byte[] valueBytes = diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreRead.java deleted file mode 100644 index 2f53b6f430b8..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreRead.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.operation; - -import java.util.ArrayList; -import java.util.List; - -/** abstract file store read. */ -public abstract class AbstractFileStoreRead implements FileStoreRead { - - protected List fileHooks = new ArrayList<>(); - - @Override - public FileStoreRead withFileHooks(List fileHooks) { - this.fileHooks.addAll(fileHooks); - return this; - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java index 61150f2787d8..b67edaaf3e6b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreRead.java @@ -56,7 +56,7 @@ import static org.apache.paimon.predicate.PredicateBuilder.splitAnd; /** {@link FileStoreRead} for {@link AppendOnlyFileStore}. */ -public class AppendOnlyFileStoreRead extends AbstractFileStoreRead { +public class AppendOnlyFileStoreRead implements FileStoreRead { private static final Logger LOG = LoggerFactory.getLogger(AppendOnlyFileStoreRead.class); @@ -182,8 +182,7 @@ public RecordReader createReader(DataSplit split) throws IOExceptio bulkFormatMapping.getIndexMapping(), bulkFormatMapping.getCastMapping(), PartitionUtils.create( - bulkFormatMapping.getPartitionPair(), partition), - fileHooks)); + bulkFormatMapping.getPartitionPair(), partition))); } return ConcatRecordReader.create(suppliers); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileHook.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileHook.java deleted file mode 100644 index fe2c22278f42..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileHook.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.operation; - -import java.util.function.Consumer; - -/** file hook. */ -public class FileHook { - - /** ReaderTrigger. */ - public enum ReaderTrigger { - OPEN_FILE, - - CLOSE_FILE; - } - - private final ReaderTrigger trigger; - - private final Consumer function; - - public FileHook(ReaderTrigger trigger, Consumer function) { - this.trigger = trigger; - this.function = function; - } - - public ReaderTrigger getTrigger() { - return this.trigger; - } - - public Consumer getFunction() { - return this.function; - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreRead.java index b26ff2a5a4a5..2d3e121b1c45 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreRead.java @@ -23,7 +23,6 @@ import org.apache.paimon.table.source.DataSplit; import java.io.IOException; -import java.util.List; /** * Read operation which provides {@link RecordReader} creation. @@ -34,8 +33,6 @@ public interface FileStoreRead { FileStoreRead withFilter(Predicate predicate); - FileStoreRead withFileHooks(List fileHooks); - /** Create a {@link RecordReader} from split. */ RecordReader createReader(DataSplit split) throws IOException; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java index d53a1865161d..b0ab4338f117 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreRead.java @@ -65,7 +65,7 @@ import static org.apache.paimon.predicate.PredicateBuilder.splitAnd; /** {@link FileStoreRead} implementation for {@link KeyValueFileStore}. */ -public class KeyValueFileStoreRead extends AbstractFileStoreRead { +public class KeyValueFileStoreRead implements FileStoreRead { private final TableSchema tableSchema; private final FileIO fileIO; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index bd9f7d23616a..4d91328f24c7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -122,7 +122,7 @@ public void projection(int[][] projection) { @Override public RecordReader reader(Split split) throws IOException { - return read.withFileHooks(hooks).createReader((DataSplit) split); + return read.createReader((DataSplit) split); } }; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java index 9116cfb44b55..930cddcd589d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java @@ -21,7 +21,6 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; import org.apache.paimon.operation.DefaultValueAssigner; -import org.apache.paimon.operation.FileHook; import org.apache.paimon.operation.FileStoreRead; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateProjectionConverter; @@ -30,8 +29,6 @@ import org.apache.paimon.utils.Projection; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import java.util.Optional; /** A {@link InnerTableRead} for data table. */ @@ -44,8 +41,6 @@ public abstract class AbstractDataTableRead implements InnerTableRead { private boolean executeFilter = false; private Predicate predicate; - protected final List hooks = new ArrayList<>(); - public AbstractDataTableRead(FileStoreRead fileStoreRead, TableSchema schema) { this.fileStoreRead = fileStoreRead; this.defaultValueAssigner = schema == null ? null : DefaultValueAssigner.create(schema); @@ -53,11 +48,6 @@ public AbstractDataTableRead(FileStoreRead fileStoreRead, TableSchema schema) public abstract void projection(int[][] projection); - public TableRead withFileHook(FileHook hook) { - hooks.add(hook); - return this; - } - public abstract RecordReader reader(Split split) throws IOException; @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java index 12d51590dccd..bb0354eee52e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/KeyValueTableRead.java @@ -52,9 +52,7 @@ public TableRead withIOManager(IOManager ioManager) { @Override public final RecordReader reader(Split split) throws IOException { - RecordReader recordReader = - read.withFileHooks(hooks).createReader((DataSplit) split); - return new RowDataRecordReader(recordReader); + return new RowDataRecordReader(read.createReader((DataSplit) split)); } protected abstract RecordReader.RecordIterator rowDataRecordIteratorFromKv( diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index 55cff92980bf..8cf95fad3290 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -28,6 +28,7 @@ import org.apache.paimon.format.fs.HadoopReadOnlyFileSystem; import org.apache.paimon.format.orc.filter.OrcFilters; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; import org.apache.paimon.reader.RecordReader.RecordIterator; import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowType; @@ -94,7 +95,7 @@ public OrcVectorizedReader createReader(FormatReaderFactory.Context context) context instanceof OrcFormatReaderContext ? ((OrcFormatReaderContext) context).poolSize() : 1; - Pool poolOfBatches = createPoolOfBatches(poolSize); + Pool poolOfBatches = createPoolOfBatches(context.filePath(), poolSize); RecordReader orcReader = createRecordReader( @@ -114,7 +115,7 @@ public OrcVectorizedReader createReader(FormatReaderFactory.Context context) * conversion from the ORC representation to the result format. */ public OrcReaderBatch createReaderBatch( - VectorizedRowBatch orcBatch, Pool.Recycler recycler) { + Path filePath, VectorizedRowBatch orcBatch, Pool.Recycler recycler) { List tableFieldNames = tableType.getFieldNames(); List tableFieldTypes = tableType.getFieldTypes(); @@ -125,17 +126,17 @@ public OrcReaderBatch createReaderBatch( DataType type = tableFieldTypes.get(i); vectors[i] = createPaimonVector(orcBatch.cols[tableFieldNames.indexOf(name)], type); } - return new OrcReaderBatch(orcBatch, new VectorizedColumnBatch(vectors), recycler); + return new OrcReaderBatch(filePath, orcBatch, new VectorizedColumnBatch(vectors), recycler); } // ------------------------------------------------------------------------ - private Pool createPoolOfBatches(int numBatches) { + private Pool createPoolOfBatches(Path filePath, int numBatches) { final Pool pool = new Pool<>(numBatches); for (int i = 0; i < numBatches; i++) { final VectorizedRowBatch orcBatch = createBatchWrapper(schema, batchSize / numBatches); - final OrcReaderBatch batch = createReaderBatch(orcBatch, pool.recycler()); + final OrcReaderBatch batch = createReaderBatch(filePath, orcBatch, pool.recycler()); pool.add(batch); } @@ -153,6 +154,7 @@ private static class OrcReaderBatch { private final ColumnarRowIterator result; protected OrcReaderBatch( + final Path filePath, final VectorizedRowBatch orcVectorizedRowBatch, final VectorizedColumnBatch paimonColumnBatch, final Pool.Recycler recycler) { @@ -160,7 +162,8 @@ protected OrcReaderBatch( this.recycler = checkNotNull(recycler); this.paimonColumnBatch = paimonColumnBatch; this.result = - new ColumnarRowIterator(new ColumnarRow(paimonColumnBatch), this::recycle); + new ColumnarRowIterator( + filePath, new ColumnarRow(paimonColumnBatch), this::recycle); } /** diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java index ed778c0bf018..004a0d655e96 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java @@ -28,6 +28,7 @@ import org.apache.paimon.format.parquet.reader.ColumnReader; import org.apache.paimon.format.parquet.reader.ParquetDecimalVector; import org.apache.paimon.format.parquet.reader.ParquetTimestampVector; +import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.reader.RecordReader.RecordIterator; @@ -100,7 +101,8 @@ public ParquetReader createReader(FormatReaderFactory.Context context) throws IO checkSchema(fileSchema, requestedSchema); - Pool poolOfBatches = createPoolOfBatches(requestedSchema); + Pool poolOfBatches = + createPoolOfBatches(context.filePath(), requestedSchema); return new ParquetReader(reader, requestedSchema, reader.getRecordCount(), poolOfBatches); } @@ -174,21 +176,24 @@ private void checkSchema(MessageType fileSchema, MessageType requestedSchema) } } - private Pool createPoolOfBatches(MessageType requestedSchema) { + private Pool createPoolOfBatches( + Path filePath, MessageType requestedSchema) { // In a VectorizedColumnBatch, the dictionary will be lazied deserialized. // If there are multiple batches at the same time, there may be thread safety problems, // because the deserialization of the dictionary depends on some internal structures. // We need set poolCapacity to 1. Pool pool = new Pool<>(1); - pool.add(createReaderBatch(requestedSchema, pool.recycler())); + pool.add(createReaderBatch(filePath, requestedSchema, pool.recycler())); return pool; } private ParquetReaderBatch createReaderBatch( - MessageType requestedSchema, Pool.Recycler recycler) { + Path filePath, + MessageType requestedSchema, + Pool.Recycler recycler) { WritableColumnVector[] writableVectors = createWritableVectors(requestedSchema); VectorizedColumnBatch columnarBatch = createVectorizedColumnBatch(writableVectors); - return createReaderBatch(writableVectors, columnarBatch, recycler); + return createReaderBatch(filePath, writableVectors, columnarBatch, recycler); } private WritableColumnVector[] createWritableVectors(MessageType requestedSchema) { @@ -361,10 +366,11 @@ public void close() throws IOException { } private ParquetReaderBatch createReaderBatch( + Path filePath, WritableColumnVector[] writableVectors, VectorizedColumnBatch columnarBatch, Pool.Recycler recycler) { - return new ParquetReaderBatch(writableVectors, columnarBatch, recycler); + return new ParquetReaderBatch(filePath, writableVectors, columnarBatch, recycler); } private static class ParquetReaderBatch { @@ -376,13 +382,16 @@ private static class ParquetReaderBatch { private final ColumnarRowIterator result; protected ParquetReaderBatch( + Path filePath, WritableColumnVector[] writableVectors, VectorizedColumnBatch columnarBatch, Pool.Recycler recycler) { this.writableVectors = writableVectors; this.columnarBatch = columnarBatch; this.recycler = recycler; - this.result = new ColumnarRowIterator(new ColumnarRow(columnarBatch), this::recycle); + this.result = + new ColumnarRowIterator( + filePath, new ColumnarRow(columnarBatch), this::recycle); } public void recycle() { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala index cfb8803b4cee..c4e694814453 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala @@ -40,7 +40,7 @@ case class PaimonPartitionReader( private lazy val iterator = { val reader = readFunc(split) - new RecordReaderIterator[PaimonInternalRow](reader) + PaimonRecordReaderIterator(reader) } override def next(): Boolean = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala index cb87e7a309e5..2314f57c3e16 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala @@ -40,34 +40,14 @@ case class PaimonPartitionReaderFactory(readBuilder: ReadBuilder) extends Partit override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { partition match { case paimonInputPartition: SparkInputPartition => - val tableRead = readBuilder.newRead().withIOManager(ioManager) - tableRead match { - case dataTableRead: AbstractDataTableRead[_] => - addFileHook(dataTableRead) - case _ => - } val readFunc: Split => RecordReader[data.InternalRow] = - (split: Split) => tableRead.createReader(split) + (split: Split) => readBuilder.newRead().withIOManager(ioManager).createReader(split) PaimonPartitionReader(readFunc, paimonInputPartition, row) case _ => throw new RuntimeException(s"It's not a Paimon input partition, $partition") } } - private def addFileHook(tableRead: AbstractDataTableRead[_]): Unit = { - tableRead.withFileHook( - new FileHook( - FileHook.ReaderTrigger.OPEN_FILE, - (fileName: String) => Utils.setInputFileName(fileName) - )) - - tableRead.withFileHook( - new FileHook( - FileHook.ReaderTrigger.CLOSE_FILE, - (_: String) => Utils.unsetInputFileName() - )) - } - override def equals(obj: Any): Boolean = { obj match { case other: PaimonPartitionReaderFactory => diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala new file mode 100644 index 000000000000..08fbe5b240ad --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark + +import org.apache.paimon.data.{InternalRow => PaimonInternalRow} +import org.apache.paimon.fs.Path +import org.apache.paimon.reader.{FileRecordIterator, RecordReader} +import org.apache.paimon.utils.CloseableIterator + +import org.apache.spark.sql.Utils + +import java.io.IOException + +case class PaimonRecordReaderIterator(reader: RecordReader[PaimonInternalRow]) + extends CloseableIterator[PaimonInternalRow] { + + private var lastFilePath: Path = _ + private var currentIterator: RecordReader.RecordIterator[PaimonInternalRow] = readBatch() + private var advanced = false + private var currentResult: PaimonInternalRow = _ + + override def hasNext: Boolean = { + if (currentIterator == null) { + false + } else { + advanceIfNeeded() + currentResult != null + } + } + + override def next(): PaimonInternalRow = { + if (!hasNext) { + null + } else { + advanced = false + currentResult + } + } + + override def close(): Unit = { + try { + if (currentIterator != null) { + currentIterator.releaseBatch() + currentResult == null + } + } finally { + reader.close() + } + } + + private def readBatch(): RecordReader.RecordIterator[PaimonInternalRow] = { + val iter = reader.readBatch() + iter match { + case fileRecordIterator: FileRecordIterator[_] => + if (lastFilePath != fileRecordIterator.filePath()) { + Utils.setInputFileName(fileRecordIterator.filePath().toUri.toString) + lastFilePath = fileRecordIterator.filePath() + } + case _ => + } + iter + } + + private def advanceIfNeeded(): Unit = { + if (!advanced) { + advanced = true + try { + var stop = false + while (!stop) { + currentResult = currentIterator.next + if (currentResult != null) { + stop = true + } else { + currentIterator.releaseBatch() + currentIterator = null + currentIterator = readBatch() + if (currentIterator == null) { + stop = true + } + } + } + } catch { + case e: IOException => + throw new RuntimeException(e) + } + } + } +} From a2a7fa67e937dd09f046df2fa4b0f2982767fdec Mon Sep 17 00:00:00 2001 From: Yann Date: Tue, 26 Mar 2024 16:43:58 +0800 Subject: [PATCH 3/3] [followup] --- .../apache/paimon/spark/PaimonPartitionReaderFactory.scala | 4 +--- .../org/apache/paimon/spark/PaimonRecordReaderIterator.scala | 1 + 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala index 2314f57c3e16..961aad8a26ad 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala @@ -20,12 +20,10 @@ package org.apache.paimon.spark import org.apache.paimon.data import org.apache.paimon.disk.IOManager -import org.apache.paimon.operation.FileHook import org.apache.paimon.reader.RecordReader import org.apache.paimon.spark.SparkUtils.createIOManager -import org.apache.paimon.table.source.{AbstractDataTableRead, ReadBuilder, Split} +import org.apache.paimon.table.source.{ReadBuilder, Split} -import org.apache.spark.sql.Utils import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala index 08fbe5b240ad..3debb5e1891d 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala @@ -61,6 +61,7 @@ case class PaimonRecordReaderIterator(reader: RecordReader[PaimonInternalRow]) } } finally { reader.close() + Utils.unsetInputFileName() } }