Skip to content

Commit

Permalink
[core] Add field statistics in IcebergDataFileMeta (apache#4090)
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper authored Aug 29, 2024
1 parent 7005670 commit 98071a1
Show file tree
Hide file tree
Showing 9 changed files with 314 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.source.DataSplit;
Expand All @@ -62,6 +64,7 @@
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -170,10 +173,11 @@ private void createMetadata(long snapshotId, FileChangesCollector fileChangesCol

private void createMetadataWithoutBase(long snapshotId) throws IOException {
SnapshotReader snapshotReader = table.newSnapshotReader().withSnapshot(snapshotId);
SchemaCache schemas = new SchemaCache();
Iterator<IcebergManifestEntry> entryIterator =
snapshotReader.read().dataSplits().stream()
.filter(DataSplit::rawConvertible)
.flatMap(s -> dataSplitToManifestEntries(s, snapshotId).stream())
.flatMap(s -> dataSplitToManifestEntries(s, snapshotId, schemas).stream())
.iterator();
List<IcebergManifestFileMeta> manifestFileMetas =
manifestFile.rollingWrite(entryIterator, snapshotId);
Expand Down Expand Up @@ -219,17 +223,22 @@ private void createMetadataWithoutBase(long snapshotId) throws IOException {
}

private List<IcebergManifestEntry> dataSplitToManifestEntries(
DataSplit dataSplit, long snapshotId) {
DataSplit dataSplit, long snapshotId, SchemaCache schemas) {
List<IcebergManifestEntry> result = new ArrayList<>();
for (RawFile rawFile : dataSplit.convertToRawFiles().get()) {
List<RawFile> rawFiles = dataSplit.convertToRawFiles().get();
for (int i = 0; i < dataSplit.dataFiles().size(); i++) {
DataFileMeta paimonFileMeta = dataSplit.dataFiles().get(i);
RawFile rawFile = rawFiles.get(i);
IcebergDataFileMeta fileMeta =
new IcebergDataFileMeta(
IcebergDataFileMeta.create(
IcebergDataFileMeta.Content.DATA,
rawFile.path(),
rawFile.format(),
dataSplit.partition(),
rawFile.rowCount(),
rawFile.fileSize());
rawFile.fileSize(),
schemas.get(paimonFileMeta.schemaId()),
paimonFileMeta.valueStats());
result.add(
new IcebergManifestEntry(
IcebergManifestEntry.Status.ADDED,
Expand Down Expand Up @@ -414,24 +423,28 @@ private List<IcebergManifestFileMeta> createNewlyAddedManifestFileMetas(
return Collections.emptyList();
}

SchemaCache schemas = new SchemaCache();
return manifestFile.rollingWrite(
addedFiles.entrySet().stream()
.map(
e -> {
IcebergDataFileMeta fileMeta =
new IcebergDataFileMeta(
DataFileMeta paimonFileMeta = e.getValue().getRight();
IcebergDataFileMeta icebergFileMeta =
IcebergDataFileMeta.create(
IcebergDataFileMeta.Content.DATA,
e.getKey(),
e.getValue().getRight().fileFormat(),
paimonFileMeta.fileFormat(),
e.getValue().getLeft(),
e.getValue().getRight().rowCount(),
e.getValue().getRight().fileSize());
paimonFileMeta.rowCount(),
paimonFileMeta.fileSize(),
schemas.get(paimonFileMeta.schemaId()),
paimonFileMeta.valueStats());
return new IcebergManifestEntry(
IcebergManifestEntry.Status.ADDED,
currentSnapshotId,
currentSnapshotId,
currentSnapshotId,
fileMeta);
icebergFileMeta);
})
.iterator(),
currentSnapshotId);
Expand Down Expand Up @@ -660,4 +673,18 @@ private void expireAllBefore(long snapshotId) throws IOException {
table.fileIO().deleteQuietly(path);
}
}

// -------------------------------------------------------------------------------------
// Utils
// -------------------------------------------------------------------------------------

private class SchemaCache {

SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location());
Map<Long, TableSchema> tableSchemas = new HashMap<>();

private TableSchema get(long schemaId) {
return tableSchemas.computeIfAbsent(schemaId, id -> schemaManager.schema(id));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,20 @@
package org.apache.paimon.iceberg.manifest;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericMap;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
Expand Down Expand Up @@ -69,20 +77,72 @@ public static Content fromId(int id) {
private final BinaryRow partition;
private final long recordCount;
private final long fileSizeInBytes;
private final InternalMap nullValueCounts;
private final InternalMap lowerBounds;
private final InternalMap upperBounds;

public IcebergDataFileMeta(
IcebergDataFileMeta(
Content content,
String filePath,
String fileFormat,
BinaryRow partition,
long recordCount,
long fileSizeInBytes) {
long fileSizeInBytes,
InternalMap nullValueCounts,
InternalMap lowerBounds,
InternalMap upperBounds) {
this.content = content;
this.filePath = filePath;
this.fileFormat = fileFormat;
this.partition = partition;
this.recordCount = recordCount;
this.fileSizeInBytes = fileSizeInBytes;
this.nullValueCounts = nullValueCounts;
this.lowerBounds = lowerBounds;
this.upperBounds = upperBounds;
}

public static IcebergDataFileMeta create(
Content content,
String filePath,
String fileFormat,
BinaryRow partition,
long recordCount,
long fileSizeInBytes,
TableSchema tableSchema,
SimpleStats stats) {
Map<Integer, Long> nullValueCounts = new HashMap<>();
Map<Integer, byte[]> lowerBounds = new HashMap<>();
Map<Integer, byte[]> upperBounds = new HashMap<>();

List<InternalRow.FieldGetter> fieldGetters = new ArrayList<>();
int numFields = tableSchema.fields().size();
for (int i = 0; i < numFields; i++) {
fieldGetters.add(InternalRow.createFieldGetter(tableSchema.fields().get(i).type(), i));
}

for (int i = 0; i < numFields; i++) {
int fieldId = tableSchema.fields().get(i).id();
DataType type = tableSchema.fields().get(i).type();
nullValueCounts.put(fieldId, stats.nullCounts().getLong(i));
Object minValue = fieldGetters.get(i).getFieldOrNull(stats.minValues());
Object maxValue = fieldGetters.get(i).getFieldOrNull(stats.maxValues());
if (minValue != null && maxValue != null) {
lowerBounds.put(fieldId, IcebergConversions.toByteBuffer(type, minValue).array());
upperBounds.put(fieldId, IcebergConversions.toByteBuffer(type, maxValue).array());
}
}

return new IcebergDataFileMeta(
content,
filePath,
fileFormat,
partition,
recordCount,
fileSizeInBytes,
new GenericMap(nullValueCounts),
new GenericMap(lowerBounds),
new GenericMap(upperBounds));
}

public Content content() {
Expand All @@ -109,6 +169,18 @@ public long fileSizeInBytes() {
return fileSizeInBytes;
}

public InternalMap nullValueCounts() {
return nullValueCounts;
}

public InternalMap lowerBounds() {
return lowerBounds;
}

public InternalMap upperBounds() {
return upperBounds;
}

public static RowType schema(RowType partitionType) {
List<DataField> fields = new ArrayList<>();
fields.add(new DataField(134, "content", DataTypes.INT().notNull()));
Expand All @@ -117,6 +189,21 @@ public static RowType schema(RowType partitionType) {
fields.add(new DataField(102, "partition", partitionType));
fields.add(new DataField(103, "record_count", DataTypes.BIGINT().notNull()));
fields.add(new DataField(104, "file_size_in_bytes", DataTypes.BIGINT().notNull()));
fields.add(
new DataField(
110,
"null_value_counts",
DataTypes.MAP(DataTypes.INT().notNull(), DataTypes.BIGINT().notNull())));
fields.add(
new DataField(
125,
"lower_bounds",
DataTypes.MAP(DataTypes.INT().notNull(), DataTypes.BYTES().notNull())));
fields.add(
new DataField(
128,
"upper_bounds",
DataTypes.MAP(DataTypes.INT().notNull(), DataTypes.BYTES().notNull())));
return new RowType(fields);
}

Expand All @@ -134,11 +221,23 @@ public boolean equals(Object o) {
&& fileSizeInBytes == that.fileSizeInBytes
&& Objects.equals(filePath, that.filePath)
&& Objects.equals(fileFormat, that.fileFormat)
&& Objects.equals(partition, that.partition);
&& Objects.equals(partition, that.partition)
&& Objects.equals(nullValueCounts, that.nullValueCounts)
&& Objects.equals(lowerBounds, that.lowerBounds)
&& Objects.equals(upperBounds, that.upperBounds);
}

@Override
public int hashCode() {
return Objects.hash(content, filePath, fileFormat, partition, recordCount, fileSizeInBytes);
return Objects.hash(
content,
filePath,
fileFormat,
partition,
recordCount,
fileSizeInBytes,
nullValueCounts,
lowerBounds,
upperBounds);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalMapSerializer;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ObjectSerializer;

Expand All @@ -31,10 +33,17 @@ public class IcebergDataFileMetaSerializer extends ObjectSerializer<IcebergDataF
private static final long serialVersionUID = 1L;

private final InternalRowSerializer partSerializer;
private final InternalMapSerializer nullValueCountsSerializer;
private final InternalMapSerializer lowerBoundsSerializer;
private final InternalMapSerializer upperBoundsSerializer;

public IcebergDataFileMetaSerializer(RowType partitionType) {
super(IcebergDataFileMeta.schema(partitionType));
this.partSerializer = new InternalRowSerializer(partitionType);
this.nullValueCountsSerializer =
new InternalMapSerializer(DataTypes.INT(), DataTypes.BIGINT());
this.lowerBoundsSerializer = new InternalMapSerializer(DataTypes.INT(), DataTypes.BYTES());
this.upperBoundsSerializer = new InternalMapSerializer(DataTypes.INT(), DataTypes.BYTES());
}

@Override
Expand All @@ -45,7 +54,10 @@ public InternalRow toRow(IcebergDataFileMeta file) {
BinaryString.fromString(file.fileFormat()),
file.partition(),
file.recordCount(),
file.fileSizeInBytes());
file.fileSizeInBytes(),
file.nullValueCounts(),
file.lowerBounds(),
file.upperBounds());
}

@Override
Expand All @@ -56,6 +68,9 @@ public IcebergDataFileMeta fromRow(InternalRow row) {
row.getString(2).toString(),
partSerializer.toBinaryRow(row.getRow(3, partSerializer.getArity())).copy(),
row.getLong(4),
row.getLong(5));
row.getLong(5),
nullValueCountsSerializer.copy(row.getMap(6)),
lowerBoundsSerializer.copy(row.getMap(7)),
upperBoundsSerializer.copy(row.getMap(8)));
}
}
Loading

0 comments on commit 98071a1

Please sign in to comment.