From 92550da4c41a57bb14a30ae8d2d2f49e0e9462bb Mon Sep 17 00:00:00 2001 From: Jingsong Date: Wed, 23 Oct 2024 16:23:00 +0200 Subject: [PATCH] [core] Reuse partitionCastExecutors in InternalRowPartitionComputer --- .../utils/InternalRowPartitionComputer.java | 40 +++++++++++++------ .../org/apache/paimon/utils/TypeUtils.java | 20 ---------- .../spark/commands/WriteIntoPaimonTable.scala | 3 +- 3 files changed, 29 insertions(+), 34 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java index 1bc8eecc88d1..8de2720a26e9 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java @@ -18,16 +18,20 @@ package org.apache.paimon.utils; +import org.apache.paimon.casting.CastExecutor; +import org.apache.paimon.casting.CastExecutors; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalRow.FieldGetter; import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VarCharType; -import java.util.Arrays; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import static org.apache.paimon.utils.InternalRowUtils.createNullCheckingFieldGetter; import static org.apache.paimon.utils.TypeUtils.castFromString; /** PartitionComputer for {@link InternalRow}. */ @@ -35,7 +39,8 @@ public class InternalRowPartitionComputer { protected final String defaultPartValue; protected final String[] partitionColumns; - protected final InternalRow.FieldGetter[] partitionFieldGetters; + protected final FieldGetter[] partitionFieldGetters; + protected final CastExecutor[] partitionCastExecutors; protected final List types; protected final boolean legacyPartitionName; @@ -49,14 +54,14 @@ public InternalRowPartitionComputer( this.types = rowType.getFieldTypes(); this.legacyPartitionName = legacyPartitionName; List columnList = rowType.getFieldNames(); - this.partitionFieldGetters = - Arrays.stream(partitionColumns) - .mapToInt(columnList::indexOf) - .mapToObj( - i -> - InternalRowUtils.createNullCheckingFieldGetter( - rowType.getTypeAt(i), i)) - .toArray(InternalRow.FieldGetter[]::new); + this.partitionFieldGetters = new FieldGetter[partitionColumns.length]; + this.partitionCastExecutors = new CastExecutor[partitionColumns.length]; + for (String partitionColumn : partitionColumns) { + int i = columnList.indexOf(partitionColumn); + DataType type = rowType.getTypeAt(i); + partitionFieldGetters[i] = createNullCheckingFieldGetter(type, i); + partitionCastExecutors[i] = CastExecutors.resolve(type, VarCharType.STRING_TYPE); + } } public LinkedHashMap generatePartValues(InternalRow in) { @@ -64,8 +69,17 @@ public LinkedHashMap generatePartValues(InternalRow in) { for (int i = 0; i < partitionFieldGetters.length; i++) { Object field = partitionFieldGetters[i].getFieldOrNull(in); - String partitionValue = - TypeUtils.castPartitionValueToString(field, types.get(i), legacyPartitionName); + String partitionValue = null; + if (field != null) { + if (legacyPartitionName) { + partitionValue = field.toString(); + } else { + Object casted = partitionCastExecutors[i].cast(field); + if (casted != null) { + partitionValue = casted.toString(); + } + } + } if (StringUtils.isNullOrWhitespaceOnly(partitionValue)) { partitionValue = defaultPartValue; } @@ -90,7 +104,7 @@ public static Map convertSpecToInternal( public static String partToSimpleString( RowType partitionType, BinaryRow partition, String delimiter, int maxLength) { - InternalRow.FieldGetter[] getters = partitionType.fieldGetters(); + FieldGetter[] getters = partitionType.fieldGetters(); StringBuilder builder = new StringBuilder(); for (int i = 0; i < getters.length; i++) { Object part = getters[i].getFieldOrNull(partition); diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java index 046e3d4b3e67..8d8d84953da7 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java @@ -18,8 +18,6 @@ package org.apache.paimon.utils; -import org.apache.paimon.casting.CastExecutor; -import org.apache.paimon.casting.CastExecutors; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericArray; @@ -90,24 +88,6 @@ public static RowType project(RowType inputType, List names) { .collect(Collectors.toList())); } - public static String castPartitionValueToString( - Object value, DataType type, boolean legacyPartitionName) { - if (value == null) { - return null; - } - if (legacyPartitionName) { - return value.toString(); - } - - CastExecutor castExecutor = - (CastExecutor) CastExecutors.resolve(type, VarCharType.STRING_TYPE); - if (castExecutor == null) { - throw new UnsupportedOperationException(type + " is not supported"); - } - Object result = castExecutor.cast(value); - return result == null ? null : result.toString(); - } - public static Object castFromString(String s, DataType type) { return castFromStringInternal(s, type, false); } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala index eda39ae1e02d..27d9a0786a56 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala @@ -78,7 +78,8 @@ case class WriteIntoPaimonTable( val partitionComputer = new InternalRowPartitionComputer( coreOptions.partitionDefaultName, TypeUtils.project(table.rowType(), table.partitionKeys()), - table.partitionKeys().asScala.toArray + table.partitionKeys().asScala.toArray, + coreOptions.legacyPartitionName() ) val partitions = commitMessages .map(c => c.partition())