Skip to content

Commit

Permalink
[core] Improve object table for fileIO, Privileged and parent_path
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Nov 22, 2024
1 parent c907544 commit 19398a6
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.catalog;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.TableType;
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
Expand Down Expand Up @@ -66,6 +67,7 @@
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** Common implementation of {@link Catalog}. */
public abstract class AbstractCatalog implements Catalog {
Expand Down Expand Up @@ -430,17 +432,39 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
protected Table getDataOrFormatTable(Identifier identifier) throws TableNotExistException {
Preconditions.checkArgument(identifier.getSystemTableName() == null);
TableMeta tableMeta = getDataTableMeta(identifier);
return FileStoreTableFactory.create(
fileIO,
getTableLocation(identifier),
tableMeta.schema,
new CatalogEnvironment(
identifier,
tableMeta.uuid,
Lock.factory(
lockFactory().orElse(null), lockContext().orElse(null), identifier),
metastoreClientFactory(identifier, tableMeta.schema).orElse(null),
lineageMetaFactory));
FileStoreTable table =
FileStoreTableFactory.create(
fileIO,
getTableLocation(identifier),
tableMeta.schema,
new CatalogEnvironment(
identifier,
tableMeta.uuid,
Lock.factory(
lockFactory().orElse(null),
lockContext().orElse(null),
identifier),
metastoreClientFactory(identifier, tableMeta.schema).orElse(null),
lineageMetaFactory));
CoreOptions options = table.coreOptions();
if (options.type() == TableType.OBJECT_TABLE) {
String objectLocation = options.objectLocation();
checkNotNull(objectLocation, "Object location should not be null for object table.");
table =
ObjectTable.builder()
.underlyingTable(table)
.objectLocation(objectLocation)
.objectFileIO(objectFileIO(objectLocation))
.build();
}
return table;
}

/**
* Catalog implementation may override this method to provide {@link FileIO} to object table.
*/
protected FileIO objectFileIO(String objectLocation) {
return fileIO;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void alterTable(
public Table getTable(Identifier identifier) throws TableNotExistException {
Table table = wrapped.getTable(identifier);
if (table instanceof FileStoreTable) {
return new PrivilegedFileStoreTable(
return PrivilegedFileStoreTable.wrap(
(FileStoreTable) table, privilegeManager.getPrivilegeChecker(), identifier);
} else {
return table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.table.DelegatedFileStoreTable;
import org.apache.paimon.table.ExpireSnapshots;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.object.ObjectTable;
import org.apache.paimon.table.query.LocalTableQuery;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
Expand All @@ -48,10 +49,10 @@
/** {@link FileStoreTable} with privilege checks. */
public class PrivilegedFileStoreTable extends DelegatedFileStoreTable {

private final PrivilegeChecker privilegeChecker;
private final Identifier identifier;
protected final PrivilegeChecker privilegeChecker;
protected final Identifier identifier;

public PrivilegedFileStoreTable(
protected PrivilegedFileStoreTable(
FileStoreTable wrapped, PrivilegeChecker privilegeChecker, Identifier identifier) {
super(wrapped);
this.privilegeChecker = privilegeChecker;
Expand Down Expand Up @@ -106,18 +107,6 @@ public Optional<Statistics> statistics() {
return wrapped.statistics();
}

@Override
public FileStoreTable copy(Map<String, String> dynamicOptions) {
return new PrivilegedFileStoreTable(
wrapped.copy(dynamicOptions), privilegeChecker, identifier);
}

@Override
public FileStoreTable copy(TableSchema newTableSchema) {
return new PrivilegedFileStoreTable(
wrapped.copy(newTableSchema), privilegeChecker, identifier);
}

@Override
public void rollbackTo(long snapshotId) {
privilegeChecker.assertCanInsert(identifier);
Expand Down Expand Up @@ -202,18 +191,6 @@ public ExpireSnapshots newExpireChangelog() {
return wrapped.newExpireChangelog();
}

@Override
public FileStoreTable copyWithoutTimeTravel(Map<String, String> dynamicOptions) {
return new PrivilegedFileStoreTable(
wrapped.copyWithoutTimeTravel(dynamicOptions), privilegeChecker, identifier);
}

@Override
public FileStoreTable copyWithLatestSchema() {
return new PrivilegedFileStoreTable(
wrapped.copyWithLatestSchema(), privilegeChecker, identifier);
}

@Override
public DataTableScan newScan() {
privilegeChecker.assertCanSelect(identifier);
Expand Down Expand Up @@ -262,11 +239,7 @@ public LocalTableQuery newLocalTableQuery() {
return wrapped.newLocalTableQuery();
}

@Override
public FileStoreTable switchToBranch(String branchName) {
return new PrivilegedFileStoreTable(
wrapped.switchToBranch(branchName), privilegeChecker, identifier);
}
// ======================= equals ============================

@Override
public boolean equals(Object o) {
Expand All @@ -281,4 +254,45 @@ public boolean equals(Object o) {
&& Objects.equals(privilegeChecker, that.privilegeChecker)
&& Objects.equals(identifier, that.identifier);
}

// ======================= copy ============================

@Override
public PrivilegedFileStoreTable copy(Map<String, String> dynamicOptions) {
return new PrivilegedFileStoreTable(
wrapped.copy(dynamicOptions), privilegeChecker, identifier);
}

@Override
public PrivilegedFileStoreTable copy(TableSchema newTableSchema) {
return new PrivilegedFileStoreTable(
wrapped.copy(newTableSchema), privilegeChecker, identifier);
}

@Override
public PrivilegedFileStoreTable copyWithoutTimeTravel(Map<String, String> dynamicOptions) {
return new PrivilegedFileStoreTable(
wrapped.copyWithoutTimeTravel(dynamicOptions), privilegeChecker, identifier);
}

@Override
public PrivilegedFileStoreTable copyWithLatestSchema() {
return new PrivilegedFileStoreTable(
wrapped.copyWithLatestSchema(), privilegeChecker, identifier);
}

@Override
public PrivilegedFileStoreTable switchToBranch(String branchName) {
return new PrivilegedFileStoreTable(
wrapped.switchToBranch(branchName), privilegeChecker, identifier);
}

public static PrivilegedFileStoreTable wrap(
FileStoreTable table, PrivilegeChecker privilegeChecker, Identifier identifier) {
if (table instanceof ObjectTable) {
return new PrivilegedObjectTable((ObjectTable) table, privilegeChecker, identifier);
} else {
return new PrivilegedFileStoreTable(table, privilegeChecker, identifier);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.privilege;

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.object.ObjectTable;

import java.util.Map;

/** A {@link PrivilegedFileStoreTable} for {@link ObjectTable}. */
public class PrivilegedObjectTable extends PrivilegedFileStoreTable implements ObjectTable {

private final ObjectTable objectTable;

protected PrivilegedObjectTable(
ObjectTable wrapped, PrivilegeChecker privilegeChecker, Identifier identifier) {
super(wrapped, privilegeChecker, identifier);
this.objectTable = wrapped;
}

@Override
public String objectLocation() {
return objectTable.objectLocation();
}

@Override
public FileStoreTable underlyingTable() {
return objectTable.underlyingTable();
}

@Override
public FileIO objectFileIO() {
return objectTable.objectFileIO();
}

@Override
public long refresh() {
privilegeChecker.assertCanInsert(identifier);
return objectTable.refresh();
}

// ======================= copy ============================

@Override
public PrivilegedObjectTable copy(Map<String, String> dynamicOptions) {
return new PrivilegedObjectTable(
objectTable.copy(dynamicOptions), privilegeChecker, identifier);
}

@Override
public PrivilegedObjectTable copy(TableSchema newTableSchema) {
return new PrivilegedObjectTable(
objectTable.copy(newTableSchema), privilegeChecker, identifier);
}

@Override
public PrivilegedObjectTable copyWithoutTimeTravel(Map<String, String> dynamicOptions) {
return new PrivilegedObjectTable(
objectTable.copyWithoutTimeTravel(dynamicOptions), privilegeChecker, identifier);
}

@Override
public PrivilegedObjectTable copyWithLatestSchema() {
return new PrivilegedObjectTable(
objectTable.copyWithLatestSchema(), privilegeChecker, identifier);
}

@Override
public PrivilegedObjectTable switchToBranch(String branchName) {
return new PrivilegedObjectTable(
objectTable.switchToBranch(branchName), privilegeChecker, identifier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@
package org.apache.paimon.table;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.TableType;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.object.ObjectTable;
import org.apache.paimon.utils.StringUtils;

import java.io.IOException;
Expand All @@ -35,7 +33,6 @@

import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** Factory to create {@link FileStoreTable}. */
public class FileStoreTableFactory {
Expand Down Expand Up @@ -127,17 +124,6 @@ public static FileStoreTable createWithoutFallbackBranch(
fileIO, tablePath, tableSchema, catalogEnvironment)
: new PrimaryKeyFileStoreTable(
fileIO, tablePath, tableSchema, catalogEnvironment);
table = table.copy(dynamicOptions.toMap());
CoreOptions options = table.coreOptions();
if (options.type() == TableType.OBJECT_TABLE) {
String objectLocation = options.objectLocation();
checkNotNull(objectLocation, "Object location should not be null for object table.");
table =
ObjectTable.builder()
.underlyingTable(table)
.objectLocation(objectLocation)
.build();
}
return table;
return table.copy(dynamicOptions.toMap());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
Expand All @@ -41,13 +40,14 @@ public class ObjectRefresh {

public static long refresh(ObjectTable table) throws Exception {
String location = table.objectLocation();
FileStoreTable underlyingTable = table.underlyingTable();
FileIO fileIO = underlyingTable.fileIO();

// 1. collect all files for object table
List<FileStatus> fileCollector = new ArrayList<>();
listAllFiles(fileIO, new Path(location), fileCollector);
listAllFiles(table.objectFileIO(), new Path(location), fileCollector);

BatchWriteBuilder writeBuilder = underlyingTable.newBatchWriteBuilder().withOverwrite();
// 2. write to underlying table
BatchWriteBuilder writeBuilder =
table.underlyingTable().newBatchWriteBuilder().withOverwrite();
try (BatchTableWrite write = writeBuilder.newWrite();
BatchTableCommit commit = writeBuilder.newCommit()) {
for (FileStatus file : fileCollector) {
Expand Down Expand Up @@ -78,6 +78,7 @@ private static void listAllFiles(FileIO fileIO, Path directory, List<FileStatus>
private static InternalRow toRow(FileStatus file) {
return toRow(
file.getPath().toString(),
file.getPath().getParent().toString(),
file.getPath().getName(),
file.getLen(),
Timestamp.fromEpochMillis(file.getModificationTime()),
Expand Down
Loading

0 comments on commit 19398a6

Please sign in to comment.