Skip to content

Commit

Permalink
[iceberg] Introduce metadata.iceberg.manifest-legacy-version (#4621)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Dec 2, 2024
1 parent 4e1b749 commit 512e2ce
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 16 deletions.
11 changes: 11 additions & 0 deletions docs/content/migration/iceberg-compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -377,9 +377,20 @@ you also need to set some (or all) of the following table options when creating
<td>String</td>
<td>Compression for Iceberg manifest files.</td>
</tr>
<tr>
<td><h5>metadata.iceberg.manifest-legacy-version</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Should use the legacy manifest version to generate Iceberg's 1.4 manifest files.</td>
</tr>
</tbody>
</table>

## AWS Athena

AWS Athena may use old manifest reader to read Iceberg manifest by names, we should let Paimon producing legacy Iceberg
manifest list file, you can enable: `'metadata.iceberg.manifest-legacy-version'`.

## Trino Iceberg

In this example, we use Trino Iceberg connector to access Paimon table through Iceberg Hive catalog.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ public class IcebergOptions {
"gzip") // some Iceberg reader cannot support zstd, for example DuckDB
.withDescription("Compression for Iceberg manifest files.");

public static final ConfigOption<Boolean> MANIFEST_LEGACY_VERSION =
key("metadata.iceberg.manifest-legacy-version")
.booleanType()
.defaultValue(false)
.withDescription(
"Should use the legacy manifest version to generate Iceberg's 1.4 manifest files.");

/** Where to store Iceberg metadata. */
public enum StorageType implements DescribedEnum {
DISABLED("disabled", "Disable Iceberg compatibility support."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,11 @@ public List<IcebergPartitionSummary> partitions() {
return partitions;
}

public static RowType schema() {
public static RowType schema(boolean legacyVersion) {
return legacyVersion ? schemaForIceberg1_4() : schemaForIcebergNew();
}

private static RowType schemaForIcebergNew() {
List<DataField> fields = new ArrayList<>();
fields.add(new DataField(500, "manifest_path", DataTypes.STRING().notNull()));
fields.add(new DataField(501, "manifest_length", DataTypes.BIGINT().notNull()));
Expand All @@ -186,6 +190,29 @@ public static RowType schema() {
return new RowType(false, fields);
}

private static RowType schemaForIceberg1_4() {
// see https://github.com/apache/iceberg/pull/5338
// some reader still want old schema, for example, AWS athena
List<DataField> fields = new ArrayList<>();
fields.add(new DataField(500, "manifest_path", DataTypes.STRING().notNull()));
fields.add(new DataField(501, "manifest_length", DataTypes.BIGINT().notNull()));
fields.add(new DataField(502, "partition_spec_id", DataTypes.INT().notNull()));
fields.add(new DataField(517, "content", DataTypes.INT().notNull()));
fields.add(new DataField(515, "sequence_number", DataTypes.BIGINT().notNull()));
fields.add(new DataField(516, "min_sequence_number", DataTypes.BIGINT().notNull()));
fields.add(new DataField(503, "added_snapshot_id", DataTypes.BIGINT()));
fields.add(new DataField(504, "added_data_files_count", DataTypes.INT().notNull()));
fields.add(new DataField(505, "existing_data_files_count", DataTypes.INT().notNull()));
fields.add(new DataField(506, "deleted_data_files_count", DataTypes.INT().notNull()));
fields.add(new DataField(512, "added_rows_count", DataTypes.BIGINT().notNull()));
fields.add(new DataField(513, "existing_rows_count", DataTypes.BIGINT().notNull()));
fields.add(new DataField(514, "deleted_rows_count", DataTypes.BIGINT().notNull()));
fields.add(
new DataField(
508, "partitions", DataTypes.ARRAY(IcebergPartitionSummary.schema())));
return new RowType(false, fields);
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta.Content;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ObjectSerializer;

import java.util.ArrayList;
Expand All @@ -36,8 +37,8 @@ public class IcebergManifestFileMetaSerializer extends ObjectSerializer<IcebergM

private final IcebergPartitionSummarySerializer partitionSummarySerializer;

public IcebergManifestFileMetaSerializer() {
super(IcebergManifestFileMeta.schema());
public IcebergManifestFileMetaSerializer(RowType schema) {
super(schema);
this.partitionSummarySerializer = new IcebergPartitionSummarySerializer();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.iceberg.IcebergOptions;
import org.apache.paimon.iceberg.IcebergPathFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ObjectsFile;
import org.apache.paimon.utils.PathFactory;

Expand All @@ -38,16 +37,16 @@ public class IcebergManifestList extends ObjectsFile<IcebergManifestFileMeta> {

public IcebergManifestList(
FileIO fileIO,
FormatReaderFactory readerFactory,
FormatWriterFactory writerFactory,
FileFormat fileFormat,
RowType manifestType,
String compression,
PathFactory pathFactory) {
super(
fileIO,
new IcebergManifestFileMetaSerializer(),
IcebergManifestFileMeta.schema(),
readerFactory,
writerFactory,
new IcebergManifestFileMetaSerializer(manifestType),
manifestType,
fileFormat.createReaderFactory(manifestType),
fileFormat.createWriterFactory(manifestType),
compression,
pathFactory,
null);
Expand All @@ -65,11 +64,14 @@ public static IcebergManifestList create(FileStoreTable table, IcebergPathFactor
"avro.row-name-mapping",
"org.apache.paimon.avro.generated.record:manifest_file,"
+ "manifest_file_partitions:r508");
FileFormat manifestListAvro = FileFormat.fromIdentifier("avro", avroOptions);
FileFormat fileFormat = FileFormat.fromIdentifier("avro", avroOptions);
RowType manifestType =
IcebergManifestFileMeta.schema(
avroOptions.get(IcebergOptions.MANIFEST_LEGACY_VERSION));
return new IcebergManifestList(
table.fileIO(),
manifestListAvro.createReaderFactory(IcebergManifestFileMeta.schema()),
manifestListAvro.createWriterFactory(IcebergManifestFileMeta.schema()),
fileFormat,
manifestType,
avroOptions.get(IcebergOptions.MANIFEST_COMPRESSION),
pathFactory.manifestListFactory());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for Iceberg compatibility. */
public class IcebergCompatibilityTest {
Expand Down Expand Up @@ -309,11 +310,21 @@ public void testIcebergSnapshotExpire() throws Exception {
assertThat(manifestFile.compression()).isEqualTo("gzip");

Set<String> usingManifests = new HashSet<>();
for (IcebergManifestFileMeta fileMeta :
manifestList.read(new Path(metadata.currentSnapshot().manifestList()).getName())) {
String manifestListFile = new Path(metadata.currentSnapshot().manifestList()).getName();
for (IcebergManifestFileMeta fileMeta : manifestList.read(manifestListFile)) {
usingManifests.add(fileMeta.manifestPath());
}

IcebergManifestList legacyManifestList =
IcebergManifestList.create(
table.copy(
Collections.singletonMap(
IcebergOptions.MANIFEST_LEGACY_VERSION.key(), "true")),
pathFactory);
assertThatThrownBy(() -> legacyManifestList.read(manifestListFile))
.rootCause()
.isInstanceOf(NullPointerException.class);

Set<String> unusedFiles = new HashSet<>();
for (int i = 0; i < 2; i++) {
unusedFiles.add(metadata.snapshots().get(i).manifestList());
Expand Down

0 comments on commit 512e2ce

Please sign in to comment.