Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduce metadata.stats-dense-store to reduce meta size for multiple columns table #4322

Merged
merged 5 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/content/flink/sql-ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ Paimon will automatically collect the statistics of the data file for speeding u
The statistics collector mode can be configured by `'metadata.stats-mode'`, by default is `'truncate(16)'`.
You can configure the field level by setting `'fields.{field_name}.stats-mode'`.

For the stats mode of `none`, we suggest that you configure `metadata.stats-dense-store` = `true`, which will
significantly reduce the storage size of the manifest.

### Field Default Value

Paimon table currently supports setting default values for fields in table properties by `'fields.item_id.default-value'`,
Expand Down
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,12 @@
<td><p>Enum</p></td>
<td>Specify the merge engine for table with primary key.<br /><br />Possible values:<ul><li>"deduplicate": De-duplicate and keep the last row.</li><li>"partial-update": Partial update non-null fields.</li><li>"aggregation": Aggregate fields with same primary key.</li><li>"first-row": De-duplicate and keep the first row.</li></ul></td>
</tr>
<tr>
<td><h5>metadata.stats-dense-store</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to store statistic densely in metadata (manifest files), which will significantly reduce the storage size of metadata when the none statistic mode is set.<br />Note, when this mode is enabled, the Paimon sdk in reading engine requires at least version 0.9.1 or 1.0.0 or higher.</td>
</tr>
<tr>
<td><h5>metadata.stats-mode</h5></td>
<td style="word-wrap: break-word;">"truncate(16)"</td>
Expand Down
22 changes: 21 additions & 1 deletion paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,7 @@ public class CoreOptions implements Serializable {
public static final String STATS_MODE_SUFFIX = "stats-mode";

public static final ConfigOption<String> METADATA_STATS_MODE =
key("metadata." + STATS_MODE_SUFFIX)
key("metadata.stats-mode")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can become more intuitive in the code.

.stringType()
.defaultValue("truncate(16)")
.withDescription(
Expand All @@ -1053,6 +1053,22 @@ public class CoreOptions implements Serializable {
+ STATS_MODE_SUFFIX))
.build());

public static final ConfigOption<Boolean> METADATA_STATS_DENSE_STORE =
key("metadata.stats-dense-store")
.booleanType()
.defaultValue(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the default value is not true?
You are worry about that many users are using old versions of Paimon sdk in their reading engine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is true

.withDescription(
Description.builder()
.text(
"Whether to store statistic densely in metadata (manifest files), which"
+ " will significantly reduce the storage size of metadata when the"
+ " none statistic mode is set.")
.linebreak()
.text(
"Note, when this mode is enabled, the Paimon sdk in reading engine requires"
+ " at least version 0.9.1 or 1.0.0 or higher.")
.build());

public static final ConfigOption<String> COMMIT_CALLBACKS =
key("commit.callbacks")
.stringType()
Expand Down Expand Up @@ -2233,6 +2249,10 @@ public boolean asyncFileWrite() {
return options.get(ASYNC_FILE_WRITE);
}

public boolean statsDenseStore() {
return options.get(METADATA_STATS_DENSE_STORE);
}

/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
*/
public class SimpleColStats {

public static final SimpleColStats NONE = new SimpleColStats(null, null, null);

@Nullable private final Object min;
@Nullable private final Object max;
private final Long nullCount;
Expand All @@ -58,6 +60,10 @@ public Long nullCount() {
return nullCount;
}

public boolean isNone() {
return min == null && max == null && nullCount == null;
}

@Override
public boolean equals(Object o) {
if (!(o instanceof SimpleColStats)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ public void collect(Object field, Serializer<Object> fieldSerializer) {}

@Override
public SimpleColStats result() {
return new SimpleColStats(null, null, null);
return SimpleColStats.NONE;
}

@Override
public SimpleColStats convert(SimpleColStats source) {
return new SimpleColStats(null, null, null);
return SimpleColStats.NONE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,8 @@ public class SystemFields {
public static boolean isSystemField(int fieldId) {
return fieldId >= SYSTEM_FIELD_ID_START;
}

public static boolean isSystemField(String field) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not add KEY_FIELD_PREFIX to SYSTEM_FIELD_NAMES?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What to solve?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If KEY_FIELD_PREFIX is not in SYSTEM_FIELD_NAMES, then the funtion name "isSystemField" is inappropriate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So add KEY_FIELD_PREFIX to SYSTEM_FIELD_NAMES, what problem can be solved? Can you write a ut demo?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A solution is using a SYSTEM_FIELD_PREFIXS, and always using starWith. But it is not good for performance.

Let it go now.

return field.startsWith(KEY_FIELD_PREFIX) || SYSTEM_FIELD_NAMES.contains(field);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;

import javax.annotation.Nullable;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
Expand Down Expand Up @@ -198,7 +200,11 @@ public static Object get(DataGetters dataGetters, int pos, DataType fieldType) {
}
}

public static InternalArray toStringArrayData(List<String> list) {
public static InternalArray toStringArrayData(@Nullable List<String> list) {
if (list == null) {
return null;
}

return new GenericArray(list.stream().map(BinaryString::fromString).toArray());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.DataType;

/**
* An implementation of {@link InternalArray} which provides a projected view of the underlying
* {@link InternalArray}.
*
* <p>Projection includes both reducing the accessible fields and reordering them.
*
* <p>Note: This class supports only top-level projections, not nested projections.
*/
public class ProjectedArray implements InternalArray {

private final int[] indexMapping;

private InternalArray array;

private ProjectedArray(int[] indexMapping) {
this.indexMapping = indexMapping;
}

/**
* Replaces the underlying {@link InternalArray} backing this {@link ProjectedArray}.
*
* <p>This method replaces the row data in place and does not return a new object. This is done
* for performance reasons.
*/
public ProjectedArray replaceArray(InternalArray array) {
this.array = array;
return this;
}

// ---------------------------------------------------------------------------------------------

@Override
public int size() {
return indexMapping.length;
}

@Override
public boolean isNullAt(int pos) {
if (indexMapping[pos] < 0) {
return true;
}
return array.isNullAt(indexMapping[pos]);
}

@Override
public boolean getBoolean(int pos) {
return array.getBoolean(indexMapping[pos]);
}

@Override
public byte getByte(int pos) {
return array.getByte(indexMapping[pos]);
}

@Override
public short getShort(int pos) {
return array.getShort(indexMapping[pos]);
}

@Override
public int getInt(int pos) {
return array.getInt(indexMapping[pos]);
}

@Override
public long getLong(int pos) {
return array.getLong(indexMapping[pos]);
}

@Override
public float getFloat(int pos) {
return array.getFloat(indexMapping[pos]);
}

@Override
public double getDouble(int pos) {
return array.getDouble(indexMapping[pos]);
}

@Override
public BinaryString getString(int pos) {
return array.getString(indexMapping[pos]);
}

@Override
public Decimal getDecimal(int pos, int precision, int scale) {
return array.getDecimal(indexMapping[pos], precision, scale);
}

@Override
public Timestamp getTimestamp(int pos, int precision) {
return array.getTimestamp(indexMapping[pos], precision);
}

@Override
public byte[] getBinary(int pos) {
return array.getBinary(indexMapping[pos]);
}

@Override
public InternalArray getArray(int pos) {
return array.getArray(indexMapping[pos]);
}

@Override
public InternalMap getMap(int pos) {
return array.getMap(indexMapping[pos]);
}

@Override
public InternalRow getRow(int pos, int numFields) {
return array.getRow(indexMapping[pos], numFields);
}

@Override
public boolean equals(Object o) {
throw new UnsupportedOperationException("Projected row data cannot be compared");
}

@Override
public int hashCode() {
throw new UnsupportedOperationException("Projected row data cannot be hashed");
}

@Override
public String toString() {
throw new UnsupportedOperationException("Projected row data cannot be toString");
}

/**
* Create an empty {@link ProjectedArray} starting from a {@code projection} array.
*
* <p>The array represents the mapping of the fields of the original {@link DataType}. For
* example, {@code [0, 2, 1]} specifies to include in the following order the 1st field, the 3rd
* field and the 2nd field of the row.
*
* @see Projection
* @see ProjectedArray
*/
public static ProjectedArray from(int[] projection) {
return new ProjectedArray(projection);
}

@Override
public boolean[] toBooleanArray() {
throw new UnsupportedOperationException();
}

@Override
public byte[] toByteArray() {
throw new UnsupportedOperationException();
}

@Override
public short[] toShortArray() {
throw new UnsupportedOperationException();
}

@Override
public int[] toIntArray() {
throw new UnsupportedOperationException();
}

@Override
public long[] toLongArray() {
throw new UnsupportedOperationException();
}

@Override
public float[] toFloatArray() {
throw new UnsupportedOperationException();
}

@Override
public double[] toDoubleArray() {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner {
private final IOFunction<List<DataFileMeta>, RecordReaderIterator<InternalRow>> bucketFileRead;
private final boolean forceCompact;
private final boolean asyncFileWrite;
private final boolean statsDenseStore;
private final List<DataFileMeta> newFiles;
private final List<DataFileMeta> deletedFiles;
private final List<DataFileMeta> compactBefore;
Expand Down Expand Up @@ -111,7 +112,8 @@ public AppendOnlyWriter(
SimpleColStatsCollector.Factory[] statsCollectors,
MemorySize maxDiskSize,
FileIndexOptions fileIndexOptions,
boolean asyncFileWrite) {
boolean asyncFileWrite,
boolean statsDenseStore) {
this.fileIO = fileIO;
this.schemaId = schemaId;
this.fileFormat = fileFormat;
Expand All @@ -122,6 +124,7 @@ public AppendOnlyWriter(
this.bucketFileRead = bucketFileRead;
this.forceCompact = forceCompact;
this.asyncFileWrite = asyncFileWrite;
this.statsDenseStore = statsDenseStore;
this.newFiles = new ArrayList<>();
this.deletedFiles = new ArrayList<>();
this.compactBefore = new ArrayList<>();
Expand Down Expand Up @@ -286,7 +289,8 @@ private RowDataRollingFileWriter createRollingRowWriter() {
statsCollectors,
fileIndexOptions,
FileSource.APPEND,
asyncFileWrite);
asyncFileWrite,
statsDenseStore);
}

private void trySyncLatestCompaction(boolean blocking)
Expand Down
Loading
Loading