diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java index 7d8a0a849cba1..42702298d2a85 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CallbackUtils.java @@ -57,6 +57,7 @@ private static List loadCallbacks( "Class " + clazz + " must implement " + expectClass); try { + // if (param == null) { result.add((T) clazz.newInstance()); } else { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java index d9aa14ce64906..07dd6692e4506 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java @@ -248,4 +248,5 @@ public void testRecoverDeletedFiles() throws Exception { "Cannot recover from this checkpoint because some files in the" + " snapshot that need to be resubmitted have been deleted"); } + } diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/sql/AskwangCommitCallback.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/sql/AskwangCommitCallback.java new file mode 100644 index 0000000000000..5f7cf31d40b9f --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/sql/AskwangCommitCallback.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql; + +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.table.sink.CommitCallback; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author askwang + * @date 2024/7/27 + */ +public class AskwangCommitCallback implements CommitCallback { + private String appid; + + public AskwangCommitCallback() {} + + public AskwangCommitCallback(String appid) { + this.appid = appid; + } + + @Override + public void call(List committables) { + Map> bucketToFiles = new ConcurrentHashMap<>(); + + System.out.println("=====enter commit callback"); + System.out.println("appid: " + appid); + for (ManifestCommittable committable : committables) { + for (CommitMessage message : committable.fileCommittables()) { + CommitMessageImpl msg = (CommitMessageImpl) message; + List files = bucketToFiles.computeIfAbsent(msg.bucket(), f -> new ArrayList<>()); + msg.newFilesIncrement().newFiles().stream() + .map(DataFileMeta::fileName).forEach(files::add); + } + } + + bucketToFiles.forEach((bucket, files) -> { + System.out.println("bucket: " + bucket + "; files: " + files); + }); + } + + @Override + public void close() throws Exception {} +} diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AskwangSQLQueryTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AskwangSQLQueryTest.scala index 4fab9cc7228f8..de332c991ccba 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AskwangSQLQueryTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AskwangSQLQueryTest.scala @@ -18,8 +18,10 @@ package org.apache.paimon.spark.sql +import org.apache.paimon.CoreOptions +import org.apache.paimon.manifest.ManifestCommittable import org.apache.paimon.spark.PaimonSparkTestBase - +import org.apache.paimon.table.sink.CommitCallback import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.analysis.Analyzer import org.apache.spark.sql.catalyst.optimizer.Optimizer @@ -27,6 +29,8 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.execution.SparkPlanner import org.apache.spark.sql.internal.SQLConf +import java.util + /** * @author * askwang @@ -70,6 +74,36 @@ class AskwangSQLQueryTest extends PaimonSparkTestBase { } } + test("commit callback") { + withTable("tb") { + withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "false") { + + spark.sql( + """ + |CREATE TABLE tb (id int, dt string) using paimon + |TBLPROPERTIES ('file.format'='parquet', 'primary-key'='id', 'bucket'='1') + |""".stripMargin) + // val ds = sql("INSERT INTO `tb` VALUES (cast(NULL as long),cast(NULL as string))") + + val sp = spark + + // "org.apache.paimon.spark.sql.AskwangCommitCallback" + println("commit class: " + classOf[AskwangCommitCallback].getName) + val location = loadTable("tb").location().toString + + import sp.implicits._ + val df1 = Seq((1, "a"), (2, "b")).toDF("a", "b") + df1.write.format("paimon") + .option(CoreOptions.COMMIT_CALLBACKS.key(), classOf[AskwangCommitCallback].getName) + .option(CoreOptions.COMMIT_CALLBACK_PARAM.key() + .replace("#", classOf[AskwangCommitCallback].getName), "appid-100") + .mode("append").save(location) + } + } + } + + + def explainPlan(query: String, spark: SparkSession) = { val (parser, analyzer, optimizer, planner) = analysisEntry(spark) val parsedPlan = parser.parsePlan(query)