Skip to content

Commit

Permalink
CommitCallback use
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang committed Jul 29, 2024
1 parent 5595881 commit 25b9d91
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ private static <T> List<T> loadCallbacks(
"Class " + clazz + " must implement " + expectClass);

try {
//
if (param == null) {
result.add((T) clazz.newInstance());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,4 +248,5 @@ public void testRecoverDeletedFiles() throws Exception {
"Cannot recover from this checkpoint because some files in the"
+ " snapshot that need to be resubmitted have been deleted");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql;

import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* @author askwang
* @date 2024/7/27
*/
public class AskwangCommitCallback implements CommitCallback {
private String appid;

public AskwangCommitCallback() {}

public AskwangCommitCallback(String appid) {
this.appid = appid;
}

@Override
public void call(List<ManifestCommittable> committables) {
Map<Integer, List<String>> bucketToFiles = new ConcurrentHashMap<>();

System.out.println("=====enter commit callback");
System.out.println("appid: " + appid);
for (ManifestCommittable committable : committables) {
for (CommitMessage message : committable.fileCommittables()) {
CommitMessageImpl msg = (CommitMessageImpl) message;
List<String> files = bucketToFiles.computeIfAbsent(msg.bucket(), f -> new ArrayList<>());
msg.newFilesIncrement().newFiles().stream()
.map(DataFileMeta::fileName).forEach(files::add);
}
}

bucketToFiles.forEach((bucket, files) -> {
System.out.println("bucket: " + bucket + "; files: " + files);
});
}

@Override
public void close() throws Exception {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@

package org.apache.paimon.spark.sql

import org.apache.paimon.CoreOptions
import org.apache.paimon.manifest.ManifestCommittable
import org.apache.paimon.spark.PaimonSparkTestBase

import org.apache.paimon.table.sink.CommitCallback
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.execution.SparkPlanner
import org.apache.spark.sql.internal.SQLConf

import java.util

/**
* @author
* askwang
Expand Down Expand Up @@ -70,6 +74,36 @@ class AskwangSQLQueryTest extends PaimonSparkTestBase {
}
}

test("commit callback") {
withTable("tb") {
withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> "false") {

spark.sql(
"""
|CREATE TABLE tb (id int, dt string) using paimon
|TBLPROPERTIES ('file.format'='parquet', 'primary-key'='id', 'bucket'='1')
|""".stripMargin)
// val ds = sql("INSERT INTO `tb` VALUES (cast(NULL as long),cast(NULL as string))")

val sp = spark

// "org.apache.paimon.spark.sql.AskwangCommitCallback"
println("commit class: " + classOf[AskwangCommitCallback].getName)
val location = loadTable("tb").location().toString

import sp.implicits._
val df1 = Seq((1, "a"), (2, "b")).toDF("a", "b")
df1.write.format("paimon")
.option(CoreOptions.COMMIT_CALLBACKS.key(), classOf[AskwangCommitCallback].getName)
.option(CoreOptions.COMMIT_CALLBACK_PARAM.key()
.replace("#", classOf[AskwangCommitCallback].getName), "appid-100")
.mode("append").save(location)
}
}
}



def explainPlan(query: String, spark: SparkSession) = {
val (parser, analyzer, optimizer, planner) = analysisEntry(spark)
val parsedPlan = parser.parsePlan(query)
Expand Down

0 comments on commit 25b9d91

Please sign in to comment.