From 2df0c1e4b33ec3e7c1d2ccc8a01fa816912423c0 Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Mon, 18 Mar 2024 19:05:48 +0800 Subject: [PATCH] [core] Introduce BatchTableCommit.truncateTable (#3037) --- .../benchmark/TableWriterBenchmark.java | 56 +++++++++---------- .../paimon/operation/FileStoreCommit.java | 2 +- .../paimon/operation/FileStoreCommitImpl.java | 2 +- .../paimon/table/sink/BatchTableCommit.java | 6 ++ .../paimon/table/sink/TableCommitImpl.java | 13 ++++- .../paimon/flink/sink/FlinkTableSink.java | 10 +--- ...pportsRowLevelOperationFlinkTableSink.java | 2 +- .../DeleteFromPaimonTableCommand.scala | 2 +- .../commands/PaimonTruncateTableCommand.scala | 2 +- 9 files changed, 52 insertions(+), 43 deletions(-) diff --git a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java index 5ced4248bffa..04d1d7342751 100644 --- a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java +++ b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/TableWriterBenchmark.java @@ -20,14 +20,13 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.options.Options; -import org.apache.paimon.table.sink.CommitMessage; -import org.apache.paimon.table.sink.StreamTableCommit; -import org.apache.paimon.table.sink.StreamTableWrite; -import org.apache.paimon.table.sink.StreamWriteBuilder; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; import org.junit.jupiter.api.Test; -import java.util.List; import java.util.concurrent.atomic.AtomicInteger; /** Benchmark for table writer. */ @@ -41,9 +40,9 @@ public void testAvro() throws Exception { /* * Java HotSpot(TM) 64-Bit Server VM 1.8.0_301-b09 on Mac OS X 10.16 * Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz - * avro: Best/Avg Time(ms) Row Rate(M/s) Per Row(ns) Relative + * avro: Best/Avg Time(ms) Row Rate(K/s) Per Row(ns) Relative * --------------------------------------------------------------------------------- - * avro_write 5847 / 7296 0.1 19489.5 1.0X + * avro_write 40309 / 41161 74.4 13436.3 1.0X */ } @@ -56,9 +55,9 @@ public void testAvroWithoutStats() throws Exception { /* * Java HotSpot(TM) 64-Bit Server VM 1.8.0_301-b09 on Mac OS X 10.16 * Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz - * avro: Best/Avg Time(ms) Row Rate(M/s) Per Row(ns) Relative + * avro: Best/Avg Time(ms) Row Rate(K/s) Per Row(ns) Relative * --------------------------------------------------------------------------------- - * avro_write 4701 / 5780 0.1 15669.6 1.0X + * avro_write 31817 / 32359 94.3 10605.6 1.0X */ } @@ -71,9 +70,9 @@ public void testOrcNoCompression() throws Exception { /* * Java HotSpot(TM) 64-Bit Server VM 1.8.0_301-b09 on Mac OS X 10.16 * Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz - * orc: Best/Avg Time(ms) Row Rate(M/s) Per Row(ns) Relative + * orc: Best/Avg Time(ms) Row Rate(K/s) Per Row(ns) Relative * --------------------------------------------------------------------------------- - * orc_write 8448 / 9584 0.0 28160.1 1.0X + * orc_write 32751 / 33032 91.6 10917.0 1.0X */ } @@ -85,9 +84,9 @@ public void testParquet() throws Exception { /* * Java HotSpot(TM) 64-Bit Server VM 1.8.0_301-b09 on Mac OS X 10.16 * Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz - * parquet: Best/Avg Time(ms) Row Rate(M/s) Per Row(ns) Relative + * parquet: Best/Avg Time(ms) Row Rate(K/s) Per Row(ns) Relative * --------------------------------------------------------------------------------- - * parquet_write 10872 / 12566 0.0 36239.7 1.0X + * parquet_write 46279 / 46715 64.8 15426.3 1.0X */ } @@ -99,44 +98,45 @@ public void testOrc() throws Exception { /* * Java HotSpot(TM) 64-Bit Server VM 1.8.0_301-b09 on Mac OS X 10.16 * Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz - * orc: Best/Avg Time(ms) Row Rate(M/s) Per Row(ns) Relative + * orc: Best/Avg Time(ms) Row Rate(K/s) Per Row(ns) Relative * --------------------------------------------------------------------------------- - * orc_write 8690 / 9771 0.0 28968.0 1.0X + * orc_write 36489 / 36697 82.2 12163.1 1.0X */ } public void innerTest(String name, Options options) throws Exception { options.set(CoreOptions.BUCKET, 1); - StreamWriteBuilder writeBuilder = createTable(options, "T").newStreamWriteBuilder(); - StreamTableWrite write = writeBuilder.newWrite(); - StreamTableCommit commit = writeBuilder.newCommit(); - long valuesPerIteration = 300_000; + Table table = createTable(options, "T"); + long valuesPerIteration = 3_000_000; Benchmark benchmark = new Benchmark(name, valuesPerIteration) .setNumWarmupIters(1) .setOutputPerIteration(true); AtomicInteger writeCount = new AtomicInteger(0); - AtomicInteger commitIdentifier = new AtomicInteger(0); benchmark.addCase( "write", - 5, + 3, () -> { + BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); + BatchTableWrite write = writeBuilder.newWrite(); + BatchTableCommit commit = writeBuilder.newCommit(); for (int i = 0; i < valuesPerIteration; i++) { try { write.write(newRandomRow()); writeCount.incrementAndGet(); - if (writeCount.get() % 10_000 == 0) { - List commitMessages = - write.prepareCommit(false, commitIdentifier.get()); - commit.commit(commitIdentifier.get(), commitMessages); - commitIdentifier.incrementAndGet(); - } } catch (Exception e) { throw new RuntimeException(e); } } + try { + commit.commit(write.prepareCommit()); + writeBuilder.newCommit().truncateTable(); + write.close(); + commit.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } }); benchmark.run(); - write.close(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java index fad4c2b70cfe..7151f25124b1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java @@ -78,7 +78,7 @@ void overwrite( */ void dropPartitions(List> partitions, long commitIdentifier); - void purgeTable(long commitIdentifier); + void truncateTable(long commitIdentifier); /** Abort an unsuccessful commit. The data files will be deleted. */ void abort(List commitMessages); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 46cfceb145c7..d3ce76b0a65d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -493,7 +493,7 @@ public void dropPartitions(List> partitions, long commitIden } @Override - public void purgeTable(long commitIdentifier) { + public void truncateTable(long commitIdentifier) { tryOverwrite( null, Collections.emptyList(), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableCommit.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableCommit.java index 894aec3e5eb6..f0c9b59e3178 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableCommit.java @@ -50,4 +50,10 @@ public interface BatchTableCommit extends TableCommit { * @param commitMessages commit messages from table write */ void commit(List commitMessages); + + /** + * Truncate table, like normal {@link #commit}, files are not immediately deleted, they are only + * logically deleted and will be deleted after the snapshot expires. + */ + void truncateTable(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index ab01943bbb60..c76b750a11b2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -65,6 +65,7 @@ import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.ExpireExecutionMode; +import static org.apache.paimon.table.sink.BatchWriteBuilder.COMMIT_IDENTIFIER; import static org.apache.paimon.utils.Preconditions.checkState; /** An abstraction layer above {@link FileStoreCommit} to provide snapshot commit and expiration. */ @@ -159,9 +160,19 @@ public Set filterCommitted(Set commitIdentifiers) { @Override public void commit(List commitMessages) { + checkCommitted(); + commit(COMMIT_IDENTIFIER, commitMessages); + } + + @Override + public void truncateTable() { + checkCommitted(); + commit.truncateTable(COMMIT_IDENTIFIER); + } + + private void checkCommitted() { checkState(!batchCommitted, "BatchTableCommit only support one-time committing."); batchCommitted = true; - commit(BatchWriteBuilder.COMMIT_IDENTIFIER, commitMessages); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java index 50bc45b752f8..b1211b0e738c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java @@ -19,10 +19,7 @@ package org.apache.paimon.flink.sink; import org.apache.paimon.flink.log.LogStoreTableFactory; -import org.apache.paimon.operation.FileStoreCommit; -import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; -import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.sink.abilities.SupportsTruncate; @@ -30,8 +27,6 @@ import javax.annotation.Nullable; -import java.util.UUID; - /** Table sink to create sink. */ public class FlinkTableSink extends SupportsRowLevelOperationFlinkTableSink implements SupportsTruncate { @@ -46,9 +41,6 @@ public FlinkTableSink( @Override public void executeTruncation() { - FileStoreCommit commit = - ((FileStoreTable) table).store().newCommit(UUID.randomUUID().toString()); - long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER; - commit.purgeTable(identifier); + table.newBatchWriteBuilder().newCommit().truncateTable(); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java index c45fd168f946..0d2bd3962dee 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java @@ -165,7 +165,7 @@ public Optional executeDeletion() { ((FileStoreTable) table).store().newCommit(UUID.randomUUID().toString()); long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER; if (deletePredicate == null) { - commit.purgeTable(identifier); + commit.truncateTable(identifier); return Optional.empty(); } else if (deleteIsDropPartition()) { commit.dropPartitions(Collections.singletonList(deletePartitions()), identifier); diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala index 3b8e801a2e3a..ff3aa253ca49 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala @@ -64,7 +64,7 @@ trait DeleteFromPaimonTableCommandBase extends PaimonLeafRunnableCommand with Pa if (forceDeleteByRows) { deleteRowsByCondition(sparkSession) } else if (deletePredicate.isEmpty) { - commit.purgeTable(BatchWriteBuilder.COMMIT_IDENTIFIER) + commit.truncateTable(BatchWriteBuilder.COMMIT_IDENTIFIER) } else { val visitor = new OnlyPartitionKeyEqualVisitor(table.partitionKeys) if (deletePredicate.get.visit(visitor)) { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala index 9ca27b631487..e9125e3e649d 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala @@ -40,7 +40,7 @@ case class PaimonTruncateTableCommand(v2Table: SparkTable, partitionSpec: TableP val commit = table.store.newCommit(UUID.randomUUID.toString) if (partitionSpec.isEmpty) { - commit.purgeTable(BatchWriteBuilder.COMMIT_IDENTIFIER) + commit.truncateTable(BatchWriteBuilder.COMMIT_IDENTIFIER) } else { commit.dropPartitions( Collections.singletonList(partitionSpec.asJava),