diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index bc5af04d6de6..e53a04c4f31e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1342,14 +1342,6 @@ public class CoreOptions implements Serializable { .withDescription( "When need to lookup, commit will wait for compaction by lookup."); - public static final ConfigOption METADATA_ICEBERG_COMPATIBLE = - key("metadata.iceberg-compatible") - .booleanType() - .defaultValue(false) - .withDescription( - "When set to true, produce Iceberg metadata after a snapshot is committed, " - + "so that Iceberg readers can read Paimon's raw files."); - public static final ConfigOption DELETE_FILE_THREAD_NUM = key("delete-file.thread-num") .intType() @@ -2083,7 +2075,7 @@ private Map callbacks( Map result = new HashMap<>(); for (String className : options.get(callbacks).split(",")) { className = className.trim(); - if (className.length() == 0) { + if (className.isEmpty()) { continue; } @@ -2168,10 +2160,6 @@ public boolean asyncFileWrite() { return options.get(ASYNC_FILE_WRITE); } - public boolean metadataIcebergCompatible() { - return options.get(METADATA_ICEBERG_COMPATIBLE); - } - /** Specifies the merge engine for table with primary key. */ public enum MergeEngine implements DescribedEnum { DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."), diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java index e78ca5818d28..c6ebe6609faa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java @@ -31,6 +31,7 @@ import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta; import org.apache.paimon.iceberg.manifest.IcebergManifestList; import org.apache.paimon.iceberg.manifest.IcebergPartitionSummary; +import org.apache.paimon.iceberg.metadata.IcebergDataField; import org.apache.paimon.iceberg.metadata.IcebergMetadata; import org.apache.paimon.iceberg.metadata.IcebergPartitionField; import org.apache.paimon.iceberg.metadata.IcebergPartitionSpec; @@ -40,8 +41,6 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestEntry; -import org.apache.paimon.options.ConfigOption; -import org.apache.paimon.options.ConfigOptions; import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.schema.SchemaManager; @@ -52,7 +51,6 @@ import org.apache.paimon.table.source.RawFile; import org.apache.paimon.table.source.ScanMode; import org.apache.paimon.table.source.snapshot.SnapshotReader; -import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; @@ -85,15 +83,6 @@ public abstract class AbstractIcebergCommitCallback implements CommitCallback { // see org.apache.iceberg.hadoop.Util private static final String VERSION_HINT_FILENAME = "version-hint.text"; - static final ConfigOption COMPACT_MIN_FILE_NUM = - ConfigOptions.key("metadata.iceberg.compaction.min.file-num") - .intType() - .defaultValue(10); - static final ConfigOption COMPACT_MAX_FILE_NUM = - ConfigOptions.key("metadata.iceberg.compaction.max.file-num") - .intType() - .defaultValue(50); - protected final FileStoreTable table; private final String commitUser; private final IcebergPathFactory pathFactory; @@ -109,7 +98,37 @@ public abstract class AbstractIcebergCommitCallback implements CommitCallback { public AbstractIcebergCommitCallback(FileStoreTable table, String commitUser) { this.table = table; this.commitUser = commitUser; - this.pathFactory = new IcebergPathFactory(table.location()); + + IcebergOptions.StorageType storageType = + table.coreOptions().toConfiguration().get(IcebergOptions.METADATA_ICEBERG_STORAGE); + switch (storageType) { + case PER_TABLE: + this.pathFactory = new IcebergPathFactory(new Path(table.location(), "metadata")); + break; + case ICEBERG_WAREHOUSE: + String tableName = table.location().getName(); + Path dbPath = table.location().getParent(); + if (dbPath.getName().endsWith(".db")) { + Path separatePath = + new Path( + dbPath.getParent(), + "iceberg/" + + dbPath.getName() + .substring(0, dbPath.getName().length() - 3) + + "/" + + tableName + + "/metadata"); + this.pathFactory = new IcebergPathFactory(separatePath); + } else { + throw new UnsupportedOperationException( + "Storage type ICEBERG_WAREHOUSE can only be used on Paimon tables in a Paimon warehouse."); + } + break; + default: + throw new UnsupportedOperationException( + "Unknown storage type " + storageType.name()); + } + this.fileStorePathFactory = table.store().pathFactory(); this.manifestFile = IcebergManifestFile.create(table, pathFactory); this.manifestList = IcebergManifestList.create(table, pathFactory); @@ -183,8 +202,9 @@ private void createMetadataWithoutBase(long snapshotId) throws IOException { manifestFile.rollingWrite(entryIterator, snapshotId); String manifestListFileName = manifestList.writeWithoutRolling(manifestFileMetas); + IcebergSchema icebergSchema = IcebergSchema.create(table.schema()); List partitionFields = - getPartitionFields(table.schema().logicalPartitionType()); + getPartitionFields(table.schema().partitionKeys(), icebergSchema); int schemaId = (int) table.schema().id(); IcebergSnapshot snapshot = new IcebergSnapshot( @@ -202,7 +222,7 @@ private void createMetadataWithoutBase(long snapshotId) throws IOException { table.location().toString(), snapshotId, table.schema().highestFieldId(), - Collections.singletonList(new IcebergSchema(table.schema())), + Collections.singletonList(icebergSchema), schemaId, Collections.singletonList(new IcebergPartitionSpec(partitionFields)), partitionFields.stream() @@ -250,11 +270,17 @@ private List dataSplitToManifestEntries( return result; } - private List getPartitionFields(RowType partitionType) { + private List getPartitionFields( + List partitionKeys, IcebergSchema icebergSchema) { + Map fields = new HashMap<>(); + for (IcebergDataField field : icebergSchema.fields()) { + fields.put(field.name(), field); + } + List result = new ArrayList<>(); int fieldId = IcebergPartitionField.FIRST_FIELD_ID; - for (DataField field : partitionType.getFields()) { - result.add(new IcebergPartitionField(field, fieldId)); + for (String partitionKey : partitionKeys) { + result.add(new IcebergPartitionField(fields.get(partitionKey), fieldId)); fieldId++; } return result; @@ -311,7 +337,7 @@ private void createMetadataWithBase( List schemas = baseMetadata.schemas(); if (baseMetadata.currentSchemaId() != schemaId) { schemas = new ArrayList<>(schemas); - schemas.add(new IcebergSchema(table.schema())); + schemas.add(IcebergSchema.create(table.schema())); } List snapshots = new ArrayList<>(baseMetadata.snapshots()); @@ -562,10 +588,10 @@ private List compactMetadataIfNeeded( } Options options = new Options(table.options()); - if (candidates.size() < options.get(COMPACT_MIN_FILE_NUM)) { + if (candidates.size() < options.get(IcebergOptions.COMPACT_MIN_FILE_NUM)) { return toCompact; } - if (candidates.size() < options.get(COMPACT_MAX_FILE_NUM) + if (candidates.size() < options.get(IcebergOptions.COMPACT_MAX_FILE_NUM) && totalSizeInBytes < targetSizeInBytes) { return toCompact; } diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java new file mode 100644 index 000000000000..58d9c5fb5152 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg; + +import org.apache.paimon.options.ConfigOption; +import org.apache.paimon.options.ConfigOptions; + +import static org.apache.paimon.options.ConfigOptions.key; + +/** Config options for Paimon Iceberg compatibility. */ +public class IcebergOptions { + + public static final ConfigOption METADATA_ICEBERG_STORAGE = + key("metadata.iceberg.storage") + .enumType(StorageType.class) + .noDefaultValue() + .withDescription( + "When set, produce Iceberg metadata after a snapshot is committed, " + + "so that Iceberg readers can read Paimon's raw files.\n" + + "PER_TABLE: Store Iceberg metadata with each table.\n" + + "ICEBERG_WAREHOUSE: Store Iceberg metadata in a separate directory. " + + "This directory can be specified as the Iceberg warehouse directory."); + + public static final ConfigOption COMPACT_MIN_FILE_NUM = + ConfigOptions.key("metadata.iceberg.compaction.min.file-num") + .intType() + .defaultValue(10); + + public static final ConfigOption COMPACT_MAX_FILE_NUM = + ConfigOptions.key("metadata.iceberg.compaction.max.file-num") + .intType() + .defaultValue(50); + + /** Where to store Iceberg metadata. */ + public enum StorageType { + // Store Iceberg metadata with each table. + PER_TABLE, + // Store Iceberg metadata in a separate directory. + // This directory can be specified as the Iceberg warehouse directory. + ICEBERG_WAREHOUSE + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergPathFactory.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergPathFactory.java index fee590abfbaa..74d2e8e48f1b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergPathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergPathFactory.java @@ -37,8 +37,8 @@ public class IcebergPathFactory { private int manifestFileCount; private int manifestListCount; - public IcebergPathFactory(Path root) { - this.metadataDirectory = new Path(root, "metadata"); + public IcebergPathFactory(Path metadataDirectory) { + this.metadataDirectory = metadataDirectory; this.uuid = UUID.randomUUID().toString(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java index fd05183b6dc9..009d892ca42c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java @@ -58,9 +58,9 @@ public class IcebergDataField { @JsonProperty(FIELD_DOC) private final String doc; - public IcebergDataField(DataField dataField) { + public IcebergDataField(DataField dataField, int bias) { this( - dataField.id(), + dataField.id() + bias, dataField.name(), !dataField.type().isNullable(), toTypeString(dataField.type()), diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionField.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionField.java index 7be0d0493b84..5b8af183e100 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionField.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergPartitionField.java @@ -18,8 +18,6 @@ package org.apache.paimon.iceberg.metadata; -import org.apache.paimon.types.DataField; - import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; @@ -55,7 +53,7 @@ public class IcebergPartitionField { @JsonProperty(FIELD_FIELD_ID) private final int fieldId; - public IcebergPartitionField(DataField dataField, int fieldId) { + public IcebergPartitionField(IcebergDataField dataField, int fieldId) { this( dataField.name(), // currently Paimon's partition value does not have any transformation diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSchema.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSchema.java index b3c82021ec95..f6491731c4b5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSchema.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergSchema.java @@ -18,6 +18,7 @@ package org.apache.paimon.iceberg.metadata; +import org.apache.paimon.CoreOptions; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; @@ -50,11 +51,20 @@ public class IcebergSchema { @JsonProperty(FIELD_FIELDS) private final List fields; - public IcebergSchema(TableSchema tableSchema) { - this( + public static IcebergSchema create(TableSchema tableSchema) { + int bias; + if (new CoreOptions(tableSchema.options()).formatType().equals("parquet")) { + // data files start with trimmed primary keys + sequence number + value kind + // also ParquetSchemaUtil.addFallbackIds starts enumerating id from 1 instead of 0 + bias = tableSchema.trimmedPrimaryKeys().size() + 3; + } else { + bias = 0; + } + + return new IcebergSchema( (int) tableSchema.id(), tableSchema.fields().stream() - .map(IcebergDataField::new) + .map(f -> new IcebergDataField(f, bias)) .collect(Collectors.toList())); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index b93cf8e44237..acb4ba41b52f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -24,6 +24,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.iceberg.AppendOnlyIcebergCommitCallback; +import org.apache.paimon.iceberg.IcebergOptions; import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.operation.AppendOnlyFileStoreScan; import org.apache.paimon.operation.AppendOnlyFileStoreWrite; @@ -165,7 +166,7 @@ protected List createCommitCallbacks(String commitUser) { List callbacks = super.createCommitCallbacks(commitUser); CoreOptions options = coreOptions(); - if (options.metadataIcebergCompatible()) { + if (options.toConfiguration().contains(IcebergOptions.METADATA_ICEBERG_STORAGE)) { callbacks.add(new AppendOnlyIcebergCommitCallback(this, commitUser)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index 9c15a7bd1f3d..3f1bac314624 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -23,6 +23,7 @@ import org.apache.paimon.KeyValueFileStore; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.iceberg.IcebergOptions; import org.apache.paimon.iceberg.PrimaryKeyIcebergCommitCallback; import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.mergetree.compact.LookupMergeFunction; @@ -185,7 +186,7 @@ protected List createCommitCallbacks(String commitUser) { List callbacks = super.createCommitCallbacks(commitUser); CoreOptions options = coreOptions(); - if (options.metadataIcebergCompatible()) { + if (options.toConfiguration().contains(IcebergOptions.METADATA_ICEBERG_STORAGE)) { callbacks.add(new PrimaryKeyIcebergCommitCallback(this, commitUser)); } diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java index 141def4e224a..2137061519e0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java @@ -199,7 +199,8 @@ public void testRetryCreateMetadata() throws Exception { commit.commit(2, commitMessages2); assertThat(table.latestSnapshotId()).hasValue(3L); - IcebergPathFactory pathFactory = new IcebergPathFactory(table.location()); + IcebergPathFactory pathFactory = + new IcebergPathFactory(new Path(table.location(), "metadata")); Path metadata3Path = pathFactory.toMetadataPath(3); assertThat(table.fileIO().exists(metadata3Path)).isTrue(); @@ -296,7 +297,8 @@ public void testIcebergSnapshotExpire() throws Exception { // Number of snapshots will become 5 with the next commit, however only 3 Iceberg snapshots // are kept. So the first 2 Iceberg snapshots will be expired. - IcebergPathFactory pathFactory = new IcebergPathFactory(table.location()); + IcebergPathFactory pathFactory = + new IcebergPathFactory(new Path(table.location(), "metadata")); IcebergManifestList manifestList = IcebergManifestList.create(table, pathFactory); Set usingManifests = new HashSet<>(); for (IcebergManifestFileMeta fileMeta : @@ -689,11 +691,11 @@ private FileStoreTable createPaimonTable( Options options = new Options(customOptions); options.set(CoreOptions.BUCKET, numBuckets); - options.set(CoreOptions.METADATA_ICEBERG_COMPATIBLE, true); + options.set(IcebergOptions.METADATA_ICEBERG_STORAGE, IcebergOptions.StorageType.PER_TABLE); options.set(CoreOptions.FILE_FORMAT, "avro"); options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofKibiBytes(32)); - options.set(AbstractIcebergCommitCallback.COMPACT_MIN_FILE_NUM, 4); - options.set(AbstractIcebergCommitCallback.COMPACT_MIN_FILE_NUM, 8); + options.set(IcebergOptions.COMPACT_MIN_FILE_NUM, 4); + options.set(IcebergOptions.COMPACT_MIN_FILE_NUM, 8); options.set(CoreOptions.MANIFEST_TARGET_FILE_SIZE, MemorySize.ofKibiBytes(8)); Schema schema = new Schema(rowType.getFields(), partitionKeys, primaryKeys, options.toMap(), ""); diff --git a/paimon-flink/paimon-flink-1.16/pom.xml b/paimon-flink/paimon-flink-1.16/pom.xml index 0e1abbe6674a..fd63695354df 100644 --- a/paimon-flink/paimon-flink-1.16/pom.xml +++ b/paimon-flink/paimon-flink-1.16/pom.xml @@ -35,6 +35,8 @@ under the License. 1.16.3 + 1.16 + 1.5.2 @@ -100,6 +102,18 @@ under the License. test + + org.apache.iceberg + iceberg-flink-${iceberg.flink.version} + ${iceberg.version} + test + + + org.apache.orc + orc-core + + + diff --git a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/iceberg/Flink116IcebergITCase.java b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/iceberg/Flink116IcebergITCase.java new file mode 100644 index 000000000000..0097700c70fa --- /dev/null +++ b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/iceberg/Flink116IcebergITCase.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.iceberg; + +/** IT cases for Paimon Iceberg compatibility in Flink 1.16. */ +public class Flink116IcebergITCase extends FlinkIcebergITCaseBase {} diff --git a/paimon-flink/paimon-flink-1.17/pom.xml b/paimon-flink/paimon-flink-1.17/pom.xml index 99492af0f248..10151a02169f 100644 --- a/paimon-flink/paimon-flink-1.17/pom.xml +++ b/paimon-flink/paimon-flink-1.17/pom.xml @@ -35,6 +35,8 @@ under the License. 1.17.2 + 1.17 + 1.6.1 @@ -106,6 +108,19 @@ under the License. ${flink.version} test + + + org.apache.iceberg + iceberg-flink-${iceberg.flink.version} + ${iceberg.version} + test + + + org.apache.orc + orc-core + + + diff --git a/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/iceberg/Flink117IcebergITCase.java b/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/iceberg/Flink117IcebergITCase.java new file mode 100644 index 000000000000..3628043bdfa6 --- /dev/null +++ b/paimon-flink/paimon-flink-1.17/src/test/java/org/apache/paimon/flink/iceberg/Flink117IcebergITCase.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.iceberg; + +/** IT cases for Paimon Iceberg compatibility in Flink 1.17. */ +public class Flink117IcebergITCase extends FlinkIcebergITCaseBase {} diff --git a/paimon-flink/paimon-flink-1.18/pom.xml b/paimon-flink/paimon-flink-1.18/pom.xml index 0220bcc19612..19161de2022c 100644 --- a/paimon-flink/paimon-flink-1.18/pom.xml +++ b/paimon-flink/paimon-flink-1.18/pom.xml @@ -35,6 +35,8 @@ under the License. 1.18.1 + 1.18 + 1.6.1 @@ -99,6 +101,19 @@ under the License. ${flink.version} test + + + org.apache.iceberg + iceberg-flink-${iceberg.flink.version} + ${iceberg.version} + test + + + org.apache.orc + orc-core + + + diff --git a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/iceberg/Flink118IcebergITCase.java b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/iceberg/Flink118IcebergITCase.java new file mode 100644 index 000000000000..bf309a9f4182 --- /dev/null +++ b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/iceberg/Flink118IcebergITCase.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.iceberg; + +/** IT cases for Paimon Iceberg compatibility in Flink 1.18. */ +public class Flink118IcebergITCase extends FlinkIcebergITCaseBase {} diff --git a/paimon-flink/paimon-flink-1.19/pom.xml b/paimon-flink/paimon-flink-1.19/pom.xml index 666756f877d9..b4c9d2ab9619 100644 --- a/paimon-flink/paimon-flink-1.19/pom.xml +++ b/paimon-flink/paimon-flink-1.19/pom.xml @@ -35,6 +35,8 @@ under the License. 1.19.1 + 1.19 + 1.6.1 @@ -106,6 +108,19 @@ under the License. 1.21 test + + + org.apache.iceberg + iceberg-flink-${iceberg.flink.version} + ${iceberg.version} + test + + + org.apache.orc + orc-core + + + diff --git a/paimon-flink/paimon-flink-1.19/src/test/java/org/apache/paimon/flink/iceberg/Flink119IcebergITCase.java b/paimon-flink/paimon-flink-1.19/src/test/java/org/apache/paimon/flink/iceberg/Flink119IcebergITCase.java new file mode 100644 index 000000000000..eb64a6c9de37 --- /dev/null +++ b/paimon-flink/paimon-flink-1.19/src/test/java/org/apache/paimon/flink/iceberg/Flink119IcebergITCase.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.iceberg; + +/** IT cases for Paimon Iceberg compatibility in Flink 1.19. */ +public class Flink119IcebergITCase extends FlinkIcebergITCaseBase {} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java new file mode 100644 index 000000000000..b136b171d5cd --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/iceberg/FlinkIcebergITCaseBase.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.iceberg; + +import org.apache.paimon.flink.util.AbstractTestBase; + +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** IT cases for Paimon Iceberg compatibility. */ +public abstract class FlinkIcebergITCaseBase extends AbstractTestBase { + + @ParameterizedTest + @ValueSource(strings = {"parquet", "avro"}) + public void testIcebergWarehouse(String format) throws Exception { + String warehouse = getTempDirPath(); + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().parallelism(2).build(); + tEnv.executeSql( + "CREATE CATALOG paimon WITH (\n" + + " 'type' = 'paimon',\n" + + " 'warehouse' = '" + + warehouse + + "'\n" + + ")"); + tEnv.executeSql( + "CREATE TABLE paimon.`default`.T (\n" + + " pt INT,\n" + + " k INT,\n" + + " v1 INT,\n" + + " v2 STRING,\n" + + " PRIMARY KEY (pt, k) NOT ENFORCED\n" + + ") PARTITIONED BY (pt) WITH (\n" + + " 'metadata.iceberg.storage' = 'ICEBERG_WAREHOUSE',\n" + // make sure all changes are visible in iceberg metadata + + " 'full-compaction.delta-commits' = '1',\n" + + " 'file.format' = '" + + format + + "'\n" + + ")"); + tEnv.executeSql( + "INSERT INTO paimon.`default`.T VALUES " + + "(1, 10, 100, 'apple'), " + + "(1, 11, 110, 'banana'), " + + "(2, 20, 200, 'cat'), " + + "(2, 21, 210, 'dog')") + .await(); + + tEnv.executeSql( + "CREATE TABLE T (\n" + + " pt INT,\n" + + " k INT,\n" + + " v1 INT,\n" + + " v2 STRING\n" + + ") PARTITIONED BY (pt) WITH (\n" + + " 'connector' = 'iceberg',\n" + + " 'catalog-type' = 'hadoop',\n" + + " 'catalog-name' = 'test',\n" + + " 'catalog-database' = 'default',\n" + + " 'warehouse' = '" + + warehouse + + "/iceberg'\n" + + ")"); + assertThat(collect(tEnv.executeSql("SELECT v1, k, v2, pt FROM T ORDER BY pt, k"))) + .containsExactly( + Row.of(100, 10, "apple", 1), + Row.of(110, 11, "banana", 1), + Row.of(200, 20, "cat", 2), + Row.of(210, 21, "dog", 2)); + + tEnv.executeSql( + "INSERT INTO paimon.`default`.T VALUES " + + "(1, 10, 101, 'red'), " + + "(1, 12, 121, 'green'), " + + "(2, 20, 201, 'blue'), " + + "(2, 22, 221, 'yellow')") + .await(); + assertThat(collect(tEnv.executeSql("SELECT v1, k, v2, pt FROM T ORDER BY pt, k"))) + .containsExactly( + Row.of(101, 10, "red", 1), + Row.of(110, 11, "banana", 1), + Row.of(121, 12, "green", 1), + Row.of(201, 20, "blue", 2), + Row.of(210, 21, "dog", 2), + Row.of(221, 22, "yellow", 2)); + } + + private List collect(TableResult result) throws Exception { + List rows = new ArrayList<>(); + try (CloseableIterator it = result.collect()) { + while (it.hasNext()) { + rows.add(it.next()); + } + } + return rows; + } +}