diff --git a/docs/layouts/shortcodes/generated/catalog_configuration.html b/docs/layouts/shortcodes/generated/catalog_configuration.html index cab6e731e851..a583c74714ba 100644 --- a/docs/layouts/shortcodes/generated/catalog_configuration.html +++ b/docs/layouts/shortcodes/generated/catalog_configuration.html @@ -26,6 +26,12 @@ + +
allow-upper-case
+ (none) + Boolean + Indicates whether this catalog allow upper case, its default value depends on the implementation of the specific catalog. +
client-pool-size
2 diff --git a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java index f00a35a75094..9ef681809eab 100644 --- a/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/options/CatalogOptions.java @@ -110,4 +110,12 @@ public class CatalogOptions { TextElement.text( "\"custom\": You can implement LineageMetaFactory and LineageMeta to store lineage information in customized storage.")) .build()); + + public static final ConfigOption ALLOW_UPPER_CASE = + ConfigOptions.key("allow-upper-case") + .booleanType() + .noDefaultValue() + .withDescription( + "Indicates whether this catalog allow upper case, " + + "its default value depends on the implementation of the specific catalog."); } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java index 94ce975aa5de..aeea97ffd9f6 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java @@ -551,8 +551,8 @@ public static boolean isBlank(String str) { return true; } - public static String caseSensitiveConversion(String str, boolean caseSensitive) { - return caseSensitive ? str : str.toLowerCase(); + public static String caseSensitiveConversion(String str, boolean allowUpperCase) { + return allowUpperCase ? str : str.toLowerCase(); } public static boolean isNumeric(final CharSequence cs) { diff --git a/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutors.java b/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutors.java index 10ad114e60a7..054d6fca17c5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutors.java +++ b/paimon-core/src/main/java/org/apache/paimon/casting/CastExecutors.java @@ -45,6 +45,7 @@ public class CastExecutors { .addRule(NumericPrimitiveToDecimalCastRule.INSTANCE) .addRule(DecimalToNumericPrimitiveCastRule.INSTANCE) .addRule(NumericPrimitiveCastRule.INSTANCE) + .addRule(NumericPrimitiveToTimestamp.INSTANCE) // Boolean <-> numeric rules .addRule(BooleanToNumericCastRule.INSTANCE) .addRule(NumericToBooleanCastRule.INSTANCE) diff --git a/paimon-core/src/main/java/org/apache/paimon/casting/NumericPrimitiveToTimestamp.java b/paimon-core/src/main/java/org/apache/paimon/casting/NumericPrimitiveToTimestamp.java new file mode 100644 index 000000000000..b8c667cee13f --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/casting/NumericPrimitiveToTimestamp.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.casting; + +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeFamily; +import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.utils.DateTimeUtils; + +import java.time.ZoneId; + +/** + * {{@link DataTypeFamily#INTEGER_NUMERIC} to @link DataTypeRoot#TIMESTAMP_WITHOUT_TIME_ZONE}/{@link + * DataTypeRoot#TIMESTAMP_WITH_LOCAL_TIME_ZONE}. + */ +public class NumericPrimitiveToTimestamp extends AbstractCastRule { + + static final NumericPrimitiveToTimestamp INSTANCE = new NumericPrimitiveToTimestamp(); + + private NumericPrimitiveToTimestamp() { + super( + CastRulePredicate.builder() + .input(DataTypeFamily.NUMERIC) + .target(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE) + .target(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) + .build()); + } + + @Override + public CastExecutor create(DataType inputType, DataType targetType) { + ZoneId zoneId = + targetType.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) + ? ZoneId.systemDefault() + : DateTimeUtils.UTC_ZONE.toZoneId(); + switch (inputType.getTypeRoot()) { + case INTEGER: + case BIGINT: + return value -> + Timestamp.fromLocalDateTime( + DateTimeUtils.toLocalDateTime(value.longValue() * 1000, zoneId)); + default: + return null; + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index bccf2a82b825..afe64666a875 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -54,6 +54,7 @@ import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.createCommitUser; +import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE; import static org.apache.paimon.options.CatalogOptions.LINEAGE_META; import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE; @@ -129,6 +130,11 @@ protected boolean lockEnabled() { return catalogOptions.get(LOCK_ENABLED); } + @Override + public boolean allowUpperCase() { + return catalogOptions.getOptional(ALLOW_UPPER_CASE).orElse(true); + } + @Override public void createDatabase(String name, boolean ignoreIfExists, Map properties) throws DatabaseAlreadyExistException { @@ -520,8 +526,8 @@ public static void validateCaseInsensitive( } protected void validateIdentifierNameCaseInsensitive(Identifier identifier) { - validateCaseInsensitive(caseSensitive(), "Database", identifier.getDatabaseName()); - validateCaseInsensitive(caseSensitive(), "Table", identifier.getObjectName()); + validateCaseInsensitive(allowUpperCase(), "Database", identifier.getDatabaseName()); + validateCaseInsensitive(allowUpperCase(), "Table", identifier.getObjectName()); } private void validateFieldNameCaseInsensitiveInSchemaChange(List changes) { @@ -539,7 +545,7 @@ private void validateFieldNameCaseInsensitiveInSchemaChange(List c } protected void validateFieldNameCaseInsensitive(List fieldNames) { - validateCaseInsensitive(caseSensitive(), "Field", fieldNames); + validateCaseInsensitive(allowUpperCase(), "Field", fieldNames); } private void validateAutoCreateClose(Map options) { diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 2d3f52901e9c..fe8e0b68b813 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -262,10 +262,8 @@ default void alterTable(Identifier identifier, SchemaChange change, boolean igno alterTable(identifier, Collections.singletonList(change), ignoreIfNotExists); } - /** Return a boolean that indicates whether this catalog is case-sensitive. */ - default boolean caseSensitive() { - return true; - } + /** Return a boolean that indicates whether this catalog allow upper case. */ + boolean allowUpperCase(); default void repairCatalog() { throw new UnsupportedOperationException(); diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index 5f24d5cf8535..20bb99f1aa31 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -42,8 +42,8 @@ public Catalog wrapped() { } @Override - public boolean caseSensitive() { - return wrapped.caseSensitive(); + public boolean allowUpperCase() { + return wrapped.allowUpperCase(); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index 20a7f8b2c355..c2ff376019bf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -176,7 +176,7 @@ public String warehouse() { } @Override - public boolean caseSensitive() { + public boolean allowUpperCase() { return catalogOptions.get(CASE_SENSITIVE); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index 54fd6a0cea06..556c071f2e75 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -324,7 +324,7 @@ protected TableSchema getDataTableSchema(Identifier identifier, String branchNam } @Override - public boolean caseSensitive() { + public boolean allowUpperCase() { return false; } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java index b7575837471c..c6c79f4d4afd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java @@ -161,7 +161,11 @@ private List normalizePrimaryKeys(List primaryKeys) { "Cannot define primary key on DDL and table options at the same time."); } String pk = options.get(CoreOptions.PRIMARY_KEY.key()); - primaryKeys = Arrays.asList(pk.split(",")); + primaryKeys = + Arrays.stream(pk.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); options.remove(CoreOptions.PRIMARY_KEY.key()); } return primaryKeys; @@ -174,7 +178,11 @@ private List normalizePartitionKeys(List partitionKeys) { "Cannot define partition on DDL and table options at the same time."); } String partitions = options.get(CoreOptions.PARTITION.key()); - partitionKeys = Arrays.asList(partitions.split(",")); + partitionKeys = + Arrays.stream(partitions.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); options.remove(CoreOptions.PARTITION.key()); } return partitionKeys; diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 4f70ac725e48..684adfe9da72 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -45,6 +45,7 @@ import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.StringUtils; import javax.annotation.Nullable; @@ -108,14 +109,14 @@ public Optional latest() { try { return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX) .reduce(Math::max) - .map(id -> schema(id)); + .map(this::schema); } catch (IOException e) { throw new UncheckedIOException(e); } } public List listAll() { - return listAllIds().stream().map(id -> schema(id)).collect(Collectors.toList()); + return listAllIds().stream().map(this::schema).collect(Collectors.toList()); } /** List all schema IDs. */ @@ -184,24 +185,31 @@ public TableSchema commitChanges(SchemaChange... changes) throws Exception { public TableSchema commitChanges(List changes) throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException, Catalog.ColumnNotExistException { + SnapshotManager snapshotManager = new SnapshotManager(fileIO, tableRoot, branch); + boolean hasSnapshots = (snapshotManager.latestSnapshotId() != null); + while (true) { - TableSchema schema = + TableSchema oldTableSchema = latest().orElseThrow( () -> new Catalog.TableNotExistException( fromPath(branchPath(), true))); - Map newOptions = new HashMap<>(schema.options()); - List newFields = new ArrayList<>(schema.fields()); - AtomicInteger highestFieldId = new AtomicInteger(schema.highestFieldId()); - String newComment = schema.comment(); + Map newOptions = new HashMap<>(oldTableSchema.options()); + List newFields = new ArrayList<>(oldTableSchema.fields()); + AtomicInteger highestFieldId = new AtomicInteger(oldTableSchema.highestFieldId()); + String newComment = oldTableSchema.comment(); for (SchemaChange change : changes) { if (change instanceof SetOption) { SetOption setOption = (SetOption) change; - checkAlterTableOption(setOption.key()); + if (hasSnapshots) { + checkAlterTableOption(setOption.key()); + } newOptions.put(setOption.key(), setOption.value()); } else if (change instanceof RemoveOption) { RemoveOption removeOption = (RemoveOption) change; - checkAlterTableOption(removeOption.key()); + if (hasSnapshots) { + checkAlterTableOption(removeOption.key()); + } newOptions.remove(removeOption.key()); } else if (change instanceof UpdateComment) { UpdateComment updateComment = (UpdateComment) change; @@ -245,7 +253,7 @@ public TableSchema commitChanges(List changes) } else if (change instanceof RenameColumn) { RenameColumn rename = (RenameColumn) change; - validateNotPrimaryAndPartitionKey(schema, rename.fieldName()); + validateNotPrimaryAndPartitionKey(oldTableSchema, rename.fieldName()); if (newFields.stream().anyMatch(f -> f.name().equals(rename.newName()))) { throw new Catalog.ColumnAlreadyExistException( fromPath(branchPath(), true), rename.fieldName()); @@ -263,7 +271,7 @@ public TableSchema commitChanges(List changes) field.description())); } else if (change instanceof DropColumn) { DropColumn drop = (DropColumn) change; - validateNotPrimaryAndPartitionKey(schema, drop.fieldName()); + validateNotPrimaryAndPartitionKey(oldTableSchema, drop.fieldName()); if (!newFields.removeIf( f -> f.name().equals(((DropColumn) change).fieldName()))) { throw new Catalog.ColumnNotExistException( @@ -274,7 +282,7 @@ public TableSchema commitChanges(List changes) } } else if (change instanceof UpdateColumnType) { UpdateColumnType update = (UpdateColumnType) change; - if (schema.partitionKeys().contains(update.fieldName())) { + if (oldTableSchema.partitionKeys().contains(update.fieldName())) { throw new IllegalArgumentException( String.format( "Cannot update partition column [%s] type in the table[%s].", @@ -310,7 +318,7 @@ public TableSchema commitChanges(List changes) UpdateColumnNullability update = (UpdateColumnNullability) change; if (update.fieldNames().length == 1 && update.newNullability() - && schema.primaryKeys().contains(update.fieldNames()[0])) { + && oldTableSchema.primaryKeys().contains(update.fieldNames()[0])) { throw new UnsupportedOperationException( "Cannot change nullability of primary key"); } @@ -346,20 +354,29 @@ public TableSchema commitChanges(List changes) } } - TableSchema newSchema = - new TableSchema( - schema.id() + 1, + // We change TableSchema to Schema, because we want to deal with primary-key and + // partition in options. + Schema newSchema = + new Schema( newFields, - highestFieldId.get(), - schema.partitionKeys(), - schema.primaryKeys(), + oldTableSchema.partitionKeys(), + oldTableSchema.primaryKeys(), newOptions, newComment); + TableSchema newTableSchema = + new TableSchema( + oldTableSchema.id() + 1, + newSchema.fields(), + highestFieldId.get(), + newSchema.partitionKeys(), + newSchema.primaryKeys(), + newSchema.options(), + newSchema.comment()); try { - boolean success = commit(newSchema); + boolean success = commit(newTableSchema); if (success) { - return newSchema; + return newTableSchema; } } catch (Exception e) { throw new RuntimeException(e); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index 40eeb4d28789..0af78a5dac8b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -139,6 +139,7 @@ public TableWriteImpl newWrite( AppendOnlyFileStoreWrite writer = store().newWrite(commitUser, manifestFilter).withBucketMode(bucketMode()); return new TableWriteImpl<>( + rowType(), writer, createRowKeyExtractor(), (record, rowKind) -> { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index 6ac2763ace66..b1e5b5366c3d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -160,6 +160,7 @@ public TableWriteImpl newWrite( String commitUser, ManifestCacheFilter manifestFilter) { KeyValue kv = new KeyValue(); return new TableWriteImpl<>( + rowType(), store().newWrite(commitUser, manifestFilter), createRowKeyExtractor(), (record, rowKind) -> diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java index 6e2194646d2a..580d7f4c4f6e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java @@ -30,13 +30,16 @@ import org.apache.paimon.operation.FileStoreWrite; import org.apache.paimon.operation.FileStoreWrite.State; import org.apache.paimon.table.BucketMode; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Restorable; import javax.annotation.Nullable; import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.stream.Collectors; import static org.apache.paimon.utils.Preconditions.checkState; @@ -47,6 +50,7 @@ */ public class TableWriteImpl implements InnerTableWrite, Restorable>> { + private final RowType rowType; private final FileStoreWrite write; private final KeyAndBucketExtractor keyAndBucketExtractor; private final RecordExtractor recordExtractor; @@ -56,17 +60,28 @@ public class TableWriteImpl implements InnerTableWrite, Restorable write, KeyAndBucketExtractor keyAndBucketExtractor, RecordExtractor recordExtractor, @Nullable RowKindGenerator rowKindGenerator, boolean ignoreDelete) { + this.rowType = rowType; this.write = write; this.keyAndBucketExtractor = keyAndBucketExtractor; this.recordExtractor = recordExtractor; this.rowKindGenerator = rowKindGenerator; this.ignoreDelete = ignoreDelete; + + List notNullColumnNames = + rowType.getFields().stream() + .filter(field -> !field.type().isNullable()) + .map(DataField::name) + .collect(Collectors.toList()); + this.notNullFieldIndex = rowType.getFieldIndices(notNullColumnNames); } @Override @@ -137,6 +152,7 @@ public void write(InternalRow row, int bucket) throws Exception { @Nullable public SinkRecord writeAndReturn(InternalRow row) throws Exception { + checkNullability(row); RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row); if (ignoreDelete && rowKind.isRetract()) { return null; @@ -148,6 +164,7 @@ public SinkRecord writeAndReturn(InternalRow row) throws Exception { @Nullable public SinkRecord writeAndReturn(InternalRow row, int bucket) throws Exception { + checkNullability(row); RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row); if (ignoreDelete && rowKind.isRetract()) { return null; @@ -157,6 +174,16 @@ public SinkRecord writeAndReturn(InternalRow row, int bucket) throws Exception { return record; } + private void checkNullability(InternalRow row) { + for (int idx : notNullFieldIndex) { + if (row.isNullAt(idx)) { + String columnName = rowType.getFields().get(idx).name(); + throw new RuntimeException( + String.format("Cannot write null to non-null column(%s)", columnName)); + } + } + } + private SinkRecord toSinkRecord(InternalRow row) { keyAndBucketExtractor.setRecord(row); return new SinkRecord( diff --git a/paimon-core/src/test/java/org/apache/paimon/casting/CastExecutorTest.java b/paimon-core/src/test/java/org/apache/paimon/casting/CastExecutorTest.java index d80cb1a6c7bd..ff805993c5bd 100644 --- a/paimon-core/src/test/java/org/apache/paimon/casting/CastExecutorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/casting/CastExecutorTest.java @@ -101,6 +101,23 @@ public void testNumericToNumeric() { CastExecutors.resolve(new FloatType(false), new DoubleType(false)), 1F, 1D); } + @Test + public void testNumericToTimestamp() { + compareCastResult( + CastExecutors.resolve(new BigIntType(false), new TimestampType(3)), + 1721898748, + DateTimeUtils.parseTimestampData("2024-07-25 09:12:28.000", 3)); + + Timestamp timestamp = Timestamp.fromEpochMillis(1721898748000L); + String tsString = DateTimeUtils.formatTimestamp(timestamp, TimeZone.getDefault(), 3); + Timestamp timestamp1 = DateTimeUtils.parseTimestampData(tsString, 3); + + compareCastResult( + CastExecutors.resolve(new BigIntType(false), new LocalZonedTimestampType(3)), + 1721898748L, + timestamp1); + } + @Test public void testNumericToDecimal() { compareCastResult( diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java index 742e2188f2b0..4bd965268f00 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java @@ -19,9 +19,18 @@ package org.apache.paimon.schema; import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.disk.IOManager; import org.apache.paimon.fs.FileIOFinder; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.reader.RecordReaderIterator; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; +import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; @@ -441,4 +450,81 @@ public void testMoveBefore() { Assertions.assertEquals( 2, fields.get(0).id(), "The field id should remain as 2 after moving f2 before f0"); } + + @Test + public void testAlterImmutableOptionsOnEmptyTable() throws Exception { + // create table without primary keys + Schema schema = + new Schema( + rowType.getFields(), + Collections.emptyList(), + Collections.emptyList(), + options, + ""); + Path tableRoot = new Path(tempDir.toString(), "table"); + SchemaManager manager = new SchemaManager(LocalFileIO.create(), tableRoot); + manager.createTable(schema); + + // set immutable options and set primary keys + manager.commitChanges( + SchemaChange.setOption("primary-key", "f0, f1"), + SchemaChange.setOption("partition", "f0"), + SchemaChange.setOption("bucket", "2"), + SchemaChange.setOption("merge-engine", "first-row")); + + FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tableRoot); + assertThat(table.schema().partitionKeys()).containsExactly("f0"); + assertThat(table.schema().primaryKeys()).containsExactly("f0", "f1"); + + // read and write data to check that table is really a primary key table with first-row + // merge engine + String commitUser = UUID.randomUUID().toString(); + TableWriteImpl write = + table.newWrite(commitUser).withIOManager(IOManager.create(tempDir + "/io")); + TableCommitImpl commit = table.newCommit(commitUser); + write.write(GenericRow.of(1, 10L, BinaryString.fromString("apple"))); + write.write(GenericRow.of(1, 20L, BinaryString.fromString("banana"))); + write.write(GenericRow.of(2, 10L, BinaryString.fromString("cat"))); + write.write(GenericRow.of(2, 20L, BinaryString.fromString("dog"))); + commit.commit(1, write.prepareCommit(false, 1)); + write.write(GenericRow.of(1, 20L, BinaryString.fromString("peach"))); + write.write(GenericRow.of(1, 30L, BinaryString.fromString("mango"))); + write.write(GenericRow.of(2, 20L, BinaryString.fromString("tiger"))); + write.write(GenericRow.of(2, 30L, BinaryString.fromString("wolf"))); + commit.commit(2, write.prepareCommit(false, 2)); + write.close(); + commit.close(); + + List actual = new ArrayList<>(); + try (RecordReaderIterator it = + new RecordReaderIterator<>( + table.newRead().createReader(table.newSnapshotReader().read()))) { + while (it.hasNext()) { + InternalRow row = it.next(); + actual.add( + String.format( + "%s %d %d %s", + row.getRowKind().shortString(), + row.getInt(0), + row.getLong(1), + row.getString(2))); + } + } + assertThat(actual) + .containsExactlyInAnyOrder( + "+I 1 10 apple", + "+I 1 20 banana", + "+I 1 30 mango", + "+I 2 10 cat", + "+I 2 20 dog", + "+I 2 30 wolf"); + + // now that table is not empty, we cannot change immutable options + assertThatThrownBy( + () -> + manager.commitChanges( + SchemaChange.setOption("merge-engine", "deduplicate"))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Change 'merge-engine' is not supported yet."); + } } diff --git a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java index adaa5b28c4e8..19ad41cae5ac 100644 --- a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink; +import org.apache.flink.types.Row; import org.junit.jupiter.api.Test; import java.util.Map; @@ -44,9 +45,29 @@ public void testSetAndRemoveOption() throws Exception { } @Test - public void testSetAndResetImmutableOptions() { + public void testSetAndResetImmutableOptionsOnEmptyTables() { + sql("CREATE TABLE T1 (a INT, b INT)"); + sql( + "ALTER TABLE T1 SET ('primary-key' = 'a', 'bucket' = '1', 'merge-engine' = 'first-row')"); + sql("INSERT INTO T1 VALUES (1, 10), (2, 20), (1, 11), (2, 21)"); + assertThat(queryAndSort("SELECT * FROM T1")).containsExactly(Row.of(1, 10), Row.of(2, 20)); + assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('merge-engine' = 'deduplicate')")) + .rootCause() + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Change 'merge-engine' is not supported yet."); + + sql( + "CREATE TABLE T2 (a INT, b INT, PRIMARY KEY (a) NOT ENFORCED) WITH ('bucket' = '1', 'merge-engine' = 'first-row')"); + sql("ALTER TABLE T2 RESET ('merge-engine')"); + sql("INSERT INTO T2 VALUES (1, 10), (2, 20), (1, 11), (2, 21)"); + assertThat(queryAndSort("SELECT * FROM T2")).containsExactly(Row.of(1, 11), Row.of(2, 21)); + } + + @Test + public void testSetAndResetImmutableOptionsOnNonEmptyTables() { // bucket-key is immutable sql("CREATE TABLE T1 (a STRING, b STRING, c STRING)"); + sql("INSERT INTO T1 VALUES ('a', 'b', 'c')"); assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('bucket-key' = 'c')")) .rootCause() @@ -55,6 +76,7 @@ public void testSetAndResetImmutableOptions() { sql( "CREATE TABLE T2 (a STRING, b STRING, c STRING) WITH ('bucket' = '1', 'bucket-key' = 'c')"); + sql("INSERT INTO T2 VALUES ('a', 'b', 'c')"); assertThatThrownBy(() -> sql("ALTER TABLE T2 RESET ('bucket-key')")) .rootCause() .isInstanceOf(UnsupportedOperationException.class) @@ -63,6 +85,7 @@ public void testSetAndResetImmutableOptions() { // merge-engine is immutable sql( "CREATE TABLE T4 (a STRING, b STRING, c STRING) WITH ('merge-engine' = 'partial-update')"); + sql("INSERT INTO T4 VALUES ('a', 'b', 'c')"); assertThatThrownBy(() -> sql("ALTER TABLE T4 RESET ('merge-engine')")) .rootCause() .isInstanceOf(UnsupportedOperationException.class) @@ -70,6 +93,7 @@ public void testSetAndResetImmutableOptions() { // sequence.field is immutable sql("CREATE TABLE T5 (a STRING, b STRING, c STRING) WITH ('sequence.field' = 'b')"); + sql("INSERT INTO T5 VALUES ('a', 'b', 'c')"); assertThatThrownBy(() -> sql("ALTER TABLE T5 SET ('sequence.field' = 'c')")) .rootCause() .isInstanceOf(UnsupportedOperationException.class) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java index ffc05aec09bb..28ad5b52aec5 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java @@ -79,7 +79,7 @@ protected Schema buildPaimonSchema(Schema retrievedSchema) { tableConfig, retrievedSchema, metadataConverters, - caseSensitive, + allowUpperCase, true, false); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java index bddebd229eaa..d6c4c12ecdd3 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java @@ -120,9 +120,9 @@ public SyncDatabaseActionBase withPrimaryKeys(String... primaryKeys) { @Override protected void validateCaseSensitivity() { - AbstractCatalog.validateCaseInsensitive(caseSensitive, "Database", database); - AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table prefix", tablePrefix); - AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table suffix", tableSuffix); + AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Database", database); + AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Table prefix", tablePrefix); + AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Table suffix", tableSuffix); } @Override @@ -135,12 +135,16 @@ protected FlatMapFunction recordParse() protected EventParser.Factory buildEventParserFactory() { NewTableSchemaBuilder schemaBuilder = new NewTableSchemaBuilder( - tableConfig, caseSensitive, partitionKeys, primaryKeys, metadataConverters); + tableConfig, + allowUpperCase, + partitionKeys, + primaryKeys, + metadataConverters); Pattern includingPattern = Pattern.compile(includingTables); Pattern excludingPattern = excludingTables == null ? null : Pattern.compile(excludingTables); TableNameConverter tableNameConverter = - new TableNameConverter(caseSensitive, mergeShards, tablePrefix, tableSuffix); + new TableNameConverter(allowUpperCase, mergeShards, tablePrefix, tableSuffix); Set createdTables; try { createdTables = new HashSet<>(catalog.listTables(database)); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java index e335fc2be348..be8cedf0baf6 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java @@ -107,15 +107,15 @@ protected Schema buildPaimonSchema(Schema retrievedSchema) { tableConfig, retrievedSchema, metadataConverters, - caseSensitive, + allowUpperCase, true, true); } @Override protected void validateCaseSensitivity() { - AbstractCatalog.validateCaseInsensitive(caseSensitive, "Database", database); - AbstractCatalog.validateCaseInsensitive(caseSensitive, "Table", table); + AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Database", database); + AbstractCatalog.validateCaseInsensitive(allowUpperCase, "Table", table); } @Override @@ -142,7 +142,7 @@ protected void beforeBuildingSourceSink() throws Exception { buildComputedColumns( computedColumnArgs, fileStoreTable.schema().fields(), - caseSensitive); + allowUpperCase); // check partition keys and primary keys in case that user specified them checkConstraints(); } @@ -162,7 +162,7 @@ protected FlatMapFunction recordParse() @Override protected EventParser.Factory buildEventParserFactory() { - boolean caseSensitive = this.caseSensitive; + boolean caseSensitive = this.allowUpperCase; return () -> new RichCdcMultiplexRecordEventParser(caseSensitive); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index 944185347c35..dc2562d91470 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -62,7 +62,7 @@ public abstract class SynchronizationActionBase extends ActionBase { protected final String database; protected final Configuration cdcSourceConfig; protected final SyncJobHandler syncJobHandler; - protected final boolean caseSensitive; + protected final boolean allowUpperCase; protected Map tableConfig = new HashMap<>(); protected TypeMapping typeMapping = TypeMapping.defaultMapping(); @@ -78,7 +78,7 @@ public SynchronizationActionBase( this.database = database; this.cdcSourceConfig = Configuration.fromMap(cdcSourceConfig); this.syncJobHandler = syncJobHandler; - this.caseSensitive = catalog.caseSensitive(); + this.allowUpperCase = catalog.allowUpperCase(); this.syncJobHandler.registerJdbcDriver(); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java index 1aab6653d4d4..e61b33d0ed1e 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java @@ -225,6 +225,7 @@ private static DataType fromDebeziumType(String dbzType) { return DataTypes.INT(); case "int64": return DataTypes.BIGINT(); + case "float": case "float32": case "float64": return DataTypes.FLOAT(); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index a33f2c978321..d27fa32d0233 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -137,7 +137,7 @@ protected void beforeBuildingSourceSink() throws Exception { + ", or MySQL database does not exist."); TableNameConverter tableNameConverter = - new TableNameConverter(caseSensitive, mergeShards, tablePrefix, tableSuffix); + new TableNameConverter(allowUpperCase, mergeShards, tablePrefix, tableSuffix); for (JdbcTableInfo tableInfo : jdbcTableInfos) { Identifier identifier = Identifier.create( @@ -152,7 +152,7 @@ protected void beforeBuildingSourceSink() throws Exception { tableConfig, tableInfo.schema(), metadataConverters, - caseSensitive, + allowUpperCase, false, true); try { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java index 49890a11ccbc..4892aee03024 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CaseSensitiveUtils.java @@ -29,7 +29,7 @@ public class CaseSensitiveUtils { public static DataStream cdcRecordConvert( Catalog.Loader catalogLoader, DataStream input) { - if (caseSensitive(catalogLoader)) { + if (allowUpperCase(catalogLoader)) { return input; } @@ -47,7 +47,7 @@ public void processElement( public static DataStream cdcMultiplexRecordConvert( Catalog.Loader catalogLoader, DataStream input) { - if (caseSensitive(catalogLoader)) { + if (allowUpperCase(catalogLoader)) { return input; } @@ -65,9 +65,9 @@ public void processElement( .name("Case-insensitive Convert"); } - private static boolean caseSensitive(Catalog.Loader catalogLoader) { + private static boolean allowUpperCase(Catalog.Loader catalogLoader) { try (Catalog catalog = catalogLoader.load()) { - return catalog.caseSensitive(); + return catalog.allowUpperCase(); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java index 3d832d33949b..77c49e8f3da2 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunctionBase.java @@ -49,7 +49,7 @@ public abstract class UpdatedDataFieldsProcessFunctionBase extends Process protected final Catalog.Loader catalogLoader; protected Catalog catalog; - private boolean caseSensitive; + private boolean allowUpperCase; private static final List STRING_TYPES = Arrays.asList(DataTypeRoot.CHAR, DataTypeRoot.VARCHAR); @@ -76,7 +76,7 @@ protected UpdatedDataFieldsProcessFunctionBase(Catalog.Loader catalogLoader) { @Override public void open(Configuration parameters) { this.catalog = catalogLoader.load(); - this.caseSensitive = this.catalog.caseSensitive(); + this.allowUpperCase = this.catalog.allowUpperCase(); } protected void applySchemaChange( @@ -203,7 +203,7 @@ protected List extractSchemaChanges( List result = new ArrayList<>(); for (DataField newField : updatedDataFields) { String newFieldName = - StringUtils.caseSensitiveConversion(newField.name(), caseSensitive); + StringUtils.caseSensitiveConversion(newField.name(), allowUpperCase); if (oldFields.containsKey(newFieldName)) { DataField oldField = oldFields.get(newFieldName); // we compare by ignoring nullable, because partition keys and primary keys might be diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java index fc5a3dbe0545..81f07b224ca7 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java @@ -846,10 +846,30 @@ public void testSetAndRemoveOption() throws Exception { } @Test - public void testSetAndResetImmutableOptions() throws Exception { + public void testSetAndResetImmutableOptionsOnEmptyTables() { + sql("CREATE TABLE T1 (a INT, b INT)"); + sql( + "ALTER TABLE T1 SET ('primary-key' = 'a', 'bucket' = '1', 'merge-engine' = 'first-row')"); + sql("INSERT INTO T1 VALUES (1, 10), (2, 20), (1, 11), (2, 21)"); + assertThat(queryAndSort("SELECT * FROM T1")).containsExactly(Row.of(1, 10), Row.of(2, 20)); + assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('merge-engine' = 'deduplicate')")) + .rootCause() + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Change 'merge-engine' is not supported yet."); + + sql( + "CREATE TABLE T2 (a INT, b INT, PRIMARY KEY (a) NOT ENFORCED) WITH ('bucket' = '1', 'merge-engine' = 'first-row')"); + sql("ALTER TABLE T2 RESET ('merge-engine')"); + sql("INSERT INTO T2 VALUES (1, 10), (2, 20), (1, 11), (2, 21)"); + assertThat(queryAndSort("SELECT * FROM T2")).containsExactly(Row.of(1, 11), Row.of(2, 21)); + } + + @Test + public void testSetAndResetImmutableOptionsOnNonEmptyTables() { // bucket-key is immutable sql( "CREATE TABLE T1 (a STRING, b STRING, c STRING) WITH ('bucket' = '1', 'bucket-key' = 'a')"); + sql("INSERT INTO T1 VALUES ('a', 'b', 'c')"); assertThatThrownBy(() -> sql("ALTER TABLE T1 SET ('bucket-key' = 'c')")) .rootCause() @@ -858,6 +878,7 @@ public void testSetAndResetImmutableOptions() throws Exception { sql( "CREATE TABLE T2 (a STRING, b STRING, c STRING) WITH ('bucket' = '1', 'bucket-key' = 'c')"); + sql("INSERT INTO T2 VALUES ('a', 'b', 'c')"); assertThatThrownBy(() -> sql("ALTER TABLE T2 RESET ('bucket-key')")) .rootCause() .isInstanceOf(UnsupportedOperationException.class) @@ -866,6 +887,7 @@ public void testSetAndResetImmutableOptions() throws Exception { // merge-engine is immutable sql( "CREATE TABLE T4 (a STRING, b STRING, c STRING) WITH ('merge-engine' = 'partial-update')"); + sql("INSERT INTO T4 VALUES ('a', 'b', 'c')"); assertThatThrownBy(() -> sql("ALTER TABLE T4 RESET ('merge-engine')")) .rootCause() .isInstanceOf(UnsupportedOperationException.class) @@ -873,6 +895,7 @@ public void testSetAndResetImmutableOptions() throws Exception { // sequence.field is immutable sql("CREATE TABLE T5 (a STRING, b STRING, c STRING) WITH ('sequence.field' = 'b')"); + sql("INSERT INTO T5 VALUES ('a', 'b', 'c')"); assertThatThrownBy(() -> sql("ALTER TABLE T5 SET ('sequence.field' = 'c')")) .rootCause() .isInstanceOf(UnsupportedOperationException.class) diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 7edb49f5867a..1bbf4b3871e8 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -91,6 +91,7 @@ import static org.apache.paimon.hive.HiveCatalogOptions.HIVE_CONF_DIR; import static org.apache.paimon.hive.HiveCatalogOptions.IDENTIFIER; import static org.apache.paimon.hive.HiveCatalogOptions.LOCATION_IN_PROPERTIES; +import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE; import static org.apache.paimon.options.CatalogOptions.TABLE_TYPE; import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -566,8 +567,8 @@ private void alterTableToHms(Table table, Identifier identifier, TableSchema new } @Override - public boolean caseSensitive() { - return false; + public boolean allowUpperCase() { + return catalogOptions.getOptional(ALLOW_UPPER_CASE).orElse(false); } @Override diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java index e42d48c8b08d..d400ec40b323 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java @@ -363,6 +363,59 @@ public void testCreateExternalTable() throws Exception { assertThat(tablePath.getFileSystem().exists(tablePath)).isTrue(); } + @Test + public void testCreateInsensitiveTable() throws Exception { + tEnv.executeSql( + String.join( + "\n", + "CREATE CATALOG paimon_catalog_01 WITH (", + " 'type' = 'paimon',", + " 'metastore' = 'hive',", + " 'uri' = '',", + " 'warehouse' = '" + path + "',", + " 'lock.enabled' = 'true',", + " 'table.type' = 'EXTERNAL',", + " 'allow-upper-case' = 'true'", + ")")) + .await(); + tEnv.executeSql("USE CATALOG paimon_catalog_01").await(); + tEnv.executeSql("USE test_db").await(); + tEnv.executeSql("CREATE TABLE t ( aa INT, Bb STRING ) WITH ( 'file.format' = 'avro' )") + .await(); + assertThat( + hiveShell + .executeQuery("DESC FORMATTED t") + .contains("Table Type: \tEXTERNAL_TABLE \tNULL")) + .isTrue(); + tEnv.executeSql("DROP TABLE t").await(); + Path tablePath = new Path(path, "test_db.db/t"); + assertThat(tablePath.getFileSystem().exists(tablePath)).isTrue(); + + tEnv.executeSql( + String.join( + "\n", + "CREATE CATALOG paimon_catalog_02 WITH (", + " 'type' = 'paimon',", + " 'metastore' = 'hive',", + " 'uri' = '',", + " 'warehouse' = '" + path + "',", + " 'lock.enabled' = 'true',", + " 'table.type' = 'EXTERNAL',", + " 'allow-upper-case' = 'false'", + ")")) + .await(); + tEnv.executeSql("USE CATALOG paimon_catalog_02").await(); + tEnv.executeSql("USE test_db").await(); + + // set case-sensitive = false would throw exception out + assertThrows( + RuntimeException.class, + () -> + tEnv.executeSql( + "CREATE TABLE t1 ( aa INT, Bb STRING ) WITH ( 'file.format' = 'avro' )") + .await()); + } + @Test public void testFlinkWriteAndHiveRead() throws Exception { tEnv.executeSql( diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 4e8d8eaf7be7..b04dce2fa482 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -26,7 +26,6 @@ import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.spark.catalog.SparkBaseCatalog; -import org.apache.paimon.table.Table; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; @@ -40,6 +39,7 @@ import org.apache.spark.sql.connector.expressions.FieldReference; import org.apache.spark.sql.connector.expressions.NamedReference; import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.internal.SessionState; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; @@ -53,8 +53,11 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE; import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE; import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType; +import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf; +import static org.apache.paimon.spark.util.OptionUtils.mergeSQLConf; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Spark {@link TableCatalog} for paimon. */ @@ -71,10 +74,18 @@ public class SparkCatalog extends SparkBaseCatalog { @Override public void initialize(String name, CaseInsensitiveStringMap options) { this.catalogName = name; + Map newOptions = new HashMap<>(options.asCaseSensitiveMap()); + SessionState sessionState = SparkSession.active().sessionState(); + CatalogContext catalogContext = - CatalogContext.create( - Options.fromMap(options), - SparkSession.active().sessionState().newHadoopConf()); + CatalogContext.create(Options.fromMap(options), sessionState.newHadoopConf()); + + // if spark is case-insensitive, set allow upper case to catalog + if (!sessionState.conf().caseSensitiveAnalysis()) { + newOptions.put(ALLOW_UPPER_CASE.key(), "true"); + } + options = new CaseInsensitiveStringMap(newOptions); + this.catalog = CatalogFactory.createCatalog(catalogContext); this.defaultDatabase = options.getOrDefault(DEFAULT_DATABASE.key(), DEFAULT_DATABASE.defaultValue()); @@ -210,21 +221,16 @@ public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceExcepti @Override public SparkTable loadTable(Identifier ident) throws NoSuchTableException { - try { - return new SparkTable(load(ident)); - } catch (Catalog.TableNotExistException e) { - throw new NoSuchTableException(ident); - } + return loadSparkTable(ident, Collections.emptyMap()); } /** * Do not annotate with @override here to maintain compatibility with Spark 3.2-. */ public SparkTable loadTable(Identifier ident, String version) throws NoSuchTableException { - Table table = loadPaimonTable(ident); LOG.info("Time travel to version '{}'.", version); - return new SparkTable( - table.copy(Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), version))); + return loadSparkTable( + ident, Collections.singletonMap(CoreOptions.SCAN_VERSION.key(), version)); } /** @@ -234,22 +240,13 @@ public SparkTable loadTable(Identifier ident, String version) throws NoSuchTable * TableCatalog#loadTable(Identifier, long)}). But in SQL you should use seconds. */ public SparkTable loadTable(Identifier ident, long timestamp) throws NoSuchTableException { - Table table = loadPaimonTable(ident); // Paimon's timestamp use millisecond timestamp = timestamp / 1000; - LOG.info("Time travel target timestamp is {} milliseconds.", timestamp); - - Options option = new Options().set(CoreOptions.SCAN_TIMESTAMP_MILLIS, timestamp); - return new SparkTable(table.copy(option.toMap())); - } - - private Table loadPaimonTable(Identifier ident) throws NoSuchTableException { - try { - return load(ident); - } catch (Catalog.TableNotExistException e) { - throw new NoSuchTableException(ident); - } + return loadSparkTable( + ident, + Collections.singletonMap( + CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), String.valueOf(timestamp))); } @Override @@ -390,7 +387,7 @@ private Schema toInitialSchema( return references.length == 1 && references[0] instanceof FieldReference; })); - Map normalizedProperties = new HashMap<>(properties); + Map normalizedProperties = mergeSQLConf(properties); normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER); normalizedProperties.remove(TableCatalog.PROP_COMMENT); String pkAsString = properties.get(PRIMARY_KEY_IDENTIFIER); @@ -459,10 +456,14 @@ protected org.apache.paimon.catalog.Identifier toIdentifier(Identifier ident) return new org.apache.paimon.catalog.Identifier(ident.namespace()[0], ident.name()); } - /** Load a Table Store table. */ - protected org.apache.paimon.table.Table load(Identifier ident) - throws Catalog.TableNotExistException, NoSuchTableException { - return catalog.getTable(toIdentifier(ident)); + protected SparkTable loadSparkTable(Identifier ident, Map extraOptions) + throws NoSuchTableException { + try { + return new SparkTable( + copyWithSQLConf(catalog.getTable(toIdentifier(ident)), extraOptions)); + } catch (Catalog.TableNotExistException e) { + throw new NoSuchTableException(ident); + } } // --------------------- unsupported methods ---------------------------- diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java index 96b87701b90f..6c7f4252433e 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java @@ -61,6 +61,7 @@ import java.util.Map; import java.util.concurrent.Callable; +import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE; import static org.apache.paimon.options.CatalogOptions.METASTORE; import static org.apache.paimon.options.CatalogOptions.WAREHOUSE; import static org.apache.paimon.spark.SparkCatalogOptions.CREATE_UNDERLYING_SESSION_CATALOG; @@ -284,6 +285,12 @@ private CaseInsensitiveStringMap autoFillConfigurations( Map newOptions = new HashMap<>(options.asCaseSensitiveMap()); fillAliyunConfigurations(newOptions, hadoopConf); fillCommonConfigurations(newOptions, sqlConf); + + // if spark is case-insensitive, set allow upper case to catalog + if (!sqlConf.caseSensitiveAnalysis()) { + newOptions.put(ALLOW_UPPER_CASE.key(), "true"); + } + return new CaseInsensitiveStringMap(newOptions); } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala index e7e744f37c8a..8ea2c31bc8f6 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala @@ -22,6 +22,7 @@ import org.apache.paimon.catalog.CatalogContext import org.apache.paimon.options.Options import org.apache.paimon.spark.commands.WriteIntoPaimonTable import org.apache.paimon.spark.sources.PaimonSink +import org.apache.paimon.spark.util.OptionUtils.mergeSQLConf import org.apache.paimon.table.{DataTable, FileStoreTable, FileStoreTableFactory} import org.apache.paimon.table.system.AuditLogTable @@ -64,7 +65,7 @@ class SparkSource schema: StructType, partitioning: Array[Transform], properties: JMap[String, String]): Table = { - new SparkTable(loadTable(properties)) + SparkTable(loadTable(properties)) } override def createRelation( @@ -80,7 +81,7 @@ class SparkSource private def loadTable(options: JMap[String, String]): DataTable = { val catalogContext = CatalogContext.create( - Options.fromMap(options), + Options.fromMap(mergeSQLConf(options)), SparkSession.active.sessionState.newHadoopConf()) val table = FileStoreTableFactory.create(catalogContext) if (Options.fromMap(options).get(SparkConnectorOptions.READ_CHANGELOG)) { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala index 67685612664d..3dc0e40c9eff 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala @@ -26,11 +26,11 @@ import org.apache.paimon.table.FileStoreTable import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.ResolvedTable -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Expression, NamedExpression} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} import scala.collection.JavaConverters._ @@ -58,8 +58,8 @@ class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] { } private def schemaCompatible( - tableSchema: StructType, dataSchema: StructType, + tableSchema: StructType, partitionCols: Seq[String], parent: Array[String] = Array.empty): Boolean = { @@ -82,9 +82,8 @@ class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] { } } - tableSchema.zip(dataSchema).forall { + dataSchema.zip(tableSchema).forall { case (f1, f2) => - checkNullability(f1, f2, partitionCols, parent) f1.name == f2.name && dataTypeCompatible(f1.name, f1.dataType, f2.dataType) } } @@ -115,17 +114,6 @@ class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] { cast.setTagValue(Compatibility.castByTableInsertionTag, ()) cast } - - private def checkNullability( - input: StructField, - expected: StructField, - partitionCols: Seq[String], - parent: Array[String] = Array.empty): Unit = { - val fullColumnName = (parent ++ Array(input.name)).mkString(".") - if (!partitionCols.contains(fullColumnName) && input.nullable && !expected.nullable) { - throw new RuntimeException("Cannot write nullable values to non-null column") - } - } } case class PaimonPostHocResolutionRules(session: SparkSession) extends Rule[LogicalPlan] { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala new file mode 100644 index 000000000000..af7ff7204cda --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.util + +import org.apache.paimon.table.Table + +import org.apache.spark.sql.catalyst.SQLConfHelper + +import java.util.{HashMap => JHashMap, Map => JMap} + +import scala.collection.JavaConverters._ + +object OptionUtils extends SQLConfHelper { + + private val PAIMON_OPTION_PREFIX = "spark.paimon." + + def mergeSQLConf(extraOptions: JMap[String, String]): JMap[String, String] = { + val mergedOptions = new JHashMap[String, String]( + conf.getAllConfs + .filterKeys(_.startsWith(PAIMON_OPTION_PREFIX)) + .map { + case (key, value) => + key.stripPrefix(PAIMON_OPTION_PREFIX) -> value + } + .asJava) + mergedOptions.putAll(extraOptions) + mergedOptions + } + + def copyWithSQLConf[T <: Table](table: T, extraOptions: JMap[String, String]): T = { + val mergedOptions = mergeSQLConf(extraOptions) + if (mergedOptions.isEmpty) { + table + } else { + table.copy(mergedOptions).asInstanceOf[T] + } + } +} diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java index 4377bc94a716..b0f1749dfeb3 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java @@ -28,6 +28,7 @@ import org.junit.jupiter.api.io.TempDir; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; /** Base tests for spark read. */ public class SparkGenericCatalogWithHiveTest { @@ -44,6 +45,59 @@ public static void closeMetastore() throws Exception { testHiveMetastore.stop(); } + @Test + public void testCreateTableCaseSensitive(@TempDir java.nio.file.Path tempDir) { + // firstly, we use hive metastore to creata table, and check the result. + Path warehousePath = new Path("file:" + tempDir.toString()); + SparkSession spark = + SparkSession.builder() + .config("spark.sql.warehouse.dir", warehousePath.toString()) + // with case-sensitive false + .config("spark.sql.caseSensitive", "false") + // with hive metastore + .config("spark.sql.catalogImplementation", "hive") + .config( + "spark.sql.catalog.spark_catalog", + SparkGenericCatalog.class.getName()) + .master("local[2]") + .getOrCreate(); + + spark.sql("CREATE DATABASE IF NOT EXISTS my_db1"); + spark.sql("USE my_db1"); + spark.sql( + "CREATE TABLE IF NOT EXISTS t2 (a INT, Bb INT, c STRING) USING paimon TBLPROPERTIES" + + " ('file.format'='avro')"); + + assertThat( + spark.sql("SHOW TABLES").collectAsList().stream() + .map(s -> s.get(1)) + .map(Object::toString)) + .containsExactlyInAnyOrder("t2"); + spark.close(); + + SparkSession spark1 = + SparkSession.builder() + .config("spark.sql.warehouse.dir", warehousePath.toString()) + // with case-sensitive true + .config("spark.sql.caseSensitive", "true") + // with hive metastore + .config("spark.sql.catalogImplementation", "hive") + .config( + "spark.sql.catalog.spark_catalog", + SparkGenericCatalog.class.getName()) + .master("local[2]") + .getOrCreate(); + + spark1.sql("USE my_db1"); + assertThrows( + RuntimeException.class, + () -> + spark1.sql( + "CREATE TABLE IF NOT EXISTS t3 (a INT, Bb INT, c STRING) USING paimon TBLPROPERTIES" + + " ('file.format'='avro')")); + spark1.close(); + } + @Test public void testBuildWithHive(@TempDir java.nio.file.Path tempDir) { // firstly, we use hive metastore to creata table, and check the result. @@ -66,7 +120,7 @@ public void testBuildWithHive(@TempDir java.nio.file.Path tempDir) { + " ('file.format'='avro')"); assertThat(spark.sql("SHOW NAMESPACES").collectAsList().stream().map(Object::toString)) - .containsExactlyInAnyOrder("[default]", "[my_db]"); + .containsExactlyInAnyOrder("[default]", "[my_db]", "[my_db1]"); assertThat( spark.sql("SHOW TABLES").collectAsList().stream() diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala index da40171042a1..db749a63619b 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala @@ -23,6 +23,7 @@ import org.apache.paimon.schema.Schema import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.paimon.types.DataTypes +import org.apache.spark.SparkException import org.apache.spark.sql.Row import org.junit.jupiter.api.Assertions @@ -33,33 +34,70 @@ abstract class DDLTestBase extends PaimonSparkTestBase { import testImplicits._ - test("Paimon DDL: create table with not null") { + test("Paimon DDL: create append table with not null") { withTable("T") { - sql(""" - |CREATE TABLE T (id INT NOT NULL, name STRING) - |""".stripMargin) + sql("CREATE TABLE T (id INT NOT NULL, name STRING)") - val exception = intercept[RuntimeException] { - sql(""" - |INSERT INTO T VALUES (1, "a"), (2, "b"), (null, "c") - |""".stripMargin) + val e1 = intercept[SparkException] { + sql("""INSERT INTO T VALUES (1, "a"), (2, "b"), (null, "c")""") } - Assertions.assertTrue( - exception.getMessage().contains("Cannot write nullable values to non-null column")) + Assertions.assertTrue(e1.getMessage().contains("Cannot write null to non-null column")) + + sql("""INSERT INTO T VALUES (1, "a"), (2, "b"), (3, null)""") + checkAnswer( + sql("SELECT * FROM T ORDER BY id"), + Seq((1, "a"), (2, "b"), (3, null)).toDF() + ) + val schema = spark.table("T").schema + Assertions.assertEquals(schema.size, 2) + Assertions.assertFalse(schema("id").nullable) + Assertions.assertTrue(schema("name").nullable) + } + } + test("Paimon DDL: create primary-key table with not null") { + withTable("T") { sql(""" - |INSERT INTO T VALUES (1, "a"), (2, "b"), (3, null) + |CREATE TABLE T (id INT, name STRING, pt STRING) + |TBLPROPERTIES ('primary-key' = 'id,pt') |""".stripMargin) + val e1 = intercept[SparkException] { + sql("""INSERT INTO T VALUES (1, "a", "pt1"), (2, "b", null)""") + } + Assertions.assertTrue(e1.getMessage().contains("Cannot write null to non-null column")) + + val e2 = intercept[SparkException] { + sql("""INSERT INTO T VALUES (1, "a", "pt1"), (null, "b", "pt2")""") + } + Assertions.assertTrue(e2.getMessage().contains("Cannot write null to non-null column")) + + sql("""INSERT INTO T VALUES (1, "a", "pt1"), (2, "b", "pt1"), (3, null, "pt2")""") checkAnswer( sql("SELECT * FROM T ORDER BY id"), - Seq((1, "a"), (2, "b"), (3, null)).toDF() + Seq((1, "a", "pt1"), (2, "b", "pt1"), (3, null, "pt2")).toDF() ) val schema = spark.table("T").schema - Assertions.assertEquals(schema.size, 2) + Assertions.assertEquals(schema.size, 3) Assertions.assertFalse(schema("id").nullable) Assertions.assertTrue(schema("name").nullable) + Assertions.assertFalse(schema("pt").nullable) + } + } + + test("Paimon DDL: write nullable expression to non-null column") { + withTable("T") { + sql(""" + |CREATE TABLE T (id INT NOT NULL, ts TIMESTAMP NOT NULL) + |""".stripMargin) + + sql("INSERT INTO T SELECT 1, TO_TIMESTAMP('2024-07-01 16:00:00')") + + checkAnswer( + sql("SELECT * FROM T ORDER BY id"), + Row(1, Timestamp.valueOf("2024-07-01 16:00:00")) :: Nil + ) } } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala new file mode 100644 index 000000000000..9fc571634645 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonOptionTest.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.paimon.spark.PaimonSparkTestBase +import org.apache.paimon.table.FileStoreTableFactory + +import org.apache.spark.sql.Row +import org.junit.jupiter.api.Assertions + +class PaimonOptionTest extends PaimonSparkTestBase { + + import testImplicits._ + + test("Paimon Option: create table with sql conf") { + withSQLConf("spark.paimon.file.block-size" -> "512M") { + sql("CREATE TABLE T (id INT)") + val table = loadTable("T") + // check options in schema file directly + val fileStoreTable = FileStoreTableFactory.create(table.fileIO(), table.location()) + Assertions.assertEquals("512M", fileStoreTable.options().get("file.block-size")) + } + } + + test("Paimon Option: create table by dataframe with sql conf") { + withSQLConf("spark.paimon.file.block-size" -> "512M") { + Seq((1L, "x1"), (2L, "x2")) + .toDF("a", "b") + .write + .format("paimon") + .mode("append") + .saveAsTable("T") + val table = loadTable("T") + // check options in schema file directly + val fileStoreTable = FileStoreTableFactory.create(table.fileIO(), table.location()) + Assertions.assertEquals("512M", fileStoreTable.options().get("file.block-size")) + } + } + + test("Paimon Option: query table with sql conf") { + sql("CREATE TABLE T (id INT)") + sql("INSERT INTO T VALUES 1") + sql("INSERT INTO T VALUES 2") + checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1) :: Row(2) :: Nil) + val table = loadTable("T") + + // query with mutable option + withSQLConf("spark.paimon.scan.snapshot-id" -> "1") { + checkAnswer(sql("SELECT * FROM T ORDER BY id"), Row(1)) + checkAnswer(spark.read.format("paimon").load(table.location().toString), Row(1)) + } + + // query with immutable option + withSQLConf("spark.paimon.bucket" -> "1") { + assertThrows[UnsupportedOperationException] { + sql("SELECT * FROM T ORDER BY id") + } + assertThrows[UnsupportedOperationException] { + spark.read.format("paimon").load(table.location().toString) + } + } + } +}