diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 9f069dc3626d..7fbb777e3310 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -25,6 +25,7 @@ import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.Table; +import org.apache.paimon.view.View; import java.io.Serializable; import java.util.Arrays; @@ -299,6 +300,69 @@ default void alterTable(Identifier identifier, SchemaChange change, boolean igno alterTable(identifier, Collections.singletonList(change), ignoreIfNotExists); } + /** + * Check if a view exists in this catalog. + * + * @param identifier Path of the view + * @return true if the given view exists in the catalog false otherwise + */ + default boolean viewExists(Identifier identifier) { + try { + return getView(identifier) != null; + } catch (ViewNotExistException e) { + return false; + } + } + + /** + * Return a {@link View} identified by the given {@link Identifier}. + * + * @param identifier Path of the view + * @return The requested view + * @throws ViewNotExistException if the target does not exist + */ + default View getView(Identifier identifier) throws ViewNotExistException { + throw new ViewNotExistException(identifier); + } + + /** + * Drop a view. + * + * @param identifier Path of the view to be dropped + * @param ignoreIfNotExists Flag to specify behavior when the view does not exist: if set to + * false, throw an exception, if set to true, do nothing. + * @throws ViewNotExistException if the view does not exist + */ + default void dropView(Identifier identifier, boolean ignoreIfNotExists) + throws ViewNotExistException { + throw new UnsupportedOperationException(); + } + + /** + * Create a new view. + * + * @param identifier path of the view to be created + * @param view the view definition + * @param ignoreIfExists flag to specify behavior when a view already exists at the given path: + * if set to false, it throws a ViewAlreadyExistException, if set to true, do nothing. + * @throws ViewAlreadyExistException if view already exists and ignoreIfExists is false + * @throws DatabaseNotExistException if the database in identifier doesn't exist + */ + default void createView(Identifier identifier, View view, boolean ignoreIfExists) + throws ViewAlreadyExistException, DatabaseNotExistException { + throw new UnsupportedOperationException(); + } + + /** + * Get names of all views under this database. An empty list is returned if none exists. + * + * @return a list of the names of all views in this database + * @throws DatabaseNotExistException if the database does not exist + */ + default List listViews(String databaseName) throws DatabaseNotExistException { + return Collections.emptyList(); + } + /** Return a boolean that indicates whether this catalog allow upper case. */ boolean allowUpperCase(); @@ -532,6 +596,48 @@ public String column() { } } + /** Exception for trying to create a view that already exists. */ + class ViewAlreadyExistException extends Exception { + + private static final String MSG = "View %s already exists."; + + private final Identifier identifier; + + public ViewAlreadyExistException(Identifier identifier) { + this(identifier, null); + } + + public ViewAlreadyExistException(Identifier identifier, Throwable cause) { + super(String.format(MSG, identifier.getFullName()), cause); + this.identifier = identifier; + } + + public Identifier identifier() { + return identifier; + } + } + + /** Exception for trying to operate on a view that doesn't exist. */ + class ViewNotExistException extends Exception { + + private static final String MSG = "View %s does not exist."; + + private final Identifier identifier; + + public ViewNotExistException(Identifier identifier) { + this(identifier, null); + } + + public ViewNotExistException(Identifier identifier, Throwable cause) { + super(String.format(MSG, identifier.getFullName()), cause); + this.identifier = identifier; + } + + public Identifier identifier() { + return identifier; + } + } + /** Loader of {@link Catalog}. */ @FunctionalInterface interface Loader extends Serializable { diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index 814de16d6e4c..d05e100b24e4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -24,6 +24,7 @@ import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.Table; +import org.apache.paimon.view.View; import java.util.List; import java.util.Map; @@ -135,6 +136,38 @@ public Table getTable(Identifier identifier) throws TableNotExistException { return wrapped.getTable(identifier); } + @Override + public boolean tableExists(Identifier identifier) { + return wrapped.tableExists(identifier); + } + + @Override + public boolean viewExists(Identifier identifier) { + return wrapped.viewExists(identifier); + } + + @Override + public View getView(Identifier identifier) throws ViewNotExistException { + return wrapped.getView(identifier); + } + + @Override + public void dropView(Identifier identifier, boolean ignoreIfNotExists) + throws ViewNotExistException { + wrapped.dropView(identifier, ignoreIfNotExists); + } + + @Override + public void createView(Identifier identifier, View view, boolean ignoreIfExists) + throws ViewAlreadyExistException, DatabaseNotExistException { + wrapped.createView(identifier, view, ignoreIfExists); + } + + @Override + public List listViews(String databaseName) throws DatabaseNotExistException { + return wrapped.listViews(databaseName); + } + @Override public Path getTableLocation(Identifier identifier) { return wrapped.getTableLocation(identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/view/View.java b/paimon-core/src/main/java/org/apache/paimon/view/View.java new file mode 100644 index 000000000000..87f56764244b --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/view/View.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.view; + +import org.apache.paimon.types.RowType; + +import java.util.Map; +import java.util.Optional; + +/** Interface for view definition. */ +public interface View { + + /** A name to identify this view. */ + String name(); + + /** Full name (including database) to identify this view. */ + String fullName(); + + /** Returns the row type of this view. */ + RowType rowType(); + + /** Returns the view representation. */ + String query(); + + /** Optional comment of this view. */ + Optional comment(); + + /** Options of this view. */ + Map options(); + + /** Copy this view with adding dynamic options. */ + View copy(Map dynamicOptions); +} diff --git a/paimon-core/src/main/java/org/apache/paimon/view/ViewImpl.java b/paimon-core/src/main/java/org/apache/paimon/view/ViewImpl.java new file mode 100644 index 000000000000..1cd48d4ce445 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/view/ViewImpl.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.view; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.types.RowType; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** Implementation of {@link View}. */ +public class ViewImpl implements View { + + private final Identifier identifier; + private final RowType rowType; + private final String query; + @Nullable private final String comment; + private final Map options; + + public ViewImpl( + Identifier identifier, + RowType rowType, + String query, + @Nullable String comment, + Map options) { + this.identifier = identifier; + this.rowType = rowType; + this.query = query; + this.comment = comment; + this.options = options; + } + + @Override + public String name() { + return identifier.getObjectName(); + } + + @Override + public String fullName() { + return identifier.getFullName(); + } + + @Override + public RowType rowType() { + return rowType; + } + + @Override + public String query() { + return query; + } + + @Override + public Optional comment() { + return Optional.ofNullable(comment); + } + + @Override + public Map options() { + return options; + } + + @Override + public View copy(Map dynamicOptions) { + Map newOptions = new HashMap<>(options); + newOptions.putAll(dynamicOptions); + return new ViewImpl(identifier, rowType, query, comment, newOptions); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ViewImpl view = (ViewImpl) o; + return Objects.equals(identifier, view.identifier) + && Objects.equals(rowType, view.rowType) + && Objects.equals(query, view.query) + && Objects.equals(comment, view.comment) + && Objects.equals(options, view.options); + } + + @Override + public int hashCode() { + return Objects.hash(identifier, rowType, query, comment, options); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java index c4470b2283f3..9bb643c9e60b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java @@ -29,6 +29,8 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.view.View; +import org.apache.paimon.view.ViewImpl; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; @@ -40,7 +42,9 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; @@ -831,4 +835,64 @@ public void testAlterTableUpdateComment() throws Exception { table = catalog.getTable(identifier); assertThat(table.comment().isPresent()).isFalse(); } + + protected boolean supportsView() { + return false; + } + + @Test + public void testView() throws Exception { + if (!supportsView()) { + return; + } + + Identifier identifier = new Identifier("view_db", "my_view"); + RowType rowType = + RowType.builder() + .field("str", DataTypes.STRING()) + .field("int", DataTypes.INT()) + .build(); + String query = "SELECT * FROM OTHER_TABLE"; + String comment = "it is my view"; + Map options = new HashMap<>(); + options.put("key1", "v1"); + options.put("key2", "v2"); + View view = new ViewImpl(identifier, rowType, query, comment, options); + + assertThatThrownBy(() -> catalog.createView(identifier, view, false)) + .isInstanceOf(Catalog.DatabaseNotExistException.class); + + assertThatThrownBy(() -> catalog.listViews(identifier.getDatabaseName())) + .isInstanceOf(Catalog.DatabaseNotExistException.class); + + catalog.createDatabase(identifier.getDatabaseName(), false); + + assertThatThrownBy(() -> catalog.getView(identifier)) + .isInstanceOf(Catalog.ViewNotExistException.class); + + catalog.createView(identifier, view, false); + + assertThat(catalog.viewExists(identifier)).isTrue(); + + View catalogView = catalog.getView(identifier); + assertThat(catalogView.fullName()).isEqualTo(view.fullName()); + assertThat(catalogView.rowType()).isEqualTo(view.rowType()); + assertThat(catalogView.query()).isEqualTo(view.query()); + assertThat(catalogView.comment()).isEqualTo(view.comment()); + assertThat(catalogView.options()).containsAllEntriesOf(view.options()); + + List views = catalog.listViews(identifier.getDatabaseName()); + assertThat(views).containsOnly(identifier.getObjectName()); + + catalog.createView(identifier, view, true); + assertThatThrownBy(() -> catalog.createView(identifier, view, false)) + .isInstanceOf(Catalog.ViewAlreadyExistException.class); + + catalog.dropView(identifier, false); + assertThat(catalog.viewExists(identifier)).isFalse(); + + catalog.dropView(identifier, true); + assertThatThrownBy(() -> catalog.dropView(identifier, false)) + .isInstanceOf(Catalog.ViewNotExistException.class); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index c1e7db6b2633..ad624b5602e3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -41,6 +41,8 @@ import org.apache.paimon.utils.InternalRowPartitionComputer; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.StringUtils; +import org.apache.paimon.view.View; +import org.apache.paimon.view.ViewImpl; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.catalog.AbstractCatalog; @@ -53,10 +55,12 @@ import org.apache.flink.table.catalog.CatalogPartitionImpl; import org.apache.flink.table.catalog.CatalogPartitionSpec; import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.IntervalFreshness; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogView; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.TableChange; import org.apache.flink.table.catalog.TableChange.AddColumn; @@ -284,6 +288,11 @@ private CatalogBaseTable getTable(ObjectPath tablePath, @Nullable Long timestamp try { table = catalog.getTable(toIdentifier(tablePath)); } catch (Catalog.TableNotExistException e) { + Optional view = getView(tablePath, timestamp); + if (view.isPresent()) { + return view.get(); + } + throw new TableNotExistException(getName(), tablePath); } @@ -309,17 +318,53 @@ private CatalogBaseTable getTable(ObjectPath tablePath, @Nullable Long timestamp } } + private Optional getView(ObjectPath tablePath, @Nullable Long timestamp) { + View view; + try { + view = catalog.getView(toIdentifier(tablePath)); + } catch (Catalog.ViewNotExistException e) { + return Optional.empty(); + } + + if (timestamp != null) { + throw new UnsupportedOperationException( + String.format("View %s does not support time travel.", tablePath)); + } + + org.apache.flink.table.api.Schema schema = + org.apache.flink.table.api.Schema.newBuilder() + .fromRowDataType(fromLogicalToDataType(toLogicalType(view.rowType()))) + .build(); + return Optional.of( + CatalogView.of( + schema, + view.comment().orElse(null), + view.query(), + view.query(), + view.options())); + } + @Override public boolean tableExists(ObjectPath tablePath) throws CatalogException { - return catalog.tableExists(toIdentifier(tablePath)); + Identifier identifier = toIdentifier(tablePath); + return catalog.tableExists(identifier) || catalog.viewExists(identifier); } @Override public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException { Identifier identifier = toIdentifier(tablePath); - Table table = null; + if (catalog.viewExists(identifier)) { + try { + catalog.dropView(identifier, ignoreIfNotExists); + return; + } catch (Catalog.ViewNotExistException e) { + throw new RuntimeException("Unexpected exception.", e); + } + } + try { + Table table = null; if (logStoreAutoRegister && catalog.tableExists(identifier)) { table = catalog.getTable(identifier); } @@ -335,18 +380,17 @@ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) @Override public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { - if (!(table instanceof CatalogTable || table instanceof CatalogMaterializedTable)) { - throw new UnsupportedOperationException( - "Only support CatalogTable and CatalogMaterializedTable, but is: " - + table.getClass()); - } - if (Objects.equals(getDefaultDatabase(), tablePath.getDatabaseName()) && disableCreateTableInDefaultDatabase) { throw new UnsupportedOperationException( "Creating table in default database is disabled, please specify a database name."); } + if (table instanceof CatalogView) { + createView(tablePath, (ResolvedCatalogView) table, ignoreIfExists); + return; + } + Identifier identifier = toIdentifier(tablePath); // the returned value of "table.getOptions" may be unmodifiable (for example from // TableDescriptor) @@ -372,6 +416,34 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig } } + private void createView(ObjectPath tablePath, ResolvedCatalogView table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException { + Identifier identifier = toIdentifier(tablePath); + org.apache.paimon.types.RowType.Builder builder = org.apache.paimon.types.RowType.builder(); + table.getResolvedSchema() + .getColumns() + .forEach( + column -> + builder.field( + column.getName(), + toDataType(column.getDataType().getLogicalType()), + column.getComment().orElse(null))); + View view = + new ViewImpl( + identifier, + builder.build(), + table.getOriginalQuery(), + table.getComment(), + table.getOptions()); + try { + catalog.createView(identifier, view, ignoreIfExists); + } catch (Catalog.ViewAlreadyExistException e) { + throw new TableAlreadyExistException(getName(), tablePath); + } catch (Catalog.DatabaseNotExistException e) { + throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName()); + } + } + private static void fillOptionsForMaterializedTable( CatalogMaterializedTable mt, Map options) { Options mtOptions = new Options(); @@ -1025,8 +1097,13 @@ public final void renameTable( } @Override - public final List listViews(String databaseName) throws CatalogException { - return Collections.emptyList(); + public final List listViews(String databaseName) + throws DatabaseNotExistException, CatalogException { + try { + return catalog.listViews(databaseName); + } catch (Catalog.DatabaseNotExistException e) { + throw new DatabaseNotExistException(getName(), databaseName); + } } @Override diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index d4f886247822..657bce8a50cd 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -46,12 +46,15 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.view.View; +import org.apache.paimon.view.ViewImpl; import org.apache.flink.table.hive.LegacyHiveClasses; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; @@ -440,7 +443,7 @@ && tableSchemaInFileSystem( if (formatTableEnabled()) { try { - HiveFormatTableUtils.convertToFormatTable(table); + HiveTableUtils.convertToFormatTable(table); return true; } catch (UnsupportedOperationException e) { return false; @@ -468,6 +471,139 @@ public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExis .orElseThrow(() -> new TableNotExistException(identifier)); } + @Override + public View getView(Identifier identifier) throws ViewNotExistException { + Table table; + try { + table = + clients.run( + client -> + client.getTable( + identifier.getDatabaseName(), + identifier.getTableName())); + } catch (NoSuchObjectException e) { + throw new ViewNotExistException(identifier); + } catch (TException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + + if (!isView(table)) { + throw new ViewNotExistException(identifier); + } + + RowType rowType = HiveTableUtils.createRowType(table); + Map options = new HashMap<>(table.getParameters()); + String comment = options.remove(COMMENT_PROP); + return new ViewImpl(identifier, rowType, table.getViewExpandedText(), comment, options); + } + + @Override + public void createView(Identifier identifier, View view, boolean ignoreIfExists) + throws ViewAlreadyExistException, DatabaseNotExistException { + if (!databaseExists(identifier.getDatabaseName())) { + throw new DatabaseNotExistException(identifier.getDatabaseName()); + } + + try { + getView(identifier); + if (ignoreIfExists) { + return; + } + throw new ViewAlreadyExistException(identifier); + } catch (ViewNotExistException ignored) { + } + + Table hiveTable = + org.apache.hadoop.hive.ql.metadata.Table.getEmptyTable( + identifier.getDatabaseName(), identifier.getObjectName()); + hiveTable.setCreateTime((int) (System.currentTimeMillis() / 1000)); + + Map properties = new HashMap<>(view.options()); + // Table comment + if (view.comment().isPresent()) { + properties.put(COMMENT_PROP, view.comment().get()); + } + hiveTable.setParameters(properties); + hiveTable.setPartitionKeys(new ArrayList<>()); + hiveTable.setViewOriginalText(view.query()); + hiveTable.setViewExpandedText(view.query()); + hiveTable.setTableType(TableType.VIRTUAL_VIEW.name()); + + StorageDescriptor sd = hiveTable.getSd(); + List columns = + view.rowType().getFields().stream() + .map(this::convertToFieldSchema) + .collect(Collectors.toList()); + sd.setCols(columns); + + try { + clients.execute(client -> client.createTable(hiveTable)); + } catch (Exception e) { + // we don't need to delete directories since HMS will roll back db and fs if failed. + throw new RuntimeException("Failed to create table " + identifier.getFullName(), e); + } + } + + @Override + public void dropView(Identifier identifier, boolean ignoreIfNotExists) + throws ViewNotExistException { + try { + getView(identifier); + } catch (ViewNotExistException e) { + if (ignoreIfNotExists) { + return; + } + throw e; + } + + try { + clients.execute( + client -> + client.dropTable( + identifier.getDatabaseName(), + identifier.getTableName(), + false, + false, + false)); + } catch (TException e) { + throw new RuntimeException("Failed to drop view " + identifier.getFullName(), e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Interrupted in call to drop view " + identifier.getFullName(), e); + } + } + + @Override + public List listViews(String databaseName) throws DatabaseNotExistException { + if (isSystemDatabase(databaseName)) { + return Collections.emptyList(); + } + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(databaseName); + } + + try { + List tables = clients.run(client -> client.getAllTables(databaseName)); + List views = new ArrayList<>(); + for (String tableName : tables) { + Table table = clients.run(client -> client.getTable(databaseName, tableName)); + if (isView(table)) { + views.add(tableName); + } + } + return views; + } catch (TException e) { + throw new RuntimeException("Failed to list all tables in database " + databaseName, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in call to listTables " + databaseName, e); + } + } + @Override public FormatTable getFormatTable(Identifier identifier) throws TableNotExistException { if (!formatTableEnabled()) { @@ -491,12 +627,13 @@ public FormatTable getFormatTable(Identifier identifier) throws TableNotExistExc throw new RuntimeException(e); } try { - return HiveFormatTableUtils.convertToFormatTable(table); + return HiveTableUtils.convertToFormatTable(table); } catch (UnsupportedOperationException e) { throw new TableNotExistException(identifier); } } + @Override public void createFormatTable(Identifier identifier, Schema schema) { if (!formatTableEnabled()) { throw new UnsupportedOperationException( @@ -1134,4 +1271,8 @@ public static HiveConf createHiveConf(CatalogContext context) { public static String possibleHiveConfPath() { return System.getenv("HIVE_CONF_DIR"); } + + private static boolean isView(Table table) { + return TableType.valueOf(table.getTableType()) == TableType.VIRTUAL_VIEW; + } } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveFormatTableUtils.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java similarity index 93% rename from paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveFormatTableUtils.java rename to paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java index dcd0c3c450c6..0c88107c0849 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveFormatTableUtils.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveTableUtils.java @@ -44,7 +44,7 @@ import static org.apache.paimon.catalog.Catalog.COMMENT_PROP; import static org.apache.paimon.table.FormatTableOptions.FIELD_DELIMITER; -class HiveFormatTableUtils { +class HiveTableUtils { public static FormatTable convertToFormatTable(Table hiveTable) { if (TableType.valueOf(hiveTable.getTableType()) == TableType.VIRTUAL_VIEW) { @@ -54,7 +54,7 @@ public static FormatTable convertToFormatTable(Table hiveTable) { Identifier identifier = new Identifier(hiveTable.getDbName(), hiveTable.getTableName()); Map options = new HashMap<>(hiveTable.getParameters()); List partitionKeys = getFieldNames(hiveTable.getPartitionKeys()); - RowType rowType = createRowType(hiveTable.getSd().getCols(), hiveTable.getPartitionKeys()); + RowType rowType = createRowType(hiveTable); String comment = options.remove(COMMENT_PROP); String location = hiveTable.getSd().getLocation(); Format format; @@ -104,10 +104,9 @@ private static List getFieldNames(List fieldSchemas) { } /** Create a Paimon's Schema from Hive table's columns and partition keys. */ - private static RowType createRowType( - List nonPartCols, List partitionKeys) { - List allCols = new ArrayList<>(nonPartCols); - allCols.addAll(partitionKeys); + public static RowType createRowType(Table table) { + List allCols = new ArrayList<>(table.getSd().getCols()); + allCols.addAll(table.getPartitionKeys()); Pair columnInformation = extractColumnInformation(allCols); return RowType.builder() .fields(columnInformation.getRight(), columnInformation.getLeft()) diff --git a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java index 6b13a80e801a..cb902605cf90 100644 --- a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java +++ b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java @@ -268,4 +268,9 @@ public void testAlterHiveTableParameters() { fail("Test failed due to exception: " + e.getMessage()); } } + + @Override + protected boolean supportsView() { + return true; + } } diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java index 246822600014..cca044f0438a 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java @@ -1911,6 +1911,33 @@ public void testExpiredPartitionsSyncToMetastore() throws Exception { .containsExactlyInAnyOrder("dt=9998-06-15", "dt=9999-06-15"); } + @Test + public void testView() throws Exception { + tEnv.executeSql("CREATE TABLE t ( a INT, b STRING ) WITH ( 'file.format' = 'avro' )") + .await(); + tEnv.executeSql("INSERT INTO t VALUES (1, 'Hi'), (2, 'Hello')").await(); + + // test flink view + tEnv.executeSql("CREATE VIEW flink_v AS SELECT a + 1, b FROM t").await(); + assertThat(collect("SELECT * FROM flink_v")) + .containsExactlyInAnyOrder(Row.of(2, "Hi"), Row.of(3, "Hello")); + assertThat(hiveShell.executeQuery("SELECT * FROM flink_v")) + .containsExactlyInAnyOrder("2\tHi", "3\tHello"); + + // test hive view + hiveShell.executeQuery("CREATE VIEW hive_v AS SELECT a + 1, b FROM t"); + assertThat(collect("SELECT * FROM hive_v")) + .containsExactlyInAnyOrder(Row.of(2, "Hi"), Row.of(3, "Hello")); + assertThat(hiveShell.executeQuery("SELECT * FROM hive_v")) + .containsExactlyInAnyOrder("2\tHi", "3\tHello"); + + assertThat(collect("SHOW VIEWS")) + .containsExactlyInAnyOrder(Row.of("flink_v"), Row.of("hive_v")); + + collect("DROP VIEW flink_v"); + collect("DROP VIEW hive_v"); + } + /** Prepare to update a paimon table with a custom path in the paimon file system. */ private void alterTableInFileSystem(TableEnvironment tEnv) throws Exception { tEnv.executeSql( @@ -1974,14 +2001,4 @@ protected List collect(String sql) throws Exception { } return result; } - - private List collectString(String sql) throws Exception { - List result = new ArrayList<>(); - try (CloseableIterator it = tEnv.executeSql(sql).collect()) { - while (it.hasNext()) { - result.add(it.next().toString()); - } - } - return result; - } }