Skip to content

Commit

Permalink
[core] Reuse partitionCastExecutors in InternalRowPartitionComputer
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Oct 23, 2024
1 parent a9f2d2c commit 92550da
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,29 @@

package org.apache.paimon.utils;

import org.apache.paimon.casting.CastExecutor;
import org.apache.paimon.casting.CastExecutors;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.InternalRow.FieldGetter;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;

import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

import static org.apache.paimon.utils.InternalRowUtils.createNullCheckingFieldGetter;
import static org.apache.paimon.utils.TypeUtils.castFromString;

/** PartitionComputer for {@link InternalRow}. */
public class InternalRowPartitionComputer {

protected final String defaultPartValue;
protected final String[] partitionColumns;
protected final InternalRow.FieldGetter[] partitionFieldGetters;
protected final FieldGetter[] partitionFieldGetters;
protected final CastExecutor[] partitionCastExecutors;
protected final List<DataType> types;
protected final boolean legacyPartitionName;

Expand All @@ -49,23 +54,32 @@ public InternalRowPartitionComputer(
this.types = rowType.getFieldTypes();
this.legacyPartitionName = legacyPartitionName;
List<String> columnList = rowType.getFieldNames();
this.partitionFieldGetters =
Arrays.stream(partitionColumns)
.mapToInt(columnList::indexOf)
.mapToObj(
i ->
InternalRowUtils.createNullCheckingFieldGetter(
rowType.getTypeAt(i), i))
.toArray(InternalRow.FieldGetter[]::new);
this.partitionFieldGetters = new FieldGetter[partitionColumns.length];
this.partitionCastExecutors = new CastExecutor[partitionColumns.length];
for (String partitionColumn : partitionColumns) {
int i = columnList.indexOf(partitionColumn);
DataType type = rowType.getTypeAt(i);
partitionFieldGetters[i] = createNullCheckingFieldGetter(type, i);
partitionCastExecutors[i] = CastExecutors.resolve(type, VarCharType.STRING_TYPE);
}
}

public LinkedHashMap<String, String> generatePartValues(InternalRow in) {
LinkedHashMap<String, String> partSpec = new LinkedHashMap<>();

for (int i = 0; i < partitionFieldGetters.length; i++) {
Object field = partitionFieldGetters[i].getFieldOrNull(in);
String partitionValue =
TypeUtils.castPartitionValueToString(field, types.get(i), legacyPartitionName);
String partitionValue = null;
if (field != null) {
if (legacyPartitionName) {
partitionValue = field.toString();
} else {
Object casted = partitionCastExecutors[i].cast(field);
if (casted != null) {
partitionValue = casted.toString();
}
}
}
if (StringUtils.isNullOrWhitespaceOnly(partitionValue)) {
partitionValue = defaultPartValue;
}
Expand All @@ -90,7 +104,7 @@ public static Map<String, Object> convertSpecToInternal(

public static String partToSimpleString(
RowType partitionType, BinaryRow partition, String delimiter, int maxLength) {
InternalRow.FieldGetter[] getters = partitionType.fieldGetters();
FieldGetter[] getters = partitionType.fieldGetters();
StringBuilder builder = new StringBuilder();
for (int i = 0; i < getters.length; i++) {
Object part = getters[i].getFieldOrNull(partition);
Expand Down
20 changes: 0 additions & 20 deletions paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.paimon.utils;

import org.apache.paimon.casting.CastExecutor;
import org.apache.paimon.casting.CastExecutors;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericArray;
Expand Down Expand Up @@ -90,24 +88,6 @@ public static RowType project(RowType inputType, List<String> names) {
.collect(Collectors.toList()));
}

public static String castPartitionValueToString(
Object value, DataType type, boolean legacyPartitionName) {
if (value == null) {
return null;
}
if (legacyPartitionName) {
return value.toString();
}

CastExecutor<Object, Object> castExecutor =
(CastExecutor<Object, Object>) CastExecutors.resolve(type, VarCharType.STRING_TYPE);
if (castExecutor == null) {
throw new UnsupportedOperationException(type + " is not supported");
}
Object result = castExecutor.cast(value);
return result == null ? null : result.toString();
}

public static Object castFromString(String s, DataType type) {
return castFromStringInternal(s, type, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ case class WriteIntoPaimonTable(
val partitionComputer = new InternalRowPartitionComputer(
coreOptions.partitionDefaultName,
TypeUtils.project(table.rowType(), table.partitionKeys()),
table.partitionKeys().asScala.toArray
table.partitionKeys().asScala.toArray,
coreOptions.legacyPartitionName()
)
val partitions = commitMessages
.map(c => c.partition())
Expand Down

0 comments on commit 92550da

Please sign in to comment.