diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index d7447c37dd79..ef6c0e33485e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -20,7 +20,6 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.TableType; -import org.apache.paimon.factories.FactoryUtil; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; @@ -59,8 +58,11 @@ import static org.apache.paimon.CoreOptions.TYPE; import static org.apache.paimon.CoreOptions.createCommitUser; -import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; -import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE; +import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch; +import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase; +import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable; +import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase; +import static org.apache.paimon.catalog.CatalogUtils.lockFactory; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkNotNull; @@ -94,31 +96,16 @@ public FileIO fileIO() { return fileIO; } - public Optional lockFactory() { - if (!lockEnabled()) { - return Optional.empty(); - } - - String lock = catalogOptions.get(LOCK_TYPE); - if (lock == null) { - return defaultLockFactory(); - } - - return Optional.of( - FactoryUtil.discoverFactory( - AbstractCatalog.class.getClassLoader(), CatalogLockFactory.class, lock)); - } - public Optional defaultLockFactory() { return Optional.empty(); } public Optional lockContext() { - return Optional.of(CatalogLockContext.fromOptions(catalogOptions)); + return CatalogUtils.lockContext(catalogOptions); } protected boolean lockEnabled() { - return catalogOptions.getOptional(LOCK_ENABLED).orElse(fileIO.isObjectStore()); + return CatalogUtils.lockEnabled(catalogOptions, fileIO); } protected boolean allowCustomTablePath() { @@ -397,20 +384,7 @@ public Table getTable(Identifier identifier) throws TableNotExistException { identifier.getTableName(), identifier.getBranchName(), null)); - if (!(originTable instanceof FileStoreTable)) { - throw new UnsupportedOperationException( - String.format( - "Only data table support system tables, but this table %s is %s.", - identifier, originTable.getClass())); - } - Table table = - SystemTableLoader.load( - Preconditions.checkNotNull(identifier.getSystemTableName()), - (FileStoreTable) originTable); - if (table == null) { - throw new TableNotExistException(identifier); - } - return table; + return CatalogUtils.getSystemTable(identifier, originTable); } else { return getDataOrFormatTable(identifier); } @@ -428,7 +402,8 @@ protected Table getDataOrFormatTable(Identifier identifier) throws TableNotExist identifier, tableMeta.uuid, Lock.factory( - lockFactory().orElse(null), + lockFactory(catalogOptions, fileIO(), defaultLockFactory()) + .orElse(null), lockContext().orElse(null), identifier), metastoreClientFactory(identifier).orElse(null))); @@ -472,7 +447,7 @@ public void createFormatTable(Identifier identifier, Schema schema) { * @return The warehouse path for the database */ public Path newDatabasePath(String database) { - return newDatabasePath(warehouse(), database); + return CatalogUtils.newDatabasePath(warehouse(), database); } public Map> allTablePaths() { @@ -507,16 +482,6 @@ public Path getTableLocation(Identifier identifier) { return new Path(newDatabasePath(identifier.getDatabaseName()), identifier.getTableName()); } - protected static void checkNotBranch(Identifier identifier, String method) { - if (identifier.getBranchName() != null) { - throw new IllegalArgumentException( - String.format( - "Cannot '%s' for branch table '%s', " - + "please modify the table with the default branch.", - method, identifier)); - } - } - protected void assertMainBranch(Identifier identifier) { if (identifier.getBranchName() != null && !DEFAULT_MAIN_BRANCH.equals(identifier.getBranchName())) { @@ -525,46 +490,10 @@ protected void assertMainBranch(Identifier identifier) { } } - protected static boolean isTableInSystemDatabase(Identifier identifier) { - return isSystemDatabase(identifier.getDatabaseName()) || identifier.isSystemTable(); - } - - protected static void checkNotSystemTable(Identifier identifier, String method) { - if (isTableInSystemDatabase(identifier)) { - throw new IllegalArgumentException( - String.format( - "Cannot '%s' for system table '%s', please use data table.", - method, identifier)); - } - } - private void copyTableDefaultOptions(Map options) { tableDefaultOptions.forEach(options::putIfAbsent); } - public static Path newTableLocation(String warehouse, Identifier identifier) { - checkNotBranch(identifier, "newTableLocation"); - checkNotSystemTable(identifier, "newTableLocation"); - return new Path( - newDatabasePath(warehouse, identifier.getDatabaseName()), - identifier.getTableName()); - } - - public static Path newDatabasePath(String warehouse, String database) { - return new Path(warehouse, database + DB_SUFFIX); - } - - public static boolean isSystemDatabase(String database) { - return SYSTEM_DATABASE_NAME.equals(database); - } - - /** Validate database cannot be a system database. */ - protected void checkNotSystemDatabase(String database) { - if (isSystemDatabase(database)) { - throw new ProcessSystemDatabaseException(); - } - } - private void validateAutoCreateClose(Map options) { checkArgument( !Boolean.parseBoolean( diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java index 34e53f32f267..e5977168ae86 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java @@ -210,7 +210,7 @@ public void dropTable(Identifier identifier, boolean ignoreIfNotExists) @Override public void renameTable(Identifier fromTable, Identifier toTable, boolean ignoreIfNotExists) - throws TableNotExistException, TableAlreadyExistException { + throws TableNotExistException, TableAlreadyExistException, TableNoPermissionException { super.renameTable(fromTable, toTable, ignoreIfNotExists); invalidateTable(fromTable); } @@ -218,13 +218,15 @@ public void renameTable(Identifier fromTable, Identifier toTable, boolean ignore @Override public void alterTable( Identifier identifier, List changes, boolean ignoreIfNotExists) - throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { + throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException, + TableNoPermissionException { super.alterTable(identifier, changes, ignoreIfNotExists); invalidateTable(identifier); } @Override - public Table getTable(Identifier identifier) throws TableNotExistException { + public Table getTable(Identifier identifier) + throws TableNotExistException, TableNoPermissionException { Table table = tableCache.getIfPresent(identifier); if (table != null) { return table; diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index 37ea6fa5e203..09e2f9fc4fdc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -148,7 +148,7 @@ void alterDatabase(String name, List changes, boolean ignoreIfNo * @return The requested table * @throws TableNotExistException if the target does not exist */ - Table getTable(Identifier identifier) throws TableNotExistException; + Table getTable(Identifier identifier) throws TableNotExistException, TableNoPermissionException; /** * Get names of all tables under this database. An empty list is returned if none exists. @@ -204,7 +204,7 @@ void createTable(Identifier identifier, Schema schema, boolean ignoreIfExists) * @throws TableAlreadyExistException if the toTable already exists */ void renameTable(Identifier fromTable, Identifier toTable, boolean ignoreIfNotExists) - throws TableNotExistException, TableAlreadyExistException; + throws TableNotExistException, TableAlreadyExistException, TableNoPermissionException; /** * Modify an existing table from {@link SchemaChange}s. @@ -218,7 +218,8 @@ void renameTable(Identifier fromTable, Identifier toTable, boolean ignoreIfNotEx * @throws TableNotExistException if the table does not exist */ void alterTable(Identifier identifier, List changes, boolean ignoreIfNotExists) - throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException; + throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException, + TableNoPermissionException; /** * Invalidate cached table metadata for an {@link Identifier identifier}. @@ -274,7 +275,8 @@ void dropPartition(Identifier identifier, Map partitions) * @throws TableNotExistException if the table does not exist */ default void alterTable(Identifier identifier, SchemaChange change, boolean ignoreIfNotExists) - throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { + throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException, + TableNoPermissionException { alterTable(identifier, Collections.singletonList(change), ignoreIfNotExists); } @@ -433,6 +435,27 @@ public ProcessSystemDatabaseException() { } } + /** Exception for trying to operate on a table that doesn't have permission. */ + class TableNoPermissionException extends Exception { + + private static final String MSG = "No permission for Table %s."; + + private final Identifier identifier; + + public TableNoPermissionException(Identifier identifier) { + this(identifier, null); + } + + public TableNoPermissionException(Identifier identifier, Throwable cause) { + super(String.format(MSG, identifier.getFullName()), cause); + this.identifier = identifier; + } + + public Identifier identifier() { + return identifier; + } + } + /** Exception for trying to create a table that already exists. */ class TableAlreadyExistException extends Exception { diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java index 043da0504d7f..826e2c084722 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java @@ -18,12 +18,24 @@ package org.apache.paimon.catalog; +import org.apache.paimon.factories.FactoryUtil; +import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.options.Options; import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.system.SystemTableLoader; +import org.apache.paimon.utils.Preconditions; import java.util.Map; +import java.util.Optional; +import static org.apache.paimon.catalog.Catalog.DB_SUFFIX; +import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME; import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX; +import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED; +import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE; import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey; /** Utils for {@link Catalog}. */ @@ -60,4 +72,93 @@ public static String table(String path) { public static Map tableDefaultOptions(Map options) { return convertToPropertiesPrefixKey(options, TABLE_DEFAULT_OPTION_PREFIX); } + + public static boolean isSystemDatabase(String database) { + return SYSTEM_DATABASE_NAME.equals(database); + } + + /** Validate database cannot be a system database. */ + public static void checkNotSystemDatabase(String database) { + if (isSystemDatabase(database)) { + throw new Catalog.ProcessSystemDatabaseException(); + } + } + + public static boolean isTableInSystemDatabase(Identifier identifier) { + return isSystemDatabase(identifier.getDatabaseName()) || identifier.isSystemTable(); + } + + public static void checkNotSystemTable(Identifier identifier, String method) { + if (isTableInSystemDatabase(identifier)) { + throw new IllegalArgumentException( + String.format( + "Cannot '%s' for system table '%s', please use data table.", + method, identifier)); + } + } + + public static Path newDatabasePath(String warehouse, String database) { + return new Path(warehouse, database + DB_SUFFIX); + } + + public static Path newTableLocation(String warehouse, Identifier identifier) { + checkNotBranch(identifier, "newTableLocation"); + checkNotSystemTable(identifier, "newTableLocation"); + return new Path( + newDatabasePath(warehouse, identifier.getDatabaseName()), + identifier.getTableName()); + } + + public static void checkNotBranch(Identifier identifier, String method) { + if (identifier.getBranchName() != null) { + throw new IllegalArgumentException( + String.format( + "Cannot '%s' for branch table '%s', " + + "please modify the table with the default branch.", + method, identifier)); + } + } + + public static Optional lockFactory( + Options options, FileIO fileIO, Optional defaultLockFactoryOpt) { + boolean lockEnabled = lockEnabled(options, fileIO); + if (!lockEnabled) { + return Optional.empty(); + } + + String lock = options.get(LOCK_TYPE); + if (lock == null) { + return defaultLockFactoryOpt; + } + + return Optional.of( + FactoryUtil.discoverFactory( + AbstractCatalog.class.getClassLoader(), CatalogLockFactory.class, lock)); + } + + public static Optional lockContext(Options options) { + return Optional.of(CatalogLockContext.fromOptions(options)); + } + + public static boolean lockEnabled(Options options, FileIO fileIO) { + return options.getOptional(LOCK_ENABLED).orElse(fileIO != null && fileIO.isObjectStore()); + } + + public static Table getSystemTable(Identifier identifier, Table originTable) + throws Catalog.TableNotExistException { + if (!(originTable instanceof FileStoreTable)) { + throw new UnsupportedOperationException( + String.format( + "Only data table support system tables, but this table %s is %s.", + identifier, originTable.getClass())); + } + Table table = + SystemTableLoader.load( + Preconditions.checkNotNull(identifier.getSystemTableName()), + (FileStoreTable) originTable); + if (table == null) { + throw new Catalog.TableNotExistException(identifier); + } + return table; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java index 968f00cfcae5..7bc9e505ee4b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java @@ -108,19 +108,21 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx @Override public void renameTable(Identifier fromTable, Identifier toTable, boolean ignoreIfNotExists) - throws TableNotExistException, TableAlreadyExistException { + throws TableNotExistException, TableAlreadyExistException, TableNoPermissionException { wrapped.renameTable(fromTable, toTable, ignoreIfNotExists); } @Override public void alterTable( Identifier identifier, List changes, boolean ignoreIfNotExists) - throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { + throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException, + TableNoPermissionException { wrapped.alterTable(identifier, changes, ignoreIfNotExists); } @Override - public Table getTable(Identifier identifier) throws TableNotExistException { + public Table getTable(Identifier identifier) + throws TableNotExistException, TableNoPermissionException { return wrapped.getTable(identifier); } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index cb0c358259f8..577dd9674ec8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.concurrent.Callable; +import static org.apache.paimon.catalog.CatalogUtils.lockFactory; import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE; /** A catalog implementation for {@link FileIO}. */ @@ -123,7 +124,9 @@ public void createTableImpl(Identifier identifier, Schema schema) { private SchemaManager schemaManager(Identifier identifier) { Path path = getTableLocation(identifier); CatalogLock catalogLock = - lockFactory().map(fac -> fac.createLock(assertGetLockContext())).orElse(null); + lockFactory(catalogOptions, fileIO(), defaultLockFactory()) + .map(fac -> fac.createLock(assertGetLockContext())) + .orElse(null); return new SchemaManager(fileIO, path, identifier.getBranchNameOrDefault()) .withLock(catalogLock == null ? null : Lock.fromCatalog(catalogLock, identifier)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java index 6a4276662468..1bf2e359303b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java @@ -212,7 +212,8 @@ public static List createOrphanFilesCleans( long olderThanMillis, SerializableConsumer fileCleaner, @Nullable Integer parallelism) - throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException { + throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException, + Catalog.TableNoPermissionException { List tableNames = Collections.singletonList(tableName); if (tableName == null || "*".equals(tableName)) { tableNames = catalog.listTables(databaseName); @@ -253,7 +254,8 @@ public static CleanOrphanFilesResult executeDatabaseOrphanFiles( long olderThanMillis, SerializableConsumer fileCleaner, @Nullable Integer parallelism) - throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException { + throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException, + Catalog.TableNoPermissionException { List tableCleans = createOrphanFilesCleans( catalog, diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java index 35822471a2d6..023d201dec2d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java @@ -107,7 +107,7 @@ public void createTable(Identifier identifier, Schema schema, boolean ignoreIfEx @Override public void renameTable(Identifier fromTable, Identifier toTable, boolean ignoreIfNotExists) - throws TableNotExistException, TableAlreadyExistException { + throws TableNotExistException, TableAlreadyExistException, TableNoPermissionException { privilegeManager.getPrivilegeChecker().assertCanAlterTable(fromTable); wrapped.renameTable(fromTable, toTable, ignoreIfNotExists); @@ -126,13 +126,15 @@ public void renameTable(Identifier fromTable, Identifier toTable, boolean ignore @Override public void alterTable( Identifier identifier, List changes, boolean ignoreIfNotExists) - throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { + throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException, + TableNoPermissionException { privilegeManager.getPrivilegeChecker().assertCanAlterTable(identifier); wrapped.alterTable(identifier, changes, ignoreIfNotExists); } @Override - public Table getTable(Identifier identifier) throws TableNotExistException { + public Table getTable(Identifier identifier) + throws TableNotExistException, TableNoPermissionException { Table table = wrapped.getTable(identifier); if (table instanceof FileStoreTable) { return PrivilegedFileStoreTable.wrap( @@ -186,6 +188,8 @@ public void grantPrivilegeOnTable(String user, Identifier identifier, PrivilegeT getTable(identifier); } catch (TableNotExistException e) { throw new IllegalArgumentException("Table " + identifier + " does not exist"); + } catch (TableNoPermissionException e) { + throw new IllegalArgumentException("Table " + identifier + " has no permission"); } privilegeManager.grant(user, identifier.getFullName(), privilege); } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java index 8b53bef8486b..a767c8af98fa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java @@ -18,46 +18,72 @@ package org.apache.paimon.rest; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.TableType; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogUtils; import org.apache.paimon.catalog.Database; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.catalog.PropertyChange; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.PartitionEntry; +import org.apache.paimon.operation.Lock; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.rest.auth.AuthSession; import org.apache.paimon.rest.auth.CredentialsProvider; import org.apache.paimon.rest.auth.CredentialsProviderFactory; import org.apache.paimon.rest.exceptions.AlreadyExistsException; +import org.apache.paimon.rest.exceptions.ForbiddenException; import org.apache.paimon.rest.exceptions.NoSuchResourceException; import org.apache.paimon.rest.requests.AlterDatabaseRequest; import org.apache.paimon.rest.requests.CreateDatabaseRequest; +import org.apache.paimon.rest.requests.CreateTableRequest; +import org.apache.paimon.rest.requests.UpdateTableRequest; import org.apache.paimon.rest.responses.AlterDatabaseResponse; import org.apache.paimon.rest.responses.ConfigResponse; import org.apache.paimon.rest.responses.CreateDatabaseResponse; -import org.apache.paimon.rest.responses.DatabaseName; import org.apache.paimon.rest.responses.GetDatabaseResponse; +import org.apache.paimon.rest.responses.GetTableResponse; import org.apache.paimon.rest.responses.ListDatabasesResponse; +import org.apache.paimon.rest.responses.ListTablesResponse; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.CatalogEnvironment; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.table.Table; +import org.apache.paimon.table.object.ObjectTable; +import org.apache.paimon.table.system.SystemTableLoader; import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.Preconditions; import org.apache.paimon.shade.guava30.com.google.common.annotations.VisibleForTesting; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.UncheckedIOException; import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ScheduledExecutorService; -import java.util.stream.Collectors; +import java.util.function.Supplier; +import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase; +import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase; +import static org.apache.paimon.catalog.CatalogUtils.lockContext; +import static org.apache.paimon.catalog.CatalogUtils.lockFactory; +import static org.apache.paimon.catalog.CatalogUtils.newTableLocation; import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE; +import static org.apache.paimon.utils.Preconditions.checkNotNull; import static org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool; /** A catalog implementation for REST. */ @@ -67,21 +93,24 @@ public class RESTCatalog implements Catalog { private final RESTClient client; private final ResourcePaths resourcePaths; - private final Options options; private final Map baseHeader; private final AuthSession catalogAuth; + private final CatalogContext context; + private final FileIO fileIO; private volatile ScheduledExecutorService refreshExecutor = null; - public RESTCatalog(Options options) { - if (options.getOptional(CatalogOptions.WAREHOUSE).isPresent()) { + public RESTCatalog(CatalogContext catalogContext) { + Options catalogOptions = catalogContext.options(); + if (catalogOptions.getOptional(CatalogOptions.WAREHOUSE).isPresent()) { throw new IllegalArgumentException("Can not config warehouse in RESTCatalog."); } - String uri = options.get(RESTCatalogOptions.URI); + String uri = catalogOptions.get(RESTCatalogOptions.URI); Optional connectTimeout = - options.getOptional(RESTCatalogOptions.CONNECTION_TIMEOUT); - Optional readTimeout = options.getOptional(RESTCatalogOptions.READ_TIMEOUT); - Integer threadPoolSize = options.get(RESTCatalogOptions.THREAD_POOL_SIZE); + catalogOptions.getOptional(RESTCatalogOptions.CONNECTION_TIMEOUT); + Optional readTimeout = + catalogOptions.getOptional(RESTCatalogOptions.READ_TIMEOUT); + Integer threadPoolSize = catalogOptions.get(RESTCatalogOptions.THREAD_POOL_SIZE); HttpClientOptions httpClientOptions = new HttpClientOptions( uri, @@ -91,10 +120,10 @@ public RESTCatalog(Options options) { threadPoolSize, DefaultErrorHandler.getInstance()); this.client = new HttpClient(httpClientOptions); - this.baseHeader = configHeaders(options.toMap()); + this.baseHeader = configHeaders(catalogOptions.toMap()); CredentialsProvider credentialsProvider = CredentialsProviderFactory.createCredentialsProvider( - options, RESTCatalog.class.getClassLoader()); + catalogOptions, RESTCatalog.class.getClassLoader()); if (credentialsProvider.keepRefreshed()) { this.catalogAuth = AuthSession.fromRefreshCredentialsProvider( @@ -104,26 +133,47 @@ public RESTCatalog(Options options) { this.catalogAuth = new AuthSession(this.baseHeader, credentialsProvider); } Map initHeaders = - RESTUtil.merge(configHeaders(options.toMap()), this.catalogAuth.getHeaders()); - this.options = new Options(fetchOptionsFromServer(initHeaders, options.toMap())); + RESTUtil.merge( + configHeaders(catalogOptions.toMap()), this.catalogAuth.getHeaders()); + Options options = new Options(fetchOptionsFromServer(initHeaders, initHeaders)); + this.context = + CatalogContext.create( + options, catalogContext.preferIO(), catalogContext.fallbackIO()); this.resourcePaths = - ResourcePaths.forCatalogProperties( - this.options.get(RESTCatalogInternalOptions.PREFIX)); + ResourcePaths.forCatalogProperties(options.get(RESTCatalogInternalOptions.PREFIX)); + this.fileIO = getFileIOFromOptions(context); + } + + // todo: whether it's ok + private static FileIO getFileIOFromOptions(CatalogContext context) { + Options options = context.options(); + String warehouseStr = options.get(CatalogOptions.WAREHOUSE); + Path warehousePath = new Path(warehouseStr); + FileIO fileIO; + CatalogContext contextWithNewOptions = + CatalogContext.create(options, context.preferIO(), context.fallbackIO()); + try { + fileIO = FileIO.get(warehousePath, contextWithNewOptions); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return fileIO; } @Override public String warehouse() { - throw new UnsupportedOperationException(); + return context.options().get(CatalogOptions.WAREHOUSE); } @Override public Map options() { - return this.options.toMap(); + return context.options().toMap(); } @Override public FileIO fileIO() { - throw new UnsupportedOperationException(); + // todo: storage token need support refresh + return this.fileIO; } @Override @@ -131,9 +181,7 @@ public List listDatabases() { ListDatabasesResponse response = client.get(resourcePaths.databases(), ListDatabasesResponse.class, headers()); if (response.getDatabases() != null) { - return response.getDatabases().stream() - .map(DatabaseName::getName) - .collect(Collectors.toList()); + return response.getDatabases(); } return ImmutableList.of(); } @@ -141,6 +189,7 @@ public List listDatabases() { @Override public void createDatabase(String name, boolean ignoreIfExists, Map properties) throws DatabaseAlreadyExistException { + checkNotSystemDatabase(name); CreateDatabaseRequest request = new CreateDatabaseRequest(name, properties); try { client.post( @@ -154,6 +203,9 @@ public void createDatabase(String name, boolean ignoreIfExists, Map changes, boolean ignoreIfNotExists) throws DatabaseNotExistException { + checkNotSystemDatabase(name); try { Pair, Set> setPropertiesToRemoveKeys = PropertyChange.getSetPropertiesToRemoveKeys(changes); @@ -205,39 +259,87 @@ public void alterDatabase(String name, List changes, boolean ign } } - @Override - public Table getTable(Identifier identifier) throws TableNotExistException { - throw new UnsupportedOperationException(); - } - @Override public List listTables(String databaseName) throws DatabaseNotExistException { - return new ArrayList(); + ListTablesResponse response = + client.get(resourcePaths.tables(databaseName), ListTablesResponse.class, headers()); + if (response.getTables() != null) { + return response.getTables(); + } + return ImmutableList.of(); } @Override - public void dropTable(Identifier identifier, boolean ignoreIfNotExists) - throws TableNotExistException { - throw new UnsupportedOperationException(); + public Table getTable(Identifier identifier) + throws TableNotExistException, TableNoPermissionException { + if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) { + return getAllInSystemDatabase(identifier); + } else if (identifier.isSystemTable()) { + return getSystemTable(identifier); + } else { + return getDataOrFormatTable(identifier); + } } @Override public void createTable(Identifier identifier, Schema schema, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException { - throw new UnsupportedOperationException(); + try { + CreateTableRequest request = new CreateTableRequest(identifier, schema); + client.post( + resourcePaths.tables(identifier.getDatabaseName()), + request, + GetTableResponse.class, + headers()); + } catch (AlreadyExistsException e) { + throw new TableAlreadyExistException(identifier); + } } @Override public void renameTable(Identifier fromTable, Identifier toTable, boolean ignoreIfNotExists) - throws TableNotExistException, TableAlreadyExistException { - throw new UnsupportedOperationException(); + throws TableNotExistException, TableAlreadyExistException, TableNoPermissionException { + try { + updateTable(fromTable, toTable, new ArrayList<>()); + } catch (NoSuchResourceException e) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(fromTable); + } + } catch (ForbiddenException e) { + throw new TableNoPermissionException(fromTable); + } catch (AlreadyExistsException e) { + throw new TableAlreadyExistException(toTable); + } } @Override public void alterTable( Identifier identifier, List changes, boolean ignoreIfNotExists) - throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException { - throw new UnsupportedOperationException(); + throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException, + TableNoPermissionException { + try { + updateTable(identifier, null, changes); + } catch (NoSuchResourceException e) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(identifier); + } + } catch (ForbiddenException e) { + throw new TableNoPermissionException(identifier); + } + } + + @Override + public void dropTable(Identifier identifier, boolean ignoreIfNotExists) + throws TableNotExistException { + try { + client.delete( + resourcePaths.table(identifier.getDatabaseName(), identifier.getTableName()), + headers()); + } catch (NoSuchResourceException e) { + if (!ignoreIfNotExists) { + throw new TableNotExistException(identifier); + } + } } @Override @@ -258,7 +360,7 @@ public List listPartitions(Identifier identifier) @Override public boolean caseSensitive() { - return options.getOptional(CASE_SENSITIVE).orElse(true); + return context.options().getOptional(CASE_SENSITIVE).orElse(true); } @Override @@ -287,6 +389,111 @@ private Map headers() { return catalogAuth.getHeaders(); } + private TableSchema getDataTableSchema(Identifier identifier) + throws TableNotExistException, TableNoPermissionException { + try { + GetTableResponse response = + client.get( + resourcePaths.table( + identifier.getDatabaseName(), identifier.getTableName()), + GetTableResponse.class, + headers()); + return response.getSchema(); + } catch (NoSuchResourceException e) { + throw new TableNotExistException(identifier); + } catch (ForbiddenException e) { + throw new TableNoPermissionException(identifier); + } + } + + // todo: how know which exception to throw + private void updateTable(Identifier fromTable, Identifier toTable, List changes) { + UpdateTableRequest request = new UpdateTableRequest(fromTable, toTable, changes); + client.post( + resourcePaths.table(fromTable.getDatabaseName(), fromTable.getTableName()), + request, + GetTableResponse.class, + headers()); + } + + private Table getAllInSystemDatabase(Identifier identifier) throws TableNotExistException { + String tableName = identifier.getTableName(); + Supplier>> getAllTablePathsFunction = + () -> { + try { + Map> allPaths = new HashMap<>(); + for (String database : listDatabases()) { + Map tableMap = + allPaths.computeIfAbsent(database, d -> new HashMap<>()); + for (String table : listTables(database)) { + Path tableLocation = + newTableLocation( + warehouse(), Identifier.create(database, table)); + tableMap.put(table, tableLocation); + } + } + return allPaths; + } catch (DatabaseNotExistException e) { + throw new RuntimeException("Database is deleted while listing", e); + } + }; + Table table = + SystemTableLoader.loadGlobal( + tableName, fileIO(), getAllTablePathsFunction, context.options()); + if (table == null) { + throw new TableNotExistException(identifier); + } + return table; + } + + private Table getSystemTable(Identifier identifier) + throws TableNotExistException, TableNoPermissionException { + Table originTable = + getDataOrFormatTable( + new Identifier( + identifier.getDatabaseName(), + identifier.getTableName(), + identifier.getBranchName(), + null)); + return CatalogUtils.getSystemTable(identifier, originTable); + } + + private Table getDataOrFormatTable(Identifier identifier) + throws TableNotExistException, TableNoPermissionException { + Preconditions.checkArgument(identifier.getSystemTableName() == null); + TableSchema tableSchema = getDataTableSchema(identifier); + String uuid = null; + FileStoreTable table = + FileStoreTableFactory.create( + this.fileIO(), + newTableLocation(warehouse(), identifier), + tableSchema, + new CatalogEnvironment( + identifier, + uuid, + Lock.factory( + lockFactory( + context.options(), + this.fileIO(), + Optional.empty()) + .orElse(null), + lockContext(context.options()).orElse(null), + identifier), + null)); // todo: whether need MetastoreClient.Factory + CoreOptions options = table.coreOptions(); + if (options.type() == TableType.OBJECT_TABLE) { + String objectLocation = options.objectLocation(); + checkNotNull(objectLocation, "Object location should not be null for object table."); + table = + ObjectTable.builder() + .underlyingTable(table) + .objectLocation(objectLocation) + .objectFileIO(this.fileIO()) + .build(); + } + return table; + } + private ScheduledExecutorService tokenRefreshExecutor() { if (refreshExecutor == null) { synchronized (this) { diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogFactory.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogFactory.java index a5c773cb4bd5..b03c9ca4248d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogFactory.java @@ -21,6 +21,8 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.Options; /** Factory to create {@link RESTCatalog}. */ public class RESTCatalogFactory implements CatalogFactory { @@ -33,6 +35,10 @@ public String identifier() { @Override public Catalog create(CatalogContext context) { - return new RESTCatalog(context.options()); + Options options = context.options(); + if (options.getOptional(CatalogOptions.WAREHOUSE).isPresent()) { + throw new IllegalArgumentException("Can not config warehouse in RESTCatalog."); + } + return new RESTCatalog(context); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java index 51277454ffb0..567dfea49046 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java @@ -52,4 +52,23 @@ public String databaseProperties(String databaseName) { .add("properties") .toString(); } + + public String tables(String databaseName) { + return SLASH.add("v1") + .add(prefix) + .add("databases") + .add(databaseName) + .add("tables") + .toString(); + } + + public String table(String databaseName, String tableName) { + return SLASH.add("v1") + .add(prefix) + .add("databases") + .add(databaseName) + .add("tables") + .add(tableName) + .toString(); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/requests/CreateTableRequest.java b/paimon-core/src/main/java/org/apache/paimon/rest/requests/CreateTableRequest.java new file mode 100644 index 000000000000..1e152d7f1f56 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/rest/requests/CreateTableRequest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.requests; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.rest.RESTRequest; +import org.apache.paimon.schema.Schema; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +/** Request for creating table. */ +public class CreateTableRequest implements RESTRequest { + + private static final String FIELD_IDENTIFIER = "identifier"; + private static final String FIELD_SCHEMA = "schema"; + + @JsonProperty(FIELD_IDENTIFIER) + private Identifier identifier; + + @JsonProperty(FIELD_SCHEMA) + private Schema schema; + + @JsonCreator + public CreateTableRequest( + @JsonProperty(FIELD_IDENTIFIER) Identifier identifier, + @JsonProperty(FIELD_SCHEMA) Schema schema) { + this.identifier = identifier; + this.schema = schema; + } + + @JsonGetter(FIELD_IDENTIFIER) + public Identifier getIdentifier() { + return identifier; + } + + @JsonGetter(FIELD_SCHEMA) + public Schema getSchema() { + return schema; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/requests/UpdateTableRequest.java b/paimon-core/src/main/java/org/apache/paimon/rest/requests/UpdateTableRequest.java new file mode 100644 index 000000000000..ca12a1152455 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/rest/requests/UpdateTableRequest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.requests; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.rest.RESTRequest; +import org.apache.paimon.schema.SchemaChange; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +/** Request for updating table. */ +public class UpdateTableRequest implements RESTRequest { + + private static final String FIELD_FROM_IDENTIFIER = "from"; + private static final String FIELD_TO_IDENTIFIER = "to"; + private static final String FIELD_SCHEMA_CHANGES = "changes"; + + @JsonProperty(FIELD_FROM_IDENTIFIER) + private Identifier fromIdentifier; + + @JsonProperty(FIELD_TO_IDENTIFIER) + private Identifier toIdentifier; + + @JsonProperty(FIELD_SCHEMA_CHANGES) + private List changes; + + @JsonCreator + public UpdateTableRequest( + @JsonProperty(FIELD_FROM_IDENTIFIER) Identifier fromIdentifier, + @JsonProperty(FIELD_TO_IDENTIFIER) Identifier toIdentifier, + @JsonProperty(FIELD_SCHEMA_CHANGES) List changes) { + this.fromIdentifier = fromIdentifier; + this.toIdentifier = toIdentifier; + this.changes = changes; + } + + @JsonGetter(FIELD_FROM_IDENTIFIER) + public Identifier getFromIdentifier() { + return fromIdentifier; + } + + @JsonGetter(FIELD_TO_IDENTIFIER) + public Identifier getToIdentifier() { + return toIdentifier; + } + + @JsonGetter(FIELD_SCHEMA_CHANGES) + public List getChanges() { + return changes; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableResponse.java b/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableResponse.java new file mode 100644 index 000000000000..671c50cac5a1 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableResponse.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.rest.responses; + +import org.apache.paimon.rest.RESTResponse; +import org.apache.paimon.schema.TableSchema; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +/** Response for getting table. */ +public class GetTableResponse implements RESTResponse { + + private static final String FIELD_LOCATION = "location"; + private static final String FIELD_SCHEMA = "schema"; + + @JsonProperty(FIELD_LOCATION) + private final String location; + + @JsonProperty(FIELD_SCHEMA) + private final TableSchema schema; + + @JsonCreator + public GetTableResponse( + @JsonProperty(FIELD_LOCATION) String location, + @JsonProperty(FIELD_SCHEMA) TableSchema schema) { + this.location = location; + this.schema = schema; + } + + @JsonGetter(FIELD_LOCATION) + public String getLocation() { + return this.location; + } + + @JsonGetter(FIELD_SCHEMA) + public TableSchema getSchema() { + return this.schema; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListDatabasesResponse.java b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListDatabasesResponse.java index 38773f354b77..64a17a6be7e6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListDatabasesResponse.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListDatabasesResponse.java @@ -31,15 +31,15 @@ public class ListDatabasesResponse implements RESTResponse { private static final String FIELD_DATABASES = "databases"; @JsonProperty(FIELD_DATABASES) - private List databases; + private List databases; @JsonCreator - public ListDatabasesResponse(@JsonProperty(FIELD_DATABASES) List databases) { + public ListDatabasesResponse(@JsonProperty(FIELD_DATABASES) List databases) { this.databases = databases; } @JsonGetter(FIELD_DATABASES) - public List getDatabases() { + public List getDatabases() { return this.databases; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/responses/DatabaseName.java b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListTablesResponse.java similarity index 70% rename from paimon-core/src/main/java/org/apache/paimon/rest/responses/DatabaseName.java rename to paimon-core/src/main/java/org/apache/paimon/rest/responses/ListTablesResponse.java index 9a93b2fd1e3d..bccaa48438e2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/rest/responses/DatabaseName.java +++ b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListTablesResponse.java @@ -18,27 +18,28 @@ package org.apache.paimon.rest.responses; -import org.apache.paimon.rest.RESTMessage; +import org.apache.paimon.rest.RESTResponse; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; -/** Class for Database entity. */ -public class DatabaseName implements RESTMessage { +import java.util.List; - private static final String FIELD_NAME = "name"; +/** Response for listing tables. */ +public class ListTablesResponse implements RESTResponse { + private static final String FIELD_TABLES = "tables"; - @JsonProperty(FIELD_NAME) - private String name; + @JsonProperty(FIELD_TABLES) + private List tables; @JsonCreator - public DatabaseName(@JsonProperty(FIELD_NAME) String name) { - this.name = name; + public ListTablesResponse(@JsonProperty(FIELD_TABLES) List tables) { + this.tables = tables; } - @JsonGetter(FIELD_NAME) - public String getName() { - return this.name; + @JsonGetter(FIELD_TABLES) + public List getTables() { + return this.tables; } } diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java index fee6d1433143..53433b10a19d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CachingCatalogTest.java @@ -284,7 +284,8 @@ public void testDeadlock() throws Exception { identifier -> { try { return underlyCatalog.getTable(identifier); - } catch (Catalog.TableNotExistException e) { + } catch (Catalog.TableNotExistException + | Catalog.TableNoPermissionException e) { throw new RuntimeException(e); } }); diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java index 821257a0e10e..5f40b15f4f9d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java @@ -22,10 +22,10 @@ import org.apache.paimon.rest.requests.CreateDatabaseRequest; import org.apache.paimon.rest.responses.AlterDatabaseResponse; import org.apache.paimon.rest.responses.CreateDatabaseResponse; -import org.apache.paimon.rest.responses.DatabaseName; import org.apache.paimon.rest.responses.ErrorResponse; import org.apache.paimon.rest.responses.GetDatabaseResponse; import org.apache.paimon.rest.responses.ListDatabasesResponse; +import org.apache.paimon.rest.responses.ListTablesResponse; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; @@ -63,9 +63,8 @@ public static GetDatabaseResponse getDatabaseResponse(String name) { } public static ListDatabasesResponse listDatabasesResponse(String name) { - DatabaseName databaseName = new DatabaseName(name); - List databaseNameList = new ArrayList<>(); - databaseNameList.add(databaseName); + List databaseNameList = new ArrayList<>(); + databaseNameList.add(name); return new ListDatabasesResponse(databaseNameList); } @@ -83,4 +82,12 @@ public static AlterDatabaseResponse alterDatabaseResponse() { return new AlterDatabaseResponse( Lists.newArrayList("remove"), Lists.newArrayList("add"), new ArrayList<>()); } + + public static ListTablesResponse listTablesResponse() { + return new ListTablesResponse(Lists.newArrayList("table")); + } + + public static ListTablesResponse listTablesEmptyResponse() { + return new ListTablesResponse(Lists.newArrayList()); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java index 9b1582929560..7fe3255f43a9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.rest; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.Database; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; @@ -27,6 +28,7 @@ import org.apache.paimon.rest.responses.ErrorResponse; import org.apache.paimon.rest.responses.GetDatabaseResponse; import org.apache.paimon.rest.responses.ListDatabasesResponse; +import org.apache.paimon.rest.responses.ListTablesResponse; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; @@ -35,7 +37,9 @@ import okhttp3.mockwebserver.MockWebServer; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import java.io.IOException; import java.util.ArrayList; @@ -50,7 +54,6 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; /** Test for REST Catalog. */ public class RESTCatalogTest { @@ -59,6 +62,9 @@ public class RESTCatalogTest { private MockWebServer mockWebServer; private RESTCatalog restCatalog; private RESTCatalog mockRestCatalog; + private CatalogContext context; + private String warehouseStr; + @Rule public TemporaryFolder folder = new TemporaryFolder(); @Before public void setUp() throws IOException { @@ -70,12 +76,17 @@ public void setUp() throws IOException { String initToken = "init_token"; options.set(RESTCatalogOptions.TOKEN, initToken); options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1); + warehouseStr = folder.getRoot().getPath(); String mockResponse = String.format( - "{\"defaults\": {\"%s\": \"%s\"}}", - RESTCatalogInternalOptions.PREFIX.key(), "prefix"); + "{\"defaults\": {\"%s\": \"%s\", \"%s\": \"%s\"}}", + RESTCatalogInternalOptions.PREFIX.key(), + "prefix", + CatalogOptions.WAREHOUSE.key(), + warehouseStr); mockResponse(mockResponse, 200); - restCatalog = new RESTCatalog(options); + context = CatalogContext.create(options); + restCatalog = new RESTCatalog(context); mockRestCatalog = spy(restCatalog); } @@ -87,8 +98,10 @@ public void tearDown() throws IOException { @Test public void testInitFailWhenDefineWarehouse() { Options options = new Options(); - options.set(CatalogOptions.WAREHOUSE, "/a/b/c"); - assertThrows(IllegalArgumentException.class, () -> new RESTCatalog(options)); + options.set(CatalogOptions.WAREHOUSE, warehouseStr); + assertThrows( + IllegalArgumentException.class, + () -> new RESTCatalog(CatalogContext.create(options))); } @Test @@ -164,8 +177,9 @@ public void testDropDatabaseWhenNoExistAndIgnoreIfNotExistsIsTrue() throws Excep public void testDropDatabaseWhenCascadeIsFalseAndNoTables() throws Exception { String name = MockRESTMessage.databaseName(); boolean cascade = false; + ListTablesResponse response = MockRESTMessage.listTablesEmptyResponse(); + mockResponse(mapper.writeValueAsString(response), 200); mockResponse("", 200); - when(mockRestCatalog.listTables(name)).thenReturn(new ArrayList<>()); assertDoesNotThrow(() -> mockRestCatalog.dropDatabase(name, false, cascade)); verify(mockRestCatalog, times(1)).dropDatabase(eq(name), eq(false), eq(cascade)); verify(mockRestCatalog, times(1)).listTables(eq(name)); @@ -175,10 +189,8 @@ public void testDropDatabaseWhenCascadeIsFalseAndNoTables() throws Exception { public void testDropDatabaseWhenCascadeIsFalseAndTablesExist() throws Exception { String name = MockRESTMessage.databaseName(); boolean cascade = false; - mockResponse("", 200); - List tables = new ArrayList<>(); - tables.add("t1"); - when(mockRestCatalog.listTables(name)).thenReturn(tables); + ListTablesResponse response = MockRESTMessage.listTablesResponse(); + mockResponse(mapper.writeValueAsString(response), 200); assertThrows( Catalog.DatabaseNotEmptyException.class, () -> mockRestCatalog.dropDatabase(name, false, cascade)); diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java index 0e5a71be39c0..1cf382a9f438 100644 --- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java @@ -103,7 +103,7 @@ public void listDatabaseResponseParseTest() throws Exception { ListDatabasesResponse parseData = mapper.readValue(responseStr, ListDatabasesResponse.class); assertEquals(response.getDatabases().size(), parseData.getDatabases().size()); - assertEquals(name, parseData.getDatabases().get(0).getName()); + assertEquals(name, parseData.getDatabases().get(0)); } @Test diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java index d70cccf6ba25..7f1577f9d14b 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java @@ -49,7 +49,7 @@ public class MarkPartitionDoneProcedure extends ProcedureBase { public String[] call( ProcedureContext procedureContext, String tableId, String... partitionStrings) - throws Catalog.TableNotExistException, IOException { + throws Catalog.TableNotExistException, Catalog.TableNoPermissionException, IOException { checkArgument( partitionStrings.length > 0, "mark_partition_done procedure must specify partitions."); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java index ce2766b9410b..41b07a29b6cc 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java @@ -207,7 +207,8 @@ protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable catalog.alterTable(identifier, optionChanges, false); } catch (Catalog.TableNotExistException | Catalog.ColumnAlreadyExistException - | Catalog.ColumnNotExistException e) { + | Catalog.ColumnNotExistException + | Catalog.TableNoPermissionException e) { throw new RuntimeException("This is unexpected.", e); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java index a4b4e8284043..002bff10b057 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java @@ -178,7 +178,8 @@ public void processElement(StreamRecord element) throws Exce } } - private FileStoreTable getTable(Identifier tableId) throws InterruptedException { + private FileStoreTable getTable(Identifier tableId) + throws InterruptedException, Catalog.TableNoPermissionException { FileStoreTable table = tables.get(tableId); if (table == null) { while (true) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java index 6b10dbb84bf4..60b4b011a83b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java @@ -213,7 +213,7 @@ Table buildPaimonTable(DynamicTableFactory.Context context) { context.getObjectIdentifier().getObjectName()); try { fileStoreTable = (FileStoreTable) flinkCatalog.catalog().getTable(identifier); - } catch (Catalog.TableNotExistException e) { + } catch (Catalog.TableNotExistException | Catalog.TableNoPermissionException e) { throw new RuntimeException(e); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index ec3c4a47a69d..1a12442c6b79 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -306,6 +306,8 @@ private CatalogBaseTable getTable(ObjectPath tablePath, @Nullable Long timestamp } throw new TableNotExistException(getName(), tablePath); + } catch (Catalog.TableNoPermissionException e) { + throw new CatalogException(e); } if (table instanceof FormatTable) { @@ -369,6 +371,8 @@ public boolean tableExists(ObjectPath tablePath) throws CatalogException { } catch (Catalog.ViewNotExistException ex) { return false; } + } catch (Catalog.TableNoPermissionException e) { + throw new CatalogException(e); } } @@ -401,6 +405,8 @@ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) } } catch (Catalog.TableNotExistException e) { throw new TableNotExistException(getName(), tablePath); + } catch (Catalog.TableNoPermissionException e) { + throw new CatalogException(e); } } @@ -865,7 +871,9 @@ public void alterTable( catalog.alterTable(toIdentifier(tablePath), changes, ignoreIfNotExists); } catch (Catalog.TableNotExistException e) { throw new TableNotExistException(getName(), tablePath); - } catch (Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) { + } catch (Catalog.ColumnAlreadyExistException + | Catalog.ColumnNotExistException + | Catalog.TableNoPermissionException e) { throw new CatalogException(e); } } @@ -886,6 +894,8 @@ public void alterTable( table = catalog.getTable(toIdentifier(tablePath)); } catch (Catalog.TableNotExistException e) { throw new TableNotExistException(getName(), tablePath); + } catch (Catalog.TableNoPermissionException e) { + throw new CatalogException(e); } checkArgument( @@ -934,7 +944,8 @@ public void alterTable( catalog.alterTable(toIdentifier(tablePath), changes, ignoreIfNotExists); } catch (Catalog.TableNotExistException | Catalog.ColumnAlreadyExistException - | Catalog.ColumnNotExistException e) { + | Catalog.ColumnNotExistException + | Catalog.TableNoPermissionException e) { throw new CatalogException(e); } } @@ -1249,6 +1260,8 @@ public final void renameTable( } } catch (Catalog.TableAlreadyExistException e) { throw new TableAlreadyExistException(getName(), toTable); + } catch (Catalog.TableNoPermissionException e) { + throw new CatalogException(e); } } @@ -1313,6 +1326,8 @@ private Table getPaimonTable(ObjectPath tablePath) throws TableNotExistException return catalog.getTable(identifier); } catch (Catalog.TableNotExistException e) { throw new TableNotExistException(getName(), tablePath); + } catch (Catalog.TableNoPermissionException e) { + throw new CatalogException(e); } } @@ -1564,6 +1579,8 @@ private void alterTableStatisticsInternal( if (!ignoreIfNotExists) { throw new TableNotExistException(getName(), tablePath); } + } catch (Catalog.TableNoPermissionException e) { + throw new CatalogException(e); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java index 457e04bfd8ff..fc3109d9f349 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java @@ -177,7 +177,9 @@ private void buildForDividedMode() { } } } - } catch (Catalog.DatabaseNotExistException | Catalog.TableNotExistException e) { + } catch (Catalog.DatabaseNotExistException + | Catalog.TableNotExistException + | Catalog.TableNoPermissionException e) { throw new RuntimeException(e); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java index 870d22012bf2..a02494163973 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java @@ -46,7 +46,7 @@ public abstract class TableActionBase extends ActionBase { identifier = new Identifier(databaseName, tableName); try { table = catalog.getTable(identifier); - } catch (Catalog.TableNotExistException e) { + } catch (Catalog.TableNotExistException | Catalog.TableNoPermissionException e) { throw new RuntimeException(e); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java index 8bcaa2a2071f..1a6be73f3886 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java @@ -82,7 +82,8 @@ public void processElement(StreamRecord streamRecord) throws Exce try { return pathOfTable( sourceCatalog.getTable(Identifier.fromString(key))); - } catch (Catalog.TableNotExistException e) { + } catch (Catalog.TableNotExistException + | Catalog.TableNoPermissionException e) { throw new RuntimeException(e); } }); @@ -93,7 +94,8 @@ public void processElement(StreamRecord streamRecord) throws Exce try { return pathOfTable( targetCatalog.getTable(Identifier.fromString(key))); - } catch (Catalog.TableNotExistException e) { + } catch (Catalog.TableNotExistException + | Catalog.TableNoPermissionException e) { throw new RuntimeException(e); } }); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java index 805e8da0a417..b75441197126 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java @@ -74,7 +74,8 @@ public MultiTableScanBase( } protected void updateTableMap() - throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException { + throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException, + Catalog.TableNoPermissionException { List databases = catalog.listDatabases(); for (String databaseName : databases) { @@ -102,7 +103,8 @@ protected void updateTableMap() } public ScanResult scanTable(ReaderOutput ctx) - throws Catalog.TableNotExistException, Catalog.DatabaseNotExistException { + throws Catalog.TableNotExistException, Catalog.DatabaseNotExistException, + Catalog.TableNoPermissionException { try { updateTableMap(); List tasks = doScan(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreRegister.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreRegister.java index ad501e204ce6..03ea5f73fda7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreRegister.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/log/LogStoreRegister.java @@ -49,7 +49,7 @@ static void registerLogSystem( if (!tableOptions.get(LOG_SYSTEM).equalsIgnoreCase(NONE)) { try { catalog.getTable(identifier); - } catch (Catalog.TableNotExistException e) { + } catch (Catalog.TableNotExistException | Catalog.TableNoPermissionException e) { LogStoreRegister logStoreRegister = getLogStoreRegister(identifier, classLoader, tableOptions, logStore); options.putAll(logStoreRegister.registerTopic()); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java index 23bbbc9b609c..782d0e10cc46 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java @@ -323,7 +323,8 @@ public static CleanOrphanFilesResult executeDatabaseOrphanFiles( @Nullable Integer parallelism, String databaseName, @Nullable String tableName) - throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException { + throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException, + Catalog.TableNoPermissionException { List tableNames = Collections.singletonList(tableName); if (tableName == null || "*".equals(tableName)) { tableNames = catalog.listTables(databaseName); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java index 74cf85a4c5c2..9c58569428f7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateBranchProcedure.java @@ -52,7 +52,7 @@ public String identifier() { }) public String[] call( ProcedureContext procedureContext, String tableId, String branchName, String tagName) - throws Catalog.TableNotExistException { + throws Catalog.TableNotExistException, Catalog.TableNoPermissionException { Table table = catalog.getTable(Identifier.fromString(tableId)); if (!StringUtils.isBlank(tagName)) { table.createBranch(branchName, tagName); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java index dba9d46636e6..cedebde98a1c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateOrReplaceTagBaseProcedure.java @@ -54,7 +54,7 @@ public String[] call( String tagName, @Nullable Long snapshotId, @Nullable String timeRetained) - throws Catalog.TableNotExistException { + throws Catalog.TableNotExistException, Catalog.TableNoPermissionException { Table table = catalog.getTable(Identifier.fromString(tableId)); createOrReplaceTag(table, tagName, snapshotId, toDuration(timeRetained)); return new String[] {"Success"}; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagFromTimestampProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagFromTimestampProcedure.java index 1a7a0de3befb..0029c4b47ba6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagFromTimestampProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagFromTimestampProcedure.java @@ -58,7 +58,7 @@ public Row[] call( String tagName, Long timestamp, @Nullable String timeRetained) - throws Catalog.TableNotExistException { + throws Catalog.TableNotExistException, Catalog.TableNoPermissionException { FileStoreTable fileStoreTable = (FileStoreTable) table(tableId); SnapshotManager snapshotManager = fileStoreTable.snapshotManager(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagFromWatermarkProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagFromWatermarkProcedure.java index 8f7bdbdeea62..dff9c98cf810 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagFromWatermarkProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagFromWatermarkProcedure.java @@ -58,7 +58,7 @@ public Row[] call( String tagName, Long watermark, @Nullable String timeRetained) - throws Catalog.TableNotExistException { + throws Catalog.TableNotExistException, Catalog.TableNoPermissionException { FileStoreTable fileStoreTable = (FileStoreTable) table(tableId); SnapshotManager snapshotManager = fileStoreTable.snapshotManager(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java index 56c649028650..843675edc409 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteBranchProcedure.java @@ -48,7 +48,7 @@ public String identifier() { @ArgumentHint(name = "branch", type = @DataTypeHint("STRING")) }) public String[] call(ProcedureContext procedureContext, String tableId, String branchStr) - throws Catalog.TableNotExistException { + throws Catalog.TableNotExistException, Catalog.TableNoPermissionException { Identifier identifier = Identifier.fromString(tableId); catalog.getTable(identifier).deleteBranches(branchStr); catalog.invalidateTable( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java index 2ae104c9d191..3b8f9d729457 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java @@ -44,7 +44,7 @@ public class DeleteTagProcedure extends ProcedureBase { @ArgumentHint(name = "tag", type = @DataTypeHint("STRING")) }) public String[] call(ProcedureContext procedureContext, String tableId, String tagNameStr) - throws Catalog.TableNotExistException { + throws Catalog.TableNotExistException, Catalog.TableNoPermissionException { Table table = catalog.getTable(Identifier.fromString(tableId)); table.deleteTags(tagNameStr); return new String[] {"Success"}; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java index 3a231758f476..6d18b706bb99 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java @@ -46,7 +46,7 @@ public class DropPartitionProcedure extends ProcedureBase { public String[] call( ProcedureContext procedureContext, String tableId, String... partitionStrings) - throws Catalog.TableNotExistException { + throws Catalog.TableNotExistException, Catalog.TableNoPermissionException { checkArgument( partitionStrings.length > 0, "drop-partition procedure must specify partitions."); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java index ce282c6800cc..0ef11044da73 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java @@ -76,7 +76,7 @@ public String identifier() { String timestampPattern, String expireStrategy, Integer maxExpires) - throws Catalog.TableNotExistException { + throws Catalog.TableNotExistException, Catalog.TableNoPermissionException { FileStoreTable fileStoreTable = (FileStoreTable) table(tableId); FileStore fileStore = fileStoreTable.store(); Map map = new HashMap<>(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java index 9b662fc36907..48290356e466 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java @@ -66,7 +66,7 @@ public String[] call( Integer retainMin, String olderThanStr, Integer maxDeletes) - throws Catalog.TableNotExistException { + throws Catalog.TableNotExistException, Catalog.TableNoPermissionException { ExpireSnapshots expireSnapshots = table(tableId).newExpireSnapshots(); ExpireConfig.Builder builder = ExpireConfig.builder(); if (retainMax != null) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java index 3d8af1de70cc..109793216167 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireTagsProcedure.java @@ -50,7 +50,7 @@ public class ExpireTagsProcedure extends ProcedureBase { }) public @DataTypeHint("ROW") Row[] call( ProcedureContext procedureContext, String tableId, @Nullable String olderThanStr) - throws Catalog.TableNotExistException { + throws Catalog.TableNotExistException, Catalog.TableNoPermissionException { FileStoreTable fileStoreTable = (FileStoreTable) table(tableId); TagTimeExpire tagTimeExpire = fileStoreTable.store().newTagCreationManager().getTagTimeExpire(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/FastForwardProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/FastForwardProcedure.java index f707d74c3f12..4ec397f8bfe3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/FastForwardProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/FastForwardProcedure.java @@ -49,12 +49,12 @@ public String identifier() { @ArgumentHint(name = "branch", type = @DataTypeHint("STRING")) }) public String[] call(ProcedureContext procedureContext, String tableId, String branchName) - throws Catalog.TableNotExistException { + throws Catalog.TableNotExistException, Catalog.TableNoPermissionException { return innerCall(tableId, branchName); } private String[] innerCall(String tableId, String branchName) - throws Catalog.TableNotExistException { + throws Catalog.TableNotExistException, Catalog.TableNoPermissionException { Table table = catalog.getTable(Identifier.fromString(tableId)); table.fastForward(branchName); return new String[] {"Success"}; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java index f0a89a0bb32b..81573722e459 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java @@ -56,7 +56,7 @@ public class MarkPartitionDoneProcedure extends ProcedureBase { @ArgumentHint(name = "partitions", type = @DataTypeHint("STRING")) }) public String[] call(ProcedureContext procedureContext, String tableId, String partitions) - throws Catalog.TableNotExistException, IOException { + throws Catalog.TableNotExistException, Catalog.TableNoPermissionException, IOException { String[] partitionStrings = partitions.split(";"); checkArgument( partitionStrings.length > 0, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java index efb6aa50c219..d1ecf6b4c1f1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java @@ -52,7 +52,8 @@ public ProcedureBase withCatalog(Catalog catalog) { return this; } - protected Table table(String tableId) throws Catalog.TableNotExistException { + protected Table table(String tableId) + throws Catalog.TableNotExistException, Catalog.TableNoPermissionException { return catalog.getTable(Identifier.fromString(tableId)); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java index 7ee2a3610402..cb7a711cbf78 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java @@ -47,7 +47,7 @@ public class PurgeFilesProcedure extends ProcedureBase { @ProcedureHint(argument = {@ArgumentHint(name = "table", type = @DataTypeHint("STRING"))}) public String[] call(ProcedureContext procedureContext, String tableId) - throws Catalog.TableNotExistException { + throws Catalog.TableNotExistException, Catalog.TableNoPermissionException { Table table = catalog.getTable(Identifier.fromString(tableId)); FileStoreTable fileStoreTable = (FileStoreTable) table; FileIO fileIO = fileStoreTable.fileIO(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RefreshObjectTableProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RefreshObjectTableProcedure.java index 97eb3095f094..1ff56cecd736 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RefreshObjectTableProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RefreshObjectTableProcedure.java @@ -41,7 +41,7 @@ public class RefreshObjectTableProcedure extends ProcedureBase { @ProcedureHint(argument = {@ArgumentHint(name = "table", type = @DataTypeHint("STRING"))}) public @DataTypeHint("ROW") Row[] call( ProcedureContext procedureContext, String tableId) - throws Catalog.TableNotExistException { + throws Catalog.TableNotExistException, Catalog.TableNoPermissionException { ObjectTable table = (ObjectTable) table(tableId); long fileNumber = table.refresh(); return new Row[] {Row.of(fileNumber)}; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RenameTagProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RenameTagProcedure.java index 5476e807be71..3c8c0f70c863 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RenameTagProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RenameTagProcedure.java @@ -45,7 +45,7 @@ public class RenameTagProcedure extends ProcedureBase { }) public String[] call( ProcedureContext procedureContext, String tableId, String tagName, String targetTagName) - throws Catalog.TableNotExistException { + throws Catalog.TableNotExistException, Catalog.TableNoPermissionException { Table table = catalog.getTable(Identifier.fromString(tableId)); table.renameTag(tagName, targetTagName); String ret = String.format("Rename [%s] to [%s] successfully.", tagName, targetTagName); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java index 934ce182a09c..e1a0e866b16b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java @@ -58,7 +58,7 @@ public String[] call( String tableId, String consumerId, Long nextSnapshotId) - throws Catalog.TableNotExistException { + throws Catalog.TableNotExistException, Catalog.TableNoPermissionException { FileStoreTable fileStoreTable = (FileStoreTable) catalog.getTable(Identifier.fromString(tableId)); ConsumerManager consumerManager = diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java index 9bca6505c99e..3f4c885e9607 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java @@ -54,7 +54,7 @@ public class RollbackToProcedure extends ProcedureBase { }) public String[] call( ProcedureContext procedureContext, String tableId, String tagName, Long snapshotId) - throws Catalog.TableNotExistException { + throws Catalog.TableNotExistException, Catalog.TableNoPermissionException { Table table = catalog.getTable(Identifier.fromString(tableId)); if (!StringUtils.isNullOrWhitespaceOnly(tagName)) { table.rollbackTo(tagName); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java index f84dab8eab89..08389aac8c0e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToTimestampProcedure.java @@ -48,7 +48,7 @@ public class RollbackToTimestampProcedure extends ProcedureBase { @ArgumentHint(name = "timestamp", type = @DataTypeHint("BIGINT")) }) public String[] call(ProcedureContext procedureContext, String tableId, Long timestamp) - throws Catalog.TableNotExistException { + throws Catalog.TableNotExistException, Catalog.TableNoPermissionException { Table table = catalog.getTable(Identifier.fromString(tableId)); FileStoreTable fileStoreTable = (FileStoreTable) table; Snapshot snapshot = fileStoreTable.snapshotManager().earlierOrEqualTimeMills(timestamp); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java index ab1ea8080de9..dbb280076b6c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToWatermarkProcedure.java @@ -48,7 +48,7 @@ public class RollbackToWatermarkProcedure extends ProcedureBase { @ArgumentHint(name = "watermark", type = @DataTypeHint("BIGINT")) }) public String[] call(ProcedureContext procedureContext, String tableId, Long watermark) - throws Catalog.TableNotExistException { + throws Catalog.TableNotExistException, Catalog.TableNoPermissionException { Table table = catalog.getTable(Identifier.fromString(tableId)); FileStoreTable fileStoreTable = (FileStoreTable) table; Snapshot snapshot = fileStoreTable.snapshotManager().earlierOrEqualWatermark(watermark); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java index 07ec7d165e3a..f7148498eea6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java @@ -122,7 +122,7 @@ private UnawareBucketCompactor compactor(Identifier tableId) { commitUser, this::workerExecutor, getMetricGroup()); - } catch (Catalog.TableNotExistException e) { + } catch (Catalog.TableNotExistException | Catalog.TableNoPermissionException e) { throw new RuntimeException(e); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java index 02a7e6c1b3c8..93bb0aac1606 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java @@ -241,7 +241,7 @@ private FileStoreTable getTable(Identifier tableId) throws InterruptedException table = table.copy(dynamicOptions); tables.put(tableId, table); break; - } catch (Catalog.TableNotExistException e) { + } catch (Catalog.TableNotExistException | Catalog.TableNoPermissionException e) { // table not found, waiting until table is created by // upstream operators } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java index 01acddb9ad99..d18b6b26dec7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java @@ -191,7 +191,7 @@ private StoreCommitter getStoreCommitter(Identifier tableId) { FileStoreTable table; try { table = (FileStoreTable) catalog.getTable(tableId).copy(dynamicOptions); - } catch (Catalog.TableNotExistException e) { + } catch (Catalog.TableNotExistException | Catalog.TableNoPermissionException e) { throw new RuntimeException( String.format( "Failed to get committer for table %s", tableId.getFullName()), diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSource.java index 5c0d9c42dd29..779144caeb6d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSource.java @@ -178,7 +178,9 @@ private static Long getPartitionInfo( PartitionEntry::lastFileCreationTime)); multiTablesPartitionInfo.put(tableIdentifier, partitionInfo); } catch (Catalog.TableNotExistException e) { - LOGGER.error(String.format("table: %s not found.", tableIdentifier.getFullName())); + LOGGER.error("table: {} not found.", tableIdentifier.getFullName()); + } catch (Catalog.TableNoPermissionException e) { + LOGGER.error("table: {} no permission.", tableIdentifier.getFullName()); } } return partitionInfo.get(partition); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java index ae3099ec0628..a6666dfaad8e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java @@ -150,6 +150,8 @@ private TableRead getTableRead(Identifier tableId) { readsMap.put(tableId, table.newReadBuilder().newRead().withIOManager(ioManager)); } catch (Catalog.TableNotExistException e) { LOG.error(String.format("table: %s not found.", tableId.getFullName())); + } catch (Catalog.TableNoPermissionException e) { + LOG.error(String.format("table: %s no permission.", tableId.getFullName())); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java index 15fde93755fd..c5b14267e5fa 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.java @@ -97,7 +97,9 @@ private FileStoreTable getTable(Identifier tableId) { table = (FileStoreTable) newTable; tablesMap.put(tableId, table); } catch (Catalog.TableNotExistException e) { - LOG.error(String.format("table: %s not found.", tableId.getFullName())); + LOG.error("table: {} not found.", tableId.getFullName()); + } catch (Catalog.TableNoPermissionException e) { + LOG.error("table: {} no permission.", tableId.getFullName()); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java index 19aa6d5d7439..188e29c146be 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java @@ -179,7 +179,8 @@ protected CatalogTable table(String tableName) throws TableNotExistException { } protected FileStoreTable paimonTable(String tableName) - throws org.apache.paimon.catalog.Catalog.TableNotExistException { + throws org.apache.paimon.catalog.Catalog.TableNotExistException, + org.apache.paimon.catalog.Catalog.TableNoPermissionException { org.apache.paimon.catalog.Catalog catalog = flinkCatalog().catalog(); return (FileStoreTable) catalog.getTable(Identifier.create(tEnv.getCurrentDatabase(), tableName)); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java index 0cd969707cfa..42dcfc26b672 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java @@ -80,7 +80,8 @@ public class OperatorSourceTest { @BeforeEach public void before() throws Catalog.TableAlreadyExistException, Catalog.DatabaseNotExistException, - Catalog.TableNotExistException, Catalog.DatabaseAlreadyExistException { + Catalog.TableNotExistException, Catalog.DatabaseAlreadyExistException, + Catalog.TableNoPermissionException { Catalog catalog = CatalogFactory.createCatalog( CatalogContext.create(new org.apache.paimon.fs.Path(tempDir.toUri()))); diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index 0be872a58cbf..fd22ca20323e 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -100,6 +100,11 @@ import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_TIME; import static org.apache.paimon.CoreOptions.TYPE; import static org.apache.paimon.TableType.FORMAT_TABLE; +import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch; +import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase; +import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable; +import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase; +import static org.apache.paimon.catalog.CatalogUtils.lockFactory; import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout; import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep; import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR; @@ -632,7 +637,8 @@ public org.apache.paimon.table.Table getDataOrFormatTable(Identifier identifier) identifier, tableMeta.uuid(), Lock.factory( - lockFactory().orElse(null), + lockFactory(catalogOptions, fileIO(), defaultLockFactory()) + .orElse(null), lockContext().orElse(null), identifier), metastoreClientFactory(identifier).orElse(null))); diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java index 5cc826b554bf..38fa4dfe4d39 100644 --- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java @@ -19,8 +19,8 @@ package org.apache.paimon.hive; import org.apache.paimon.CoreOptions; -import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogUtils; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; @@ -87,7 +87,7 @@ public void preCreateTable(Table table) throws MetaException { org.apache.hadoop.fs.Path hadoopPath = getDnsPath(new org.apache.hadoop.fs.Path(warehouse), conf); warehouse = hadoopPath.toUri().toString(); - location = AbstractCatalog.newTableLocation(warehouse, identifier).toUri().toString(); + location = CatalogUtils.newTableLocation(warehouse, identifier).toUri().toString(); table.getSd().setLocation(location); } diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/CreateTableITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/CreateTableITCase.java index 15856c3c06cd..c3a200f1801a 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/CreateTableITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/CreateTableITCase.java @@ -18,10 +18,10 @@ package org.apache.paimon.hive; -import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.CatalogUtils; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; @@ -94,7 +94,7 @@ public void testCreateExternalTableWithPaimonTable() throws Exception { Maps.newHashMap(), ""); Identifier identifier = Identifier.create(DATABASE_TEST, tableName); - Path tablePath = AbstractCatalog.newTableLocation(path, identifier); + Path tablePath = CatalogUtils.newTableLocation(path, identifier); new SchemaManager(LocalFileIO.create(), tablePath).createTable(schema); // Create hive external table @@ -189,7 +189,7 @@ public void testCreateTableUsePartitionedBy() { // check the paimon table schema Identifier identifier = Identifier.create(DATABASE_TEST, tableName); - Path tablePath = AbstractCatalog.newTableLocation(path, identifier); + Path tablePath = CatalogUtils.newTableLocation(path, identifier); Optional tableSchema = new SchemaManager(LocalFileIO.create(), tablePath).latest(); assertThat(tableSchema).isPresent(); @@ -200,7 +200,8 @@ public void testCreateTableUsePartitionedBy() { } @Test - public void testLowerTableName() throws Catalog.TableNotExistException { + public void testLowerTableName() + throws Catalog.TableNotExistException, Catalog.TableNoPermissionException { // Use `partitioned by` to create hive partition table String tableName = "UPPER_NAME"; hiveShell.execute("SET hive.metastore.warehouse.dir=" + path); @@ -245,7 +246,7 @@ public void testLowerTableName() throws Catalog.TableNotExistException { } // check the paimon table name and schema Identifier identifier = Identifier.create(DATABASE_TEST, tableName.toLowerCase()); - Path tablePath = AbstractCatalog.newTableLocation(path, identifier); + Path tablePath = CatalogUtils.newTableLocation(path, identifier); Options conf = new Options(); conf.set(CatalogOptions.WAREHOUSE, path); CatalogContext catalogContext = CatalogContext.create(conf); @@ -258,7 +259,8 @@ public void testLowerTableName() throws Catalog.TableNotExistException { } @Test - public void testLowerDBName() throws Catalog.TableNotExistException { + public void testLowerDBName() + throws Catalog.TableNotExistException, Catalog.TableNoPermissionException { String upperDB = "UPPER_DB"; hiveShell.execute(String.format("create database %s", upperDB)); @@ -310,7 +312,7 @@ public void testLowerDBName() throws Catalog.TableNotExistException { // check the paimon db name态table name and schema Identifier identifier = Identifier.create(upperDB.toLowerCase(), tableName.toLowerCase()); - Path tablePath = AbstractCatalog.newTableLocation(path, identifier); + Path tablePath = CatalogUtils.newTableLocation(path, identifier); Options conf = new Options(); conf.set(CatalogOptions.WAREHOUSE, path); CatalogContext catalogContext = CatalogContext.create(conf); @@ -355,7 +357,7 @@ public void testCreateTableWithPrimaryKey() { // check the paimon table schema Identifier identifier = Identifier.create(DATABASE_TEST, tableName); - Path tablePath = AbstractCatalog.newTableLocation(path, identifier); + Path tablePath = CatalogUtils.newTableLocation(path, identifier); Optional tableSchema = new SchemaManager(LocalFileIO.create(), tablePath).latest(); assertThat(tableSchema).isPresent(); @@ -397,7 +399,7 @@ public void testCreateTableWithPartition() { // check the paimon table schema Identifier identifier = Identifier.create(DATABASE_TEST, tableName); - Path tablePath = AbstractCatalog.newTableLocation(path, identifier); + Path tablePath = CatalogUtils.newTableLocation(path, identifier); Optional tableSchema = new SchemaManager(LocalFileIO.create(), tablePath).latest(); assertThat(tableSchema).isPresent(); @@ -441,7 +443,7 @@ public void testCreateTableSpecifyProperties() { // check the paimon table schema Identifier identifier = Identifier.create(DATABASE_TEST, tableName); - Path tablePath = AbstractCatalog.newTableLocation(path, identifier); + Path tablePath = CatalogUtils.newTableLocation(path, identifier); Optional tableSchema = new SchemaManager(LocalFileIO.create(), tablePath).latest(); assertThat(tableSchema).isPresent(); @@ -489,7 +491,7 @@ public void testCreateTableFailing() throws Exception { Maps.newHashMap(), ""); Identifier identifier = Identifier.create(DATABASE_TEST, tableName); - Path tablePath = AbstractCatalog.newTableLocation(path, identifier); + Path tablePath = CatalogUtils.newTableLocation(path, identifier); new SchemaManager(LocalFileIO.create(), tablePath).createTable(schema); String hiveSql = @@ -533,7 +535,7 @@ public void testCreateTableFailing() throws Exception { } catch (Exception ignore) { } finally { Identifier identifier = Identifier.create(DATABASE_TEST, tableName); - Path tablePath = AbstractCatalog.newTableLocation(path, identifier); + Path tablePath = CatalogUtils.newTableLocation(path, identifier); boolean isPresent = new SchemaManager(LocalFileIO.create(), tablePath).latest().isPresent(); Assertions.assertThat(isPresent).isFalse(); diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveLocationTest.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveLocationTest.java index f3fe03fbba6d..7e52b892791b 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveLocationTest.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveLocationTest.java @@ -18,8 +18,8 @@ package org.apache.paimon.hive; -import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogUtils; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; @@ -257,7 +257,7 @@ public void testRWIT() { Identifier identifier = Identifier.create(dbName, tableName); String location = - AbstractCatalog.newTableLocation(warehouse, identifier).toUri().toString(); + CatalogUtils.newTableLocation(warehouse, identifier).toUri().toString(); String createTableSqlStr = getCreateTableSqlStr(tableName, location, locationInProperties); diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveReadITCaseBase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveReadITCaseBase.java index 4b16788ee716..882215f7c0cd 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveReadITCaseBase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveReadITCaseBase.java @@ -19,7 +19,7 @@ package org.apache.paimon.hive; import org.apache.paimon.CoreOptions; -import org.apache.paimon.catalog.AbstractCatalog; +import org.apache.paimon.catalog.CatalogUtils; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; @@ -975,7 +975,7 @@ public void testReadExternalTableWithEmptyDataAndIgnoreCase() throws Exception { Maps.newHashMap(), ""); Identifier identifier = Identifier.create(DATABASE_TEST, tableName); - Path tablePath = AbstractCatalog.newTableLocation(path, identifier); + Path tablePath = CatalogUtils.newTableLocation(path, identifier); new SchemaManager(LocalFileIO.create(), tablePath).createTable(schema); // Create hive external table @@ -1057,7 +1057,7 @@ public void testReadExternalTableWithDataAndIgnoreCase() throws Exception { commit.close(); // add column, do some ddl which will generate a new version schema-n file. - Path tablePath = AbstractCatalog.newTableLocation(path, identifier); + Path tablePath = CatalogUtils.newTableLocation(path, identifier); SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), tablePath); schemaManager.commitChanges(SchemaChange.addColumn("N1", DataTypes.STRING())); diff --git a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java index 5331b65d71b6..21a5ec7a5fec 100644 --- a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java +++ b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java @@ -21,13 +21,17 @@ import org.apache.paimon.rest.ResourcePaths; import org.apache.paimon.rest.requests.AlterDatabaseRequest; import org.apache.paimon.rest.requests.CreateDatabaseRequest; +import org.apache.paimon.rest.requests.CreateTableRequest; +import org.apache.paimon.rest.requests.UpdateTableRequest; import org.apache.paimon.rest.responses.AlterDatabaseResponse; import org.apache.paimon.rest.responses.ConfigResponse; import org.apache.paimon.rest.responses.CreateDatabaseResponse; -import org.apache.paimon.rest.responses.DatabaseName; import org.apache.paimon.rest.responses.ErrorResponse; import org.apache.paimon.rest.responses.GetDatabaseResponse; +import org.apache.paimon.rest.responses.GetTableResponse; import org.apache.paimon.rest.responses.ListDatabasesResponse; +import org.apache.paimon.rest.responses.ListTablesResponse; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; @@ -48,7 +52,7 @@ import java.util.HashMap; import java.util.Map; -/** * RESTCatalog management APIs. */ +/** RESTCatalog management APIs. */ @CrossOrigin(origins = "http://localhost:8081") @RestController public class RESTCatalogController { @@ -86,7 +90,7 @@ public ConfigResponse getConfig() { }) @GetMapping("/v1/{prefix}/databases") public ListDatabasesResponse listDatabases(@PathVariable String prefix) { - return new ListDatabasesResponse(ImmutableList.of(new DatabaseName("account"))); + return new ListDatabasesResponse(ImmutableList.of("account")); } @Operation( @@ -181,4 +185,134 @@ public AlterDatabaseResponse alterDatabase( Lists.newArrayList("add"), Lists.newArrayList("missing")); } + + @Operation( + summary = "List tables", + tags = {"table"}) + @ApiResponses({ + @ApiResponse( + responseCode = "200", + content = {@Content(schema = @Schema(implementation = ListTablesResponse.class))}), + @ApiResponse( + responseCode = "500", + content = {@Content(schema = @Schema())}) + }) + @GetMapping("/v1/{prefix}/databases/{database}") + public ListTablesResponse listTables( + @PathVariable String prefix, @PathVariable String database) { + return new ListTablesResponse(ImmutableList.of("user")); + } + + @Operation( + summary = "Get table", + tags = {"table"}) + @ApiResponses({ + @ApiResponse( + responseCode = "200", + content = {@Content(schema = @Schema(implementation = ListTablesResponse.class))}), + @ApiResponse( + responseCode = "404", + description = "Resource not found", + content = {@Content(schema = @Schema(implementation = ErrorResponse.class))}), + @ApiResponse( + responseCode = "500", + content = {@Content(schema = @Schema())}) + }) + @GetMapping("/v1/{prefix}/databases/{database}/tables/{table}") + public GetTableResponse getTable( + @PathVariable String prefix, + @PathVariable String database, + @PathVariable String table) { + return new GetTableResponse( + "location", + new TableSchema( + 1, + 1, + ImmutableList.of(), + 1, + ImmutableList.of(), + ImmutableList.of(), + new HashMap<>(), + "comment", + 1L)); + } + + @Operation( + summary = "Create table", + tags = {"table"}) + @ApiResponses({ + @ApiResponse( + responseCode = "200", + content = {@Content(schema = @Schema(implementation = GetTableResponse.class))}), + @ApiResponse( + responseCode = "500", + content = {@Content(schema = @Schema())}) + }) + @PostMapping("/v1/{prefix}/databases/{database}/tables") + public GetTableResponse createTable( + @PathVariable String prefix, + @PathVariable String database, + @RequestBody CreateTableRequest request) { + return new GetTableResponse( + "location", + new TableSchema( + 1, + 1, + ImmutableList.of(), + 1, + ImmutableList.of(), + ImmutableList.of(), + new HashMap<>(), + "comment", + 1L)); + } + + @Operation( + summary = "Update table", + tags = {"table"}) + @ApiResponses({ + @ApiResponse( + responseCode = "200", + content = {@Content(schema = @Schema(implementation = GetTableResponse.class))}), + @ApiResponse( + responseCode = "500", + content = {@Content(schema = @Schema())}) + }) + @PostMapping("/v1/{prefix}/databases/{database}/tables/table") + public GetTableResponse updateTable( + @PathVariable String prefix, + @PathVariable String database, + @PathVariable String table, + @RequestBody UpdateTableRequest request) { + return new GetTableResponse( + "location", + new TableSchema( + 1, + 1, + ImmutableList.of(), + 1, + ImmutableList.of(), + ImmutableList.of(), + new HashMap<>(), + "comment", + 1L)); + } + + @Operation( + summary = "Update table", + tags = {"table"}) + @ApiResponses({ + @ApiResponse( + responseCode = "404", + description = "Resource not found", + content = {@Content(schema = @Schema(implementation = ErrorResponse.class))}), + @ApiResponse( + responseCode = "500", + content = {@Content(schema = @Schema())}) + }) + @DeleteMapping("/v1/{prefix}/databases/{database}/tables/table") + public void dropTable( + @PathVariable String prefix, + @PathVariable String database, + @PathVariable String table) {} } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index f32b87603f44..8482a9107735 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -279,7 +279,9 @@ public org.apache.spark.sql.connector.catalog.Table alterTable( return loadTable(ident); } catch (Catalog.TableNotExistException e) { throw new NoSuchTableException(ident); - } catch (Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException e) { + } catch (Catalog.ColumnAlreadyExistException + | Catalog.ColumnNotExistException + | Catalog.TableNoPermissionException e) { throw new RuntimeException(e); } } @@ -432,6 +434,8 @@ public void renameTable(Identifier oldIdent, Identifier newIdent) throw new NoSuchTableException(oldIdent); } catch (Catalog.TableAlreadyExistException e) { throw new TableAlreadyExistsException(newIdent); + } catch (Catalog.TableNoPermissionException e) { + throw new RuntimeException(String.format("Table %s no permission", oldIdent)); } } @@ -450,6 +454,8 @@ protected org.apache.spark.sql.connector.catalog.Table loadSparkTable( } } catch (Catalog.TableNotExistException e) { throw new NoSuchTableException(ident); + } catch (Catalog.TableNoPermissionException e) { + throw new RuntimeException(e); } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java index 95d55df01178..576c733144ff 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MigrateFileProcedure.java @@ -93,6 +93,8 @@ public InternalRow[] call(InternalRow args) { } catch (Catalog.TableNotExistException e) { throw new IllegalArgumentException( "Target paimon table does not exist: " + targetTable); + } catch (Catalog.TableNoPermissionException e) { + throw new IllegalArgumentException("Target paimon table no permission: " + targetTable); } try {