Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Add compaction when creating Iceberg compatible metadata #4005

Merged
merged 1 commit into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.table.FileStoreTable;
Expand All @@ -52,6 +54,7 @@
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.ManifestReadThreadPool;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;

Expand All @@ -66,6 +69,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
Expand All @@ -77,6 +81,15 @@ public abstract class AbstractIcebergCommitCallback implements CommitCallback {
// see org.apache.iceberg.hadoop.Util
private static final String VERSION_HINT_FILENAME = "version-hint.text";

static final ConfigOption<Integer> COMPACT_MIN_FILE_NUM =
ConfigOptions.key("metadata.iceberg.compaction.min.file-num")
.intType()
.defaultValue(10);
static final ConfigOption<Integer> COMPACT_MAX_FILE_NUM =
ConfigOptions.key("metadata.iceberg.compaction.max.file-num")
.intType()
.defaultValue(50);

protected final FileStoreTable table;
private final String commitUser;
private final IcebergPathFactory pathFactory;
Expand Down Expand Up @@ -289,7 +302,9 @@ private void createMetadataWithBase(
newManifestFileMetas = result.getLeft();
snapshotSummary = result.getRight();
}
String manifestListFileName = manifestList.writeWithoutRolling(newManifestFileMetas);
String manifestListFileName =
manifestList.writeWithoutRolling(
compactMetadataIfNeeded(newManifestFileMetas, snapshotId));

// add new schema if needed
int schemaId = (int) table.schema().id();
Expand Down Expand Up @@ -491,6 +506,10 @@ private List<IcebergManifestFileMeta> createNewlyAddedManifestFileMetas(
newManifestFileMetas.addAll(
manifestFile.rollingWrite(newEntries.iterator(), currentSnapshotId));
}
} else {
// partition of this file meta is not modified in this snapshot,
// use this file meta again
newManifestFileMetas.add(fileMeta);
}
}

Expand All @@ -509,6 +528,72 @@ private List<IcebergPartitionField> getPartitionFields(RowType partitionType) {
return result;
}

private List<IcebergManifestFileMeta> compactMetadataIfNeeded(
List<IcebergManifestFileMeta> toCompact, long currentSnapshotId) throws IOException {
List<IcebergManifestFileMeta> result = new ArrayList<>();
long targetSizeInBytes = table.coreOptions().manifestTargetSize().getBytes();

List<IcebergManifestFileMeta> candidates = new ArrayList<>();
long totalSizeInBytes = 0;
for (IcebergManifestFileMeta meta : toCompact) {
if (meta.manifestLength() < targetSizeInBytes * 2 / 3) {
candidates.add(meta);
totalSizeInBytes += meta.manifestLength();
} else {
result.add(meta);
}
}

Options options = new Options(table.options());
if (candidates.size() < options.get(COMPACT_MIN_FILE_NUM)) {
return toCompact;
}
if (candidates.size() < options.get(COMPACT_MAX_FILE_NUM)
&& totalSizeInBytes < targetSizeInBytes) {
return toCompact;
}

Function<IcebergManifestFileMeta, List<IcebergManifestEntry>> processor =
meta -> {
List<IcebergManifestEntry> entries = new ArrayList<>();
for (IcebergManifestEntry entry :
manifestFile.read(new Path(meta.manifestPath()).getName())) {
if (entry.fileSequenceNumber() == currentSnapshotId
|| entry.status() == IcebergManifestEntry.Status.EXISTING) {
entries.add(entry);
} else {
// rewrite status if this entry is from an older snapshot
IcebergManifestEntry.Status newStatus;
if (entry.status() == IcebergManifestEntry.Status.ADDED) {
newStatus = IcebergManifestEntry.Status.EXISTING;
} else if (entry.status() == IcebergManifestEntry.Status.DELETED) {
continue;
} else {
throw new UnsupportedOperationException(
"Unknown IcebergManifestEntry.Status " + entry.status());
}
entries.add(
new IcebergManifestEntry(
newStatus,
entry.snapshotId(),
entry.sequenceNumber(),
entry.fileSequenceNumber(),
entry.file()));
}
}
if (meta.sequenceNumber() == currentSnapshotId) {
// this file is created for this snapshot, so it is not recorded in any
// iceberg metas, we need to clean it
table.fileIO().deleteQuietly(new Path(meta.manifestPath()));
}
return entries;
};
Iterable<IcebergManifestEntry> newEntries =
ManifestReadThreadPool.sequentialBatchedExecute(processor, candidates, null);
result.addAll(manifestFile.rollingWrite(newEntries.iterator(), currentSnapshotId));
return result;
}

@Override
public void close() throws Exception {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
Expand Down Expand Up @@ -258,8 +259,8 @@ public void testUnPartitionedPrimaryKeyTable() throws Exception {
},
new String[] {"k1", "k2", "v1", "v2"});

int numRounds = 5;
int numRecords = 500;
int numRounds = 20;
int numRecords = 1000;
ThreadLocalRandom random = ThreadLocalRandom.current();
List<List<TestRecord>> testRecords = new ArrayList<>();
List<List<String>> expected = new ArrayList<>();
Expand Down Expand Up @@ -316,16 +317,18 @@ public void testPartitionedPrimaryKeyTable() throws Exception {
return b;
};

int numRounds = 2;
int numRecords = 3;
int numRounds = 20;
int numRecords = 500;
ThreadLocalRandom random = ThreadLocalRandom.current();
boolean samePartitionEachRound = random.nextBoolean();

List<List<TestRecord>> testRecords = new ArrayList<>();
List<List<String>> expected = new ArrayList<>();
Map<String, String> expectedMap = new LinkedHashMap<>();
for (int r = 0; r < numRounds; r++) {
List<TestRecord> round = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
int pt1 = random.nextInt(0, 2);
int pt1 = (random.nextInt(0, samePartitionEachRound ? 1 : 2) + r) % 3;
String pt2 = String.valueOf(random.nextInt(10, 12));
String k = String.valueOf(random.nextInt(0, 100));
int v1 = random.nextInt();
Expand Down Expand Up @@ -551,6 +554,10 @@ private FileStoreTable createPaimonTable(
options.set(CoreOptions.BUCKET, numBuckets);
options.set(CoreOptions.METADATA_ICEBERG_COMPATIBLE, true);
options.set(CoreOptions.FILE_FORMAT, "avro");
options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofKibiBytes(32));
options.set(AbstractIcebergCommitCallback.COMPACT_MIN_FILE_NUM, 4);
options.set(AbstractIcebergCommitCallback.COMPACT_MIN_FILE_NUM, 8);
options.set(CoreOptions.MANIFEST_TARGET_FILE_SIZE, MemorySize.ofKibiBytes(8));
Schema schema =
new Schema(rowType.getFields(), partitionKeys, primaryKeys, options.toMap(), "");

Expand Down
Loading