Skip to content

Commit

Permalink
range based size
Browse files Browse the repository at this point in the history
  • Loading branch information
wg1026688210 committed Jan 29, 2024
1 parent 9abfbd1 commit 89b1867
Show file tree
Hide file tree
Showing 8 changed files with 753 additions and 47 deletions.
7 changes: 7 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,13 @@
<td>Duration</td>
<td>In watermarking, if a source remains idle beyond the specified timeout duration, it triggers snapshot advancement and facilitates tag creation.</td>
</tr>
<tr>
<td><h5>sort-compaction.range-strategy</h5></td>
<td style="word-wrap: break-word;">QUANTITY</td>
<td><p>Enum</p></td>
<td>The range strategy of sort compaction, the default value is quantity.
If the data size allocated for the sorting task is uneven,which may lead to performance bottlenecks, the config can be set to size.<br /><br />Possible values:<ul><li>"SIZE"</li><li>"QUANTITY"</li></ul></td>
</tr>
<tr>
<td><h5>sort-engine</h5></td>
<td style="word-wrap: break-word;">loser-tree</td>
Expand Down
19 changes: 19 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,15 @@ public class CoreOptions implements Serializable {
.defaultValue(MemorySize.ofMebiBytes(10))
.withDescription("The threshold for read file async.");

public static final ConfigOption<RangeStrategy> SORT_RANG_STRATEGY =
key("sort-compaction.range-strategy")
.enumType(RangeStrategy.class)
.defaultValue(RangeStrategy.QUANTITY)
.withDescription(
"The range strategy of sort compaction, the default value is quantity.\n"
+ "If the data size allocated for the sorting task is uneven,which may lead to performance bottlenecks, "
+ "the config can be set to size.");

private final Options options;

public CoreOptions(Map<String, String> options) {
Expand Down Expand Up @@ -1144,6 +1153,10 @@ public String partitionDefaultName() {
return options.get(PARTITION_DEFAULT_NAME);
}

public boolean sortBySize() {
return options.get(SORT_RANG_STRATEGY) == RangeStrategy.SIZE;
}

public static FileFormat createFileFormat(
Options options, ConfigOption<FileFormatType> formatOption) {
String formatIdentifier = options.get(formatOption).toString();
Expand Down Expand Up @@ -2217,6 +2230,12 @@ public InlineElement getDescription() {
}
}

/** Specifies range strategy. */
public enum RangeStrategy {
SIZE,
QUANTITY
}

/** Specifies the log consistency mode for table. */
public enum ConsumerMode implements DescribedEnum {
EXACTLY_ONCE(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.types;

import org.apache.paimon.data.DataGetters;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;

import java.util.List;
import java.util.function.BiFunction;

/** The class is to calculate the occupied space size based on Datatype. */
public class InternalRowToSizeVisitor
implements DataTypeVisitor<BiFunction<DataGetters, Integer, Integer>> {

public static final int NULL_SIZE = 0;

@Override
public BiFunction<DataGetters, Integer, Integer> visit(CharType charType) {
return (row, index) -> {
if (row.isNullAt(index)) {
return NULL_SIZE;
} else {
return row.getString(index).toBytes().length;
}
};
}

@Override
public BiFunction<DataGetters, Integer, Integer> visit(VarCharType varCharType) {
return (row, index) -> {
if (row.isNullAt(index)) {
return NULL_SIZE;
} else {
return row.getString(index).toBytes().length;
}
};
}

@Override
public BiFunction<DataGetters, Integer, Integer> visit(BooleanType booleanType) {
return (row, index) -> {
if (row.isNullAt(index)) {
return NULL_SIZE;
} else {
return 1;
}
};
}

@Override
public BiFunction<DataGetters, Integer, Integer> visit(BinaryType binaryType) {
return (row, index) -> {
if (row.isNullAt(index)) {
return NULL_SIZE;
} else {
return row.getBinary(index).length;
}
};
}

@Override
public BiFunction<DataGetters, Integer, Integer> visit(VarBinaryType varBinaryType) {
return (row, index) -> {
if (row.isNullAt(index)) {
return NULL_SIZE;
} else {
return row.getBinary(index).length;
}
};
}

@Override
public BiFunction<DataGetters, Integer, Integer> visit(DecimalType decimalType) {
return (row, index) -> {
if (row.isNullAt(index)) {
return NULL_SIZE;
} else {
return row.getDecimal(index, decimalType.getPrecision(), decimalType.getScale())
.toUnscaledBytes()
.length;
}
};
}

@Override
public BiFunction<DataGetters, Integer, Integer> visit(TinyIntType tinyIntType) {
return (row, index) -> {
if (row.isNullAt(index)) {
return NULL_SIZE;
} else {
return 1;
}
};
}

@Override
public BiFunction<DataGetters, Integer, Integer> visit(SmallIntType smallIntType) {
return (row, index) -> {
if (row.isNullAt(index)) {
return NULL_SIZE;
} else {
return 2;
}
};
}

@Override
public BiFunction<DataGetters, Integer, Integer> visit(IntType intType) {
return (row, index) -> {
if (row.isNullAt(index)) {
return NULL_SIZE;
} else {
return 4;
}
};
}

@Override
public BiFunction<DataGetters, Integer, Integer> visit(BigIntType bigIntType) {
return (row, index) -> {
if (row.isNullAt(index)) {
return NULL_SIZE;
} else {
return 8;
}
};
}

@Override
public BiFunction<DataGetters, Integer, Integer> visit(FloatType floatType) {
return (row, index) -> {
if (row.isNullAt(index)) {
return NULL_SIZE;
} else {
int x = Float.floatToIntBits(row.getFloat(index));
return 4;
}
};
}

@Override
public BiFunction<DataGetters, Integer, Integer> visit(DoubleType doubleType) {
return (row, index) -> {
if (row.isNullAt(index)) {
return NULL_SIZE;
} else {
return 8;
}
};
}

@Override
public BiFunction<DataGetters, Integer, Integer> visit(DateType dateType) {
return (row, index) -> {
if (row.isNullAt(index)) {
return NULL_SIZE;
} else {
int x = row.getInt(index);
return 4;
}
};
}

@Override
public BiFunction<DataGetters, Integer, Integer> visit(TimeType timeType) {
return (row, index) -> {
if (row.isNullAt(index)) {
return NULL_SIZE;
} else {
return 4;
}
};
}

@Override
public BiFunction<DataGetters, Integer, Integer> visit(TimestampType timestampType) {
return (row, index) -> {
if (row.isNullAt(index)) {
return NULL_SIZE;
} else {
return 8;
}
};
}

@Override
public BiFunction<DataGetters, Integer, Integer> visit(
LocalZonedTimestampType localZonedTimestampType) {
return (row, index) -> {
if (row.isNullAt(index)) {
return NULL_SIZE;
} else {
long x =
row.getTimestamp(index, localZonedTimestampType.getPrecision())
.getMillisecond();
return 8;
}
};
}

@Override
public BiFunction<DataGetters, Integer, Integer> visit(ArrayType arrayType) {
return (row, index) -> {
if (row.isNullAt(index)) {
return NULL_SIZE;
} else {
BiFunction<DataGetters, Integer, Integer> function =
arrayType.getElementType().accept(this);
InternalArray internalArray = row.getArray(index);

int size = 0;
for (int i = 0; i < internalArray.size(); i++) {
size += function.apply(internalArray, i);
}

return size;
}
};
}

@Override
public BiFunction<DataGetters, Integer, Integer> visit(MultisetType multisetType) {
return (row, index) -> {
if (row.isNullAt(index)) {
return NULL_SIZE;
} else {
BiFunction<DataGetters, Integer, Integer> function =
multisetType.getElementType().accept(this);
InternalMap map = row.getMap(index);

int size = 0;
for (int i = 0; i < map.size(); i++) {
size += function.apply(map.keyArray(), i);
}

return size;
}
};
}

@Override
public BiFunction<DataGetters, Integer, Integer> visit(MapType mapType) {
return (row, index) -> {
if (row.isNullAt(index)) {
return NULL_SIZE;
} else {

BiFunction<DataGetters, Integer, Integer> keyFunction =
mapType.getKeyType().accept(this);
BiFunction<DataGetters, Integer, Integer> valueFunction =
mapType.getValueType().accept(this);

InternalMap map = row.getMap(index);

int size = 0;
for (int i = 0; i < map.size(); i++) {
size += keyFunction.apply(map.keyArray(), i);
}

for (int i = 0; i < map.size(); i++) {
size += valueFunction.apply(map.valueArray(), i);
}

return size;
}
};
}

@Override
public BiFunction<DataGetters, Integer, Integer> visit(RowType rowType) {
return (row, index) -> {
if (row.isNullAt(index)) {
return NULL_SIZE;
} else {
int size = 0;
List<DataType> fieldTypes = rowType.getFieldTypes();
InternalRow nestRow = row.getRow(index, rowType.getFieldCount());
for (int i = 0; i < fieldTypes.size(); i++) {
DataType dataType = fieldTypes.get(i);
size += dataType.accept(this).apply(nestRow, i);
}
return size;
}
};
}
}
Loading

0 comments on commit 89b1867

Please sign in to comment.