Skip to content

Commit

Permalink
[flink] Add IT cases for Iceberg compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper committed Oct 8, 2024
1 parent 58d3f90 commit e350080
Show file tree
Hide file tree
Showing 19 changed files with 403 additions and 51 deletions.
14 changes: 1 addition & 13 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1342,14 +1342,6 @@ public class CoreOptions implements Serializable {
.withDescription(
"When need to lookup, commit will wait for compaction by lookup.");

public static final ConfigOption<Boolean> METADATA_ICEBERG_COMPATIBLE =
key("metadata.iceberg-compatible")
.booleanType()
.defaultValue(false)
.withDescription(
"When set to true, produce Iceberg metadata after a snapshot is committed, "
+ "so that Iceberg readers can read Paimon's raw files.");

public static final ConfigOption<Integer> DELETE_FILE_THREAD_NUM =
key("delete-file.thread-num")
.intType()
Expand Down Expand Up @@ -2083,7 +2075,7 @@ private Map<String, String> callbacks(
Map<String, String> result = new HashMap<>();
for (String className : options.get(callbacks).split(",")) {
className = className.trim();
if (className.length() == 0) {
if (className.isEmpty()) {
continue;
}

Expand Down Expand Up @@ -2168,10 +2160,6 @@ public boolean asyncFileWrite() {
return options.get(ASYNC_FILE_WRITE);
}

public boolean metadataIcebergCompatible() {
return options.get(METADATA_ICEBERG_COMPATIBLE);
}

/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta;
import org.apache.paimon.iceberg.manifest.IcebergManifestList;
import org.apache.paimon.iceberg.manifest.IcebergPartitionSummary;
import org.apache.paimon.iceberg.metadata.IcebergDataField;
import org.apache.paimon.iceberg.metadata.IcebergMetadata;
import org.apache.paimon.iceberg.metadata.IcebergPartitionField;
import org.apache.paimon.iceberg.metadata.IcebergPartitionSpec;
Expand All @@ -40,8 +41,6 @@
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.schema.SchemaManager;
Expand All @@ -52,7 +51,6 @@
import org.apache.paimon.table.source.RawFile;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
Expand Down Expand Up @@ -85,15 +83,6 @@ public abstract class AbstractIcebergCommitCallback implements CommitCallback {
// see org.apache.iceberg.hadoop.Util
private static final String VERSION_HINT_FILENAME = "version-hint.text";

static final ConfigOption<Integer> COMPACT_MIN_FILE_NUM =
ConfigOptions.key("metadata.iceberg.compaction.min.file-num")
.intType()
.defaultValue(10);
static final ConfigOption<Integer> COMPACT_MAX_FILE_NUM =
ConfigOptions.key("metadata.iceberg.compaction.max.file-num")
.intType()
.defaultValue(50);

protected final FileStoreTable table;
private final String commitUser;
private final IcebergPathFactory pathFactory;
Expand All @@ -109,7 +98,37 @@ public abstract class AbstractIcebergCommitCallback implements CommitCallback {
public AbstractIcebergCommitCallback(FileStoreTable table, String commitUser) {
this.table = table;
this.commitUser = commitUser;
this.pathFactory = new IcebergPathFactory(table.location());

IcebergOptions.StorageType storageType =
table.coreOptions().toConfiguration().get(IcebergOptions.METADATA_ICEBERG_STORAGE);
switch (storageType) {
case PER_TABLE:
this.pathFactory = new IcebergPathFactory(new Path(table.location(), "metadata"));
break;
case ICEBERG_WAREHOUSE:
String tableName = table.location().getName();
Path dbPath = table.location().getParent();
if (dbPath.getName().endsWith(".db")) {
Path separatePath =
new Path(
dbPath.getParent(),
"iceberg/"
+ dbPath.getName()
.substring(0, dbPath.getName().length() - 3)
+ "/"
+ tableName
+ "/metadata");
this.pathFactory = new IcebergPathFactory(separatePath);
} else {
throw new UnsupportedOperationException(
"Storage type ICEBERG_WAREHOUSE can only be used on Paimon tables in a Paimon warehouse.");
}
break;
default:
throw new UnsupportedOperationException(
"Unknown storage type " + storageType.name());
}

this.fileStorePathFactory = table.store().pathFactory();
this.manifestFile = IcebergManifestFile.create(table, pathFactory);
this.manifestList = IcebergManifestList.create(table, pathFactory);
Expand Down Expand Up @@ -183,8 +202,9 @@ private void createMetadataWithoutBase(long snapshotId) throws IOException {
manifestFile.rollingWrite(entryIterator, snapshotId);
String manifestListFileName = manifestList.writeWithoutRolling(manifestFileMetas);

IcebergSchema icebergSchema = IcebergSchema.create(table.schema());
List<IcebergPartitionField> partitionFields =
getPartitionFields(table.schema().logicalPartitionType());
getPartitionFields(table.schema().partitionKeys(), icebergSchema);
int schemaId = (int) table.schema().id();
IcebergSnapshot snapshot =
new IcebergSnapshot(
Expand All @@ -202,7 +222,7 @@ private void createMetadataWithoutBase(long snapshotId) throws IOException {
table.location().toString(),
snapshotId,
table.schema().highestFieldId(),
Collections.singletonList(new IcebergSchema(table.schema())),
Collections.singletonList(icebergSchema),
schemaId,
Collections.singletonList(new IcebergPartitionSpec(partitionFields)),
partitionFields.stream()
Expand Down Expand Up @@ -250,11 +270,17 @@ private List<IcebergManifestEntry> dataSplitToManifestEntries(
return result;
}

private List<IcebergPartitionField> getPartitionFields(RowType partitionType) {
private List<IcebergPartitionField> getPartitionFields(
List<String> partitionKeys, IcebergSchema icebergSchema) {
Map<String, IcebergDataField> fields = new HashMap<>();
for (IcebergDataField field : icebergSchema.fields()) {
fields.put(field.name(), field);
}

List<IcebergPartitionField> result = new ArrayList<>();
int fieldId = IcebergPartitionField.FIRST_FIELD_ID;
for (DataField field : partitionType.getFields()) {
result.add(new IcebergPartitionField(field, fieldId));
for (String partitionKey : partitionKeys) {
result.add(new IcebergPartitionField(fields.get(partitionKey), fieldId));
fieldId++;
}
return result;
Expand Down Expand Up @@ -311,7 +337,7 @@ private void createMetadataWithBase(
List<IcebergSchema> schemas = baseMetadata.schemas();
if (baseMetadata.currentSchemaId() != schemaId) {
schemas = new ArrayList<>(schemas);
schemas.add(new IcebergSchema(table.schema()));
schemas.add(IcebergSchema.create(table.schema()));
}

List<IcebergSnapshot> snapshots = new ArrayList<>(baseMetadata.snapshots());
Expand Down Expand Up @@ -562,10 +588,10 @@ private List<IcebergManifestFileMeta> compactMetadataIfNeeded(
}

Options options = new Options(table.options());
if (candidates.size() < options.get(COMPACT_MIN_FILE_NUM)) {
if (candidates.size() < options.get(IcebergOptions.COMPACT_MIN_FILE_NUM)) {
return toCompact;
}
if (candidates.size() < options.get(COMPACT_MAX_FILE_NUM)
if (candidates.size() < options.get(IcebergOptions.COMPACT_MAX_FILE_NUM)
&& totalSizeInBytes < targetSizeInBytes) {
return toCompact;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.iceberg;

import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;

import static org.apache.paimon.options.ConfigOptions.key;

/** Config options for Paimon Iceberg compatibility. */
public class IcebergOptions {

public static final ConfigOption<StorageType> METADATA_ICEBERG_STORAGE =
key("metadata.iceberg.storage")
.enumType(StorageType.class)
.noDefaultValue()
.withDescription(
"When set, produce Iceberg metadata after a snapshot is committed, "
+ "so that Iceberg readers can read Paimon's raw files.\n"
+ "PER_TABLE: Store Iceberg metadata with each table.\n"
+ "ICEBERG_WAREHOUSE: Store Iceberg metadata in a separate directory. "
+ "This directory can be specified as the Iceberg warehouse directory.");

public static final ConfigOption<Integer> COMPACT_MIN_FILE_NUM =
ConfigOptions.key("metadata.iceberg.compaction.min.file-num")
.intType()
.defaultValue(10);

public static final ConfigOption<Integer> COMPACT_MAX_FILE_NUM =
ConfigOptions.key("metadata.iceberg.compaction.max.file-num")
.intType()
.defaultValue(50);

/** Where to store Iceberg metadata. */
public enum StorageType {
// Store Iceberg metadata with each table.
PER_TABLE,
// Store Iceberg metadata in a separate directory.
// This directory can be specified as the Iceberg warehouse directory.
ICEBERG_WAREHOUSE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ public class IcebergPathFactory {
private int manifestFileCount;
private int manifestListCount;

public IcebergPathFactory(Path root) {
this.metadataDirectory = new Path(root, "metadata");
public IcebergPathFactory(Path metadataDirectory) {
this.metadataDirectory = metadataDirectory;
this.uuid = UUID.randomUUID().toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ public class IcebergDataField {
@JsonProperty(FIELD_DOC)
private final String doc;

public IcebergDataField(DataField dataField) {
public IcebergDataField(DataField dataField, int bias) {
this(
dataField.id(),
dataField.id() + bias,
dataField.name(),
!dataField.type().isNullable(),
toTypeString(dataField.type()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.paimon.iceberg.metadata;

import org.apache.paimon.types.DataField;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
Expand Down Expand Up @@ -55,7 +53,7 @@ public class IcebergPartitionField {
@JsonProperty(FIELD_FIELD_ID)
private final int fieldId;

public IcebergPartitionField(DataField dataField, int fieldId) {
public IcebergPartitionField(IcebergDataField dataField, int fieldId) {
this(
dataField.name(),
// currently Paimon's partition value does not have any transformation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.iceberg.metadata;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.schema.TableSchema;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
Expand Down Expand Up @@ -50,11 +51,20 @@ public class IcebergSchema {
@JsonProperty(FIELD_FIELDS)
private final List<IcebergDataField> fields;

public IcebergSchema(TableSchema tableSchema) {
this(
public static IcebergSchema create(TableSchema tableSchema) {
int bias;
if (new CoreOptions(tableSchema.options()).formatType().equals("parquet")) {
// data files start with trimmed primary keys + sequence number + value kind
// also ParquetSchemaUtil.addFallbackIds starts enumerating id from 1 instead of 0
bias = tableSchema.trimmedPrimaryKeys().size() + 3;
} else {
bias = 0;
}

return new IcebergSchema(
(int) tableSchema.id(),
tableSchema.fields().stream()
.map(IcebergDataField::new)
.map(f -> new IcebergDataField(f, bias))
.collect(Collectors.toList()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.iceberg.AppendOnlyIcebergCommitCallback;
import org.apache.paimon.iceberg.IcebergOptions;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.operation.AppendOnlyFileStoreScan;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
Expand Down Expand Up @@ -165,7 +166,7 @@ protected List<CommitCallback> createCommitCallbacks(String commitUser) {
List<CommitCallback> callbacks = super.createCommitCallbacks(commitUser);
CoreOptions options = coreOptions();

if (options.metadataIcebergCompatible()) {
if (options.toConfiguration().contains(IcebergOptions.METADATA_ICEBERG_STORAGE)) {
callbacks.add(new AppendOnlyIcebergCommitCallback(this, commitUser));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.KeyValueFileStore;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.iceberg.IcebergOptions;
import org.apache.paimon.iceberg.PrimaryKeyIcebergCommitCallback;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.mergetree.compact.LookupMergeFunction;
Expand Down Expand Up @@ -185,7 +186,7 @@ protected List<CommitCallback> createCommitCallbacks(String commitUser) {
List<CommitCallback> callbacks = super.createCommitCallbacks(commitUser);
CoreOptions options = coreOptions();

if (options.metadataIcebergCompatible()) {
if (options.toConfiguration().contains(IcebergOptions.METADATA_ICEBERG_STORAGE)) {
callbacks.add(new PrimaryKeyIcebergCommitCallback(this, commitUser));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ public void testRetryCreateMetadata() throws Exception {
commit.commit(2, commitMessages2);
assertThat(table.latestSnapshotId()).hasValue(3L);

IcebergPathFactory pathFactory = new IcebergPathFactory(table.location());
IcebergPathFactory pathFactory =
new IcebergPathFactory(new Path(table.location(), "metadata"));
Path metadata3Path = pathFactory.toMetadataPath(3);
assertThat(table.fileIO().exists(metadata3Path)).isTrue();

Expand Down Expand Up @@ -296,7 +297,8 @@ public void testIcebergSnapshotExpire() throws Exception {
// Number of snapshots will become 5 with the next commit, however only 3 Iceberg snapshots
// are kept. So the first 2 Iceberg snapshots will be expired.

IcebergPathFactory pathFactory = new IcebergPathFactory(table.location());
IcebergPathFactory pathFactory =
new IcebergPathFactory(new Path(table.location(), "metadata"));
IcebergManifestList manifestList = IcebergManifestList.create(table, pathFactory);
Set<String> usingManifests = new HashSet<>();
for (IcebergManifestFileMeta fileMeta :
Expand Down Expand Up @@ -689,11 +691,11 @@ private FileStoreTable createPaimonTable(

Options options = new Options(customOptions);
options.set(CoreOptions.BUCKET, numBuckets);
options.set(CoreOptions.METADATA_ICEBERG_COMPATIBLE, true);
options.set(IcebergOptions.METADATA_ICEBERG_STORAGE, IcebergOptions.StorageType.PER_TABLE);
options.set(CoreOptions.FILE_FORMAT, "avro");
options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofKibiBytes(32));
options.set(AbstractIcebergCommitCallback.COMPACT_MIN_FILE_NUM, 4);
options.set(AbstractIcebergCommitCallback.COMPACT_MIN_FILE_NUM, 8);
options.set(IcebergOptions.COMPACT_MIN_FILE_NUM, 4);
options.set(IcebergOptions.COMPACT_MIN_FILE_NUM, 8);
options.set(CoreOptions.MANIFEST_TARGET_FILE_SIZE, MemorySize.ofKibiBytes(8));
Schema schema =
new Schema(rowType.getFields(), partitionKeys, primaryKeys, options.toMap(), "");
Expand Down
Loading

0 comments on commit e350080

Please sign in to comment.