diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index fff593aabb62..d3a8d628a2bc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -19,6 +19,7 @@ package org.apache.paimon.catalog; import org.apache.paimon.CoreOptions; +import org.apache.paimon.TableType; import org.apache.paimon.factories.FactoryUtil; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; @@ -66,6 +67,7 @@ import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.paimon.utils.Preconditions.checkNotNull; /** Common implementation of {@link Catalog}. */ public abstract class AbstractCatalog implements Catalog { @@ -430,17 +432,39 @@ public Table getTable(Identifier identifier) throws TableNotExistException { protected Table getDataOrFormatTable(Identifier identifier) throws TableNotExistException { Preconditions.checkArgument(identifier.getSystemTableName() == null); TableMeta tableMeta = getDataTableMeta(identifier); - return FileStoreTableFactory.create( - fileIO, - getTableLocation(identifier), - tableMeta.schema, - new CatalogEnvironment( - identifier, - tableMeta.uuid, - Lock.factory( - lockFactory().orElse(null), lockContext().orElse(null), identifier), - metastoreClientFactory(identifier, tableMeta.schema).orElse(null), - lineageMetaFactory)); + FileStoreTable table = + FileStoreTableFactory.create( + fileIO, + getTableLocation(identifier), + tableMeta.schema, + new CatalogEnvironment( + identifier, + tableMeta.uuid, + Lock.factory( + lockFactory().orElse(null), + lockContext().orElse(null), + identifier), + metastoreClientFactory(identifier, tableMeta.schema).orElse(null), + lineageMetaFactory)); + CoreOptions options = table.coreOptions(); + if (options.type() == TableType.OBJECT_TABLE) { + String objectLocation = options.objectLocation(); + checkNotNull(objectLocation, "Object location should not be null for object table."); + table = + ObjectTable.builder() + .underlyingTable(table) + .objectLocation(objectLocation) + .objectFileIO(objectFileIO(objectLocation)) + .build(); + } + return table; + } + + /** + * Catalog implementation may override this method to provide {@link FileIO} to object table. + */ + protected FileIO objectFileIO(String objectLocation) { + return fileIO; } /** diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java index c9b9c21937be..2e88213a24b9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java @@ -127,7 +127,7 @@ public void alterTable( public Table getTable(Identifier identifier) throws TableNotExistException { Table table = wrapped.getTable(identifier); if (table instanceof FileStoreTable) { - return new PrivilegedFileStoreTable( + return PrivilegedFileStoreTable.wrap( (FileStoreTable) table, privilegeManager.getPrivilegeChecker(), identifier); } else { return table; diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java index 37990ed5a1f3..52c806c7c53b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStoreTable.java @@ -27,6 +27,7 @@ import org.apache.paimon.table.DelegatedFileStoreTable; import org.apache.paimon.table.ExpireSnapshots; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.object.ObjectTable; import org.apache.paimon.table.query.LocalTableQuery; import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.table.sink.TableWriteImpl; @@ -48,10 +49,10 @@ /** {@link FileStoreTable} with privilege checks. */ public class PrivilegedFileStoreTable extends DelegatedFileStoreTable { - private final PrivilegeChecker privilegeChecker; - private final Identifier identifier; + protected final PrivilegeChecker privilegeChecker; + protected final Identifier identifier; - public PrivilegedFileStoreTable( + protected PrivilegedFileStoreTable( FileStoreTable wrapped, PrivilegeChecker privilegeChecker, Identifier identifier) { super(wrapped); this.privilegeChecker = privilegeChecker; @@ -106,18 +107,6 @@ public Optional statistics() { return wrapped.statistics(); } - @Override - public FileStoreTable copy(Map dynamicOptions) { - return new PrivilegedFileStoreTable( - wrapped.copy(dynamicOptions), privilegeChecker, identifier); - } - - @Override - public FileStoreTable copy(TableSchema newTableSchema) { - return new PrivilegedFileStoreTable( - wrapped.copy(newTableSchema), privilegeChecker, identifier); - } - @Override public void rollbackTo(long snapshotId) { privilegeChecker.assertCanInsert(identifier); @@ -202,18 +191,6 @@ public ExpireSnapshots newExpireChangelog() { return wrapped.newExpireChangelog(); } - @Override - public FileStoreTable copyWithoutTimeTravel(Map dynamicOptions) { - return new PrivilegedFileStoreTable( - wrapped.copyWithoutTimeTravel(dynamicOptions), privilegeChecker, identifier); - } - - @Override - public FileStoreTable copyWithLatestSchema() { - return new PrivilegedFileStoreTable( - wrapped.copyWithLatestSchema(), privilegeChecker, identifier); - } - @Override public DataTableScan newScan() { privilegeChecker.assertCanSelect(identifier); @@ -262,11 +239,7 @@ public LocalTableQuery newLocalTableQuery() { return wrapped.newLocalTableQuery(); } - @Override - public FileStoreTable switchToBranch(String branchName) { - return new PrivilegedFileStoreTable( - wrapped.switchToBranch(branchName), privilegeChecker, identifier); - } + // ======================= equals ============================ @Override public boolean equals(Object o) { @@ -281,4 +254,45 @@ public boolean equals(Object o) { && Objects.equals(privilegeChecker, that.privilegeChecker) && Objects.equals(identifier, that.identifier); } + + // ======================= copy ============================ + + @Override + public PrivilegedFileStoreTable copy(Map dynamicOptions) { + return new PrivilegedFileStoreTable( + wrapped.copy(dynamicOptions), privilegeChecker, identifier); + } + + @Override + public PrivilegedFileStoreTable copy(TableSchema newTableSchema) { + return new PrivilegedFileStoreTable( + wrapped.copy(newTableSchema), privilegeChecker, identifier); + } + + @Override + public PrivilegedFileStoreTable copyWithoutTimeTravel(Map dynamicOptions) { + return new PrivilegedFileStoreTable( + wrapped.copyWithoutTimeTravel(dynamicOptions), privilegeChecker, identifier); + } + + @Override + public PrivilegedFileStoreTable copyWithLatestSchema() { + return new PrivilegedFileStoreTable( + wrapped.copyWithLatestSchema(), privilegeChecker, identifier); + } + + @Override + public PrivilegedFileStoreTable switchToBranch(String branchName) { + return new PrivilegedFileStoreTable( + wrapped.switchToBranch(branchName), privilegeChecker, identifier); + } + + public static PrivilegedFileStoreTable wrap( + FileStoreTable table, PrivilegeChecker privilegeChecker, Identifier identifier) { + if (table instanceof ObjectTable) { + return new PrivilegedObjectTable((ObjectTable) table, privilegeChecker, identifier); + } else { + return new PrivilegedFileStoreTable(table, privilegeChecker, identifier); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedObjectTable.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedObjectTable.java new file mode 100644 index 000000000000..c5a319c1fedd --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedObjectTable.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.privilege; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.object.ObjectTable; + +import java.util.Map; + +/** A {@link PrivilegedFileStoreTable} for {@link ObjectTable}. */ +public class PrivilegedObjectTable extends PrivilegedFileStoreTable implements ObjectTable { + + private final ObjectTable objectTable; + + protected PrivilegedObjectTable( + ObjectTable wrapped, PrivilegeChecker privilegeChecker, Identifier identifier) { + super(wrapped, privilegeChecker, identifier); + this.objectTable = wrapped; + } + + @Override + public String objectLocation() { + return objectTable.objectLocation(); + } + + @Override + public FileStoreTable underlyingTable() { + return objectTable.underlyingTable(); + } + + @Override + public FileIO objectFileIO() { + return objectTable.objectFileIO(); + } + + @Override + public long refresh() { + privilegeChecker.assertCanInsert(identifier); + return objectTable.refresh(); + } + + // ======================= copy ============================ + + @Override + public PrivilegedObjectTable copy(Map dynamicOptions) { + return new PrivilegedObjectTable( + objectTable.copy(dynamicOptions), privilegeChecker, identifier); + } + + @Override + public PrivilegedObjectTable copy(TableSchema newTableSchema) { + return new PrivilegedObjectTable( + objectTable.copy(newTableSchema), privilegeChecker, identifier); + } + + @Override + public PrivilegedObjectTable copyWithoutTimeTravel(Map dynamicOptions) { + return new PrivilegedObjectTable( + objectTable.copyWithoutTimeTravel(dynamicOptions), privilegeChecker, identifier); + } + + @Override + public PrivilegedObjectTable copyWithLatestSchema() { + return new PrivilegedObjectTable( + objectTable.copyWithLatestSchema(), privilegeChecker, identifier); + } + + @Override + public PrivilegedObjectTable switchToBranch(String branchName) { + return new PrivilegedObjectTable( + objectTable.switchToBranch(branchName), privilegeChecker, identifier); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java index 47d8777241d6..423dc1726319 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java @@ -19,14 +19,12 @@ package org.apache.paimon.table; import org.apache.paimon.CoreOptions; -import org.apache.paimon.TableType; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.options.Options; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.object.ObjectTable; import org.apache.paimon.utils.StringUtils; import java.io.IOException; @@ -35,7 +33,6 @@ import static org.apache.paimon.CoreOptions.PATH; import static org.apache.paimon.utils.Preconditions.checkArgument; -import static org.apache.paimon.utils.Preconditions.checkNotNull; /** Factory to create {@link FileStoreTable}. */ public class FileStoreTableFactory { @@ -127,17 +124,6 @@ public static FileStoreTable createWithoutFallbackBranch( fileIO, tablePath, tableSchema, catalogEnvironment) : new PrimaryKeyFileStoreTable( fileIO, tablePath, tableSchema, catalogEnvironment); - table = table.copy(dynamicOptions.toMap()); - CoreOptions options = table.coreOptions(); - if (options.type() == TableType.OBJECT_TABLE) { - String objectLocation = options.objectLocation(); - checkNotNull(objectLocation, "Object location should not be null for object table."); - table = - ObjectTable.builder() - .underlyingTable(table) - .objectLocation(objectLocation) - .build(); - } - return table; + return table.copy(dynamicOptions.toMap()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java index 326efbc0eac8..b1be840c5153 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectRefresh.java @@ -26,7 +26,6 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; -import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.BatchTableWrite; import org.apache.paimon.table.sink.BatchWriteBuilder; @@ -41,13 +40,14 @@ public class ObjectRefresh { public static long refresh(ObjectTable table) throws Exception { String location = table.objectLocation(); - FileStoreTable underlyingTable = table.underlyingTable(); - FileIO fileIO = underlyingTable.fileIO(); + // 1. collect all files for object table List fileCollector = new ArrayList<>(); - listAllFiles(fileIO, new Path(location), fileCollector); + listAllFiles(table.objectFileIO(), new Path(location), fileCollector); - BatchWriteBuilder writeBuilder = underlyingTable.newBatchWriteBuilder().withOverwrite(); + // 2. write to underlying table + BatchWriteBuilder writeBuilder = + table.underlyingTable().newBatchWriteBuilder().withOverwrite(); try (BatchTableWrite write = writeBuilder.newWrite(); BatchTableCommit commit = writeBuilder.newCommit()) { for (FileStatus file : fileCollector) { @@ -78,6 +78,7 @@ private static void listAllFiles(FileIO fileIO, Path directory, List private static InternalRow toRow(FileStatus file) { return toRow( file.getPath().toString(), + file.getPath().getParent().toString(), file.getPath().getName(), file.getLen(), Timestamp.fromEpochMillis(file.getModificationTime()), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java index 65689108caae..97acfe7299c5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/object/ObjectTable.java @@ -18,6 +18,7 @@ package org.apache.paimon.table.object; +import org.apache.paimon.fs.FileIO; import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.DelegatedFileStoreTable; @@ -46,6 +47,7 @@ public interface ObjectTable extends FileStoreTable { RowType SCHEMA = RowType.builder() .field("path", DataTypes.STRING().notNull()) + .field("parent_path", DataTypes.STRING().notNull()) .field("name", DataTypes.STRING().notNull()) .field("length", DataTypes.BIGINT().notNull()) .field("mtime", DataTypes.TIMESTAMP_LTZ_MILLIS()) @@ -66,11 +68,26 @@ public interface ObjectTable extends FileStoreTable { /** Underlying table to store metadata. */ FileStoreTable underlyingTable(); + /** File io for object file system. */ + FileIO objectFileIO(); + long refresh(); @Override ObjectTable copy(Map dynamicOptions); + @Override + ObjectTable copy(TableSchema newTableSchema); + + @Override + ObjectTable copyWithoutTimeTravel(Map dynamicOptions); + + @Override + ObjectTable copyWithLatestSchema(); + + @Override + ObjectTable switchToBranch(String branchName); + /** Create a new builder for {@link ObjectTable}. */ static ObjectTable.Builder builder() { return new ObjectTable.Builder(); @@ -80,6 +97,7 @@ static ObjectTable.Builder builder() { class Builder { private FileStoreTable underlyingTable; + private FileIO objectFileIO; private String objectLocation; public ObjectTable.Builder underlyingTable(FileStoreTable underlyingTable) { @@ -93,23 +111,31 @@ public ObjectTable.Builder underlyingTable(FileStoreTable underlyingTable) { return this; } + public ObjectTable.Builder objectFileIO(FileIO objectFileIO) { + this.objectFileIO = objectFileIO; + return this; + } + public ObjectTable.Builder objectLocation(String objectLocation) { this.objectLocation = objectLocation; return this; } public ObjectTable build() { - return new ObjectTableImpl(underlyingTable, objectLocation); + return new ObjectTableImpl(underlyingTable, objectFileIO, objectLocation); } } /** An implementation for {@link ObjectTable}. */ class ObjectTableImpl extends DelegatedFileStoreTable implements ObjectTable { + private final FileIO objectFileIO; private final String objectLocation; - public ObjectTableImpl(FileStoreTable underlyingTable, String objectLocation) { + public ObjectTableImpl( + FileStoreTable underlyingTable, FileIO objectFileIO, String objectLocation) { super(underlyingTable); + this.objectFileIO = objectFileIO; this.objectLocation = objectLocation; } @@ -148,6 +174,11 @@ public FileStoreTable underlyingTable() { return wrapped; } + @Override + public FileIO objectFileIO() { + return objectFileIO; + } + @Override public long refresh() { try { @@ -159,28 +190,30 @@ public long refresh() { @Override public ObjectTable copy(Map dynamicOptions) { - return new ObjectTableImpl(wrapped.copy(dynamicOptions), objectLocation); + return new ObjectTableImpl(wrapped.copy(dynamicOptions), objectFileIO, objectLocation); } @Override - public FileStoreTable copy(TableSchema newTableSchema) { - return new ObjectTableImpl(wrapped.copy(newTableSchema), objectLocation); + public ObjectTable copy(TableSchema newTableSchema) { + return new ObjectTableImpl(wrapped.copy(newTableSchema), objectFileIO, objectLocation); } @Override - public FileStoreTable copyWithoutTimeTravel(Map dynamicOptions) { + public ObjectTable copyWithoutTimeTravel(Map dynamicOptions) { return new ObjectTableImpl( - wrapped.copyWithoutTimeTravel(dynamicOptions), objectLocation); + wrapped.copyWithoutTimeTravel(dynamicOptions), objectFileIO, objectLocation); } @Override - public FileStoreTable copyWithLatestSchema() { - return new ObjectTableImpl(wrapped.copyWithLatestSchema(), objectLocation); + public ObjectTable copyWithLatestSchema() { + return new ObjectTableImpl( + wrapped.copyWithLatestSchema(), objectFileIO, objectLocation); } @Override - public FileStoreTable switchToBranch(String branchName) { - return new ObjectTableImpl(wrapped.switchToBranch(branchName), objectLocation); + public ObjectTable switchToBranch(String branchName) { + return new ObjectTableImpl( + wrapped.switchToBranch(branchName), objectFileIO, objectLocation); } } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ObjectTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ObjectTableITCase.java index b9e30035b093..d3ad1d4a52f4 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ObjectTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ObjectTableITCase.java @@ -80,4 +80,31 @@ public void testObjectTableRefresh() throws IOException { .hasMessageContaining("Object table does not support Write."); assertThat(sql("SELECT name, length FROM T")).containsExactlyInAnyOrder(Row.of("f1", 5L)); } + + @Test + public void testObjectTableRefreshInPrivileged() throws IOException { + sql("CALL sys.init_file_based_privilege('root-passwd')"); + + tEnv.executeSql( + String.format( + "CREATE CATALOG rootcat WITH (\n" + + " 'type' = 'paimon',\n" + + " 'warehouse' = '%s',\n" + + " 'user' = 'root',\n" + + " 'password' = 'root-passwd'\n" + + ")", + path)); + tEnv.useCatalog("rootcat"); + + Path objectLocation = new Path(path + "/object-location"); + FileIO fileIO = LocalFileIO.create(); + sql( + "CREATE TABLE T WITH ('type' = 'object-table', 'object-location' = '%s')", + objectLocation); + + // add new file + fileIO.overwriteFileUtf8(new Path(objectLocation, "f0"), "1,2,3"); + sql("CALL sys.refresh_object_table('default.T')"); + assertThat(sql("SELECT name, length FROM T")).containsExactlyInAnyOrder(Row.of("f0", 5L)); + } }