Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Dec 5, 2023
1 parent 1758644 commit 9117e13
Show file tree
Hide file tree
Showing 9 changed files with 278 additions and 43 deletions.
4 changes: 2 additions & 2 deletions docs/content/maintenance/dedicated-compaction.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ For more usage of the compact_database action, see
## Sort Compact
If your table is configured with [dynamic bucket]({{< ref "concepts/primary-key-table#dynamic-bucket" >}})
or [append table]({{< ref "concepts/append-only-table#append-for-scalable-table" >}}) ,
If your table is configured with [dynamic bucket primary key table]({{< ref "concepts/primary-key-table#dynamic-bucket" >}})
or [unaware bucket append table]({{< ref "concepts/append-only-table#append-for-scalable-table" >}}) ,
you can trigger a compact with specified column sort to speed up queries.
```bash
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.apache.paimon.utils;

import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.types.RowType;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -34,6 +38,14 @@ public static List<Map<String, String>> getPartitions(String... partitionStrings
return partitions;
}

public static Predicate getPartitionFilter(
List<Map<String, String>> specifiedPartitions, RowType rowType) {
return PredicateBuilder.or(
specifiedPartitions.stream()
.map(p -> PredicateBuilder.partition(p, rowType))
.toArray(Predicate[]::new));
}

public static Map<String, String> parseCommaSeparatedKeyValues(String keyValues) {
Map<String, String> kvs = new HashMap<>();
if (!StringUtils.isBlank(keyValues)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,12 @@ public void addFiles(List<DataFileMeta> dataFileMetas) {
}

public boolean readyToRemove() {
return toCompact.size() == 0 || age > REMOVE_AGE;
return toCompact.isEmpty() || age > REMOVE_AGE;
}

private List<List<DataFileMeta>> agePack() {
List<List<DataFileMeta>> packed = pack();
if (packed.size() == 0) {
if (packed.isEmpty()) {
// non-packed, we need to grow up age, and check whether to compact once
if (++age > COMPACT_AGE && toCompact.size() > 1) {
List<DataFileMeta> all = new ArrayList<>(toCompact);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void testCompactLessFile() {
assertThat(compactionCoordinator.partitionCompactCoordinators.size()).isEqualTo(1);
}

// age enough, generate less file comaction
// age enough, generate less file compaction
List<AppendOnlyCompactionTask> tasks = compactionCoordinator.compactPlan();
assertThat(tasks.size()).isEqualTo(1);
assertThat(new HashSet<>(files))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@
import org.apache.paimon.flink.sink.UnawareBucketCompactionSink;
import org.apache.paimon.flink.source.BucketUnawareCompactSource;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.AppendOnlyFileStoreTable;
import org.apache.paimon.utils.ParameterUtils;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
Expand Down Expand Up @@ -95,7 +94,13 @@ private DataStreamSource<AppendOnlyCompactionTask> buildSource() {
long scanInterval = table.coreOptions().continuousDiscoveryInterval().toMillis();
BucketUnawareCompactSource source =
new BucketUnawareCompactSource(
table, isContinuous, scanInterval, getPartitionFilter());
table,
isContinuous,
scanInterval,
specifiedPartitions != null
? ParameterUtils.getPartitionFilter(
specifiedPartitions, table.rowType())
: null);

return BucketUnawareCompactSource.buildSource(env, source, isContinuous, tableIdentifier);
}
Expand Down Expand Up @@ -123,16 +128,4 @@ private DataStream<AppendOnlyCompactionTask> rebalanceInput(
}
return new DataStream<>(env, transformation);
}

private Predicate getPartitionFilter() {
Predicate partitionPredicate = null;
if (specifiedPartitions != null) {
partitionPredicate =
PredicateBuilder.or(
specifiedPartitions.stream()
.map(p -> PredicateBuilder.partition(p, table.rowType()))
.toArray(Predicate[]::new));
}
return partitionPredicate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,36 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.append.AppendOnlyCompactionTask;
import org.apache.paimon.append.AppendOnlyTableCompactionCoordinator;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.spark.DynamicOverWrite$;
import org.apache.paimon.spark.SparkUtils;
import org.apache.paimon.spark.commands.WriteIntoPaimonTable;
import org.apache.paimon.spark.sort.TableSorter;
import org.apache.paimon.table.AppendOnlyFileStoreTable;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CompactionTaskSerializer;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
Expand All @@ -42,11 +62,15 @@

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import static org.apache.spark.sql.types.DataTypes.StringType;

Expand Down Expand Up @@ -85,7 +109,7 @@ public StructType outputType() {
public InternalRow[] call(InternalRow args) {
Preconditions.checkArgument(args.numFields() >= 1);
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
String partitionFilter = blank(args, 1) ? null : toWhere(args.getString(1));
String partitions = blank(args, 1) ? null : args.getString(1);
String sortType = blank(args, 2) ? TableSorter.OrderType.NONE.name() : args.getString(2);
List<String> sortColumns =
blank(args, 3)
Expand All @@ -106,7 +130,7 @@ public InternalRow[] call(InternalRow args) {
(FileStoreTable) table,
sortType,
sortColumns,
partitionFilter));
partitions));
return new InternalRow[] {internalRow};
});
}
Expand All @@ -124,28 +148,155 @@ private boolean execute(
FileStoreTable table,
String sortType,
List<String> sortColumns,
@Nullable String filter) {
CoreOptions coreOptions = table.store().options();

// sort only works with bucket=-1 yet
if (!TableSorter.OrderType.of(sortType).equals(TableSorter.OrderType.NONE)) {
if (!(table instanceof AppendOnlyFileStoreTable) || coreOptions.bucket() != -1) {
throw new UnsupportedOperationException(
"Spark compact with sort_type "
+ sortType
+ " only support unaware-bucket append-only table yet.");
@Nullable String partitions) {
table = table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
BucketMode bucketMode = table.bucketMode();
TableSorter.OrderType orderType = TableSorter.OrderType.of(sortType);

if (orderType.equals(TableSorter.OrderType.NONE)) {
JavaSparkContext javaSparkContext = new JavaSparkContext(spark().sparkContext());
Predicate filter =
StringUtils.isBlank(partitions)
? null
: ParameterUtils.getPartitionFilter(
ParameterUtils.getPartitions(partitions), table.rowType());
switch (bucketMode) {
case FIXED:
case DYNAMIC:
compactAwareBucketTable(table, filter, javaSparkContext);
break;
case UNAWARE:
compactUnAwareBucketTable(table, filter, javaSparkContext);
break;
default:
throw new UnsupportedOperationException(
"Spark compact with " + bucketMode + " is not support yet.");
}
} else {
switch (bucketMode) {
case UNAWARE:
sortCompactUnAwareTable(table, orderType, sortColumns, partitions);
break;
default:
throw new UnsupportedOperationException(
"Spark compact with sort_type "
+ sortType
+ " only support unaware-bucket append-only table yet.");
}
}
return true;
}

private void compactAwareBucketTable(
FileStoreTable table, @Nullable Predicate filter, JavaSparkContext javaSparkContext) {
InnerTableScan scan = table.newScan();
if (filter != null) {
scan.withFilter(filter);
}

List<Split> splits = scan.plan().splits();
if (splits.isEmpty()) {
return;
}

BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
JavaRDD<CommitMessage> commitMessageJavaRDD =
javaSparkContext
.parallelize(splits)
.mapPartitions(
(FlatMapFunction<Iterator<Split>, CommitMessage>)
splitIterator -> {
IOManager ioManager = SparkUtils.createIOManager();
BatchTableWrite write = writeBuilder.newWrite();
write.withIOManager(ioManager);
try {
while (splitIterator.hasNext()) {
DataSplit dataSplit =
(DataSplit) splitIterator.next();
BinaryRow partition = dataSplit.partition();
int bucket = dataSplit.bucket();
write.compact(partition, bucket, true);
}
return write.prepareCommit().iterator();
} finally {
write.close();
ioManager.close();
}
});

try (BatchTableCommit commit = writeBuilder.newCommit()) {
commit.commit(commitMessageJavaRDD.collect());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private void compactUnAwareBucketTable(
FileStoreTable table, @Nullable Predicate filter, JavaSparkContext javaSparkContext) {
AppendOnlyFileStoreTable fileStoreTable = (AppendOnlyFileStoreTable) table;
List<AppendOnlyCompactionTask> compactionTasks =
new AppendOnlyTableCompactionCoordinator(fileStoreTable, false, filter).run();
if (compactionTasks.isEmpty()) {
return;
}

CompactionTaskSerializer serializer = new CompactionTaskSerializer();
List<byte[]> serializedTasks = new ArrayList<>();
try {
for (AppendOnlyCompactionTask compactionTask : compactionTasks) {
serializedTasks.add(serializer.serialize(compactionTask));
}
} catch (IOException e) {
throw new RuntimeException("serialize compaction task failed");
}

String commitUser = UUID.randomUUID().toString();
JavaRDD<CommitMessage> commitMessageJavaRDD =
javaSparkContext
.parallelize(serializedTasks)
.mapPartitions(
(FlatMapFunction<Iterator<byte[]>, CommitMessage>)
taskIterator -> {
AppendOnlyFileStoreWrite write =
fileStoreTable.store().newWrite(commitUser);
CompactionTaskSerializer ser =
new CompactionTaskSerializer();
ArrayList<CommitMessage> messages = new ArrayList<>();
try {
while (taskIterator.hasNext()) {
AppendOnlyCompactionTask task =
ser.deserialize(
ser.getVersion(),
taskIterator.next());
messages.add(task.doCompact(write));
}
return messages.iterator();
} finally {
write.close();
}
});

try (TableCommitImpl commit = table.newCommit(commitUser)) {
commit.commit(commitMessageJavaRDD.collect());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

Dataset<Row> row = spark().read().format("paimon").load(coreOptions.path().toString());
row = StringUtils.isBlank(filter) ? row : row.where(filter);
private void sortCompactUnAwareTable(
FileStoreTable table,
TableSorter.OrderType orderType,
List<String> sortColumns,
@Nullable String partitions) {
Dataset<Row> row =
spark().read().format("paimon").load(table.coreOptions().path().toString());
row = StringUtils.isBlank(partitions) ? row : row.where(toWhere(partitions));
new WriteIntoPaimonTable(
table,
DynamicOverWrite$.MODULE$,
TableSorter.getSorter(table, sortType, sortColumns).sort(row),
TableSorter.getSorter(table, orderType, sortColumns).sort(row),
new Options())
.run(spark());
return true;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ private void checkColNames() {
public abstract Dataset<Row> sort(Dataset<Row> input);

public static TableSorter getSorter(
FileStoreTable table, String sortStrategy, List<String> orderColumns) {
switch (OrderType.of(sortStrategy)) {
FileStoreTable table, TableSorter.OrderType orderType, List<String> orderColumns) {
switch (orderType) {
case ORDER:
return new OrderSorter(table, orderColumns);
case ZORDER:
Expand All @@ -79,7 +79,7 @@ public Dataset<Row> sort(Dataset<Row> input) {
}
};
default:
throw new IllegalArgumentException("cannot match order type: " + sortStrategy);
throw new IllegalArgumentException("cannot match order type: " + orderType);
}
}

Expand Down
Loading

0 comments on commit 9117e13

Please sign in to comment.