diff --git a/paimon-spark/paimon-spark-3.1/pom.xml b/paimon-spark/paimon-spark-3.1/pom.xml
index ba1f1d27e2a3..0fcf1bdeb838 100644
--- a/paimon-spark/paimon-spark-3.1/pom.xml
+++ b/paimon-spark/paimon-spark-3.1/pom.xml
@@ -91,6 +91,43 @@ under the License.
+
+ org.apache.spark
+ spark-core_2.12
+ ${spark.version}
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+ org.apache.logging.log4j
+ log4j-slf4j2-impl
+
+
+ org.apache.orc
+ orc-core
+
+
+ org.apache.orc
+ orc-mapreduce
+
+
+ org.apache.parquet
+ parquet-column
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+
+
org.apache.spark
spark-sql_2.12
diff --git a/paimon-spark/paimon-spark-3.2/pom.xml b/paimon-spark/paimon-spark-3.2/pom.xml
index 07db172eff39..865f3a9eefb9 100644
--- a/paimon-spark/paimon-spark-3.2/pom.xml
+++ b/paimon-spark/paimon-spark-3.2/pom.xml
@@ -95,6 +95,43 @@ under the License.
+
+ org.apache.spark
+ spark-core_2.12
+ ${spark.version}
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+ org.apache.logging.log4j
+ log4j-slf4j2-impl
+
+
+ org.apache.orc
+ orc-core
+
+
+ org.apache.orc
+ orc-mapreduce
+
+
+ org.apache.parquet
+ parquet-column
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+
+
org.apache.paimon
paimon-spark-common
diff --git a/paimon-spark/paimon-spark-3.3/pom.xml b/paimon-spark/paimon-spark-3.3/pom.xml
index 1405bea615c9..8345f5b4ad87 100644
--- a/paimon-spark/paimon-spark-3.3/pom.xml
+++ b/paimon-spark/paimon-spark-3.3/pom.xml
@@ -91,6 +91,43 @@ under the License.
+
+ org.apache.spark
+ spark-core_2.12
+ ${spark.version}
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+ org.apache.logging.log4j
+ log4j-slf4j2-impl
+
+
+ org.apache.orc
+ orc-core
+
+
+ org.apache.orc
+ orc-mapreduce
+
+
+ org.apache.parquet
+ parquet-column
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+
+
org.apache.paimon
paimon-spark-common
diff --git a/paimon-spark/paimon-spark-3.4/pom.xml b/paimon-spark/paimon-spark-3.4/pom.xml
index 93ce4470517a..47e1952b6778 100644
--- a/paimon-spark/paimon-spark-3.4/pom.xml
+++ b/paimon-spark/paimon-spark-3.4/pom.xml
@@ -91,6 +91,43 @@ under the License.
+
+ org.apache.spark
+ spark-core_2.12
+ ${spark.version}
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+ org.apache.logging.log4j
+ log4j-slf4j2-impl
+
+
+ org.apache.orc
+ orc-core
+
+
+ org.apache.orc
+ orc-mapreduce
+
+
+ org.apache.parquet
+ parquet-column
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+
+
org.apache.paimon
paimon-spark-common
diff --git a/paimon-spark/paimon-spark-3.5/pom.xml b/paimon-spark/paimon-spark-3.5/pom.xml
index d37f1f10feb1..1794dfd93615 100644
--- a/paimon-spark/paimon-spark-3.5/pom.xml
+++ b/paimon-spark/paimon-spark-3.5/pom.xml
@@ -87,6 +87,43 @@ under the License.
+
+ org.apache.spark
+ spark-core_2.12
+ ${spark.version}
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+ org.apache.logging.log4j
+ log4j-slf4j2-impl
+
+
+ org.apache.orc
+ orc-core
+
+
+ org.apache.orc
+ orc-mapreduce
+
+
+ org.apache.parquet
+ parquet-column
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+
+
org.apache.paimon
paimon-spark-common
diff --git a/paimon-spark/paimon-spark-common/pom.xml b/paimon-spark/paimon-spark-common/pom.xml
index f08d86467095..17e4baa3c826 100644
--- a/paimon-spark/paimon-spark-common/pom.xml
+++ b/paimon-spark/paimon-spark-common/pom.xml
@@ -94,6 +94,43 @@ under the License.
+
+ org.apache.spark
+ spark-core_2.12
+ ${spark.version}
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+ org.apache.logging.log4j
+ log4j-slf4j2-impl
+
+
+ org.apache.orc
+ orc-core
+
+
+ org.apache.orc
+ orc-mapreduce
+
+
+ org.apache.parquet
+ parquet-column
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+
+
org.apache.spark
spark-hive_2.12
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTableWrite.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTableWrite.java
deleted file mode 100644
index 3e8926080727..000000000000
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTableWrite.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark;
-
-import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.spark.util.SparkRowUtils;
-import org.apache.paimon.table.sink.BatchTableWrite;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
-import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.sink.CommitMessageSerializer;
-import org.apache.paimon.types.RowType;
-
-import org.apache.spark.sql.Row;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/** An util class for {@link BatchTableWrite}. */
-public class SparkTableWrite implements AutoCloseable {
-
- private final BatchTableWrite write;
- private final IOManager ioManager;
-
- private final RowType rowType;
- private final int rowKindColIdx;
-
- public SparkTableWrite(BatchWriteBuilder writeBuilder, RowType rowType, int rowKindColIdx) {
- this.write = writeBuilder.newWrite();
- this.rowType = rowType;
- this.rowKindColIdx = rowKindColIdx;
- this.ioManager = SparkUtils.createIOManager();
- write.withIOManager(ioManager);
- }
-
- public void write(Row row) throws Exception {
- write.write(toPaimonRow(row));
- }
-
- public void write(Row row, int bucket) throws Exception {
- write.write(toPaimonRow(row), bucket);
- }
-
- public Iterator finish() throws Exception {
- CommitMessageSerializer serializer = new CommitMessageSerializer();
- List commitMessages = new ArrayList<>();
- for (CommitMessage message : write.prepareCommit()) {
- commitMessages.add(serializer.serialize(message));
- }
- return commitMessages.iterator();
- }
-
- @Override
- public void close() throws Exception {
- write.close();
- ioManager.close();
- }
-
- private SparkRow toPaimonRow(Row row) {
- return new SparkRow(rowType, row, SparkRowUtils.getRowKind(row, rowKindColIdx));
- }
-}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
new file mode 100644
index 000000000000..920d907e20a5
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTableWrite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark
+
+import org.apache.paimon.disk.IOManager
+import org.apache.paimon.spark.util.SparkRowUtils
+import org.apache.paimon.table.sink.{BatchTableWrite, BatchWriteBuilder, CommitMessageImpl, CommitMessageSerializer}
+import org.apache.paimon.types.RowType
+
+import org.apache.spark.TaskContext
+import org.apache.spark.sql.{PaimonUtils, Row}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+class SparkTableWrite(writeBuilder: BatchWriteBuilder, rowType: RowType, rowKindColIdx: Int)
+ extends AutoCloseable {
+
+ val ioManager: IOManager = SparkUtils.createIOManager
+ val write: BatchTableWrite =
+ writeBuilder.newWrite().withIOManager(ioManager).asInstanceOf[BatchTableWrite]
+
+ def write(row: Row): Unit = {
+ write.write(toPaimonRow(row))
+ }
+
+ def write(row: Row, bucket: Int): Unit = {
+ write.write(toPaimonRow(row), bucket)
+ }
+
+ def finish(): Iterator[Array[Byte]] = {
+ var bytesWritten = 0L
+ var recordsWritten = 0L
+ val commitMessages = new ListBuffer[Array[Byte]]()
+ val serializer = new CommitMessageSerializer()
+ write.prepareCommit().asScala.foreach {
+ case message: CommitMessageImpl =>
+ message.newFilesIncrement().newFiles().asScala.foreach {
+ dataFileMeta =>
+ bytesWritten += dataFileMeta.fileSize()
+ recordsWritten += dataFileMeta.rowCount()
+ }
+ commitMessages += serializer.serialize(message)
+ }
+ reportOutputMetrics(bytesWritten, recordsWritten)
+ commitMessages.iterator
+ }
+
+ override def close(): Unit = {
+ write.close()
+ ioManager.close()
+ }
+
+ private def toPaimonRow(row: Row) =
+ new SparkRow(rowType, row, SparkRowUtils.getRowKind(row, rowKindColIdx))
+
+ private def reportOutputMetrics(bytesWritten: Long, recordsWritten: Long): Unit = {
+ val taskContext = TaskContext.get
+ if (taskContext != null) {
+ PaimonUtils.updateOutputMetrics(
+ taskContext.taskMetrics.outputMetrics,
+ bytesWritten,
+ recordsWritten)
+ }
+ }
+}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index a5681e54e979..d12909b8a173 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -80,7 +80,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
val write = newWrite()
try {
iter.foreach(row => write.write(row))
- write.finish().asScala
+ write.finish()
} finally {
write.close()
}
@@ -99,7 +99,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
val write = newWrite()
try {
iter.foreach(row => write.write(row, row.getInt(bucketColIdx)))
- write.finish().asScala
+ write.finish()
} finally {
write.close()
}
@@ -117,7 +117,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
val write = newWrite()
try {
iter.foreach(row => write.write(row, assigner.apply(row)))
- write.finish().asScala
+ write.finish()
} finally {
write.close()
}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
index 06a255ebbc62..658d43848f45 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql
+import org.apache.spark.executor.OutputMetrics
import org.apache.spark.rdd.InputFileBlockHolder
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -78,4 +79,12 @@ object PaimonUtils {
def unsetInputFileName(): Unit = {
InputFileBlockHolder.unset()
}
+
+ def updateOutputMetrics(
+ outputMetrics: OutputMetrics,
+ bytesWritten: Long,
+ recordsWritten: Long): Unit = {
+ outputMetrics.setBytesWritten(bytesWritten)
+ outputMetrics.setRecordsWritten(recordsWritten)
+ }
}
diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
index f223dabdd2aa..8866428f8ba4 100644
--- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
+++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
@@ -21,6 +21,7 @@ package org.apache.paimon.spark.sql
import org.apache.paimon.spark.PaimonMetrics.{RESULTED_TABLE_FILES, SKIPPED_TABLE_FILES}
import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.connector.metric.CustomTaskMetric
import org.junit.jupiter.api.Assertions
@@ -60,6 +61,31 @@ class PaimonMetricTest extends PaimonSparkTestBase {
}
}
+ test("Paimon Metric: report output metric") {
+ sql(s"CREATE TABLE T (id int)")
+
+ var recordsWritten = 0L
+ var bytesWritten = 0L
+
+ val listener = new SparkListener() {
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+ val outputMetrics = taskEnd.taskMetrics.outputMetrics
+ recordsWritten += outputMetrics.recordsWritten
+ bytesWritten += outputMetrics.bytesWritten
+ }
+ }
+
+ try {
+ spark.sparkContext.addSparkListener(listener)
+ sql(s"INSERT INTO T VALUES 1, 2, 3")
+ } finally {
+ spark.sparkContext.removeSparkListener(listener)
+ }
+
+ Assertions.assertEquals(3, recordsWritten)
+ Assertions.assertTrue(bytesWritten > 0)
+ }
+
def metric(metrics: Array[CustomTaskMetric], name: String): Long = {
metrics.find(_.name() == name).get.value()
}