Skip to content

Commit

Permalink
[core] Support nested types in Iceberg compatible metadata (#4626)
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper authored Dec 4, 2024
1 parent 7495357 commit 9d64170
Show file tree
Hide file tree
Showing 10 changed files with 529 additions and 39 deletions.
33 changes: 18 additions & 15 deletions docs/content/migration/iceberg-compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -479,21 +479,24 @@ SELECT * FROM animals WHERE class = 'mammal';

Paimon Iceberg compatibility currently supports the following data types.

| Paimon Data Type | Iceberg Data Type |
|-------------------|-------------------|
| `BOOLEAN` | `boolean` |
| `INT` | `int` |
| `BIGINT` | `long` |
| `FLOAT` | `float` |
| `DOUBLE` | `double` |
| `DECIMAL` | `decimal` |
| `CHAR` | `string` |
| `VARCHAR` | `string` |
| `BINARY` | `binary` |
| `VARBINARY` | `binary` |
| `DATE` | `date` |
| `TIMESTAMP`* | `timestamp` |
| `TIMESTAMP_LTZ`* | `timestamptz` |
| Paimon Data Type | Iceberg Data Type |
|------------------|-------------------|
| `BOOLEAN` | `boolean` |
| `INT` | `int` |
| `BIGINT` | `long` |
| `FLOAT` | `float` |
| `DOUBLE` | `double` |
| `DECIMAL` | `decimal` |
| `CHAR` | `string` |
| `VARCHAR` | `string` |
| `BINARY` | `binary` |
| `VARBINARY` | `binary` |
| `DATE` | `date` |
| `TIMESTAMP`* | `timestamp` |
| `TIMESTAMP_LTZ`* | `timestamptz` |
| `ARRAY` | `list` |
| `MAP` | `map` |
| `ROW` | `struct` |

*: `TIMESTAMP` and `TIMESTAMP_LTZ` type only support precision from 4 to 6

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ private List<IcebergManifestEntry> dataSplitToManifestEntries(
rawFile.rowCount(),
rawFile.fileSize(),
schemaCache.get(paimonFileMeta.schemaId()),
paimonFileMeta.valueStats());
paimonFileMeta.valueStats(),
paimonFileMeta.valueStatsCols());
result.add(
new IcebergManifestEntry(
IcebergManifestEntry.Status.ADDED,
Expand Down Expand Up @@ -509,7 +510,8 @@ private List<IcebergManifestFileMeta> createNewlyAddedManifestFileMetas(
paimonFileMeta.rowCount(),
paimonFileMeta.fileSize(),
schemaCache.get(paimonFileMeta.schemaId()),
paimonFileMeta.valueStats());
paimonFileMeta.valueStats(),
paimonFileMeta.valueStatsCols());
return new IcebergManifestEntry(
IcebergManifestEntry.Status.ADDED,
currentSnapshotId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import org.apache.paimon.data.GenericMap;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.iceberg.metadata.IcebergDataField;
import org.apache.paimon.iceberg.metadata.IcebergSchema;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -110,27 +112,44 @@ public static IcebergDataFileMeta create(
long recordCount,
long fileSizeInBytes,
IcebergSchema icebergSchema,
SimpleStats stats) {
SimpleStats stats,
@Nullable List<String> statsColumns) {
int numFields = icebergSchema.fields().size();
Map<String, Integer> indexMap = new HashMap<>();
if (statsColumns == null) {
for (int i = 0; i < numFields; i++) {
indexMap.put(icebergSchema.fields().get(i).name(), i);
}
} else {
for (int i = 0; i < statsColumns.size(); i++) {
indexMap.put(statsColumns.get(i), i);
}
}

Map<Integer, Long> nullValueCounts = new HashMap<>();
Map<Integer, byte[]> lowerBounds = new HashMap<>();
Map<Integer, byte[]> upperBounds = new HashMap<>();

List<InternalRow.FieldGetter> fieldGetters = new ArrayList<>();
int numFields = icebergSchema.fields().size();
for (int i = 0; i < numFields; i++) {
fieldGetters.add(
InternalRow.createFieldGetter(icebergSchema.fields().get(i).dataType(), i));
}
IcebergDataField field = icebergSchema.fields().get(i);
if (!indexMap.containsKey(field.name())) {
continue;
}

for (int i = 0; i < numFields; i++) {
int fieldId = icebergSchema.fields().get(i).id();
DataType type = icebergSchema.fields().get(i).dataType();
nullValueCounts.put(fieldId, stats.nullCounts().getLong(i));
Object minValue = fieldGetters.get(i).getFieldOrNull(stats.minValues());
Object maxValue = fieldGetters.get(i).getFieldOrNull(stats.maxValues());
int idx = indexMap.get(field.name());
nullValueCounts.put(field.id(), stats.nullCounts().getLong(idx));

InternalRow.FieldGetter fieldGetter =
InternalRow.createFieldGetter(field.dataType(), idx);
Object minValue = fieldGetter.getFieldOrNull(stats.minValues());
Object maxValue = fieldGetter.getFieldOrNull(stats.maxValues());
if (minValue != null && maxValue != null) {
lowerBounds.put(fieldId, IcebergConversions.toByteBuffer(type, minValue).array());
upperBounds.put(fieldId, IcebergConversions.toByteBuffer(type, maxValue).array());
lowerBounds.put(
field.id(),
IcebergConversions.toByteBuffer(field.dataType(), minValue).array());
upperBounds.put(
field.id(),
IcebergConversions.toByteBuffer(field.dataType(), maxValue).array());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@

package org.apache.paimon.iceberg.metadata;

import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.utils.Preconditions;

Expand All @@ -32,6 +36,7 @@
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Objects;
import java.util.stream.Collectors;

/**
* {@link DataField} in Iceberg.
Expand All @@ -57,7 +62,7 @@ public class IcebergDataField {
private final boolean required;

@JsonProperty(FIELD_TYPE)
private final String type;
private final Object type;

@JsonIgnore private final DataType dataType;

Expand All @@ -69,7 +74,7 @@ public IcebergDataField(DataField dataField) {
dataField.id(),
dataField.name(),
!dataField.type().isNullable(),
toTypeString(dataField.type()),
toTypeObject(dataField.type(), dataField.id(), 0),
dataField.type(),
dataField.description());
}
Expand All @@ -79,13 +84,13 @@ public IcebergDataField(
@JsonProperty(FIELD_ID) int id,
@JsonProperty(FIELD_NAME) String name,
@JsonProperty(FIELD_REQUIRED) boolean required,
@JsonProperty(FIELD_TYPE) String type,
@JsonProperty(FIELD_TYPE) Object type,
@JsonProperty(FIELD_DOC) String doc) {
this(id, name, required, type, null, doc);
}

public IcebergDataField(
int id, String name, boolean required, String type, DataType dataType, String doc) {
int id, String name, boolean required, Object type, DataType dataType, String doc) {
this.id = id;
this.name = name;
this.required = required;
Expand All @@ -110,7 +115,7 @@ public boolean required() {
}

@JsonGetter(FIELD_TYPE)
public String type() {
public Object type() {
return type;
}

Expand All @@ -124,7 +129,7 @@ public DataType dataType() {
return Preconditions.checkNotNull(dataType);
}

private static String toTypeString(DataType dataType) {
private static Object toTypeObject(DataType dataType, int fieldId, int depth) {
switch (dataType.getTypeRoot()) {
case BOOLEAN:
return "boolean";
Expand Down Expand Up @@ -160,6 +165,26 @@ private static String toTypeString(DataType dataType) {
timestampLtzPrecision > 3 && timestampLtzPrecision <= 6,
"Paimon Iceberg compatibility only support timestamp type with precision from 4 to 6.");
return "timestamptz";
case ARRAY:
ArrayType arrayType = (ArrayType) dataType;
return new IcebergListType(
SpecialFields.getArrayElementFieldId(fieldId, depth + 1),
!dataType.isNullable(),
toTypeObject(arrayType.getElementType(), fieldId, depth + 1));
case MAP:
MapType mapType = (MapType) dataType;
return new IcebergMapType(
SpecialFields.getMapKeyFieldId(fieldId, depth + 1),
toTypeObject(mapType.getKeyType(), fieldId, depth + 1),
SpecialFields.getMapValueFieldId(fieldId, depth + 1),
!mapType.getValueType().isNullable(),
toTypeObject(mapType.getValueType(), fieldId, depth + 1));
case ROW:
RowType rowType = (RowType) dataType;
return new IcebergStructType(
rowType.getFields().stream()
.map(IcebergDataField::new)
.collect(Collectors.toList()));
default:
throw new UnsupportedOperationException("Unsupported data type: " + dataType);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.iceberg.metadata;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Objects;

/**
* {@link org.apache.paimon.types.ArrayType} in Iceberg.
*
* <p>See <a href="https://iceberg.apache.org/spec/#schemas">Iceberg spec</a>.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class IcebergListType {

private static final String FIELD_TYPE = "type";
private static final String FIELD_ELEMENT_ID = "element-id";
private static final String FIELD_ELEMENT_REQUIRED = "element-required";
private static final String FIELD_ELEMENT = "element";

@JsonProperty(FIELD_TYPE)
private final String type;

@JsonProperty(FIELD_ELEMENT_ID)
private final int elementId;

@JsonProperty(FIELD_ELEMENT_REQUIRED)
private final boolean elementRequired;

@JsonProperty(FIELD_ELEMENT)
private final Object element;

public IcebergListType(int elementId, boolean elementRequired, Object element) {
this("list", elementId, elementRequired, element);
}

@JsonCreator
public IcebergListType(
@JsonProperty(FIELD_TYPE) String type,
@JsonProperty(FIELD_ELEMENT_ID) int elementId,
@JsonProperty(FIELD_ELEMENT_REQUIRED) boolean elementRequired,
@JsonProperty(FIELD_ELEMENT) Object element) {
this.type = type;
this.elementId = elementId;
this.elementRequired = elementRequired;
this.element = element;
}

@JsonGetter(FIELD_TYPE)
public String type() {
return type;
}

@JsonGetter(FIELD_ELEMENT_ID)
public int elementId() {
return elementId;
}

@JsonGetter(FIELD_ELEMENT_REQUIRED)
public boolean elementRequired() {
return elementRequired;
}

@JsonGetter(FIELD_ELEMENT)
public Object element() {
return element;
}

@Override
public int hashCode() {
return Objects.hash(type, elementId, elementRequired, element);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof IcebergListType)) {
return false;
}

IcebergListType that = (IcebergListType) o;
return Objects.equals(type, that.type)
&& elementId == that.elementId
&& elementRequired == that.elementRequired
&& Objects.equals(element, that.element);
}
}
Loading

0 comments on commit 9d64170

Please sign in to comment.