Skip to content

Commit

Permalink
[core] Fix partition column generate wrong partition spec (apache#4349)
Browse files Browse the repository at this point in the history
  • Loading branch information
ulysses-you authored Oct 23, 2024
1 parent 576b04d commit a9f2d2c
Show file tree
Hide file tree
Showing 60 changed files with 159 additions and 31 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,12 @@
<td>Duration</td>
<td>The expiration interval of a partition. A partition will be expired if it‘s lifetime is over this value. Partition time is extracted from the partition value.</td>
</tr>
<tr>
<td><h5>partition.legacy-name</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>The legacy partition name is using `toString` fpr all types. If false, using cast to string for all types.</td>
</tr>
<tr>
<td><h5>partition.mark-done-action</h5></td>
<td style="word-wrap: break-word;">"success-file"</td>
Expand Down
12 changes: 12 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,14 @@ public class CoreOptions implements Serializable {
"The default partition name in case the dynamic partition"
+ " column value is null/empty string.");

public static final ConfigOption<Boolean> PARTITION_GENERATE_LEGCY_NAME =
key("partition.legacy-name")
.booleanType()
.defaultValue(true)
.withDescription(
"The legacy partition name is using `toString` fpr all types. If false, using "
+ "cast to string for all types.");

public static final ConfigOption<Integer> SNAPSHOT_NUM_RETAINED_MIN =
key("snapshot.num-retained.min")
.intType()
Expand Down Expand Up @@ -1539,6 +1547,10 @@ public String partitionDefaultName() {
return options.get(PARTITION_DEFAULT_NAME);
}

public boolean legacyPartitionName() {
return options.get(PARTITION_GENERATE_LEGCY_NAME);
}

public boolean sortBySize() {
return options.get(SORT_RANG_STRATEGY) == RangeStrategy.SIZE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;

import java.util.Arrays;
Expand All @@ -35,11 +36,18 @@ public class InternalRowPartitionComputer {
protected final String defaultPartValue;
protected final String[] partitionColumns;
protected final InternalRow.FieldGetter[] partitionFieldGetters;
protected final List<DataType> types;
protected final boolean legacyPartitionName;

public InternalRowPartitionComputer(
String defaultPartValue, RowType rowType, String[] partitionColumns) {
String defaultPartValue,
RowType rowType,
String[] partitionColumns,
boolean legacyPartitionName) {
this.defaultPartValue = defaultPartValue;
this.partitionColumns = partitionColumns;
this.types = rowType.getFieldTypes();
this.legacyPartitionName = legacyPartitionName;
List<String> columnList = rowType.getFieldNames();
this.partitionFieldGetters =
Arrays.stream(partitionColumns)
Expand All @@ -56,7 +64,8 @@ public LinkedHashMap<String, String> generatePartValues(InternalRow in) {

for (int i = 0; i < partitionFieldGetters.length; i++) {
Object field = partitionFieldGetters[i].getFieldOrNull(in);
String partitionValue = field != null ? field.toString() : null;
String partitionValue =
TypeUtils.castPartitionValueToString(field, types.get(i), legacyPartitionName);
if (StringUtils.isNullOrWhitespaceOnly(partitionValue)) {
partitionValue = defaultPartValue;
}
Expand Down
20 changes: 20 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.utils;

import org.apache.paimon.casting.CastExecutor;
import org.apache.paimon.casting.CastExecutors;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericArray;
Expand Down Expand Up @@ -88,6 +90,24 @@ public static RowType project(RowType inputType, List<String> names) {
.collect(Collectors.toList()));
}

public static String castPartitionValueToString(
Object value, DataType type, boolean legacyPartitionName) {
if (value == null) {
return null;
}
if (legacyPartitionName) {
return value.toString();
}

CastExecutor<Object, Object> castExecutor =
(CastExecutor<Object, Object>) CastExecutors.resolve(type, VarCharType.STRING_TYPE);
if (castExecutor == null) {
throw new UnsupportedOperationException(type + " is not supported");
}
Object result = castExecutor.cast(value);
return result == null ? null : result.toString();
}

public static Object castFromString(String s, DataType type) {
return castFromStringInternal(s, type, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public FileStorePathFactory pathFactory() {
options.partitionDefaultName(),
options.fileFormat().getFormatIdentifier(),
options.dataFilePrefix(),
options.changelogFilePrefix());
options.changelogFilePrefix(),
options.legacyPartitionName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ private Map<String, FileStorePathFactory> format2PathFactory() {
options.partitionDefaultName(),
format,
options.dataFilePrefix(),
options.changelogFilePrefix())));
options.changelogFilePrefix(),
options.legacyPartitionName())));
return pathFactoryMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
protected CompactionMetrics compactionMetrics = null;
protected final String tableName;
private boolean isInsertOnly;
private boolean legacyPartitionName;

protected AbstractFileStoreWrite(
SnapshotManager snapshotManager,
Expand All @@ -97,7 +98,8 @@ protected AbstractFileStoreWrite(
String tableName,
int totalBuckets,
RowType partitionType,
int writerNumberMax) {
int writerNumberMax,
boolean legacyPartitionName) {
this.snapshotManager = snapshotManager;
this.scan = scan;
this.indexFactory = indexFactory;
Expand All @@ -107,6 +109,7 @@ protected AbstractFileStoreWrite(
this.writers = new HashMap<>();
this.tableName = tableName;
this.writerNumberMax = writerNumberMax;
this.legacyPartitionName = legacyPartitionName;
}

@Override
Expand Down Expand Up @@ -469,7 +472,8 @@ private List<DataFileMeta> scanExistingFileMetas(
? "partition "
+ getPartitionComputer(
partitionType,
PARTITION_DEFAULT_NAME.defaultValue())
PARTITION_DEFAULT_NAME.defaultValue(),
legacyPartitionName)
.generatePartValues(partition)
: "table";
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public MemoryFileStoreWrite(
tableName,
options.bucket(),
partitionType,
options.writeMaxWritersToSpill());
options.writeMaxWritersToSpill(),
options.legacyPartitionName());
this.options = options;
this.cacheManager = new CacheManager(options.lookupCacheMaxMemory());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@ public FileStorePathFactory(
String defaultPartValue,
String formatIdentifier,
String dataFilePrefix,
String changelogFilePrefix) {
String changelogFilePrefix,
boolean legacyPartitionName) {
this.root = root;
this.uuid = UUID.randomUUID().toString();

this.partitionComputer = getPartitionComputer(partitionType, defaultPartValue);
this.partitionComputer =
getPartitionComputer(partitionType, defaultPartValue, legacyPartitionName);
this.formatIdentifier = formatIdentifier;
this.dataFilePrefix = dataFilePrefix;
this.changelogFilePrefix = changelogFilePrefix;
Expand All @@ -78,9 +80,10 @@ public Path root() {

@VisibleForTesting
public static InternalRowPartitionComputer getPartitionComputer(
RowType partitionType, String defaultPartValue) {
RowType partitionType, String defaultPartValue, boolean legacyPartitionName) {
String[] partitionColumns = partitionType.getFieldNames().toArray(new String[0]);
return new InternalRowPartitionComputer(defaultPartValue, partitionType, partitionColumns);
return new InternalRowPartitionComputer(
defaultPartValue, partitionType, partitionColumns, legacyPartitionName);
}

public Path newManifestFile() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String f
CoreOptions.PARTITION_DEFAULT_NAME.defaultValue(),
format,
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
FileIO fileIO = FileIOFinder.find(path);
Options options = new Options();
Expand All @@ -244,7 +245,8 @@ protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String f
CoreOptions.PARTITION_DEFAULT_NAME.defaultValue(),
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue()));
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue()));

return KeyValueFileWriterFactory.builder(
fileIO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ protected ManifestFile createManifestFile(String pathStr) {
"default",
CoreOptions.FILE_FORMAT.defaultValue(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue()),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue()),
Long.MAX_VALUE,
null)
.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ private ManifestFile createManifestFile(String pathStr) {
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
FileIO fileIO = FileIOFinder.find(path);
return new ManifestFile.Factory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ private ManifestList createManifestList(String pathStr) {
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
return new ManifestList.Factory(FileIOFinder.find(path), avro, "zstd", pathFactory, null)
.create();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public void testCreateDataFilePathFactoryWithPartition() {
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());

assertPartition("20211224", 16, pathFactory, "/dt=20211224/hr=16");
assertPartition("20211224", null, pathFactory, "/dt=20211224/hr=default");
Expand Down Expand Up @@ -124,6 +125,7 @@ public static FileStorePathFactory createNonPartFactory(Path root) {
PARTITION_DEFAULT_NAME.defaultValue(),
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1075,9 +1075,12 @@ private List<CatalogPartitionSpec> getPartitionSpecs(
getPartitionEntries(table, tablePath, partitionSpec);
org.apache.paimon.types.RowType partitionRowType = table.schema().logicalPartitionType();

CoreOptions options = new CoreOptions(table.options());
InternalRowPartitionComputer partitionComputer =
FileStorePathFactory.getPartitionComputer(
partitionRowType, new CoreOptions(table.options()).partitionDefaultName());
partitionRowType,
options.partitionDefaultName(),
options.legacyPartitionName());

return partitionEntries.stream()
.map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public static PartitionMarkDone create(
new InternalRowPartitionComputer(
coreOptions.partitionDefaultName(),
table.schema().logicalPartitionType(),
table.partitionKeys().toArray(new String[0]));
table.partitionKeys().toArray(new String[0]),
coreOptions.legacyPartitionName());

PartitionMarkDoneTrigger trigger =
PartitionMarkDoneTrigger.create(coreOptions, isRestored, stateStore);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ public TestChangelogDataReadWrite(String root) {
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
this.snapshotManager = new SnapshotManager(LocalFileIO.create(), new Path(root));
this.commitUser = UUID.randomUUID().toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@ public class HiveMetastoreClient implements MetastoreClient {
ClientPool<IMetaStoreClient, TException> clients)
throws TException, InterruptedException {
this.identifier = identifier;
CoreOptions options = new CoreOptions(schema.options());
this.partitionComputer =
new InternalRowPartitionComputer(
new CoreOptions(schema.options()).partitionDefaultName(),
options.partitionDefaultName(),
schema.logicalPartitionType(),
schema.partitionKeys().toArray(new String[0]));
schema.partitionKeys().toArray(new String[0]),
options.legacyPartitionName());

this.clients = clients;
this.sd =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.spark

import org.apache.paimon.CoreOptions
import org.apache.paimon.metastore.MetastoreClient
import org.apache.paimon.operation.FileStoreCommit
import org.apache.paimon.table.FileStoreTable
Expand All @@ -31,7 +32,6 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.SupportsAtomicPartitionManagement
import org.apache.spark.sql.types.StructType

import java.util
import java.util.{Map => JMap, Objects, UUID}

import scala.collection.JavaConverters._
Expand All @@ -52,7 +52,8 @@ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement {
val rowDataPartitionComputer = new InternalRowPartitionComputer(
fileStoreTable.coreOptions().partitionDefaultName(),
partitionRowType,
table.partitionKeys().asScala.toArray)
table.partitionKeys().asScala.toArray,
CoreOptions.fromMap(table.options()).legacyPartitionName)

rows.map {
r =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ case class DeleteFromPaimonTableCommand(
val rowDataPartitionComputer = new InternalRowPartitionComputer(
table.coreOptions().partitionDefaultName(),
table.schema().logicalPartitionType(),
table.partitionKeys.asScala.toArray
table.partitionKeys.asScala.toArray,
table.coreOptions().legacyPartitionName()
)
val dropPartitions = matchedPartitions.map {
partition => rowDataPartitionComputer.generatePartValues(partition).asScala.asJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,15 @@ private[spark] trait StreamHelper {
private lazy val partitionSchema: StructType =
SparkTypeUtils.fromPaimonRowType(TypeUtils.project(table.rowType(), table.partitionKeys()))

private lazy val partitionComputer: InternalRowPartitionComputer =
private lazy val partitionComputer: InternalRowPartitionComputer = {
val options = new CoreOptions(table.options)
new InternalRowPartitionComputer(
new CoreOptions(table.options).partitionDefaultName,
options.partitionDefaultName,
TypeUtils.project(table.rowType(), table.partitionKeys()),
table.partitionKeys().asScala.toArray
table.partitionKeys().asScala.toArray,
options.legacyPartitionName()
)
}

// Used to get the initial offset.
lazy val streamScanStartingContext: StartingContext = streamScan.startingContext()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ protected void foreachIndexReader(Consumer<FileIndexReader> consumer)
new CoreOptions(new Options()).partitionDefaultName(),
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());

Table table = fileSystemCatalog.getTable(Identifier.create("db", "T"));
ReadBuilder readBuilder = table.newReadBuilder();
Expand Down
Loading

0 comments on commit a9f2d2c

Please sign in to comment.