Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][flink] Fix Iceberg compatible metadata cannot be read when data file format is parquet #4288

Merged
merged 9 commits into from
Oct 11, 2024

Conversation

tsreaper
Copy link
Contributor

@tsreaper tsreaper commented Oct 8, 2024

Purpose

Currently there is not IT cases for Paimon Iceberg compatibility in Flink. This PR adds IT cases and fixes some issues with parquet format.

Tests

IT cases.

API and Format

No format changes.

Documentation

Document for the whole Iceberg compatibility feature is also added.

@tsreaper tsreaper changed the title [flink] Add IT cases for Iceberg compatibility [fix][flink] Fix Iceberg compatible metadata cannot be read when data file format is parquet Oct 9, 2024
.defaultValue(50);

/** Where to store Iceberg metadata. */
public enum StorageType {
Copy link
Contributor

@JingsongLi JingsongLi Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implements DescribedEnum

.defaultValue(50);

/** Where to store Iceberg metadata. */
public enum StorageType {
Copy link
Contributor

@JingsongLi JingsongLi Oct 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Values can be: NONE, TABLE_LOCATION, HADOOP_CATALOG?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why NONE? This value should not be set if users don't need Iceberg compatibility.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or name it DISABLED

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally speaking, using ENUM already has a state inside, and disable should also be one of its states

HADOOP_CATALOG(
"Store Iceberg metadata in a separate directory. "
+ "This directory can be specified as the warehouse directory of an Iceberg Hadoop catalog.");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can add a value and implement toString. For the paimon style, the value should be table-location and hadoop-catalog.

dbPath.getParent(),
"iceberg/"
+ dbPath.getName()
.substring(0, dbPath.getName().length() - 3)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1、private final String xxx = ".db";
2、Change "substring(0, dbPath.getName().length() - 3)" to ""substring(0, dbPath.getName().length() - xxx.length())"

private List<IcebergPartitionField> getPartitionFields(RowType partitionType) {
private List<IcebergPartitionField> getPartitionFields(
List<String> partitionKeys, IcebergSchema icebergSchema) {
Map<String, IcebergDataField> fields = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map<String, IcebergDataField> fields = new HashMap<>(icebergSchema.fields().size());

for (IcebergDataField field : icebergSchema.fields()) {
fields.put(field.name(), field);
}

List<IcebergPartitionField> result = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

List result = new ArrayList<>(partitionKeys.size());

public static final ConfigOption<Integer> COMPACT_MIN_FILE_NUM =
ConfigOptions.key("metadata.iceberg.compaction.min.file-num")
.intType()
.defaultValue(10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add withDescription?

public static final ConfigOption<Integer> COMPACT_MAX_FILE_NUM =
ConfigOptions.key("metadata.iceberg.compaction.max.file-num")
.intType()
.defaultValue(50);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add withDescription?

this(
public static IcebergSchema create(TableSchema tableSchema) {
int bias;
if (new CoreOptions(tableSchema.options()).formatType().equals("parquet")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use CoreOptions.FILE_FORMAT_PARQUET

(int) tableSchema.id(),
tableSchema.fields().stream()
.map(IcebergDataField::new)
.map(f -> new IcebergDataField(f, bias))
.collect(Collectors.toList()));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add "private static final String STRUCT = "struct";" for line 77

@@ -165,7 +166,8 @@ protected List<CommitCallback> createCommitCallbacks(String commitUser) {
List<CommitCallback> callbacks = super.createCommitCallbacks(commitUser);
CoreOptions options = coreOptions();

if (options.metadataIcebergCompatible()) {
if (options.toConfiguration().get(IcebergOptions.METADATA_ICEBERG_STORAGE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to AbstractFileStoreTable for AppendOnlyFileStoreTable and PrimaryKeyFileStoreTable?

@@ -185,7 +186,8 @@ protected List<CommitCallback> createCommitCallbacks(String commitUser) {
List<CommitCallback> callbacks = super.createCommitCallbacks(commitUser);
CoreOptions options = coreOptions();

if (options.metadataIcebergCompatible()) {
if (options.toConfiguration().get(IcebergOptions.METADATA_ICEBERG_STORAGE)
!= IcebergOptions.StorageType.DISABLED) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to AbstractFileStoreTable for AppendOnlyFileStoreTable and PrimaryKeyFileStoreTable?


import static org.assertj.core.api.Assertions.assertThat;

/** IT cases for Paimon Iceberg compatibility. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Base IT cases for Paimon Iceberg compatibility.

@tsreaper tsreaper marked this pull request as draft October 10, 2024 04:34
@tsreaper tsreaper marked this pull request as ready for review October 10, 2024 11:22
Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me!

@JingsongLi JingsongLi merged commit 524915e into apache:master Oct 11, 2024
11 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants