Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Fix partition column generate wrong partition spec #4349

Merged
merged 2 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,12 @@
<td>Duration</td>
<td>The expiration interval of a partition. A partition will be expired if it‘s lifetime is over this value. Partition time is extracted from the partition value.</td>
</tr>
<tr>
<td><h5>partition.legacy-name</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>The legacy partition name is using `toString` fpr all types. If false, using cast to string for all types.</td>
</tr>
<tr>
<td><h5>partition.mark-done-action</h5></td>
<td style="word-wrap: break-word;">"success-file"</td>
Expand Down
12 changes: 12 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,14 @@ public class CoreOptions implements Serializable {
"The default partition name in case the dynamic partition"
+ " column value is null/empty string.");

public static final ConfigOption<Boolean> PARTITION_GENERATE_LEGCY_NAME =
key("partition.legacy-name")
.booleanType()
.defaultValue(true)
.withDescription(
"The legacy partition name is using `toString` fpr all types. If false, using "
+ "cast to string for all types.");

public static final ConfigOption<Integer> SNAPSHOT_NUM_RETAINED_MIN =
key("snapshot.num-retained.min")
.intType()
Expand Down Expand Up @@ -1531,6 +1539,10 @@ public String partitionDefaultName() {
return options.get(PARTITION_DEFAULT_NAME);
}

public boolean legacyPartitionName() {
return options.get(PARTITION_GENERATE_LEGCY_NAME);
}

public boolean sortBySize() {
return options.get(SORT_RANG_STRATEGY) == RangeStrategy.SIZE;
}
Expand Down
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move cast related files from core to common due to the InternalRowPartitionComputer is at common.

File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;

import java.util.Arrays;
Expand All @@ -35,11 +36,18 @@ public class InternalRowPartitionComputer {
protected final String defaultPartValue;
protected final String[] partitionColumns;
protected final InternalRow.FieldGetter[] partitionFieldGetters;
protected final List<DataType> types;
protected final boolean legacyPartitionName;

public InternalRowPartitionComputer(
String defaultPartValue, RowType rowType, String[] partitionColumns) {
String defaultPartValue,
RowType rowType,
String[] partitionColumns,
boolean legacyPartitionName) {
this.defaultPartValue = defaultPartValue;
this.partitionColumns = partitionColumns;
this.types = rowType.getFieldTypes();
this.legacyPartitionName = legacyPartitionName;
List<String> columnList = rowType.getFieldNames();
this.partitionFieldGetters =
Arrays.stream(partitionColumns)
Expand All @@ -56,7 +64,8 @@ public LinkedHashMap<String, String> generatePartValues(InternalRow in) {

for (int i = 0; i < partitionFieldGetters.length; i++) {
Object field = partitionFieldGetters[i].getFieldOrNull(in);
String partitionValue = field != null ? field.toString() : null;
String partitionValue =
TypeUtils.castPartitionValueToString(field, types.get(i), legacyPartitionName);
if (StringUtils.isNullOrWhitespaceOnly(partitionValue)) {
partitionValue = defaultPartValue;
}
Expand Down
20 changes: 20 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.paimon.utils;

import org.apache.paimon.casting.CastExecutor;
import org.apache.paimon.casting.CastExecutors;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericArray;
Expand Down Expand Up @@ -88,6 +90,24 @@ public static RowType project(RowType inputType, List<String> names) {
.collect(Collectors.toList()));
}

public static String castPartitionValueToString(
Object value, DataType type, boolean legacyPartitionName) {
if (value == null) {
return null;
}
if (legacyPartitionName) {
return value.toString();
}

CastExecutor<Object, Object> castExecutor =
(CastExecutor<Object, Object>) CastExecutors.resolve(type, VarCharType.STRING_TYPE);
if (castExecutor == null) {
throw new UnsupportedOperationException(type + " is not supported");
}
Object result = castExecutor.cast(value);
return result == null ? null : result.toString();
}

public static Object castFromString(String s, DataType type) {
return castFromStringInternal(s, type, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ public FileStorePathFactory pathFactory() {
options.partitionDefaultName(),
options.fileFormat().getFormatIdentifier(),
options.dataFilePrefix(),
options.changelogFilePrefix());
options.changelogFilePrefix(),
options.legacyPartitionName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ private Map<String, FileStorePathFactory> format2PathFactory() {
options.partitionDefaultName(),
format,
options.dataFilePrefix(),
options.changelogFilePrefix())));
options.changelogFilePrefix(),
options.legacyPartitionName())));
return pathFactoryMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> {
protected CompactionMetrics compactionMetrics = null;
protected final String tableName;
private boolean isInsertOnly;
private boolean legacyPartitionName;

protected AbstractFileStoreWrite(
SnapshotManager snapshotManager,
Expand All @@ -97,7 +98,8 @@ protected AbstractFileStoreWrite(
String tableName,
int totalBuckets,
RowType partitionType,
int writerNumberMax) {
int writerNumberMax,
boolean legacyPartitionName) {
this.snapshotManager = snapshotManager;
this.scan = scan;
this.indexFactory = indexFactory;
Expand All @@ -107,6 +109,7 @@ protected AbstractFileStoreWrite(
this.writers = new HashMap<>();
this.tableName = tableName;
this.writerNumberMax = writerNumberMax;
this.legacyPartitionName = legacyPartitionName;
}

@Override
Expand Down Expand Up @@ -469,7 +472,8 @@ private List<DataFileMeta> scanExistingFileMetas(
? "partition "
+ getPartitionComputer(
partitionType,
PARTITION_DEFAULT_NAME.defaultValue())
PARTITION_DEFAULT_NAME.defaultValue(),
legacyPartitionName)
.generatePartValues(partition)
: "table";
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public MemoryFileStoreWrite(
tableName,
options.bucket(),
partitionType,
options.writeMaxWritersToSpill());
options.writeMaxWritersToSpill(),
options.legacyPartitionName());
this.options = options;
this.cacheManager = new CacheManager(options.lookupCacheMaxMemory());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@ public FileStorePathFactory(
String defaultPartValue,
String formatIdentifier,
String dataFilePrefix,
String changelogFilePrefix) {
String changelogFilePrefix,
boolean legacyPartitionName) {
this.root = root;
this.uuid = UUID.randomUUID().toString();

this.partitionComputer = getPartitionComputer(partitionType, defaultPartValue);
this.partitionComputer =
getPartitionComputer(partitionType, defaultPartValue, legacyPartitionName);
this.formatIdentifier = formatIdentifier;
this.dataFilePrefix = dataFilePrefix;
this.changelogFilePrefix = changelogFilePrefix;
Expand All @@ -78,9 +80,10 @@ public Path root() {

@VisibleForTesting
public static InternalRowPartitionComputer getPartitionComputer(
RowType partitionType, String defaultPartValue) {
RowType partitionType, String defaultPartValue, boolean legacyPartitionName) {
String[] partitionColumns = partitionType.getFieldNames().toArray(new String[0]);
return new InternalRowPartitionComputer(defaultPartValue, partitionType, partitionColumns);
return new InternalRowPartitionComputer(
defaultPartValue, partitionType, partitionColumns, legacyPartitionName);
}

public Path newManifestFile() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String f
CoreOptions.PARTITION_DEFAULT_NAME.defaultValue(),
format,
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
FileIO fileIO = FileIOFinder.find(path);
Options options = new Options();
Expand All @@ -244,7 +245,8 @@ protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String f
CoreOptions.PARTITION_DEFAULT_NAME.defaultValue(),
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue()));
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue()));

return KeyValueFileWriterFactory.builder(
fileIO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ protected ManifestFile createManifestFile(String pathStr) {
"default",
CoreOptions.FILE_FORMAT.defaultValue(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue()),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue()),
Long.MAX_VALUE,
null)
.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ private ManifestFile createManifestFile(String pathStr) {
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
FileIO fileIO = FileIOFinder.find(path);
return new ManifestFile.Factory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ private ManifestList createManifestList(String pathStr) {
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
return new ManifestList.Factory(FileIOFinder.find(path), avro, "zstd", pathFactory, null)
.create();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public void testCreateDataFilePathFactoryWithPartition() {
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());

assertPartition("20211224", 16, pathFactory, "/dt=20211224/hr=16");
assertPartition("20211224", null, pathFactory, "/dt=20211224/hr=default");
Expand Down Expand Up @@ -124,6 +125,7 @@ public static FileStorePathFactory createNonPartFactory(Path root) {
PARTITION_DEFAULT_NAME.defaultValue(),
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1075,9 +1075,12 @@ private List<CatalogPartitionSpec> getPartitionSpecs(
getPartitionEntries(table, tablePath, partitionSpec);
org.apache.paimon.types.RowType partitionRowType = table.schema().logicalPartitionType();

CoreOptions options = new CoreOptions(table.options());
InternalRowPartitionComputer partitionComputer =
FileStorePathFactory.getPartitionComputer(
partitionRowType, new CoreOptions(table.options()).partitionDefaultName());
partitionRowType,
options.partitionDefaultName(),
options.legacyPartitionName());

return partitionEntries.stream()
.map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public static PartitionMarkDone create(
new InternalRowPartitionComputer(
coreOptions.partitionDefaultName(),
table.schema().logicalPartitionType(),
table.partitionKeys().toArray(new String[0]));
table.partitionKeys().toArray(new String[0]),
coreOptions.legacyPartitionName());

PartitionMarkDoneTrigger trigger =
PartitionMarkDoneTrigger.create(coreOptions, isRestored, stateStore);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ public TestChangelogDataReadWrite(String root) {
"default",
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
this.snapshotManager = new SnapshotManager(LocalFileIO.create(), new Path(root));
this.commitUser = UUID.randomUUID().toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,13 @@ public class HiveMetastoreClient implements MetastoreClient {
ClientPool<IMetaStoreClient, TException> clients)
throws TException, InterruptedException {
this.identifier = identifier;
CoreOptions options = new CoreOptions(schema.options());
this.partitionComputer =
new InternalRowPartitionComputer(
new CoreOptions(schema.options()).partitionDefaultName(),
options.partitionDefaultName(),
schema.logicalPartitionType(),
schema.partitionKeys().toArray(new String[0]));
schema.partitionKeys().toArray(new String[0]),
options.legacyPartitionName());

this.clients = clients;
this.sd =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.spark

import org.apache.paimon.CoreOptions
import org.apache.paimon.metastore.MetastoreClient
import org.apache.paimon.operation.FileStoreCommit
import org.apache.paimon.table.FileStoreTable
Expand All @@ -31,7 +32,6 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.SupportsAtomicPartitionManagement
import org.apache.spark.sql.types.StructType

import java.util
import java.util.{Map => JMap, Objects, UUID}

import scala.collection.JavaConverters._
Expand All @@ -52,7 +52,8 @@ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement {
val rowDataPartitionComputer = new InternalRowPartitionComputer(
fileStoreTable.coreOptions().partitionDefaultName(),
partitionRowType,
table.partitionKeys().asScala.toArray)
table.partitionKeys().asScala.toArray,
CoreOptions.fromMap(table.options()).legacyPartitionName)

rows.map {
r =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ case class DeleteFromPaimonTableCommand(
val rowDataPartitionComputer = new InternalRowPartitionComputer(
table.coreOptions().partitionDefaultName(),
table.schema().logicalPartitionType(),
table.partitionKeys.asScala.toArray
table.partitionKeys.asScala.toArray,
table.coreOptions().legacyPartitionName()
)
val dropPartitions = matchedPartitions.map {
partition => rowDataPartitionComputer.generatePartValues(partition).asScala.asJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,15 @@ private[spark] trait StreamHelper {
private lazy val partitionSchema: StructType =
SparkTypeUtils.fromPaimonRowType(TypeUtils.project(table.rowType(), table.partitionKeys()))

private lazy val partitionComputer: InternalRowPartitionComputer =
private lazy val partitionComputer: InternalRowPartitionComputer = {
val options = new CoreOptions(table.options)
new InternalRowPartitionComputer(
new CoreOptions(table.options).partitionDefaultName,
options.partitionDefaultName,
TypeUtils.project(table.rowType(), table.partitionKeys()),
table.partitionKeys().asScala.toArray
table.partitionKeys().asScala.toArray,
options.legacyPartitionName()
)
}

// Used to get the initial offset.
lazy val streamScanStartingContext: StartingContext = streamScan.startingContext()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ protected void foreachIndexReader(Consumer<FileIndexReader> consumer)
new CoreOptions(new Options()).partitionDefaultName(),
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());

Table table = fileSystemCatalog.getTable(Identifier.create("db", "T"));
ReadBuilder readBuilder = table.newReadBuilder();
Expand Down
Loading
Loading