diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInputPartitionReader.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInputPartitionReader.java deleted file mode 100644 index 0c299cabeb48..000000000000 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInputPartitionReader.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.spark; - -import org.apache.paimon.disk.IOManager; -import org.apache.paimon.reader.RecordReaderIterator; - -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.connector.read.PartitionReader; -import org.apache.spark.sql.connector.read.PartitionReaderFactory; - -import java.io.IOException; - -/** A spark 3 {@link PartitionReader} for paimon, created by {@link PartitionReaderFactory}. */ -public class SparkInputPartitionReader implements PartitionReader { - - private final IOManager ioManager; - private final RecordReaderIterator iterator; - private final SparkInternalRow row; - - public SparkInputPartitionReader( - IOManager ioManager, - RecordReaderIterator iterator, - SparkInternalRow row) { - this.ioManager = ioManager; - this.iterator = iterator; - this.row = row; - } - - @Override - public boolean next() { - if (iterator.hasNext()) { - row.replace(iterator.next()); - return true; - } - return false; - } - - @Override - public InternalRow get() { - return row; - } - - @Override - public void close() throws IOException { - try { - iterator.close(); - } catch (Exception e) { - throw new IOException(e); - } - try { - ioManager.close(); - } catch (Exception e) { - throw new IOException(e); - } - } -} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java deleted file mode 100644 index 69a0297151e6..000000000000 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkReaderFactory.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.spark; - -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.disk.IOManager; -import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.reader.RecordReaderIterator; -import org.apache.paimon.table.source.ReadBuilder; - -import org.apache.spark.sql.connector.read.InputPartition; -import org.apache.spark.sql.connector.read.PartitionReader; -import org.apache.spark.sql.connector.read.PartitionReaderFactory; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.Objects; - -import static org.apache.paimon.spark.SparkUtils.createIOManager; - -/** A Spark {@link PartitionReaderFactory} for paimon. */ -public class SparkReaderFactory implements PartitionReaderFactory { - - private static final long serialVersionUID = 1L; - - private final ReadBuilder readBuilder; - - public SparkReaderFactory(ReadBuilder readBuilder) { - this.readBuilder = readBuilder; - } - - @Override - public PartitionReader createReader( - InputPartition partition) { - RecordReader reader; - IOManager ioManager = createIOManager(); - try { - reader = - readBuilder - .newRead() - .withIOManager(ioManager) - .createReader(((SparkInputPartition) partition).split()); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - RecordReaderIterator iterator = new RecordReaderIterator<>(reader); - SparkInternalRow row = new SparkInternalRow(readBuilder.readType()); - return new SparkInputPartitionReader(ioManager, iterator, row); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - - if (o == null || getClass() != o.getClass()) { - return false; - } - - SparkReaderFactory that = (SparkReaderFactory) o; - return this.readBuilder.equals(that.readBuilder); - } - - @Override - public int hashCode() { - return Objects.hash(readBuilder); - } -} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala index d954c9e14363..4258df4831e0 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala @@ -18,15 +18,14 @@ package org.apache.paimon.spark import org.apache.paimon.spark.sources.PaimonMicroBatchStream -import org.apache.paimon.table.{DataTable, Table} +import org.apache.paimon.table.{DataTable, FileStoreTable, Table} import org.apache.paimon.table.source.{ReadBuilder, Split} +import org.apache.spark.sql.connector.metric.CustomMetric import org.apache.spark.sql.connector.read.{Batch, Scan, Statistics, SupportsReportStatistics} import org.apache.spark.sql.connector.read.streaming.MicroBatchStream import org.apache.spark.sql.types.StructType -import java.util.OptionalLong - import scala.collection.JavaConverters._ abstract class PaimonBaseScan(table: Table, readBuilder: ReadBuilder, desc: String) @@ -60,4 +59,17 @@ abstract class PaimonBaseScan(table: Table, readBuilder: ReadBuilder, desc: Stri splits } + override def supportedCustomMetrics: Array[CustomMetric] = { + val paimonMetrics: Array[CustomMetric] = table match { + case _: FileStoreTable => + Array( + PaimonNumSplitMetric(), + PaimonSplitSizeMetric(), + PaimonAvgSplitSizeMetric() + ) + case _ => + Array.empty[CustomMetric] + } + super.supportedCustomMetrics() ++ paimonMetrics + } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBatch.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBatch.scala index f1aa4a353014..3cecf9698193 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBatch.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBatch.scala @@ -29,7 +29,8 @@ case class PaimonBatch(splits: Array[Split], readBuilder: ReadBuilder) extends B override def planInputPartitions(): Array[InputPartition] = splits.map(new SparkInputPartition(_).asInstanceOf[InputPartition]) - override def createReaderFactory(): PartitionReaderFactory = new SparkReaderFactory(readBuilder) + override def createReaderFactory(): PartitionReaderFactory = new PaimonPartitionReaderFactory( + readBuilder) override def equals(obj: Any): Boolean = { obj match { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala new file mode 100644 index 000000000000..7940af22457e --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonMetrics.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.paimon.spark + +import org.apache.spark.sql.Utils +import org.apache.spark.sql.connector.metric.{CustomAvgMetric, CustomSumMetric, CustomTaskMetric} + +import java.text.DecimalFormat + +object PaimonMetrics { + + val NUM_SPLITS = "numSplits" + + val SPLIT_SIZE = "splitSize" + + val AVG_SPLIT_SIZE = "avgSplitSize" +} + +// paimon's task metric +sealed trait PaimonTaskMetric extends CustomTaskMetric + +case class PaimonNumSplitsTaskMetric(override val value: Long) extends PaimonTaskMetric { + + override def name(): String = PaimonMetrics.NUM_SPLITS + +} + +case class PaimonSplitSizeTaskMetric(override val value: Long) extends PaimonTaskMetric { + + override def name(): String = PaimonMetrics.SPLIT_SIZE + +} + +case class PaimonAvgSplitSizeTaskMetric(override val value: Long) extends PaimonTaskMetric { + + override def name(): String = PaimonMetrics.AVG_SPLIT_SIZE + +} + +// paimon's sum metric +sealed trait PaimonSumMetric extends CustomSumMetric { + + protected def aggregateTaskMetrics0(taskMetrics: Array[Long]): Long = { + var sum: Long = 0L + for (taskMetric <- taskMetrics) { + sum += taskMetric + } + sum + } + + override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = { + String.valueOf(aggregateTaskMetrics0(taskMetrics)) + } + +} + +case class PaimonNumSplitMetric() extends PaimonSumMetric { + + override def name(): String = PaimonMetrics.NUM_SPLITS + + override def description(): String = "number of splits read" + +} + +case class PaimonSplitSizeMetric() extends PaimonSumMetric { + + override def name(): String = PaimonMetrics.SPLIT_SIZE + + override def description(): String = "size of splits read" + + override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = { + Utils.bytesToString(aggregateTaskMetrics0(taskMetrics)) + } +} + +// paimon's avg metric +sealed trait PaimonAvgMetric extends CustomAvgMetric { + + protected def aggregateTaskMetrics0(taskMetrics: Array[Long]): Double = { + if (taskMetrics.length > 0) { + var sum = 0L + for (taskMetric <- taskMetrics) { + sum += taskMetric + } + sum.toDouble / taskMetrics.length + } else { + 0d + } + } + + override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = { + val average = aggregateTaskMetrics0(taskMetrics) + new DecimalFormat("#0.000").format(average) + } + +} + +case class PaimonAvgSplitSizeMetric() extends PaimonAvgMetric { + + override def name(): String = PaimonMetrics.AVG_SPLIT_SIZE + + override def description(): String = "avg size of splits read" + + override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = { + val average = aggregateTaskMetrics0(taskMetrics).round + Utils.bytesToString(average) + } + +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala new file mode 100644 index 000000000000..d0564c1270bb --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.paimon.spark + +import org.apache.paimon.data.{InternalRow => PaimonInternalRow} +import org.apache.paimon.reader.{RecordReader, RecordReaderIterator} +import org.apache.paimon.table.source.{DataSplit, Split} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.metric.CustomTaskMetric +import org.apache.spark.sql.connector.read.PartitionReader + +import java.io.IOException + +import scala.collection.JavaConverters._ + +case class PaimonPartitionReader( + readFunc: Split => RecordReader[PaimonInternalRow], + partition: SparkInputPartition, + row: SparkInternalRow +) extends PartitionReader[InternalRow] { + + private lazy val split: Split = partition.split + + private lazy val iterator = { + val reader = readFunc(split) + new RecordReaderIterator[PaimonInternalRow](reader) + } + + override def next(): Boolean = { + if (iterator.hasNext) { + row.replace(iterator.next()) + true + } else { + false + } + } + + override def get(): InternalRow = { + row + } + + override def currentMetricsValues(): Array[CustomTaskMetric] = { + val paimonMetricsValues: Array[CustomTaskMetric] = split match { + case dataSplit: DataSplit => + val splitSize = dataSplit.dataFiles().asScala.map(_.fileSize).sum + Array( + PaimonNumSplitsTaskMetric(1L), + PaimonSplitSizeTaskMetric(splitSize), + PaimonAvgSplitSizeTaskMetric(splitSize) + ) + + case _ => + Array.empty[CustomTaskMetric] + } + super.currentMetricsValues() ++ paimonMetricsValues + } + + override def close(): Unit = { + try { + iterator.close() + } catch { + case e: Exception => + throw new IOException(e) + } + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala new file mode 100644 index 000000000000..ae3bff2b2280 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReaderFactory.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.paimon.spark + +import org.apache.paimon.data +import org.apache.paimon.disk.IOManager +import org.apache.paimon.reader.RecordReader +import org.apache.paimon.spark.SparkUtils.createIOManager +import org.apache.paimon.table.source.{ReadBuilder, Split} + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} + +import java.util.Objects + +case class PaimonPartitionReaderFactory(readBuilder: ReadBuilder) extends PartitionReaderFactory { + + private lazy val ioManager: IOManager = createIOManager() + + private lazy val row: SparkInternalRow = new SparkInternalRow(readBuilder.readType()) + + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { + partition match { + case paimonInputPartition: SparkInputPartition => + val readFunc: Split => RecordReader[data.InternalRow] = + (split: Split) => readBuilder.newRead().withIOManager(ioManager).createReader(split) + PaimonPartitionReader(readFunc, paimonInputPartition, row) + case _ => + throw new RuntimeException(s"It's not a Paimon input partition, $partition") + } + } + + override def equals(obj: Any): Boolean = { + obj match { + case other: PaimonPartitionReaderFactory => + this.readBuilder.equals(other.readBuilder) + + case _ => false + } + } + + override def hashCode(): Int = { + Objects.hashCode(readBuilder) + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala index 702065a134e0..0777be4ae747 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonMicroBatchStream.scala @@ -18,7 +18,7 @@ package org.apache.paimon.spark.sources import org.apache.paimon.options.Options -import org.apache.paimon.spark.{PaimonImplicits, SparkConnectorOptions, SparkInputPartition, SparkReaderFactory} +import org.apache.paimon.spark.{PaimonImplicits, PaimonPartitionReaderFactory, SparkConnectorOptions, SparkInputPartition} import org.apache.paimon.table.DataTable import org.apache.paimon.table.source.ReadBuilder @@ -126,7 +126,7 @@ class PaimonMicroBatchStream( } override def createReaderFactory(): PartitionReaderFactory = { - new SparkReaderFactory(readBuilder) + new PaimonPartitionReaderFactory(readBuilder) } override def initialOffset(): Offset = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala index 058664270ead..e8f1b418aa49 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/Utils.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.sources.Filter +import org.apache.spark.util.{Utils => SparkUtils} /** * Some classes or methods defined in the spark project are marked as private under @@ -63,4 +64,8 @@ object Utils { def fieldReference(name: String): NamedReference = { FieldReference.column(name) } + + def bytesToString(size: Long): String = { + SparkUtils.bytesToString(size) + } }