Skip to content

Commit

Permalink
[flink] Avoid deprecated usage on TableSchema, DataType and Descripto…
Browse files Browse the repository at this point in the history
…rProperties (#4611)
  • Loading branch information
yunfengzhou-hub authored Dec 2, 2024
1 parent 419b02a commit 6fb887f
Show file tree
Hide file tree
Showing 8 changed files with 342 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,55 @@
import org.apache.paimon.types.DataField;

import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/** A {@link CatalogTableImpl} to wrap {@link FileStoreTable}. */
public class DataCatalogTable extends CatalogTableImpl {
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/** A {@link CatalogTable} to wrap {@link FileStoreTable}. */
public class DataCatalogTable implements CatalogTable {
// Schema of the table (column names and types)
private final Schema schema;

// Partition keys if this is a partitioned table. It's an empty set if the table is not
// partitioned
private final List<String> partitionKeys;

// Properties of the table
private final Map<String, String> options;

// Comment of the table
private final String comment;

private final Table table;
private final Map<String, String> nonPhysicalColumnComments;

public DataCatalogTable(
Table table,
TableSchema tableSchema,
Schema resolvedSchema,
List<String> partitionKeys,
Map<String, String> properties,
Map<String, String> options,
String comment,
Map<String, String> nonPhysicalColumnComments) {
super(tableSchema, partitionKeys, properties, comment);
this.schema = resolvedSchema;
this.partitionKeys = checkNotNull(partitionKeys, "partitionKeys cannot be null");
this.options = checkNotNull(options, "options cannot be null");

checkArgument(
options.entrySet().stream()
.allMatch(e -> e.getKey() != null && e.getValue() != null),
"properties cannot have null keys or values");

this.comment = comment;

this.table = table;
this.nonPhysicalColumnComments = nonPhysicalColumnComments;
}
Expand All @@ -66,32 +88,30 @@ public Schema getUnresolvedSchema() {
.filter(dataField -> dataField.description() != null)
.collect(Collectors.toMap(DataField::name, DataField::description));

return toSchema(getSchema(), columnComments);
return toSchema(schema, columnComments);
}

/** Copied from {@link TableSchema#toSchema(Map)} to support versions lower than 1.17. */
private Schema toSchema(TableSchema tableSchema, Map<String, String> comments) {
private Schema toSchema(Schema tableSchema, Map<String, String> comments) {
final Schema.Builder builder = Schema.newBuilder();

tableSchema
.getTableColumns()
.getColumns()
.forEach(
column -> {
if (column instanceof TableColumn.PhysicalColumn) {
final TableColumn.PhysicalColumn c =
(TableColumn.PhysicalColumn) column;
builder.column(c.getName(), c.getType());
} else if (column instanceof TableColumn.MetadataColumn) {
final TableColumn.MetadataColumn c =
(TableColumn.MetadataColumn) column;
if (column instanceof Schema.UnresolvedPhysicalColumn) {
final Schema.UnresolvedPhysicalColumn c =
(Schema.UnresolvedPhysicalColumn) column;
builder.column(c.getName(), c.getDataType());
} else if (column instanceof Schema.UnresolvedMetadataColumn) {
final Schema.UnresolvedMetadataColumn c =
(Schema.UnresolvedMetadataColumn) column;
builder.columnByMetadata(
c.getName(),
c.getType(),
c.getMetadataAlias().orElse(null),
c.getDataType(),
c.getMetadataKey(),
c.isVirtual());
} else if (column instanceof TableColumn.ComputedColumn) {
final TableColumn.ComputedColumn c =
(TableColumn.ComputedColumn) column;
} else if (column instanceof Schema.UnresolvedComputedColumn) {
final Schema.UnresolvedComputedColumn c =
(Schema.UnresolvedComputedColumn) column;
builder.columnByExpression(c.getName(), c.getExpression());
} else {
throw new IllegalArgumentException(
Expand All @@ -104,27 +124,24 @@ private Schema toSchema(TableSchema tableSchema, Map<String, String> comments) {
builder.withComment(nonPhysicalColumnComments.get(colName));
}
});

tableSchema
.getWatermarkSpecs()
.forEach(
spec ->
builder.watermark(
spec.getRowtimeAttribute(), spec.getWatermarkExpr()));

spec.getColumnName(), spec.getWatermarkExpression()));
if (tableSchema.getPrimaryKey().isPresent()) {
UniqueConstraint primaryKey = tableSchema.getPrimaryKey().get();
builder.primaryKeyNamed(primaryKey.getName(), primaryKey.getColumns());
Schema.UnresolvedPrimaryKey primaryKey = tableSchema.getPrimaryKey().get();
builder.primaryKeyNamed(primaryKey.getConstraintName(), primaryKey.getColumnNames());
}

return builder.build();
}

@Override
public CatalogBaseTable copy() {
return new DataCatalogTable(
table,
getSchema().copy(),
schema,
new ArrayList<>(getPartitionKeys()),
new HashMap<>(getOptions()),
getComment(),
Expand All @@ -135,10 +152,40 @@ public CatalogBaseTable copy() {
public CatalogTable copy(Map<String, String> options) {
return new DataCatalogTable(
table,
getSchema(),
schema,
getPartitionKeys(),
options,
getComment(),
nonPhysicalColumnComments);
}

@Override
public Optional<String> getDescription() {
return Optional.of(getComment());
}

@Override
public Optional<String> getDetailedDescription() {
return Optional.of("This is a catalog table in an im-memory catalog");
}

@Override
public boolean isPartitioned() {
return !partitionKeys.isEmpty();
}

@Override
public List<String> getPartitionKeys() {
return partitionKeys;
}

@Override
public Map<String, String> getOptions() {
return options;
}

@Override
public String getComment() {
return comment != null ? comment : "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.procedure.ProcedureUtil;
import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
import org.apache.paimon.flink.utils.FlinkDescriptorProperties;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.operation.FileStoreCommit;
Expand All @@ -46,7 +47,6 @@
import org.apache.paimon.view.View;
import org.apache.paimon.view.ViewImpl;

import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
Expand Down Expand Up @@ -96,7 +96,6 @@
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.procedures.Procedure;
Expand All @@ -121,13 +120,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.flink.table.descriptors.DescriptorProperties.COMMENT;
import static org.apache.flink.table.descriptors.DescriptorProperties.NAME;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
import static org.apache.flink.table.descriptors.Schema.SCHEMA;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
import static org.apache.flink.table.utils.EncodingUtils.decodeBase64ToBytes;
Expand All @@ -152,11 +144,18 @@
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static org.apache.paimon.flink.log.LogStoreRegister.registerLogSystem;
import static org.apache.paimon.flink.log.LogStoreRegister.unRegisterLogSystem;
import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.SCHEMA;
import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey;
import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn;
import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeWatermarkSpec;
import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.nonPhysicalColumnsCount;
import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.serializeNewWatermarkSpec;
import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.COMMENT;
import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.NAME;
import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK;
import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_ROWTIME;
import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_EXPR;
import static org.apache.paimon.flink.utils.TableStatsUtil.createTableColumnStats;
import static org.apache.paimon.flink.utils.TableStatsUtil.createTableStats;
import static org.apache.paimon.utils.Preconditions.checkArgument;
Expand Down Expand Up @@ -1008,18 +1007,18 @@ private static void validateAlterTable(CatalogBaseTable ct1, CatalogBaseTable ct
}
// materialized table is not resolved at this time.
if (!table1IsMaterialized) {
org.apache.flink.table.api.TableSchema ts1 = ct1.getSchema();
org.apache.flink.table.api.TableSchema ts2 = ct2.getSchema();
org.apache.flink.table.api.Schema ts1 = ct1.getUnresolvedSchema();
org.apache.flink.table.api.Schema ts2 = ct2.getUnresolvedSchema();
boolean pkEquality = false;

if (ts1.getPrimaryKey().isPresent() && ts2.getPrimaryKey().isPresent()) {
pkEquality =
Objects.equals(
ts1.getPrimaryKey().get().getType(),
ts2.getPrimaryKey().get().getType())
ts1.getPrimaryKey().get().getConstraintName(),
ts2.getPrimaryKey().get().getConstraintName())
&& Objects.equals(
ts1.getPrimaryKey().get().getColumns(),
ts2.getPrimaryKey().get().getColumns());
ts1.getPrimaryKey().get().getColumnNames(),
ts2.getPrimaryKey().get().getColumnNames());
} else if (!ts1.getPrimaryKey().isPresent() && !ts2.getPrimaryKey().isPresent()) {
pkEquality = true;
}
Expand Down Expand Up @@ -1063,7 +1062,8 @@ public final void close() throws CatalogException {
private CatalogBaseTable toCatalogTable(Table table) {
Map<String, String> newOptions = new HashMap<>(table.options());

TableSchema.Builder builder = TableSchema.builder();
org.apache.flink.table.api.Schema.Builder builder =
org.apache.flink.table.api.Schema.newBuilder();
Map<String, String> nonPhysicalColumnComments = new HashMap<>();

// add columns
Expand All @@ -1078,10 +1078,10 @@ private CatalogBaseTable toCatalogTable(Table table) {
if (optionalName == null || physicalColumns.contains(optionalName)) {
// build physical column from table row field
RowType.RowField field = physicalRowFields.get(physicalColumnIndex++);
builder.field(field.getName(), fromLogicalToDataType(field.getType()));
builder.column(field.getName(), fromLogicalToDataType(field.getType()));
} else {
// build non-physical column from options
builder.add(deserializeNonPhysicalColumn(newOptions, i));
deserializeNonPhysicalColumn(newOptions, i, builder);
if (newOptions.containsKey(compoundKey(SCHEMA, i, COMMENT))) {
nonPhysicalColumnComments.put(
optionalName, newOptions.get(compoundKey(SCHEMA, i, COMMENT)));
Expand All @@ -1093,22 +1093,18 @@ private CatalogBaseTable toCatalogTable(Table table) {
// extract watermark information
if (newOptions.keySet().stream()
.anyMatch(key -> key.startsWith(compoundKey(SCHEMA, WATERMARK)))) {
builder.watermark(deserializeWatermarkSpec(newOptions));
deserializeWatermarkSpec(newOptions, builder);
}

// add primary keys
if (table.primaryKeys().size() > 0) {
builder.primaryKey(
table.primaryKeys().stream().collect(Collectors.joining("_", "PK_", "")),
table.primaryKeys().toArray(new String[0]));
builder.primaryKey(table.primaryKeys());
}

TableSchema schema = builder.build();
org.apache.flink.table.api.Schema schema = builder.build();

// remove schema from options
DescriptorProperties removeProperties = new DescriptorProperties(false);
removeProperties.putTableSchema(SCHEMA, schema);
removeProperties.asMap().keySet().forEach(newOptions::remove);
FlinkDescriptorProperties.removeSchemaKeys(SCHEMA, schema, newOptions);

Options options = Options.fromMap(newOptions);
if (TableType.MATERIALIZED_TABLE == options.get(CoreOptions.TYPE)) {
Expand All @@ -1124,7 +1120,10 @@ private CatalogBaseTable toCatalogTable(Table table) {
}

private CatalogMaterializedTable buildMaterializedTable(
Table table, Map<String, String> newOptions, TableSchema schema, Options options) {
Table table,
Map<String, String> newOptions,
org.apache.flink.table.api.Schema schema,
Options options) {
String definitionQuery = options.get(MATERIALIZED_TABLE_DEFINITION_QUERY);
IntervalFreshness freshness =
IntervalFreshness.of(
Expand All @@ -1148,7 +1147,7 @@ private CatalogMaterializedTable buildMaterializedTable(
// remove materialized table related options
allMaterializedTableAttributes().forEach(newOptions::remove);
return CatalogMaterializedTable.newBuilder()
.schema(schema.toSchema())
.schema(schema)
.comment(table.comment().orElse(""))
.partitionKeys(table.partitionKeys())
.options(newOptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FunctionDefinitionFactory;
import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.table.procedures.Procedure;

import java.util.List;
Expand Down Expand Up @@ -86,11 +85,6 @@ public Optional<Factory> getFactory() {
new FlinkGenericTableFactory(paimon.getFactory().get(), flink.getFactory().get()));
}

@Override
public Optional<TableFactory> getTableFactory() {
return flink.getTableFactory();
}

@Override
public Optional<FunctionDefinitionFactory> getFunctionDefinitionFactory() {
return flink.getFunctionDefinitionFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.paimon.table.system.AuditLogTable;

import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.WatermarkSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.types.utils.TypeConversions;

Expand All @@ -32,11 +31,11 @@
import java.util.Map;
import java.util.Optional;

import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
import static org.apache.flink.table.descriptors.Schema.SCHEMA;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.SCHEMA;
import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey;
import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeWatermarkSpec;
import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK;

/** A {@link CatalogTable} to represent system table. */
public class SystemCatalogTable implements CatalogTable {
Expand All @@ -60,11 +59,8 @@ public Schema getUnresolvedSchema() {
Map<String, String> newOptions = new HashMap<>(table.options());
if (newOptions.keySet().stream()
.anyMatch(key -> key.startsWith(compoundKey(SCHEMA, WATERMARK)))) {
WatermarkSpec watermarkSpec = deserializeWatermarkSpec(newOptions);
return builder.watermark(
watermarkSpec.getRowtimeAttribute(),
watermarkSpec.getWatermarkExpr())
.build();
deserializeWatermarkSpec(newOptions, builder);
return builder.build();
}
}
return builder.build();
Expand Down
Loading

0 comments on commit 6fb887f

Please sign in to comment.