Skip to content

Commit

Permalink
Merge branch 'refs/heads/master' into tc-paimon-0.9
Browse files Browse the repository at this point in the history
  • Loading branch information
wxplovecc committed Jul 30, 2024
2 parents 335df83 + 63cb0f9 commit 2d63e53
Show file tree
Hide file tree
Showing 37 changed files with 690 additions and 124 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/catalog_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>allow-upper-case</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Boolean</td>
<td>Indicates whether this catalog allow upper case, its default value depends on the implementation of the specific catalog.</td>
</tr>
<tr>
<td><h5>client-pool-size</h5></td>
<td style="word-wrap: break-word;">2</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,12 @@ public class CatalogOptions {
TextElement.text(
"\"custom\": You can implement LineageMetaFactory and LineageMeta to store lineage information in customized storage."))
.build());

public static final ConfigOption<Boolean> ALLOW_UPPER_CASE =
ConfigOptions.key("allow-upper-case")
.booleanType()
.noDefaultValue()
.withDescription(
"Indicates whether this catalog allow upper case, "
+ "its default value depends on the implementation of the specific catalog.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -551,8 +551,8 @@ public static boolean isBlank(String str) {
return true;
}

public static String caseSensitiveConversion(String str, boolean caseSensitive) {
return caseSensitive ? str : str.toLowerCase();
public static String caseSensitiveConversion(String str, boolean allowUpperCase) {
return allowUpperCase ? str : str.toLowerCase();
}

public static boolean isNumeric(final CharSequence cs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class CastExecutors {
.addRule(NumericPrimitiveToDecimalCastRule.INSTANCE)
.addRule(DecimalToNumericPrimitiveCastRule.INSTANCE)
.addRule(NumericPrimitiveCastRule.INSTANCE)
.addRule(NumericPrimitiveToTimestamp.INSTANCE)
// Boolean <-> numeric rules
.addRule(BooleanToNumericCastRule.INSTANCE)
.addRule(NumericToBooleanCastRule.INSTANCE)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.casting;

import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeFamily;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.utils.DateTimeUtils;

import java.time.ZoneId;

/**
* {{@link DataTypeFamily#INTEGER_NUMERIC} to @link DataTypeRoot#TIMESTAMP_WITHOUT_TIME_ZONE}/{@link
* DataTypeRoot#TIMESTAMP_WITH_LOCAL_TIME_ZONE}.
*/
public class NumericPrimitiveToTimestamp extends AbstractCastRule<Number, Timestamp> {

static final NumericPrimitiveToTimestamp INSTANCE = new NumericPrimitiveToTimestamp();

private NumericPrimitiveToTimestamp() {
super(
CastRulePredicate.builder()
.input(DataTypeFamily.NUMERIC)
.target(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
.target(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
.build());
}

@Override
public CastExecutor<Number, Timestamp> create(DataType inputType, DataType targetType) {
ZoneId zoneId =
targetType.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
? ZoneId.systemDefault()
: DateTimeUtils.UTC_ZONE.toZoneId();
switch (inputType.getTypeRoot()) {
case INTEGER:
case BIGINT:
return value ->
Timestamp.fromLocalDateTime(
DateTimeUtils.toLocalDateTime(value.longValue() * 1000, zoneId));
default:
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.options.CatalogOptions.ALLOW_UPPER_CASE;
import static org.apache.paimon.options.CatalogOptions.LINEAGE_META;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
Expand Down Expand Up @@ -129,6 +130,11 @@ protected boolean lockEnabled() {
return catalogOptions.get(LOCK_ENABLED);
}

@Override
public boolean allowUpperCase() {
return catalogOptions.getOptional(ALLOW_UPPER_CASE).orElse(true);
}

@Override
public void createDatabase(String name, boolean ignoreIfExists, Map<String, String> properties)
throws DatabaseAlreadyExistException {
Expand Down Expand Up @@ -520,8 +526,8 @@ public static void validateCaseInsensitive(
}

protected void validateIdentifierNameCaseInsensitive(Identifier identifier) {
validateCaseInsensitive(caseSensitive(), "Database", identifier.getDatabaseName());
validateCaseInsensitive(caseSensitive(), "Table", identifier.getObjectName());
validateCaseInsensitive(allowUpperCase(), "Database", identifier.getDatabaseName());
validateCaseInsensitive(allowUpperCase(), "Table", identifier.getObjectName());
}

private void validateFieldNameCaseInsensitiveInSchemaChange(List<SchemaChange> changes) {
Expand All @@ -539,7 +545,7 @@ private void validateFieldNameCaseInsensitiveInSchemaChange(List<SchemaChange> c
}

protected void validateFieldNameCaseInsensitive(List<String> fieldNames) {
validateCaseInsensitive(caseSensitive(), "Field", fieldNames);
validateCaseInsensitive(allowUpperCase(), "Field", fieldNames);
}

private void validateAutoCreateClose(Map<String, String> options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,8 @@ default void alterTable(Identifier identifier, SchemaChange change, boolean igno
alterTable(identifier, Collections.singletonList(change), ignoreIfNotExists);
}

/** Return a boolean that indicates whether this catalog is case-sensitive. */
default boolean caseSensitive() {
return true;
}
/** Return a boolean that indicates whether this catalog allow upper case. */
boolean allowUpperCase();

default void repairCatalog() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ public Catalog wrapped() {
}

@Override
public boolean caseSensitive() {
return wrapped.caseSensitive();
public boolean allowUpperCase() {
return wrapped.allowUpperCase();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public String warehouse() {
}

@Override
public boolean caseSensitive() {
public boolean allowUpperCase() {
return catalogOptions.get(CASE_SENSITIVE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ protected TableSchema getDataTableSchema(Identifier identifier, String branchNam
}

@Override
public boolean caseSensitive() {
public boolean allowUpperCase() {
return false;
}

Expand Down
12 changes: 10 additions & 2 deletions paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,11 @@ private List<String> normalizePrimaryKeys(List<String> primaryKeys) {
"Cannot define primary key on DDL and table options at the same time.");
}
String pk = options.get(CoreOptions.PRIMARY_KEY.key());
primaryKeys = Arrays.asList(pk.split(","));
primaryKeys =
Arrays.stream(pk.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
options.remove(CoreOptions.PRIMARY_KEY.key());
}
return primaryKeys;
Expand All @@ -174,7 +178,11 @@ private List<String> normalizePartitionKeys(List<String> partitionKeys) {
"Cannot define partition on DDL and table options at the same time.");
}
String partitions = options.get(CoreOptions.PARTITION.key());
partitionKeys = Arrays.asList(partitions.split(","));
partitionKeys =
Arrays.stream(partitions.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
options.remove(CoreOptions.PARTITION.key());
}
return partitionKeys;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.StringUtils;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -108,14 +109,14 @@ public Optional<TableSchema> latest() {
try {
return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX)
.reduce(Math::max)
.map(id -> schema(id));
.map(this::schema);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public List<TableSchema> listAll() {
return listAllIds().stream().map(id -> schema(id)).collect(Collectors.toList());
return listAllIds().stream().map(this::schema).collect(Collectors.toList());
}

/** List all schema IDs. */
Expand Down Expand Up @@ -184,24 +185,31 @@ public TableSchema commitChanges(SchemaChange... changes) throws Exception {
public TableSchema commitChanges(List<SchemaChange> changes)
throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException,
Catalog.ColumnNotExistException {
SnapshotManager snapshotManager = new SnapshotManager(fileIO, tableRoot, branch);
boolean hasSnapshots = (snapshotManager.latestSnapshotId() != null);

while (true) {
TableSchema schema =
TableSchema oldTableSchema =
latest().orElseThrow(
() ->
new Catalog.TableNotExistException(
fromPath(branchPath(), true)));
Map<String, String> newOptions = new HashMap<>(schema.options());
List<DataField> newFields = new ArrayList<>(schema.fields());
AtomicInteger highestFieldId = new AtomicInteger(schema.highestFieldId());
String newComment = schema.comment();
Map<String, String> newOptions = new HashMap<>(oldTableSchema.options());
List<DataField> newFields = new ArrayList<>(oldTableSchema.fields());
AtomicInteger highestFieldId = new AtomicInteger(oldTableSchema.highestFieldId());
String newComment = oldTableSchema.comment();
for (SchemaChange change : changes) {
if (change instanceof SetOption) {
SetOption setOption = (SetOption) change;
checkAlterTableOption(setOption.key());
if (hasSnapshots) {
checkAlterTableOption(setOption.key());
}
newOptions.put(setOption.key(), setOption.value());
} else if (change instanceof RemoveOption) {
RemoveOption removeOption = (RemoveOption) change;
checkAlterTableOption(removeOption.key());
if (hasSnapshots) {
checkAlterTableOption(removeOption.key());
}
newOptions.remove(removeOption.key());
} else if (change instanceof UpdateComment) {
UpdateComment updateComment = (UpdateComment) change;
Expand Down Expand Up @@ -245,7 +253,7 @@ public TableSchema commitChanges(List<SchemaChange> changes)

} else if (change instanceof RenameColumn) {
RenameColumn rename = (RenameColumn) change;
validateNotPrimaryAndPartitionKey(schema, rename.fieldName());
validateNotPrimaryAndPartitionKey(oldTableSchema, rename.fieldName());
if (newFields.stream().anyMatch(f -> f.name().equals(rename.newName()))) {
throw new Catalog.ColumnAlreadyExistException(
fromPath(branchPath(), true), rename.fieldName());
Expand All @@ -263,7 +271,7 @@ public TableSchema commitChanges(List<SchemaChange> changes)
field.description()));
} else if (change instanceof DropColumn) {
DropColumn drop = (DropColumn) change;
validateNotPrimaryAndPartitionKey(schema, drop.fieldName());
validateNotPrimaryAndPartitionKey(oldTableSchema, drop.fieldName());
if (!newFields.removeIf(
f -> f.name().equals(((DropColumn) change).fieldName()))) {
throw new Catalog.ColumnNotExistException(
Expand All @@ -274,7 +282,7 @@ public TableSchema commitChanges(List<SchemaChange> changes)
}
} else if (change instanceof UpdateColumnType) {
UpdateColumnType update = (UpdateColumnType) change;
if (schema.partitionKeys().contains(update.fieldName())) {
if (oldTableSchema.partitionKeys().contains(update.fieldName())) {
throw new IllegalArgumentException(
String.format(
"Cannot update partition column [%s] type in the table[%s].",
Expand Down Expand Up @@ -310,7 +318,7 @@ public TableSchema commitChanges(List<SchemaChange> changes)
UpdateColumnNullability update = (UpdateColumnNullability) change;
if (update.fieldNames().length == 1
&& update.newNullability()
&& schema.primaryKeys().contains(update.fieldNames()[0])) {
&& oldTableSchema.primaryKeys().contains(update.fieldNames()[0])) {
throw new UnsupportedOperationException(
"Cannot change nullability of primary key");
}
Expand Down Expand Up @@ -346,20 +354,29 @@ public TableSchema commitChanges(List<SchemaChange> changes)
}
}

TableSchema newSchema =
new TableSchema(
schema.id() + 1,
// We change TableSchema to Schema, because we want to deal with primary-key and
// partition in options.
Schema newSchema =
new Schema(
newFields,
highestFieldId.get(),
schema.partitionKeys(),
schema.primaryKeys(),
oldTableSchema.partitionKeys(),
oldTableSchema.primaryKeys(),
newOptions,
newComment);
TableSchema newTableSchema =
new TableSchema(
oldTableSchema.id() + 1,
newSchema.fields(),
highestFieldId.get(),
newSchema.partitionKeys(),
newSchema.primaryKeys(),
newSchema.options(),
newSchema.comment());

try {
boolean success = commit(newSchema);
boolean success = commit(newTableSchema);
if (success) {
return newSchema;
return newTableSchema;
}
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ public TableWriteImpl<InternalRow> newWrite(
AppendOnlyFileStoreWrite writer =
store().newWrite(commitUser, manifestFilter).withBucketMode(bucketMode());
return new TableWriteImpl<>(
rowType(),
writer,
createRowKeyExtractor(),
(record, rowKind) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ public TableWriteImpl<KeyValue> newWrite(
String commitUser, ManifestCacheFilter manifestFilter) {
KeyValue kv = new KeyValue();
return new TableWriteImpl<>(
rowType(),
store().newWrite(commitUser, manifestFilter),
createRowKeyExtractor(),
(record, rowKind) ->
Expand Down
Loading

0 comments on commit 2d63e53

Please sign in to comment.