From 6fb887f47f2e79f6b3142f094b20b6d7a3f86846 Mon Sep 17 00:00:00 2001 From: yunfengzhou-hub Date: Mon, 2 Dec 2024 21:11:23 +0800 Subject: [PATCH] [flink] Avoid deprecated usage on TableSchema, DataType and DescriptorProperties (#4611) --- .../apache/paimon/flink/DataCatalogTable.java | 115 +++++++++++----- .../org/apache/paimon/flink/FlinkCatalog.java | 55 ++++---- .../paimon/flink/FlinkGenericCatalog.java | 6 - .../paimon/flink/SystemCatalogTable.java | 12 +- .../utils/FlinkCatalogPropertiesUtil.java | 102 ++++---------- .../utils/FlinkDescriptorProperties.java | 99 +++++++++++++ .../flink/FlinkCatalogPropertiesUtilTest.java | 130 +++++++++++++----- .../apache/paimon/flink/FlinkCatalogTest.java | 9 +- 8 files changed, 342 insertions(+), 186 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkDescriptorProperties.java diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java index 019d7bd6892f..e141581b476b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/DataCatalogTable.java @@ -23,33 +23,55 @@ import org.apache.paimon.types.DataField; import org.apache.flink.table.api.Schema; -import org.apache.flink.table.api.TableColumn; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.constraints.UniqueConstraint; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.CatalogTableImpl; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; -/** A {@link CatalogTableImpl} to wrap {@link FileStoreTable}. */ -public class DataCatalogTable extends CatalogTableImpl { +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** A {@link CatalogTable} to wrap {@link FileStoreTable}. */ +public class DataCatalogTable implements CatalogTable { + // Schema of the table (column names and types) + private final Schema schema; + + // Partition keys if this is a partitioned table. It's an empty set if the table is not + // partitioned + private final List partitionKeys; + + // Properties of the table + private final Map options; + + // Comment of the table + private final String comment; private final Table table; private final Map nonPhysicalColumnComments; public DataCatalogTable( Table table, - TableSchema tableSchema, + Schema resolvedSchema, List partitionKeys, - Map properties, + Map options, String comment, Map nonPhysicalColumnComments) { - super(tableSchema, partitionKeys, properties, comment); + this.schema = resolvedSchema; + this.partitionKeys = checkNotNull(partitionKeys, "partitionKeys cannot be null"); + this.options = checkNotNull(options, "options cannot be null"); + + checkArgument( + options.entrySet().stream() + .allMatch(e -> e.getKey() != null && e.getValue() != null), + "properties cannot have null keys or values"); + + this.comment = comment; + this.table = table; this.nonPhysicalColumnComments = nonPhysicalColumnComments; } @@ -66,32 +88,30 @@ public Schema getUnresolvedSchema() { .filter(dataField -> dataField.description() != null) .collect(Collectors.toMap(DataField::name, DataField::description)); - return toSchema(getSchema(), columnComments); + return toSchema(schema, columnComments); } - /** Copied from {@link TableSchema#toSchema(Map)} to support versions lower than 1.17. */ - private Schema toSchema(TableSchema tableSchema, Map comments) { + private Schema toSchema(Schema tableSchema, Map comments) { final Schema.Builder builder = Schema.newBuilder(); - tableSchema - .getTableColumns() + .getColumns() .forEach( column -> { - if (column instanceof TableColumn.PhysicalColumn) { - final TableColumn.PhysicalColumn c = - (TableColumn.PhysicalColumn) column; - builder.column(c.getName(), c.getType()); - } else if (column instanceof TableColumn.MetadataColumn) { - final TableColumn.MetadataColumn c = - (TableColumn.MetadataColumn) column; + if (column instanceof Schema.UnresolvedPhysicalColumn) { + final Schema.UnresolvedPhysicalColumn c = + (Schema.UnresolvedPhysicalColumn) column; + builder.column(c.getName(), c.getDataType()); + } else if (column instanceof Schema.UnresolvedMetadataColumn) { + final Schema.UnresolvedMetadataColumn c = + (Schema.UnresolvedMetadataColumn) column; builder.columnByMetadata( c.getName(), - c.getType(), - c.getMetadataAlias().orElse(null), + c.getDataType(), + c.getMetadataKey(), c.isVirtual()); - } else if (column instanceof TableColumn.ComputedColumn) { - final TableColumn.ComputedColumn c = - (TableColumn.ComputedColumn) column; + } else if (column instanceof Schema.UnresolvedComputedColumn) { + final Schema.UnresolvedComputedColumn c = + (Schema.UnresolvedComputedColumn) column; builder.columnByExpression(c.getName(), c.getExpression()); } else { throw new IllegalArgumentException( @@ -104,19 +124,16 @@ private Schema toSchema(TableSchema tableSchema, Map comments) { builder.withComment(nonPhysicalColumnComments.get(colName)); } }); - tableSchema .getWatermarkSpecs() .forEach( spec -> builder.watermark( - spec.getRowtimeAttribute(), spec.getWatermarkExpr())); - + spec.getColumnName(), spec.getWatermarkExpression())); if (tableSchema.getPrimaryKey().isPresent()) { - UniqueConstraint primaryKey = tableSchema.getPrimaryKey().get(); - builder.primaryKeyNamed(primaryKey.getName(), primaryKey.getColumns()); + Schema.UnresolvedPrimaryKey primaryKey = tableSchema.getPrimaryKey().get(); + builder.primaryKeyNamed(primaryKey.getConstraintName(), primaryKey.getColumnNames()); } - return builder.build(); } @@ -124,7 +141,7 @@ private Schema toSchema(TableSchema tableSchema, Map comments) { public CatalogBaseTable copy() { return new DataCatalogTable( table, - getSchema().copy(), + schema, new ArrayList<>(getPartitionKeys()), new HashMap<>(getOptions()), getComment(), @@ -135,10 +152,40 @@ public CatalogBaseTable copy() { public CatalogTable copy(Map options) { return new DataCatalogTable( table, - getSchema(), + schema, getPartitionKeys(), options, getComment(), nonPhysicalColumnComments); } + + @Override + public Optional getDescription() { + return Optional.of(getComment()); + } + + @Override + public Optional getDetailedDescription() { + return Optional.of("This is a catalog table in an im-memory catalog"); + } + + @Override + public boolean isPartitioned() { + return !partitionKeys.isEmpty(); + } + + @Override + public List getPartitionKeys() { + return partitionKeys; + } + + @Override + public Map getOptions() { + return options; + } + + @Override + public String getComment() { + return comment != null ? comment : ""; + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index c67e79c1c06b..3a7f9790ccca 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -24,6 +24,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.procedure.ProcedureUtil; import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil; +import org.apache.paimon.flink.utils.FlinkDescriptorProperties; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.operation.FileStoreCommit; @@ -46,7 +47,6 @@ import org.apache.paimon.view.View; import org.apache.paimon.view.ViewImpl; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.AbstractCatalog; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabase; @@ -96,7 +96,6 @@ import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; import org.apache.flink.table.catalog.stats.CatalogTableStatistics; -import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.factories.Factory; import org.apache.flink.table.procedures.Procedure; @@ -121,13 +120,6 @@ import java.util.function.Function; import java.util.stream.Collectors; -import static org.apache.flink.table.descriptors.DescriptorProperties.COMMENT; -import static org.apache.flink.table.descriptors.DescriptorProperties.NAME; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR; -import static org.apache.flink.table.descriptors.Schema.SCHEMA; import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; import static org.apache.flink.table.utils.EncodingUtils.decodeBase64ToBytes; @@ -152,11 +144,18 @@ import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; import static org.apache.paimon.flink.log.LogStoreRegister.registerLogSystem; import static org.apache.paimon.flink.log.LogStoreRegister.unRegisterLogSystem; +import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.SCHEMA; import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey; import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn; import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeWatermarkSpec; import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.nonPhysicalColumnsCount; import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.serializeNewWatermarkSpec; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.COMMENT; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.NAME; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_ROWTIME; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_EXPR; import static org.apache.paimon.flink.utils.TableStatsUtil.createTableColumnStats; import static org.apache.paimon.flink.utils.TableStatsUtil.createTableStats; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -1008,18 +1007,18 @@ private static void validateAlterTable(CatalogBaseTable ct1, CatalogBaseTable ct } // materialized table is not resolved at this time. if (!table1IsMaterialized) { - org.apache.flink.table.api.TableSchema ts1 = ct1.getSchema(); - org.apache.flink.table.api.TableSchema ts2 = ct2.getSchema(); + org.apache.flink.table.api.Schema ts1 = ct1.getUnresolvedSchema(); + org.apache.flink.table.api.Schema ts2 = ct2.getUnresolvedSchema(); boolean pkEquality = false; if (ts1.getPrimaryKey().isPresent() && ts2.getPrimaryKey().isPresent()) { pkEquality = Objects.equals( - ts1.getPrimaryKey().get().getType(), - ts2.getPrimaryKey().get().getType()) + ts1.getPrimaryKey().get().getConstraintName(), + ts2.getPrimaryKey().get().getConstraintName()) && Objects.equals( - ts1.getPrimaryKey().get().getColumns(), - ts2.getPrimaryKey().get().getColumns()); + ts1.getPrimaryKey().get().getColumnNames(), + ts2.getPrimaryKey().get().getColumnNames()); } else if (!ts1.getPrimaryKey().isPresent() && !ts2.getPrimaryKey().isPresent()) { pkEquality = true; } @@ -1063,7 +1062,8 @@ public final void close() throws CatalogException { private CatalogBaseTable toCatalogTable(Table table) { Map newOptions = new HashMap<>(table.options()); - TableSchema.Builder builder = TableSchema.builder(); + org.apache.flink.table.api.Schema.Builder builder = + org.apache.flink.table.api.Schema.newBuilder(); Map nonPhysicalColumnComments = new HashMap<>(); // add columns @@ -1078,10 +1078,10 @@ private CatalogBaseTable toCatalogTable(Table table) { if (optionalName == null || physicalColumns.contains(optionalName)) { // build physical column from table row field RowType.RowField field = physicalRowFields.get(physicalColumnIndex++); - builder.field(field.getName(), fromLogicalToDataType(field.getType())); + builder.column(field.getName(), fromLogicalToDataType(field.getType())); } else { // build non-physical column from options - builder.add(deserializeNonPhysicalColumn(newOptions, i)); + deserializeNonPhysicalColumn(newOptions, i, builder); if (newOptions.containsKey(compoundKey(SCHEMA, i, COMMENT))) { nonPhysicalColumnComments.put( optionalName, newOptions.get(compoundKey(SCHEMA, i, COMMENT))); @@ -1093,22 +1093,18 @@ private CatalogBaseTable toCatalogTable(Table table) { // extract watermark information if (newOptions.keySet().stream() .anyMatch(key -> key.startsWith(compoundKey(SCHEMA, WATERMARK)))) { - builder.watermark(deserializeWatermarkSpec(newOptions)); + deserializeWatermarkSpec(newOptions, builder); } // add primary keys if (table.primaryKeys().size() > 0) { - builder.primaryKey( - table.primaryKeys().stream().collect(Collectors.joining("_", "PK_", "")), - table.primaryKeys().toArray(new String[0])); + builder.primaryKey(table.primaryKeys()); } - TableSchema schema = builder.build(); + org.apache.flink.table.api.Schema schema = builder.build(); // remove schema from options - DescriptorProperties removeProperties = new DescriptorProperties(false); - removeProperties.putTableSchema(SCHEMA, schema); - removeProperties.asMap().keySet().forEach(newOptions::remove); + FlinkDescriptorProperties.removeSchemaKeys(SCHEMA, schema, newOptions); Options options = Options.fromMap(newOptions); if (TableType.MATERIALIZED_TABLE == options.get(CoreOptions.TYPE)) { @@ -1124,7 +1120,10 @@ private CatalogBaseTable toCatalogTable(Table table) { } private CatalogMaterializedTable buildMaterializedTable( - Table table, Map newOptions, TableSchema schema, Options options) { + Table table, + Map newOptions, + org.apache.flink.table.api.Schema schema, + Options options) { String definitionQuery = options.get(MATERIALIZED_TABLE_DEFINITION_QUERY); IntervalFreshness freshness = IntervalFreshness.of( @@ -1148,7 +1147,7 @@ private CatalogMaterializedTable buildMaterializedTable( // remove materialized table related options allMaterializedTableAttributes().forEach(newOptions::remove); return CatalogMaterializedTable.newBuilder() - .schema(schema.toSchema()) + .schema(schema) .comment(table.comment().orElse("")) .partitionKeys(table.partitionKeys()) .options(newOptions) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java index 37bed2d0480f..75af5917bb49 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkGenericCatalog.java @@ -48,7 +48,6 @@ import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.factories.Factory; import org.apache.flink.table.factories.FunctionDefinitionFactory; -import org.apache.flink.table.factories.TableFactory; import org.apache.flink.table.procedures.Procedure; import java.util.List; @@ -86,11 +85,6 @@ public Optional getFactory() { new FlinkGenericTableFactory(paimon.getFactory().get(), flink.getFactory().get())); } - @Override - public Optional getTableFactory() { - return flink.getTableFactory(); - } - @Override public Optional getFunctionDefinitionFactory() { return flink.getFunctionDefinitionFactory(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java index d5d843d91bb1..f88a808713c2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/SystemCatalogTable.java @@ -22,7 +22,6 @@ import org.apache.paimon.table.system.AuditLogTable; import org.apache.flink.table.api.Schema; -import org.apache.flink.table.api.WatermarkSpec; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.types.utils.TypeConversions; @@ -32,11 +31,11 @@ import java.util.Map; import java.util.Optional; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK; -import static org.apache.flink.table.descriptors.Schema.SCHEMA; import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; +import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.SCHEMA; import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey; import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeWatermarkSpec; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK; /** A {@link CatalogTable} to represent system table. */ public class SystemCatalogTable implements CatalogTable { @@ -60,11 +59,8 @@ public Schema getUnresolvedSchema() { Map newOptions = new HashMap<>(table.options()); if (newOptions.keySet().stream() .anyMatch(key -> key.startsWith(compoundKey(SCHEMA, WATERMARK)))) { - WatermarkSpec watermarkSpec = deserializeWatermarkSpec(newOptions); - return builder.watermark( - watermarkSpec.getRowtimeAttribute(), - watermarkSpec.getWatermarkExpr()) - .build(); + deserializeWatermarkSpec(newOptions, builder); + return builder.build(); } } return builder.build(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java index b0f99a6e89e4..fa84a1ca070d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java @@ -20,8 +20,7 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableSet; -import org.apache.flink.table.api.TableColumn; -import org.apache.flink.table.api.WatermarkSpec; +import org.apache.flink.table.api.Schema; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.types.DataType; @@ -36,48 +35,23 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static org.apache.flink.table.descriptors.DescriptorProperties.COMMENT; -import static org.apache.flink.table.descriptors.DescriptorProperties.DATA_TYPE; -import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR; -import static org.apache.flink.table.descriptors.DescriptorProperties.METADATA; -import static org.apache.flink.table.descriptors.DescriptorProperties.NAME; -import static org.apache.flink.table.descriptors.DescriptorProperties.VIRTUAL; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR; -import static org.apache.flink.table.descriptors.Schema.SCHEMA; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.COMMENT; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.DATA_TYPE; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.EXPR; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.METADATA; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.NAME; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.VIRTUAL; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_ROWTIME; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_EXPR; /** * Utilities for ser/deserializing non-physical columns and watermark into/from a map of string * properties. */ public class FlinkCatalogPropertiesUtil { - - public static Map serializeNonPhysicalColumns( - Map indexMap, List nonPhysicalColumns) { - Map serialized = new HashMap<>(); - for (TableColumn c : nonPhysicalColumns) { - int index = indexMap.get(c.getName()); - serialized.put(compoundKey(SCHEMA, index, NAME), c.getName()); - serialized.put( - compoundKey(SCHEMA, index, DATA_TYPE), - c.getType().getLogicalType().asSerializableString()); - if (c instanceof TableColumn.ComputedColumn) { - TableColumn.ComputedColumn computedColumn = (TableColumn.ComputedColumn) c; - serialized.put(compoundKey(SCHEMA, index, EXPR), computedColumn.getExpression()); - } else { - TableColumn.MetadataColumn metadataColumn = (TableColumn.MetadataColumn) c; - serialized.put( - compoundKey(SCHEMA, index, METADATA), - metadataColumn.getMetadataAlias().orElse(metadataColumn.getName())); - serialized.put( - compoundKey(SCHEMA, index, VIRTUAL), - Boolean.toString(metadataColumn.isVirtual())); - } - } - return serialized; - } + public static final String SCHEMA = "schema"; /** Serialize non-physical columns of new api. */ public static Map serializeNonPhysicalNewColumns(ResolvedSchema schema) { @@ -119,22 +93,6 @@ public static Map serializeNonPhysicalNewColumns(ResolvedSchema return serialized; } - public static Map serializeWatermarkSpec(WatermarkSpec watermarkSpec) { - Map serializedWatermarkSpec = new HashMap<>(); - String watermarkPrefix = compoundKey(SCHEMA, WATERMARK, 0); - serializedWatermarkSpec.put( - compoundKey(watermarkPrefix, WATERMARK_ROWTIME), - watermarkSpec.getRowtimeAttribute()); - serializedWatermarkSpec.put( - compoundKey(watermarkPrefix, WATERMARK_STRATEGY_EXPR), - watermarkSpec.getWatermarkExpr()); - serializedWatermarkSpec.put( - compoundKey(watermarkPrefix, WATERMARK_STRATEGY_DATA_TYPE), - watermarkSpec.getWatermarkExprOutputType().getLogicalType().asSerializableString()); - - return serializedWatermarkSpec; - } - public static Map serializeNewWatermarkSpec( org.apache.flink.table.catalog.WatermarkSpec watermarkSpec) { Map serializedWatermarkSpec = new HashMap<>(); @@ -219,7 +177,8 @@ private static boolean isColumnNameKey(String key) { && SCHEMA_COLUMN_NAME_SUFFIX.matcher(key.substring(SCHEMA.length() + 1)).matches(); } - public static TableColumn deserializeNonPhysicalColumn(Map options, int index) { + public static void deserializeNonPhysicalColumn( + Map options, int index, Schema.Builder builder) { String nameKey = compoundKey(SCHEMA, index, NAME); String dataTypeKey = compoundKey(SCHEMA, index, DATA_TYPE); String exprKey = compoundKey(SCHEMA, index, EXPR); @@ -227,45 +186,42 @@ public static TableColumn deserializeNonPhysicalColumn(Map optio String virtualKey = compoundKey(SCHEMA, index, VIRTUAL); String name = options.get(nameKey); - DataType dataType = - TypeConversions.fromLogicalToDataType( - LogicalTypeParser.parse(options.get(dataTypeKey))); - TableColumn column; if (options.containsKey(exprKey)) { - column = TableColumn.computed(name, dataType, options.get(exprKey)); + final String expr = options.get(exprKey); + builder.columnByExpression(name, expr); } else if (options.containsKey(metadataKey)) { String metadataAlias = options.get(metadataKey); boolean isVirtual = Boolean.parseBoolean(options.get(virtualKey)); - column = - metadataAlias.equals(name) - ? TableColumn.metadata(name, dataType, isVirtual) - : TableColumn.metadata(name, dataType, metadataAlias, isVirtual); + DataType dataType = + TypeConversions.fromLogicalToDataType( + LogicalTypeParser.parse( + options.get(dataTypeKey), + Thread.currentThread().getContextClassLoader())); + if (metadataAlias.equals(name)) { + builder.columnByMetadata(name, dataType, isVirtual); + } else { + builder.columnByMetadata(name, dataType, metadataAlias, isVirtual); + } } else { throw new RuntimeException( String.format( "Failed to build non-physical column. Current index is %s, options are %s", index, options)); } - - return column; } - public static WatermarkSpec deserializeWatermarkSpec(Map options) { + public static void deserializeWatermarkSpec( + Map options, Schema.Builder builder) { String watermarkPrefixKey = compoundKey(SCHEMA, WATERMARK); String rowtimeKey = compoundKey(watermarkPrefixKey, 0, WATERMARK_ROWTIME); String exprKey = compoundKey(watermarkPrefixKey, 0, WATERMARK_STRATEGY_EXPR); - String dataTypeKey = compoundKey(watermarkPrefixKey, 0, WATERMARK_STRATEGY_DATA_TYPE); String rowtimeAttribute = options.get(rowtimeKey); String watermarkExpressionString = options.get(exprKey); - DataType watermarkExprOutputType = - TypeConversions.fromLogicalToDataType( - LogicalTypeParser.parse(options.get(dataTypeKey))); - return new WatermarkSpec( - rowtimeAttribute, watermarkExpressionString, watermarkExprOutputType); + builder.watermark(rowtimeAttribute, watermarkExpressionString); } public static String compoundKey(Object... components) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkDescriptorProperties.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkDescriptorProperties.java new file mode 100644 index 000000000000..edc73ca7bf41 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkDescriptorProperties.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.utils; + +import org.apache.flink.table.api.Schema; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Utility class for having a unified string-based representation of Table API related classes such + * as Schema, TypeInformation, etc. + * + *

Note to implementers: Please try to reuse key names as much as possible. Key-names should be + * hierarchical and lower case. Use "-" instead of dots or camel case. E.g., + * connector.schema.start-from = from-earliest. Try not to use the higher level in a key-name. E.g., + * instead of connector.kafka.kafka-version use connector.kafka.version. + * + *

Properties with key normalization enabled contain only lower-case keys. + */ +public class FlinkDescriptorProperties { + + public static final String NAME = "name"; + + public static final String DATA_TYPE = "data-type"; + + public static final String EXPR = "expr"; + + public static final String METADATA = "metadata"; + + public static final String VIRTUAL = "virtual"; + + public static final String WATERMARK = "watermark"; + + public static final String WATERMARK_ROWTIME = "rowtime"; + + public static final String WATERMARK_STRATEGY = "strategy"; + + public static final String WATERMARK_STRATEGY_EXPR = WATERMARK_STRATEGY + '.' + EXPR; + + public static final String WATERMARK_STRATEGY_DATA_TYPE = WATERMARK_STRATEGY + '.' + DATA_TYPE; + + public static final String PRIMARY_KEY_NAME = "primary-key.name"; + + public static final String PRIMARY_KEY_COLUMNS = "primary-key.columns"; + + public static final String COMMENT = "comment"; + + public static void removeSchemaKeys(String key, Schema schema, Map options) { + checkNotNull(key); + checkNotNull(schema); + + List subKeys = Arrays.asList(NAME, DATA_TYPE, EXPR, METADATA, VIRTUAL); + for (int idx = 0; idx < schema.getColumns().size(); idx++) { + for (String subKey : subKeys) { + options.remove(key + '.' + idx + '.' + subKey); + } + } + + if (!schema.getWatermarkSpecs().isEmpty()) { + subKeys = + Arrays.asList( + WATERMARK_ROWTIME, + WATERMARK_STRATEGY_EXPR, + WATERMARK_STRATEGY_DATA_TYPE); + for (int idx = 0; idx < schema.getWatermarkSpecs().size(); idx++) { + for (String subKey : subKeys) { + options.remove(key + '.' + WATERMARK + '.' + idx + '.' + subKey); + } + } + } + + schema.getPrimaryKey() + .ifPresent( + pk -> { + options.remove(key + '.' + PRIMARY_KEY_NAME); + options.remove(key + '.' + PRIMARY_KEY_COLUMNS); + }); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java index 9268a236b6cb..e32150b1fe82 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java @@ -21,27 +21,35 @@ import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil; import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableColumn; -import org.apache.flink.table.api.WatermarkSpec; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.WatermarkSpec; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.expressions.ExpressionVisitor; +import org.apache.flink.table.expressions.ResolvedExpression; +import org.apache.flink.table.expressions.SqlCallExpression; +import org.apache.flink.table.types.DataType; import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.apache.flink.table.descriptors.DescriptorProperties.DATA_TYPE; -import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR; -import static org.apache.flink.table.descriptors.DescriptorProperties.METADATA; -import static org.apache.flink.table.descriptors.DescriptorProperties.NAME; -import static org.apache.flink.table.descriptors.DescriptorProperties.VIRTUAL; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR; -import static org.apache.flink.table.descriptors.Schema.SCHEMA; +import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.SCHEMA; import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.DATA_TYPE; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.EXPR; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.METADATA; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.NAME; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.VIRTUAL; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_ROWTIME; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE; +import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_EXPR; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link FlinkCatalogPropertiesUtil}. */ @@ -49,18 +57,27 @@ public class FlinkCatalogPropertiesUtilTest { @Test public void testSerDeNonPhysicalColumns() { - Map indexMap = new HashMap<>(); - indexMap.put("comp", 2); - indexMap.put("meta1", 3); - indexMap.put("meta2", 5); - List columns = new ArrayList<>(); - columns.add(TableColumn.computed("comp", DataTypes.INT(), "`k` * 2")); - columns.add(TableColumn.metadata("meta1", DataTypes.VARCHAR(10))); - columns.add(TableColumn.metadata("meta2", DataTypes.BIGINT().notNull(), "price", true)); + List columns = new ArrayList<>(); + columns.add(new Schema.UnresolvedComputedColumn("comp", new SqlCallExpression("`k` * 2"))); + columns.add( + new Schema.UnresolvedMetadataColumn("meta1", DataTypes.VARCHAR(10), null, false)); + columns.add( + new Schema.UnresolvedMetadataColumn( + "meta2", DataTypes.BIGINT().notNull(), "price", true, null)); + + List resolvedColumns = new ArrayList<>(); + resolvedColumns.add(Column.physical("phy1", DataTypes.INT())); + resolvedColumns.add(Column.physical("phy2", DataTypes.INT())); + resolvedColumns.add( + Column.computed("comp", new TestResolvedExpression("`k` * 2", DataTypes.INT()))); + resolvedColumns.add(Column.metadata("meta1", DataTypes.VARCHAR(10), null, false)); + resolvedColumns.add(Column.physical("phy3", DataTypes.INT())); + resolvedColumns.add(Column.metadata("meta2", DataTypes.BIGINT().notNull(), "price", true)); // validate serialization Map serialized = - FlinkCatalogPropertiesUtil.serializeNonPhysicalColumns(indexMap, columns); + FlinkCatalogPropertiesUtil.serializeNonPhysicalNewColumns( + new ResolvedSchema(resolvedColumns, Collections.emptyList(), null)); Map expected = new HashMap<>(); expected.put(compoundKey(SCHEMA, 2, NAME), "comp"); @@ -80,27 +97,26 @@ public void testSerDeNonPhysicalColumns() { assertThat(serialized).containsExactlyInAnyOrderEntriesOf(expected); // validate deserialization - List deserialized = new ArrayList<>(); - deserialized.add(FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized, 2)); - deserialized.add(FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized, 3)); - deserialized.add(FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized, 5)); + Schema.Builder builder = Schema.newBuilder(); + FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized, 2, builder); + FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized, 3, builder); + FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized, 5, builder); - assertThat(deserialized).isEqualTo(columns); - - // validate that + assertThat(builder.build().getColumns()) + .containsExactly(columns.toArray(new Schema.UnresolvedColumn[0])); } @Test public void testSerDeWatermarkSpec() { WatermarkSpec watermarkSpec = - new WatermarkSpec( + WatermarkSpec.of( "test_time", - "`test_time` - INTERVAL '0.001' SECOND", - DataTypes.TIMESTAMP(3)); + new TestResolvedExpression( + "`test_time` - INTERVAL '0.001' SECOND", DataTypes.TIMESTAMP(3))); // validate serialization Map serialized = - FlinkCatalogPropertiesUtil.serializeWatermarkSpec(watermarkSpec); + FlinkCatalogPropertiesUtil.serializeNewWatermarkSpec(watermarkSpec); Map expected = new HashMap<>(); String watermarkPrefix = compoundKey(SCHEMA, WATERMARK, 0); @@ -113,9 +129,13 @@ public void testSerDeWatermarkSpec() { assertThat(serialized).containsExactlyInAnyOrderEntriesOf(expected); // validate serialization - WatermarkSpec deserialized = - FlinkCatalogPropertiesUtil.deserializeWatermarkSpec(serialized); - assertThat(deserialized).isEqualTo(watermarkSpec); + Schema.Builder builder = Schema.newBuilder(); + FlinkCatalogPropertiesUtil.deserializeWatermarkSpec(serialized, builder); + assertThat(builder.build().getWatermarkSpecs()).hasSize(1); + Schema.UnresolvedWatermarkSpec actual = builder.build().getWatermarkSpecs().get(0); + assertThat(actual.getColumnName()).isEqualTo(watermarkSpec.getRowtimeAttribute()); + assertThat(actual.getWatermarkExpression().asSummaryString()) + .isEqualTo(watermarkSpec.getWatermarkExpression().asSummaryString()); } @Test @@ -150,4 +170,44 @@ public void testNonPhysicalColumnsCount() { oldStyleOptions, Arrays.asList("phy1", "phy2"))) .isEqualTo(3); } + + private static class TestResolvedExpression implements ResolvedExpression { + private final String name; + private final DataType outputDataType; + + private TestResolvedExpression(String name, DataType outputDataType) { + this.name = name; + this.outputDataType = outputDataType; + } + + @Override + public DataType getOutputDataType() { + return outputDataType; + } + + @Override + public List getResolvedChildren() { + return Collections.emptyList(); + } + + @Override + public String asSummaryString() { + return new SqlCallExpression(name).asSummaryString(); + } + + @Override + public String asSerializableString() { + return name; + } + + @Override + public List getChildren() { + return Collections.emptyList(); + } + + @Override + public R accept(ExpressionVisitor expressionVisitor) { + return null; + } + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java index 27a89510975f..e4286eb18172 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java @@ -850,7 +850,7 @@ private static void checkEquals(CatalogBaseTable t1, CatalogBaseTable t2) { assertThat(t2.getComment()).isEqualTo(t1.getComment()); assertThat(t2.getOptions()).isEqualTo(t1.getOptions()); if (t1.getTableKind() == CatalogBaseTable.TableKind.TABLE) { - assertThat(t2.getSchema()).isEqualTo(t1.getSchema()); + assertThat(t2.getUnresolvedSchema()).isEqualTo(t1.getUnresolvedSchema()); assertThat(((CatalogTable) (t2)).getPartitionKeys()) .isEqualTo(((CatalogTable) (t1)).getPartitionKeys()); assertThat(((CatalogTable) (t2)).isPartitioned()) @@ -864,7 +864,12 @@ private static void checkEquals(CatalogBaseTable t1, CatalogBaseTable t2) { t2.getUnresolvedSchema() .resolve(new TestSchemaResolver())) .build()) - .isEqualTo(t1.getSchema().toSchema()); + .isEqualTo( + Schema.newBuilder() + .fromResolvedSchema( + t1.getUnresolvedSchema() + .resolve(new TestSchemaResolver())) + .build()); assertThat(mt2.getPartitionKeys()).isEqualTo(mt1.getPartitionKeys()); assertThat(mt2.isPartitioned()).isEqualTo(mt1.isPartitioned()); // validate definition query