diff --git a/src/main/java/org/apache/flink/table/store/trino/TrinoMetadataBase.java b/src/main/java/org/apache/flink/table/store/trino/TrinoMetadataBase.java index 520fb7dc..e5057e30 100644 --- a/src/main/java/org/apache/flink/table/store/trino/TrinoMetadataBase.java +++ b/src/main/java/org/apache/flink/table/store/trino/TrinoMetadataBase.java @@ -22,7 +22,7 @@ import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.store.file.catalog.Catalog; import org.apache.flink.table.store.file.catalog.CatalogFactory; -import org.apache.flink.table.store.file.schema.TableSchema; +import org.apache.flink.util.InstantiationUtil; import io.trino.spi.connector.Assignment; import io.trino.spi.connector.ColumnHandle; @@ -38,9 +38,11 @@ import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SchemaTablePrefix; import io.trino.spi.expression.ConnectorExpression; -import io.trino.spi.expression.Variable; import io.trino.spi.predicate.TupleDomain; + +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -50,7 +52,6 @@ import java.util.Optional; import java.util.function.Function; -import static com.google.common.collect.ImmutableList.toImmutableList; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; @@ -86,18 +87,19 @@ public ConnectorTableProperties getTableProperties( public TrinoTableHandle getTableHandle(SchemaTableName tableName) { ObjectPath tablePath = new ObjectPath(tableName.getSchemaName(), tableName.getTableName()); - TableSchema tableSchema; + byte[] serializedTable; try { - tableSchema = catalog.getTableSchema(tablePath); + serializedTable = InstantiationUtil.serializeObject(catalog.getTable(tablePath)); } catch (Catalog.TableNotExistException e) { return null; + } catch (IOException e) { + throw new UncheckedIOException(e); } return new TrinoTableHandle( tableName.getSchemaName(), tableName.getTableName(), - catalog.getTableLocation(tablePath).toString(), - tableSchema.id()); + serializedTable); } @Override diff --git a/src/main/java/org/apache/flink/table/store/trino/TrinoPageSourceProvider.java b/src/main/java/org/apache/flink/table/store/trino/TrinoPageSourceProvider.java index ac3cf7e8..b923a038 100644 --- a/src/main/java/org/apache/flink/table/store/trino/TrinoPageSourceProvider.java +++ b/src/main/java/org/apache/flink/table/store/trino/TrinoPageSourceProvider.java @@ -18,10 +18,9 @@ package org.apache.flink.table.store.trino; -import org.apache.flink.core.fs.Path; -import org.apache.flink.table.store.file.schema.TableSchema; -import org.apache.flink.table.store.table.FileStoreTableFactory; +import org.apache.flink.table.store.table.Table; import org.apache.flink.table.store.table.source.TableRead; +import org.apache.flink.table.types.logical.RowType; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ConnectorPageSource; @@ -54,12 +53,11 @@ public ConnectorPageSource createPageSource( TrinoPageSourceProvider.class.getClassLoader()); } - private ConnectorPageSource createPageSource(TrinoTableHandle table, TrinoSplit split, List columns) { - TableSchema tableSchema = table.tableSchema(); - TableRead read = - FileStoreTableFactory.create(new Path(table.getLocation()), tableSchema) - .newRead(); - List fieldNames = tableSchema.fieldNames(); + private ConnectorPageSource createPageSource(TrinoTableHandle tableHandle, TrinoSplit split, List columns) { + Table table = tableHandle.table(); + TableRead read = table.newRead(); + RowType rowType = table.rowType(); + List fieldNames = rowType.getFieldNames(); List projectedFields = columns.stream() .map(TrinoColumnHandle.class::cast) @@ -70,8 +68,8 @@ private ConnectorPageSource createPageSource(TrinoTableHandle table, TrinoSplit read.withProjection(projected); } - new TrinoFilterConverter(tableSchema.logicalRowType()) - .convert(table.getFilter()) + new TrinoFilterConverter(rowType) + .convert(tableHandle.getFilter()) .ifPresent(read::withFilter); try { diff --git a/src/main/java/org/apache/flink/table/store/trino/TrinoSplitManagerBase.java b/src/main/java/org/apache/flink/table/store/trino/TrinoSplitManagerBase.java index 290638cf..b7499aaf 100644 --- a/src/main/java/org/apache/flink/table/store/trino/TrinoSplitManagerBase.java +++ b/src/main/java/org/apache/flink/table/store/trino/TrinoSplitManagerBase.java @@ -18,9 +18,7 @@ package org.apache.flink.table.store.trino; -import org.apache.flink.core.fs.Path; -import org.apache.flink.table.store.file.schema.TableSchema; -import org.apache.flink.table.store.table.FileStoreTableFactory; +import org.apache.flink.table.store.table.Table; import org.apache.flink.table.store.table.source.Split; import org.apache.flink.table.store.table.source.TableScan; @@ -28,7 +26,6 @@ import io.trino.spi.connector.ConnectorSplitSource; import io.trino.spi.connector.ConnectorTableHandle; - import java.util.List; import java.util.stream.Collectors; @@ -40,11 +37,9 @@ protected ConnectorSplitSource getSplits(ConnectorTableHandle connectorTableHand // TODO what is constraint? TrinoTableHandle tableHandle = (TrinoTableHandle) connectorTableHandle; - TableSchema tableSchema = tableHandle.tableSchema(); - TableScan tableScan = - FileStoreTableFactory.create(new Path(tableHandle.getLocation()), tableSchema) - .newScan(); - new TrinoFilterConverter(tableSchema.logicalRowType()) + Table table = tableHandle.table(); + TableScan tableScan = table.newScan(); + new TrinoFilterConverter(table.rowType()) .convert(tableHandle.getFilter()) .ifPresent(tableScan::withFilter); List splits = tableScan.plan().splits(); diff --git a/src/main/java/org/apache/flink/table/store/trino/TrinoTableHandle.java b/src/main/java/org/apache/flink/table/store/trino/TrinoTableHandle.java index 2ce12844..a92d2df4 100644 --- a/src/main/java/org/apache/flink/table/store/trino/TrinoTableHandle.java +++ b/src/main/java/org/apache/flink/table/store/trino/TrinoTableHandle.java @@ -18,9 +18,8 @@ package org.apache.flink.table.store.trino; -import org.apache.flink.core.fs.Path; -import org.apache.flink.table.store.file.schema.SchemaManager; -import org.apache.flink.table.store.file.schema.TableSchema; +import org.apache.flink.table.store.table.Table; +import org.apache.flink.util.InstantiationUtil; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -31,6 +30,8 @@ import io.trino.spi.connector.SchemaTableName; import io.trino.spi.predicate.TupleDomain; +import java.io.IOException; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -41,30 +42,27 @@ public final class TrinoTableHandle implements ConnectorTableHandle { private final String schemaName; private final String tableName; - private final String location; - private final long tableSchemaId; + private final byte[] serializedTable; private final TupleDomain filter; private final Optional> projectedColumns; - private TableSchema tableSchema; + private Table lazyTable; public TrinoTableHandle( - String schemaName, String tableName, String location, long tableSchemaId) { - this(schemaName, tableName, location, tableSchemaId, TupleDomain.all(), Optional.empty()); + String schemaName, String tableName, byte[] serializedTable) { + this(schemaName, tableName, serializedTable, TupleDomain.all(), Optional.empty()); } @JsonCreator public TrinoTableHandle( @JsonProperty("schemaName") String schemaName, @JsonProperty("tableName") String tableName, - @JsonProperty("location") String location, - @JsonProperty("tableSchemaId") long tableSchemaId, + @JsonProperty("serializedTable") byte[] serializedTable, @JsonProperty("filter") TupleDomain filter, @JsonProperty("projection") Optional> projectedColumns) { this.schemaName = schemaName; this.tableName = tableName; - this.location = location; - this.tableSchemaId = tableSchemaId; + this.serializedTable = serializedTable; this.filter = filter; this.projectedColumns = projectedColumns; } @@ -80,13 +78,8 @@ public String getTableName() { } @JsonProperty - public String getLocation() { - return location; - } - - @JsonProperty - public long getTableSchemaId() { - return tableSchemaId; + public byte[] getSerializedTable() { + return serializedTable; } @JsonProperty @@ -101,19 +94,23 @@ public Optional> getProjectedColumns() { public TrinoTableHandle copy(TupleDomain filter) { return new TrinoTableHandle( - schemaName, tableName, location, tableSchemaId, filter, projectedColumns); + schemaName, tableName, serializedTable, filter, projectedColumns); } public TrinoTableHandle copy(Optional> projectedColumns) { return new TrinoTableHandle( - schemaName, tableName, location, tableSchemaId, filter, projectedColumns); + schemaName, tableName, serializedTable, filter, projectedColumns); } - public TableSchema tableSchema() { - if (tableSchema == null) { - tableSchema = new SchemaManager(new Path(location)).schema(tableSchemaId); + public Table table() { + if (lazyTable == null) { + try { + lazyTable = InstantiationUtil.deserializeObject(serializedTable, this.getClass().getClassLoader()); + } catch (IOException | ClassNotFoundException e) { + throw new RuntimeException(e); + } } - return tableSchema; + return lazyTable; } public ConnectorTableMetadata tableMetadata() { @@ -121,27 +118,27 @@ public ConnectorTableMetadata tableMetadata() { SchemaTableName.schemaTableName(schemaName, tableName), columnMetadatas(), Collections.emptyMap(), - Optional.ofNullable(tableSchema().comment())); + Optional.empty()); } public List columnMetadatas() { - return tableSchema().fields().stream() + return table().rowType().getFields().stream() .map( column -> ColumnMetadata.builder() - .setName(column.name()) + .setName(column.getName()) .setType( TrinoTypeUtils.fromFlinkType( - column.type().logicalType())) - .setNullable(column.type().logicalType().isNullable()) - .setComment(Optional.ofNullable(column.description())) + column.getType())) + .setNullable(column.getType().isNullable()) + .setComment(column.getDescription()) .build()) .collect(Collectors.toList()); } public TrinoColumnHandle columnHandle(String field) { - int index = tableSchema().fieldNames().indexOf(field); - return TrinoColumnHandle.of(field, tableSchema().fields().get(index).type().logicalType()); + int index = table().rowType().getFieldNames().indexOf(field); + return TrinoColumnHandle.of(field, table().rowType().getTypeAt(index)); } @Override @@ -153,10 +150,9 @@ public boolean equals(Object o) { return false; } TrinoTableHandle that = (TrinoTableHandle) o; - return tableSchemaId == that.tableSchemaId + return Arrays.equals(serializedTable, that.serializedTable) && Objects.equals(schemaName, that.schemaName) && Objects.equals(tableName, that.tableName) - && Objects.equals(location, that.location) && Objects.equals(filter, that.filter) && Objects.equals(projectedColumns, that.projectedColumns); } @@ -164,6 +160,10 @@ public boolean equals(Object o) { @Override public int hashCode() { return Objects.hash( - schemaName, tableName, location, tableSchemaId, filter, projectedColumns); + schemaName, + tableName, + filter, + projectedColumns, + Arrays.hashCode(serializedTable)); } } diff --git a/src/test/java/org/apache/flink/table/store/trino/TestTrinoITCase.java b/src/test/java/org/apache/flink/table/store/trino/TestTrinoITCase.java index 71394d27..12c51cd0 100644 --- a/src/test/java/org/apache/flink/table/store/trino/TestTrinoITCase.java +++ b/src/test/java/org/apache/flink/table/store/trino/TestTrinoITCase.java @@ -176,6 +176,11 @@ public void testProjection() { assertThat(sql("SELECT SUM(b) FROM tablestore.default.t1")).isEqualTo("[[8]]"); } + @Test + public void testSystemTable() { + assertThat(sql("SELECT snapshot_id,schema_id,commit_user,commit_identifier,commit_kind FROM \"t1$snapshots\"")).isEqualTo("[[1, 0, user, 0, APPEND]]"); + } + @Test public void testFilter() { assertThat(sql("SELECT a, c FROM tablestore.default.t2 WHERE a < 4"))