Skip to content

Commit

Permalink
[core] Extract loadTable in CatalogUtils (apache#4904)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Jan 14, 2025
1 parent 5de4d95 commit 107dfa0
Show file tree
Hide file tree
Showing 15 changed files with 261 additions and 307 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ public synchronized Options removePrefix(String prefix) {
return new Options(convertToPropertiesPrefixKey(data, prefix));
}

public synchronized void remove(String key) {
data.remove(key);
public synchronized String remove(String key) {
return data.remove(key);
}

public synchronized void remove(ConfigOption<?> option) {
Expand Down
122 changes: 20 additions & 102 deletions paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.catalog;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.TableType;
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
Expand All @@ -32,18 +31,13 @@
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.object.ObjectTable;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -55,21 +49,19 @@
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.OBJECT_LOCATION;
import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.catalog.CatalogUtils.buildFormatTableByTableSchema;
import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
import static org.apache.paimon.catalog.CatalogUtils.getTableType;
import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSystem;
import static org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** Common implementation of {@link Catalog}. */
public abstract class AbstractCatalog implements Catalog {
Expand Down Expand Up @@ -100,6 +92,11 @@ public FileIO fileIO() {
return fileIO;
}

@Override
public FileIO fileIO(Path path) {
return fileIO;
}

public Optional<CatalogLockFactory> lockFactory() {
if (!lockEnabled()) {
return Optional.empty();
Expand Down Expand Up @@ -370,67 +367,9 @@ protected abstract void alterTableImpl(Identifier identifier, List<SchemaChange>

@Override
public Table getTable(Identifier identifier) throws TableNotExistException {
if (isSystemDatabase(identifier.getDatabaseName())) {
return CatalogUtils.createGlobalSystemTable(identifier.getTableName(), this);
} else if (identifier.isSystemTable()) {
Table originTable =
getDataOrFormatTable(
new Identifier(
identifier.getDatabaseName(),
identifier.getTableName(),
identifier.getBranchName(),
null));
return CatalogUtils.createSystemTable(identifier, originTable);
} else {
return getDataOrFormatTable(identifier);
}
}

// hive override this method.
protected Table getDataOrFormatTable(Identifier identifier) throws TableNotExistException {
Preconditions.checkArgument(identifier.getSystemTableName() == null);
TableMeta tableMeta = getDataTableMeta(identifier);
TableType tableType = getTableType(tableMeta.schema().options());
if (tableType == TableType.FORMAT_TABLE) {
TableSchema schema = tableMeta.schema();
return buildFormatTableByTableSchema(
identifier,
schema.options(),
schema.logicalRowType(),
schema.partitionKeys(),
schema.comment());
}
FileStoreTable table =
FileStoreTableFactory.create(
fileIO,
getTableLocation(identifier),
tableMeta.schema,
new CatalogEnvironment(
identifier,
tableMeta.uuid,
Lock.factory(
lockFactory().orElse(null),
lockContext().orElse(null),
identifier),
catalogLoader()));
if (tableType == TableType.OBJECT_TABLE) {
String objectLocation = table.coreOptions().objectLocation();
checkNotNull(objectLocation, "Object location should not be null for object table.");
table =
ObjectTable.builder()
.underlyingTable(table)
.objectLocation(objectLocation)
.objectFileIO(objectFileIO(objectLocation))
.build();
}
return table;
}

/**
* Catalog implementation may override this method to provide {@link FileIO} to object table.
*/
protected FileIO objectFileIO(String objectLocation) {
return fileIO;
Lock.Factory lockFactory =
Lock.factory(lockFactory().orElse(null), lockContext().orElse(null), identifier);
return CatalogUtils.loadTable(this, identifier, this::loadTableMetadata, lockFactory);
}

/**
Expand All @@ -455,11 +394,11 @@ public Path newDatabasePath(String database) {
return newDatabasePath(warehouse(), database);
}

protected TableMeta getDataTableMeta(Identifier identifier) throws TableNotExistException {
return new TableMeta(getDataTableSchema(identifier), null);
protected TableMetadata loadTableMetadata(Identifier identifier) throws TableNotExistException {
return new TableMetadata(loadTableSchema(identifier), null);
}

protected abstract TableSchema getDataTableSchema(Identifier identifier)
protected abstract TableSchema loadTableSchema(Identifier identifier)
throws TableNotExistException;

public Path getTableLocation(Identifier identifier) {
Expand Down Expand Up @@ -537,38 +476,17 @@ protected boolean tableExistsInFileSystem(Path tablePath, String branchName) {
}

public Optional<TableSchema> tableSchemaInFileSystem(Path tablePath, String branchName) {
return new SchemaManager(fileIO, tablePath, branchName)
.latest()
.map(
s -> {
if (!DEFAULT_MAIN_BRANCH.equals(branchName)) {
Optional<TableSchema> schema = new SchemaManager(fileIO, tablePath, branchName).latest();
if (!DEFAULT_MAIN_BRANCH.equals(branchName)) {
schema =
schema.map(
s -> {
Options branchOptions = new Options(s.options());
branchOptions.set(CoreOptions.BRANCH, branchName);
return s.copy(branchOptions.toMap());
} else {
return s;
}
});
}

/** Table metadata. */
protected static class TableMeta {

private final TableSchema schema;
@Nullable private final String uuid;

public TableMeta(TableSchema schema, @Nullable String uuid) {
this.schema = schema;
this.uuid = uuid;
}

public TableSchema schema() {
return schema;
}

@Nullable
public String uuid() {
return uuid;
});
}
schema.ifPresent(s -> s.options().put(PATH.key(), tablePath.toString()));
return schema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.annotation.Public;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
Expand Down Expand Up @@ -374,6 +375,9 @@ default void repairTable(Identifier identifier) throws TableNotExistException {
/** {@link FileIO} of this catalog. It can access {@link #warehouse()} path. */
FileIO fileIO();

/** {@link FileIO} of this catalog. */
FileIO fileIO(Path path);

/** Catalog options for re-creating this catalog. */
Map<String, String> options();

Expand Down
Loading

0 comments on commit 107dfa0

Please sign in to comment.