Skip to content

Commit

Permalink
[core] Introduce BatchTableCommit.truncateTable (apache#3037)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Mar 18, 2024
1 parent b6dd27a commit 2df0c1e
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;

import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/** Benchmark for table writer. */
Expand All @@ -41,9 +40,9 @@ public void testAvro() throws Exception {
/*
* Java HotSpot(TM) 64-Bit Server VM 1.8.0_301-b09 on Mac OS X 10.16
* Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
* avro: Best/Avg Time(ms) Row Rate(M/s) Per Row(ns) Relative
* avro: Best/Avg Time(ms) Row Rate(K/s) Per Row(ns) Relative
* ---------------------------------------------------------------------------------
* avro_write 5847 / 7296 0.1 19489.5 1.0X
* avro_write 40309 / 41161 74.4 13436.3 1.0X
*/
}

Expand All @@ -56,9 +55,9 @@ public void testAvroWithoutStats() throws Exception {
/*
* Java HotSpot(TM) 64-Bit Server VM 1.8.0_301-b09 on Mac OS X 10.16
* Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
* avro: Best/Avg Time(ms) Row Rate(M/s) Per Row(ns) Relative
* avro: Best/Avg Time(ms) Row Rate(K/s) Per Row(ns) Relative
* ---------------------------------------------------------------------------------
* avro_write 4701 / 5780 0.1 15669.6 1.0X
* avro_write 31817 / 32359 94.3 10605.6 1.0X
*/
}

Expand All @@ -71,9 +70,9 @@ public void testOrcNoCompression() throws Exception {
/*
* Java HotSpot(TM) 64-Bit Server VM 1.8.0_301-b09 on Mac OS X 10.16
* Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
* orc: Best/Avg Time(ms) Row Rate(M/s) Per Row(ns) Relative
* orc: Best/Avg Time(ms) Row Rate(K/s) Per Row(ns) Relative
* ---------------------------------------------------------------------------------
* orc_write 8448 / 9584 0.0 28160.1 1.0X
* orc_write 32751 / 33032 91.6 10917.0 1.0X
*/
}

Expand All @@ -85,9 +84,9 @@ public void testParquet() throws Exception {
/*
* Java HotSpot(TM) 64-Bit Server VM 1.8.0_301-b09 on Mac OS X 10.16
* Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
* parquet: Best/Avg Time(ms) Row Rate(M/s) Per Row(ns) Relative
* parquet: Best/Avg Time(ms) Row Rate(K/s) Per Row(ns) Relative
* ---------------------------------------------------------------------------------
* parquet_write 10872 / 12566 0.0 36239.7 1.0X
* parquet_write 46279 / 46715 64.8 15426.3 1.0X
*/
}

Expand All @@ -99,44 +98,45 @@ public void testOrc() throws Exception {
/*
* Java HotSpot(TM) 64-Bit Server VM 1.8.0_301-b09 on Mac OS X 10.16
* Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
* orc: Best/Avg Time(ms) Row Rate(M/s) Per Row(ns) Relative
* orc: Best/Avg Time(ms) Row Rate(K/s) Per Row(ns) Relative
* ---------------------------------------------------------------------------------
* orc_write 8690 / 9771 0.0 28968.0 1.0X
* orc_write 36489 / 36697 82.2 12163.1 1.0X
*/
}

public void innerTest(String name, Options options) throws Exception {
options.set(CoreOptions.BUCKET, 1);
StreamWriteBuilder writeBuilder = createTable(options, "T").newStreamWriteBuilder();
StreamTableWrite write = writeBuilder.newWrite();
StreamTableCommit commit = writeBuilder.newCommit();
long valuesPerIteration = 300_000;
Table table = createTable(options, "T");
long valuesPerIteration = 3_000_000;
Benchmark benchmark =
new Benchmark(name, valuesPerIteration)
.setNumWarmupIters(1)
.setOutputPerIteration(true);
AtomicInteger writeCount = new AtomicInteger(0);
AtomicInteger commitIdentifier = new AtomicInteger(0);
benchmark.addCase(
"write",
5,
3,
() -> {
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
BatchTableWrite write = writeBuilder.newWrite();
BatchTableCommit commit = writeBuilder.newCommit();
for (int i = 0; i < valuesPerIteration; i++) {
try {
write.write(newRandomRow());
writeCount.incrementAndGet();
if (writeCount.get() % 10_000 == 0) {
List<CommitMessage> commitMessages =
write.prepareCommit(false, commitIdentifier.get());
commit.commit(commitIdentifier.get(), commitMessages);
commitIdentifier.incrementAndGet();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
try {
commit.commit(write.prepareCommit());
writeBuilder.newCommit().truncateTable();
write.close();
commit.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
benchmark.run();
write.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ void overwrite(
*/
void dropPartitions(List<Map<String, String>> partitions, long commitIdentifier);

void purgeTable(long commitIdentifier);
void truncateTable(long commitIdentifier);

/** Abort an unsuccessful commit. The data files will be deleted. */
void abort(List<CommitMessage> commitMessages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ public void dropPartitions(List<Map<String, String>> partitions, long commitIden
}

@Override
public void purgeTable(long commitIdentifier) {
public void truncateTable(long commitIdentifier) {
tryOverwrite(
null,
Collections.emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,10 @@ public interface BatchTableCommit extends TableCommit {
* @param commitMessages commit messages from table write
*/
void commit(List<CommitMessage> commitMessages);

/**
* Truncate table, like normal {@link #commit}, files are not immediately deleted, they are only
* logically deleted and will be deleted after the snapshot expires.
*/
void truncateTable();
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.ExpireExecutionMode;
import static org.apache.paimon.table.sink.BatchWriteBuilder.COMMIT_IDENTIFIER;
import static org.apache.paimon.utils.Preconditions.checkState;

/** An abstraction layer above {@link FileStoreCommit} to provide snapshot commit and expiration. */
Expand Down Expand Up @@ -159,9 +160,19 @@ public Set<Long> filterCommitted(Set<Long> commitIdentifiers) {

@Override
public void commit(List<CommitMessage> commitMessages) {
checkCommitted();
commit(COMMIT_IDENTIFIER, commitMessages);
}

@Override
public void truncateTable() {
checkCommitted();
commit.truncateTable(COMMIT_IDENTIFIER);
}

private void checkCommitted() {
checkState(!batchCommitted, "BatchTableCommit only support one-time committing.");
batchCommitted = true;
commit(BatchWriteBuilder.COMMIT_IDENTIFIER, commitMessages);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,14 @@
package org.apache.paimon.flink.sink;

import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchWriteBuilder;

import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.sink.abilities.SupportsTruncate;
import org.apache.flink.table.factories.DynamicTableFactory;

import javax.annotation.Nullable;

import java.util.UUID;

/** Table sink to create sink. */
public class FlinkTableSink extends SupportsRowLevelOperationFlinkTableSink
implements SupportsTruncate {
Expand All @@ -46,9 +41,6 @@ public FlinkTableSink(

@Override
public void executeTruncation() {
FileStoreCommit commit =
((FileStoreTable) table).store().newCommit(UUID.randomUUID().toString());
long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
commit.purgeTable(identifier);
table.newBatchWriteBuilder().newCommit().truncateTable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public Optional<Long> executeDeletion() {
((FileStoreTable) table).store().newCommit(UUID.randomUUID().toString());
long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
if (deletePredicate == null) {
commit.purgeTable(identifier);
commit.truncateTable(identifier);
return Optional.empty();
} else if (deleteIsDropPartition()) {
commit.dropPartitions(Collections.singletonList(deletePartitions()), identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ trait DeleteFromPaimonTableCommandBase extends PaimonLeafRunnableCommand with Pa
if (forceDeleteByRows) {
deleteRowsByCondition(sparkSession)
} else if (deletePredicate.isEmpty) {
commit.purgeTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
commit.truncateTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
} else {
val visitor = new OnlyPartitionKeyEqualVisitor(table.partitionKeys)
if (deletePredicate.get.visit(visitor)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ case class PaimonTruncateTableCommand(v2Table: SparkTable, partitionSpec: TableP
val commit = table.store.newCommit(UUID.randomUUID.toString)

if (partitionSpec.isEmpty) {
commit.purgeTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
commit.truncateTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
} else {
commit.dropPartitions(
Collections.singletonList(partitionSpec.asJava),
Expand Down

0 comments on commit 2df0c1e

Please sign in to comment.