Skip to content

Commit

Permalink
[core] Add compaction when creating Iceberg compatible metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper committed Aug 20, 2024
1 parent 76b7f9c commit bcd9a60
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.table.FileStoreTable;
Expand All @@ -52,6 +54,7 @@
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.ManifestReadThreadPool;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;

Expand All @@ -66,6 +69,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
Expand All @@ -77,6 +81,15 @@ public abstract class AbstractIcebergCommitCallback implements CommitCallback {
// see org.apache.iceberg.hadoop.Util
private static final String VERSION_HINT_FILENAME = "version-hint.text";

static final ConfigOption<Integer> COMPACT_MIN_FILE_NUM =
ConfigOptions.key("metadata.iceberg.compaction.min.file-num")
.intType()
.defaultValue(10);
static final ConfigOption<Integer> COMPACT_MAX_FILE_NUM =
ConfigOptions.key("metadata.iceberg.compaction.max.file-num")
.intType()
.defaultValue(50);

protected final FileStoreTable table;
private final String commitUser;
private final IcebergPathFactory pathFactory;
Expand Down Expand Up @@ -289,7 +302,9 @@ private void createMetadataWithBase(
newManifestFileMetas = result.getLeft();
snapshotSummary = result.getRight();
}
String manifestListFileName = manifestList.writeWithoutRolling(newManifestFileMetas);
String manifestListFileName =
manifestList.writeWithoutRolling(
compactMetadataIfNeeded(newManifestFileMetas, snapshotId));

// add new schema if needed
int schemaId = (int) table.schema().id();
Expand Down Expand Up @@ -491,6 +506,10 @@ private List<IcebergManifestFileMeta> createNewlyAddedManifestFileMetas(
newManifestFileMetas.addAll(
manifestFile.rollingWrite(newEntries.iterator(), currentSnapshotId));
}
} else {
// partition of this file meta is not modified in this snapshot,
// use this file meta again
newManifestFileMetas.add(fileMeta);
}
}

Expand All @@ -509,6 +528,72 @@ private List<IcebergPartitionField> getPartitionFields(RowType partitionType) {
return result;
}

private List<IcebergManifestFileMeta> compactMetadataIfNeeded(
List<IcebergManifestFileMeta> toCompact, long currentSnapshotId) throws IOException {
List<IcebergManifestFileMeta> result = new ArrayList<>();
long targetSizeInBytes = table.coreOptions().manifestTargetSize().getBytes();

List<IcebergManifestFileMeta> candidates = new ArrayList<>();
long totalSizeInBytes = 0;
for (IcebergManifestFileMeta meta : toCompact) {
if (meta.manifestLength() < targetSizeInBytes * 2 / 3) {
candidates.add(meta);
totalSizeInBytes += meta.manifestLength();
} else {
result.add(meta);
}
}

Options options = new Options(table.options());
if (candidates.size() < options.get(COMPACT_MIN_FILE_NUM)) {
return toCompact;
}
if (candidates.size() < options.get(COMPACT_MAX_FILE_NUM)
&& totalSizeInBytes < targetSizeInBytes) {
return toCompact;
}

Function<IcebergManifestFileMeta, List<IcebergManifestEntry>> processor =
meta -> {
List<IcebergManifestEntry> entries = new ArrayList<>();
for (IcebergManifestEntry entry :
manifestFile.read(new Path(meta.manifestPath()).getName())) {
if (entry.fileSequenceNumber() == currentSnapshotId
|| entry.status() == IcebergManifestEntry.Status.EXISTING) {
entries.add(entry);
} else {
// rewrite status if this entry is from an older snapshot
IcebergManifestEntry.Status newStatus;
if (entry.status() == IcebergManifestEntry.Status.ADDED) {
newStatus = IcebergManifestEntry.Status.EXISTING;
} else if (entry.status() == IcebergManifestEntry.Status.DELETED) {
continue;
} else {
throw new UnsupportedOperationException(
"Unknown IcebergManifestEntry.Status " + entry.status());
}
entries.add(
new IcebergManifestEntry(
newStatus,
entry.snapshotId(),
entry.sequenceNumber(),
entry.fileSequenceNumber(),
entry.file()));
}
}
if (meta.sequenceNumber() == currentSnapshotId) {
// this file is created for this snapshot, so it is not recorded in any
// iceberg metas, we need to clean it
table.fileIO().deleteQuietly(new Path(meta.manifestPath()));
}
return entries;
};
Iterable<IcebergManifestEntry> newEntries =
ManifestReadThreadPool.sequentialBatchedExecute(processor, candidates, null);
result.addAll(manifestFile.rollingWrite(newEntries.iterator(), currentSnapshotId));
return result;
}

@Override
public void close() throws Exception {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
Expand Down Expand Up @@ -258,8 +259,8 @@ public void testUnPartitionedPrimaryKeyTable() throws Exception {
},
new String[] {"k1", "k2", "v1", "v2"});

int numRounds = 5;
int numRecords = 500;
int numRounds = 20;
int numRecords = 1000;
ThreadLocalRandom random = ThreadLocalRandom.current();
List<List<TestRecord>> testRecords = new ArrayList<>();
List<List<String>> expected = new ArrayList<>();
Expand Down Expand Up @@ -316,16 +317,18 @@ public void testPartitionedPrimaryKeyTable() throws Exception {
return b;
};

int numRounds = 2;
int numRecords = 3;
int numRounds = 20;
int numRecords = 500;
ThreadLocalRandom random = ThreadLocalRandom.current();
boolean samePartitionEachRound = random.nextBoolean();

List<List<TestRecord>> testRecords = new ArrayList<>();
List<List<String>> expected = new ArrayList<>();
Map<String, String> expectedMap = new LinkedHashMap<>();
for (int r = 0; r < numRounds; r++) {
List<TestRecord> round = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
int pt1 = random.nextInt(0, 2);
int pt1 = (random.nextInt(0, samePartitionEachRound ? 1 : 2) + r) % 3;
String pt2 = String.valueOf(random.nextInt(10, 12));
String k = String.valueOf(random.nextInt(0, 100));
int v1 = random.nextInt();
Expand Down Expand Up @@ -551,6 +554,10 @@ private FileStoreTable createPaimonTable(
options.set(CoreOptions.BUCKET, numBuckets);
options.set(CoreOptions.METADATA_ICEBERG_COMPATIBLE, true);
options.set(CoreOptions.FILE_FORMAT, "avro");
options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofKibiBytes(32));
options.set(AbstractIcebergCommitCallback.COMPACT_MIN_FILE_NUM, 4);
options.set(AbstractIcebergCommitCallback.COMPACT_MIN_FILE_NUM, 8);
options.set(CoreOptions.MANIFEST_TARGET_FILE_SIZE, MemorySize.ofKibiBytes(8));
Schema schema =
new Schema(rowType.getFields(), partitionKeys, primaryKeys, options.toMap(), "");

Expand Down

0 comments on commit bcd9a60

Please sign in to comment.