Skip to content

Commit

Permalink
[parquet] Add type id to parquet files (#4362)
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper authored Oct 23, 2024
1 parent e1fa0b0 commit 20790af
Show file tree
Hide file tree
Showing 14 changed files with 183 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,38 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

/** System fields. */
public class SystemFields {
/**
* Special fields in a {@link org.apache.paimon.types.RowType} with specific field ids.
*
* <p><b>System fields</b>:
*
* <ul>
* <li><code>_KEY_&lt;key-field&gt;</code>: Keys of a key-value. ID = 1073741823 + <code>
* (field-id)
* </code>.
* <li><code>_SEQUENCE_NUMBER</code>: Sequence number of a key-value. ID = 2147483646.
* <li><code>_VALUE_KIND</code>: Type of a key-value. See {@link org.apache.paimon.types.RowKind}.
* ID = 2147483645.
* <li><code>_LEVEL</code>: Which LSM tree level does this key-value stay in. ID = 2147483644.
* <li><code>rowkind</code>: THw rowkind field in audit-log system tables. ID = 2147483643.
* </ul>
*
* <p><b>Structured type fields</b>:
*
* <p>These ids are mainly used as field ids in parquet files, so compute engines can read a field
* directly by id. These ids are not stored in {@link org.apache.paimon.types.DataField}.
*
* <ul>
* <li>Array element field: ID = 536870911 + <code>(array-field-id)</code>.
* <li>Map key field: ID = 536870911 + <code>(array-field-id)</code>.
* <li>Map value field: ID = 536870911 - <code>(array-field-id)</code>.
* </ul>
*/
public class SpecialFields {

// ----------------------------------------------------------------------------------------
// System fields
// ----------------------------------------------------------------------------------------

public static final int SYSTEM_FIELD_ID_START = Integer.MAX_VALUE / 2;

Expand Down Expand Up @@ -59,4 +89,22 @@ public static boolean isSystemField(int fieldId) {
public static boolean isSystemField(String field) {
return field.startsWith(KEY_FIELD_PREFIX) || SYSTEM_FIELD_NAMES.contains(field);
}

// ----------------------------------------------------------------------------------------
// Structured type fields
// ----------------------------------------------------------------------------------------

public static final int STRUCTURED_TYPE_FIELD_ID_BASE = Integer.MAX_VALUE / 4;

public static int getArrayElementFieldId(int arrayFieldId) {
return STRUCTURED_TYPE_FIELD_ID_BASE + arrayFieldId;
}

public static int getMapKeyFieldId(int mapFieldId) {
return STRUCTURED_TYPE_FIELD_ID_BASE + mapFieldId;
}

public static int getMapValueFieldId(int mapFieldId) {
return STRUCTURED_TYPE_FIELD_ID_BASE - mapFieldId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.table.SystemFields;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;

Expand Down Expand Up @@ -332,7 +332,7 @@ public static int currentHighestFieldId(List<DataField> fields) {
Set<Integer> fieldIds = new HashSet<>();
new RowType(fields).collectFieldIds(fieldIds);
return fieldIds.stream()
.filter(i -> !SystemFields.isSystemField(i))
.filter(i -> !SpecialFields.isSystemField(i))
.max(Integer::compareTo)
.orElse(-1);
}
Expand Down
6 changes: 3 additions & 3 deletions paimon-core/src/main/java/org/apache/paimon/KeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.paimon.table.SystemFields.LEVEL;
import static org.apache.paimon.table.SystemFields.SEQUENCE_NUMBER;
import static org.apache.paimon.table.SystemFields.VALUE_KIND;
import static org.apache.paimon.table.SpecialFields.LEVEL;
import static org.apache.paimon.table.SpecialFields.SEQUENCE_NUMBER;
import static org.apache.paimon.table.SpecialFields.VALUE_KIND;

/**
* A key value, including user key, sequence number, value kind and value. This object can be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ public class IcebergDataField {
@JsonProperty(FIELD_DOC)
private final String doc;

public IcebergDataField(DataField dataField, int bias) {
public IcebergDataField(DataField dataField) {
this(
dataField.id() + bias,
dataField.id(),
dataField.name(),
!dataField.type().isNullable(),
toTypeString(dataField.type()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.iceberg.metadata;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.schema.TableSchema;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
Expand Down Expand Up @@ -53,26 +52,10 @@ public class IcebergSchema {
private final List<IcebergDataField> fields;

public static IcebergSchema create(TableSchema tableSchema) {
int bias;
if (new CoreOptions(tableSchema.options())
.formatType()
.equals(CoreOptions.FILE_FORMAT_PARQUET)) {
if (tableSchema.primaryKeys().isEmpty()) {
// ParquetSchemaUtil.addFallbackIds starts enumerating id from 1 instead of 0
bias = 1;
} else {
// data files start with trimmed primary keys + sequence number + value kind
// also ParquetSchemaUtil.addFallbackIds starts enumerating id from 1 instead of 0
bias = tableSchema.trimmedPrimaryKeys().size() + 3;
}
} else {
bias = 0;
}

return new IcebergSchema(
(int) tableSchema.id(),
tableSchema.fields().stream()
.map(f -> new IcebergDataField(f, bias))
.map(IcebergDataField::new)
.collect(Collectors.toList()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
import static org.apache.paimon.CoreOptions.STREAMING_READ_OVERWRITE;
import static org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP;
import static org.apache.paimon.table.SystemFields.KEY_FIELD_PREFIX;
import static org.apache.paimon.table.SystemFields.SYSTEM_FIELD_NAMES;
import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX;
import static org.apache.paimon.table.SpecialFields.SYSTEM_FIELD_NAMES;
import static org.apache.paimon.types.DataTypeRoot.ARRAY;
import static org.apache.paimon.types.DataTypeRoot.MAP;
import static org.apache.paimon.types.DataTypeRoot.MULTISET;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.paimon.table.SystemFields.KEY_FIELD_ID_START;
import static org.apache.paimon.table.SystemFields.KEY_FIELD_PREFIX;
import static org.apache.paimon.table.SpecialFields.KEY_FIELD_ID_START;
import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX;

/** Utils for creating changelog table with primary keys. */
public class PrimaryKeyTableUtils {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.SystemFields;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.InnerTableRead;
Expand Down Expand Up @@ -137,7 +137,7 @@ public String name() {
@Override
public RowType rowType() {
List<DataField> fields = new ArrayList<>();
fields.add(SystemFields.ROW_KIND);
fields.add(SpecialFields.ROW_KIND);
fields.addAll(wrapped.rowType().getFields());
return new RowType(fields);
}
Expand Down Expand Up @@ -566,7 +566,7 @@ public InnerTableRead withReadType(RowType readType) {
boolean rowKindAppeared = false;
for (int i = 0; i < fields.size(); i++) {
String fieldName = fields.get(i).name();
if (fieldName.equals(SystemFields.ROW_KIND.name())) {
if (fieldName.equals(SpecialFields.ROW_KIND.name())) {
rowKindAppeared = true;
readProjection[i] = -1;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.statistics.SimpleColStatsCollector;
import org.apache.paimon.statistics.TruncateSimpleColStatsCollector;
import org.apache.paimon.table.SystemFields;
import org.apache.paimon.table.SpecialFields;

import java.util.List;

Expand All @@ -47,7 +47,7 @@ public static SimpleColStatsCollector.Factory[] createStatsFactories(
.noDefaultValue());
if (fieldMode != null) {
modes[i] = SimpleColStatsCollector.from(fieldMode);
} else if (SystemFields.isSystemField(field)) {
} else if (SpecialFields.isSystemField(field)) {
modes[i] = () -> new TruncateSimpleColStatsCollector(128);
} else {
modes[i] = SimpleColStatsCollector.from(cfg.get(CoreOptions.METADATA_STATS_MODE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.SchemaEvolutionTableTestBase;
import org.apache.paimon.table.SystemFields;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
Expand Down Expand Up @@ -95,9 +95,9 @@ public class TestKeyValueGenerator {
public static final RowType KEY_TYPE =
RowType.of(
new DataField(
2 + SystemFields.KEY_FIELD_ID_START, "key_shopId", new IntType(false)),
2 + SpecialFields.KEY_FIELD_ID_START, "key_shopId", new IntType(false)),
new DataField(
3 + SystemFields.KEY_FIELD_ID_START,
3 + SpecialFields.KEY_FIELD_ID_START,
"key_orderId",
new BigIntType(false)));

Expand Down Expand Up @@ -393,7 +393,7 @@ public List<DataField> keyFields(TableSchema schema) {
.map(
f ->
new DataField(
f.id() + SystemFields.KEY_FIELD_ID_START,
f.id() + SpecialFields.KEY_FIELD_ID_START,
"key_" + f.name(),
f.type(),
f.description()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@
import java.util.UUID;
import java.util.function.Consumer;

import static org.apache.paimon.table.SystemFields.KEY_FIELD_PREFIX;
import static org.apache.paimon.table.SystemFields.SYSTEM_FIELD_NAMES;
import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX;
import static org.apache.paimon.table.SpecialFields.SYSTEM_FIELD_NAMES;
import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,7 +595,7 @@ public void testCalcDataBytesSend() throws Exception {
table, commit, Committer.createContext("", metricGroup, true, false, null));
committer.commit(Collections.singletonList(manifestCommittable));
CommitterMetrics metrics = committer.getCommitterMetrics();
assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(529);
assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(533);
assertThat(metrics.getNumRecordsOutCounter().getCount()).isEqualTo(2);
committer.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

package org.apache.paimon.format.parquet;

import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.LocalZonedTimestampType;
Expand Down Expand Up @@ -53,11 +56,7 @@ public class ParquetSchemaConverter {
static final String LIST_ELEMENT_NAME = "element";

public static MessageType convertToParquetMessageType(String name, RowType rowType) {
Type[] types = new Type[rowType.getFieldCount()];
for (int i = 0; i < rowType.getFieldCount(); i++) {
types[i] = convertToParquetType(rowType.getFieldNames().get(i), rowType.getTypeAt(i));
}
return new MessageType(name, types);
return new MessageType(name, convertToParquetTypes(rowType));
}

public static Type convertToParquetType(String name, DataType type) {
Expand Down Expand Up @@ -197,8 +196,47 @@ private static Type createTimestampWithLogicalType(

private static List<Type> convertToParquetTypes(RowType rowType) {
List<Type> types = new ArrayList<>(rowType.getFieldCount());
for (int i = 0; i < rowType.getFieldCount(); i++) {
types.add(convertToParquetType(rowType.getFieldNames().get(i), rowType.getTypeAt(i)));
for (DataField field : rowType.getFields()) {
Type parquetType = convertToParquetType(field.name(), field.type());
Type typeWithId = parquetType.withId(field.id());
if (field.type().getTypeRoot() == DataTypeRoot.ARRAY) {
GroupType groupType = (GroupType) parquetType;
GroupType wrapperType = (GroupType) groupType.getFields().get(0);
Type elementTypeWithId =
wrapperType
.getFields()
.get(0)
.withId(SpecialFields.getArrayElementFieldId(field.id()));
typeWithId =
ConversionPatterns.listOfElements(
groupType.getRepetition(),
groupType.getName(),
elementTypeWithId)
.withId(field.id());
} else if (field.type().getTypeRoot() == DataTypeRoot.MAP
|| field.type().getTypeRoot() == DataTypeRoot.MULTISET) {
GroupType groupType = (GroupType) parquetType;
GroupType wrapperType = (GroupType) groupType.getFields().get(0);
Type keyTypeWithId =
wrapperType
.getFields()
.get(0)
.withId(SpecialFields.getMapKeyFieldId(field.id()));
Type valueTypeWithId =
wrapperType
.getFields()
.get(1)
.withId(SpecialFields.getMapValueFieldId(field.id()));
typeWithId =
ConversionPatterns.mapType(
groupType.getRepetition(),
groupType.getName(),
MAP_REPEATED_NAME,
keyTypeWithId,
valueTypeWithId)
.withId(field.id());
}
types.add(typeWithId);
}
return types;
}
Expand Down
Loading

0 comments on commit 20790af

Please sign in to comment.