Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flink] Avoid deprecated usage on TableSchema, DataType and DescriptorProperties #4611

Merged
merged 3 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,55 @@
import org.apache.paimon.types.DataField;

import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

/** A {@link CatalogTableImpl} to wrap {@link FileStoreTable}. */
public class DataCatalogTable extends CatalogTableImpl {
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/** A {@link CatalogTable} to wrap {@link FileStoreTable}. */
public class DataCatalogTable implements CatalogTable {
// Schema of the table (column names and types)
private final Schema schema;

// Partition keys if this is a partitioned table. It's an empty set if the table is not
// partitioned
private final List<String> partitionKeys;

// Properties of the table
private final Map<String, String> options;

// Comment of the table
private final String comment;

private final Table table;
private final Map<String, String> nonPhysicalColumnComments;

public DataCatalogTable(
Table table,
TableSchema tableSchema,
Schema resolvedSchema,
List<String> partitionKeys,
Map<String, String> properties,
Map<String, String> options,
String comment,
Map<String, String> nonPhysicalColumnComments) {
super(tableSchema, partitionKeys, properties, comment);
this.schema = resolvedSchema;
this.partitionKeys = checkNotNull(partitionKeys, "partitionKeys cannot be null");
this.options = checkNotNull(options, "options cannot be null");

checkArgument(
options.entrySet().stream()
.allMatch(e -> e.getKey() != null && e.getValue() != null),
"properties cannot have null keys or values");

this.comment = comment;

this.table = table;
this.nonPhysicalColumnComments = nonPhysicalColumnComments;
}
Expand All @@ -66,32 +88,30 @@ public Schema getUnresolvedSchema() {
.filter(dataField -> dataField.description() != null)
.collect(Collectors.toMap(DataField::name, DataField::description));

return toSchema(getSchema(), columnComments);
return toSchema(schema, columnComments);
}

/** Copied from {@link TableSchema#toSchema(Map)} to support versions lower than 1.17. */
private Schema toSchema(TableSchema tableSchema, Map<String, String> comments) {
private Schema toSchema(Schema tableSchema, Map<String, String> comments) {
final Schema.Builder builder = Schema.newBuilder();

tableSchema
.getTableColumns()
.getColumns()
.forEach(
column -> {
if (column instanceof TableColumn.PhysicalColumn) {
final TableColumn.PhysicalColumn c =
(TableColumn.PhysicalColumn) column;
builder.column(c.getName(), c.getType());
} else if (column instanceof TableColumn.MetadataColumn) {
final TableColumn.MetadataColumn c =
(TableColumn.MetadataColumn) column;
if (column instanceof Schema.UnresolvedPhysicalColumn) {
final Schema.UnresolvedPhysicalColumn c =
(Schema.UnresolvedPhysicalColumn) column;
builder.column(c.getName(), c.getDataType());
} else if (column instanceof Schema.UnresolvedMetadataColumn) {
final Schema.UnresolvedMetadataColumn c =
(Schema.UnresolvedMetadataColumn) column;
builder.columnByMetadata(
c.getName(),
c.getType(),
c.getMetadataAlias().orElse(null),
c.getDataType(),
c.getMetadataKey(),
c.isVirtual());
} else if (column instanceof TableColumn.ComputedColumn) {
final TableColumn.ComputedColumn c =
(TableColumn.ComputedColumn) column;
} else if (column instanceof Schema.UnresolvedComputedColumn) {
final Schema.UnresolvedComputedColumn c =
(Schema.UnresolvedComputedColumn) column;
builder.columnByExpression(c.getName(), c.getExpression());
} else {
throw new IllegalArgumentException(
Expand All @@ -104,27 +124,24 @@ private Schema toSchema(TableSchema tableSchema, Map<String, String> comments) {
builder.withComment(nonPhysicalColumnComments.get(colName));
}
});

tableSchema
.getWatermarkSpecs()
.forEach(
spec ->
builder.watermark(
spec.getRowtimeAttribute(), spec.getWatermarkExpr()));

spec.getColumnName(), spec.getWatermarkExpression()));
if (tableSchema.getPrimaryKey().isPresent()) {
UniqueConstraint primaryKey = tableSchema.getPrimaryKey().get();
builder.primaryKeyNamed(primaryKey.getName(), primaryKey.getColumns());
Schema.UnresolvedPrimaryKey primaryKey = tableSchema.getPrimaryKey().get();
builder.primaryKeyNamed(primaryKey.getConstraintName(), primaryKey.getColumnNames());
}

return builder.build();
}

@Override
public CatalogBaseTable copy() {
return new DataCatalogTable(
table,
getSchema().copy(),
Schema.newBuilder().fromSchema(schema).build(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why need to copy it? I see there is no copy in DefaultCatalogTable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The copy was added because the copy had been made in the existing implementation. But I agree with it that the copy is unnecessary. I'll remove it.

new ArrayList<>(getPartitionKeys()),
new HashMap<>(getOptions()),
getComment(),
Expand All @@ -135,10 +152,40 @@ public CatalogBaseTable copy() {
public CatalogTable copy(Map<String, String> options) {
return new DataCatalogTable(
table,
getSchema(),
Schema.newBuilder().fromSchema(schema).build(),
getPartitionKeys(),
options,
getComment(),
nonPhysicalColumnComments);
}

@Override
public Optional<String> getDescription() {
return Optional.of(getComment());
}

@Override
public Optional<String> getDetailedDescription() {
return Optional.of("This is a catalog table in an im-memory catalog");
}

@Override
public boolean isPartitioned() {
return !partitionKeys.isEmpty();
}

@Override
public List<String> getPartitionKeys() {
return partitionKeys;
}

@Override
public Map<String, String> getOptions() {
return options;
}

@Override
public String getComment() {
return comment != null ? comment : "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.procedure.ProcedureUtil;
import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
import org.apache.paimon.flink.utils.FlinkDescriptorProperties;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.operation.FileStoreCommit;
Expand All @@ -46,7 +47,6 @@
import org.apache.paimon.view.View;
import org.apache.paimon.view.ViewImpl;

import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
Expand Down Expand Up @@ -96,7 +96,6 @@
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.procedures.Procedure;
Expand All @@ -121,13 +120,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.flink.table.descriptors.DescriptorProperties.COMMENT;
import static org.apache.flink.table.descriptors.DescriptorProperties.NAME;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
import static org.apache.flink.table.descriptors.Schema.SCHEMA;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
import static org.apache.flink.table.utils.EncodingUtils.decodeBase64ToBytes;
Expand All @@ -152,11 +144,18 @@
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static org.apache.paimon.flink.log.LogStoreRegister.registerLogSystem;
import static org.apache.paimon.flink.log.LogStoreRegister.unRegisterLogSystem;
import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.SCHEMA;
import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey;
import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn;
import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeWatermarkSpec;
import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.nonPhysicalColumnsCount;
import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.serializeNewWatermarkSpec;
import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.COMMENT;
import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.NAME;
import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK;
import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_ROWTIME;
import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK_STRATEGY_EXPR;
import static org.apache.paimon.flink.utils.TableStatsUtil.createTableColumnStats;
import static org.apache.paimon.flink.utils.TableStatsUtil.createTableStats;
import static org.apache.paimon.utils.Preconditions.checkArgument;
Expand Down Expand Up @@ -1002,18 +1001,18 @@ private static void validateAlterTable(CatalogBaseTable ct1, CatalogBaseTable ct
}
// materialized table is not resolved at this time.
if (!table1IsMaterialized) {
org.apache.flink.table.api.TableSchema ts1 = ct1.getSchema();
org.apache.flink.table.api.TableSchema ts2 = ct2.getSchema();
org.apache.flink.table.api.Schema ts1 = ct1.getUnresolvedSchema();
org.apache.flink.table.api.Schema ts2 = ct2.getUnresolvedSchema();
boolean pkEquality = false;

if (ts1.getPrimaryKey().isPresent() && ts2.getPrimaryKey().isPresent()) {
pkEquality =
Objects.equals(
ts1.getPrimaryKey().get().getType(),
ts2.getPrimaryKey().get().getType())
ts1.getPrimaryKey().get().getConstraintName(),
ts2.getPrimaryKey().get().getConstraintName())
&& Objects.equals(
ts1.getPrimaryKey().get().getColumns(),
ts2.getPrimaryKey().get().getColumns());
ts1.getPrimaryKey().get().getColumnNames(),
ts2.getPrimaryKey().get().getColumnNames());
} else if (!ts1.getPrimaryKey().isPresent() && !ts2.getPrimaryKey().isPresent()) {
pkEquality = true;
}
Expand Down Expand Up @@ -1057,7 +1056,8 @@ public final void close() throws CatalogException {
private CatalogBaseTable toCatalogTable(Table table) {
Map<String, String> newOptions = new HashMap<>(table.options());

TableSchema.Builder builder = TableSchema.builder();
org.apache.flink.table.api.Schema.Builder builder =
org.apache.flink.table.api.Schema.newBuilder();
Map<String, String> nonPhysicalColumnComments = new HashMap<>();

// add columns
Expand All @@ -1072,10 +1072,10 @@ private CatalogBaseTable toCatalogTable(Table table) {
if (optionalName == null || physicalColumns.contains(optionalName)) {
// build physical column from table row field
RowType.RowField field = physicalRowFields.get(physicalColumnIndex++);
builder.field(field.getName(), fromLogicalToDataType(field.getType()));
builder.column(field.getName(), fromLogicalToDataType(field.getType()));
} else {
// build non-physical column from options
builder.add(deserializeNonPhysicalColumn(newOptions, i));
deserializeNonPhysicalColumn(newOptions, i, builder);
if (newOptions.containsKey(compoundKey(SCHEMA, i, COMMENT))) {
nonPhysicalColumnComments.put(
optionalName, newOptions.get(compoundKey(SCHEMA, i, COMMENT)));
Expand All @@ -1087,22 +1087,18 @@ private CatalogBaseTable toCatalogTable(Table table) {
// extract watermark information
if (newOptions.keySet().stream()
.anyMatch(key -> key.startsWith(compoundKey(SCHEMA, WATERMARK)))) {
builder.watermark(deserializeWatermarkSpec(newOptions));
deserializeWatermarkSpec(newOptions, builder);
}

// add primary keys
if (table.primaryKeys().size() > 0) {
builder.primaryKey(
table.primaryKeys().stream().collect(Collectors.joining("_", "PK_", "")),
table.primaryKeys().toArray(new String[0]));
builder.primaryKey(table.primaryKeys());
}

TableSchema schema = builder.build();
org.apache.flink.table.api.Schema schema = builder.build();

// remove schema from options
DescriptorProperties removeProperties = new DescriptorProperties(false);
removeProperties.putTableSchema(SCHEMA, schema);
removeProperties.asMap().keySet().forEach(newOptions::remove);
FlinkDescriptorProperties.removeSchemaKeys(SCHEMA, schema, newOptions);

Options options = Options.fromMap(newOptions);
if (TableType.MATERIALIZED_TABLE == options.get(CoreOptions.TYPE)) {
Expand All @@ -1118,7 +1114,10 @@ private CatalogBaseTable toCatalogTable(Table table) {
}

private CatalogMaterializedTable buildMaterializedTable(
Table table, Map<String, String> newOptions, TableSchema schema, Options options) {
Table table,
Map<String, String> newOptions,
org.apache.flink.table.api.Schema schema,
Options options) {
String definitionQuery = options.get(MATERIALIZED_TABLE_DEFINITION_QUERY);
IntervalFreshness freshness =
IntervalFreshness.of(
Expand All @@ -1142,7 +1141,7 @@ private CatalogMaterializedTable buildMaterializedTable(
// remove materialized table related options
allMaterializedTableAttributes().forEach(newOptions::remove);
return CatalogMaterializedTable.newBuilder()
.schema(schema.toSchema())
.schema(schema)
.comment(table.comment().orElse(""))
.partitionKeys(table.partitionKeys())
.options(newOptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FunctionDefinitionFactory;
import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.table.procedures.Procedure;

import java.util.List;
Expand Down Expand Up @@ -86,11 +85,6 @@ public Optional<Factory> getFactory() {
new FlinkGenericTableFactory(paimon.getFactory().get(), flink.getFactory().get()));
}

@Override
public Optional<TableFactory> getTableFactory() {
return flink.getTableFactory();
}

@Override
public Optional<FunctionDefinitionFactory> getFunctionDefinitionFactory() {
return flink.getFunctionDefinitionFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.paimon.table.system.AuditLogTable;

import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.WatermarkSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.types.utils.TypeConversions;

Expand All @@ -32,11 +31,11 @@
import java.util.Map;
import java.util.Optional;

import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
import static org.apache.flink.table.descriptors.Schema.SCHEMA;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.SCHEMA;
import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey;
import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeWatermarkSpec;
import static org.apache.paimon.flink.utils.FlinkDescriptorProperties.WATERMARK;

/** A {@link CatalogTable} to represent system table. */
public class SystemCatalogTable implements CatalogTable {
Expand All @@ -60,11 +59,8 @@ public Schema getUnresolvedSchema() {
Map<String, String> newOptions = new HashMap<>(table.options());
if (newOptions.keySet().stream()
.anyMatch(key -> key.startsWith(compoundKey(SCHEMA, WATERMARK)))) {
WatermarkSpec watermarkSpec = deserializeWatermarkSpec(newOptions);
return builder.watermark(
watermarkSpec.getRowtimeAttribute(),
watermarkSpec.getWatermarkExpr())
.build();
deserializeWatermarkSpec(newOptions, builder);
return builder.build();
}
}
return builder.build();
Expand Down
Loading
Loading