Skip to content

Commit

Permalink
[core] Rename Catalog.getDataTableLocation to Catalog.getTableLocation
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Aug 5, 2024
1 parent b3f532c commit b02acc1
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.catalog;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
Expand Down Expand Up @@ -346,7 +345,7 @@ private FileStoreTable getDataTable(Identifier identifier) throws TableNotExistE
TableSchema tableSchema = getDataTableSchema(identifier);
return FileStoreTableFactory.create(
fileIO,
getDataTableLocation(identifier),
getTableLocation(identifier),
tableSchema,
new CatalogEnvironment(
identifier,
Expand Down Expand Up @@ -374,7 +373,7 @@ public Map<String, Map<String, Path>> allTablePaths() {
Map<String, Path> tableMap =
allPaths.computeIfAbsent(database, d -> new HashMap<>());
for (String table : listTables(database)) {
tableMap.put(table, getDataTableLocation(Identifier.create(database, table)));
tableMap.put(table, getTableLocation(Identifier.create(database, table)));
}
}
return allPaths;
Expand All @@ -386,8 +385,8 @@ public Map<String, Map<String, Path>> allTablePaths() {
protected abstract TableSchema getDataTableSchema(Identifier identifier)
throws TableNotExistException;

@VisibleForTesting
public Path getDataTableLocation(Identifier identifier) {
@Override
public Path getTableLocation(Identifier identifier) {
return new Path(newDatabasePath(identifier.getDatabaseName()), identifier.getTableName());
}

Expand Down
15 changes: 8 additions & 7 deletions paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,14 @@ void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
*/
Table getTable(Identifier identifier) throws TableNotExistException;

/**
* Get the table location in this catalog. If the table exists, return the location of the
* table; If the table does not exist, construct the location for table.
*
* @return the table location
*/
Path getTableLocation(Identifier identifier);

/**
* Get names of all tables under this database. An empty list is returned if none exists.
*
Expand All @@ -170,13 +178,6 @@ void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade)
*/
List<String> listTables(String databaseName) throws DatabaseNotExistException;

/**
* Get the table location in this catalog.
*
* @return the table location
*/
Path getDataTableLocation(Identifier identifier);

/**
* Check if a table exists in this catalog.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
}

@Override
public Path getDataTableLocation(Identifier identifier) {
return wrapped.getDataTableLocation(identifier);
public Path getTableLocation(Identifier identifier) {
return wrapped.getTableLocation(identifier);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,19 +99,19 @@ public boolean tableExists(Identifier identifier) {
}

return tableExistsInFileSystem(
getDataTableLocation(identifier), identifier.getBranchNameOrDefault());
getTableLocation(identifier), identifier.getBranchNameOrDefault());
}

@Override
public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExistException {
return tableSchemaInFileSystem(
getDataTableLocation(identifier), identifier.getBranchNameOrDefault())
getTableLocation(identifier), identifier.getBranchNameOrDefault())
.orElseThrow(() -> new TableNotExistException(identifier));
}

@Override
protected void dropTableImpl(Identifier identifier) {
Path path = getDataTableLocation(identifier);
Path path = getTableLocation(identifier);
uncheck(() -> fileIO.delete(path, true));
}

Expand All @@ -121,7 +121,7 @@ public void createTableImpl(Identifier identifier, Schema schema) {
}

private SchemaManager schemaManager(Identifier identifier) {
Path path = getDataTableLocation(identifier);
Path path = getTableLocation(identifier);
CatalogLock catalogLock =
lockFactory().map(fac -> fac.createLock(assertGetLockContext())).orElse(null);
return new SchemaManager(fileIO, path, identifier.getBranchNameOrDefault())
Expand All @@ -135,8 +135,8 @@ private CatalogLockContext assertGetLockContext() {

@Override
public void renameTableImpl(Identifier fromTable, Identifier toTable) {
Path fromPath = getDataTableLocation(fromTable);
Path toPath = getDataTableLocation(toTable);
Path fromPath = getTableLocation(fromTable);
Path toPath = getTableLocation(toTable);
uncheck(() -> fileIO.rename(fromPath, toPath));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ protected void dropTableImpl(Identifier identifier) {
LOG.info("Skipping drop, table does not exist: {}", identifier);
return;
}
Path path = getDataTableLocation(identifier);
Path path = getTableLocation(identifier);
try {
if (fileIO.exists(path)) {
fileIO.deleteDirectoryQuietly(path);
Expand All @@ -239,7 +239,7 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
// create table file
getSchemaManager(identifier).createTable(schema);
// Update schema metadata
Path path = getDataTableLocation(identifier);
Path path = getTableLocation(identifier);
int insertRecord =
connections.run(
conn -> {
Expand Down Expand Up @@ -276,11 +276,11 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
// update table metadata info
updateTable(connections, catalogKey, fromTable, toTable);

Path fromPath = getDataTableLocation(fromTable);
Path fromPath = getTableLocation(fromTable);
if (!new SchemaManager(fileIO, fromPath).listAllIds().isEmpty()) {
// Rename the file system's table directory. Maintain consistency between tables in
// the file system and tables in the Hive Metastore.
Path toPath = getDataTableLocation(toTable);
Path toPath = getTableLocation(toTable);
try {
fileIO.rename(fromPath, toPath);
} catch (IOException e) {
Expand Down Expand Up @@ -311,7 +311,7 @@ protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotE
connections, catalogKey, identifier.getDatabaseName(), identifier.getTableName())) {
throw new TableNotExistException(identifier);
}
Path tableLocation = getDataTableLocation(identifier);
Path tableLocation = getTableLocation(identifier);
return new SchemaManager(fileIO, tableLocation)
.latest()
.orElseThrow(
Expand Down Expand Up @@ -354,8 +354,7 @@ public void close() throws Exception {
}

private SchemaManager getSchemaManager(Identifier identifier) {
return new SchemaManager(fileIO, getDataTableLocation(identifier))
.withLock(lock(identifier));
return new SchemaManager(fileIO, getTableLocation(identifier)).withLock(lock(identifier));
}

private Map<String, String> fetchProperties(String databaseName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ public void processElement(StreamRecord<CloneFileInfo> streamRecord) throws Exce
FileIO sourceTableFileIO = sourceCatalog.fileIO();
FileIO targetTableFileIO = targetCatalog.fileIO();
Path sourceTableRootPath =
sourceCatalog.getDataTableLocation(
sourceCatalog.getTableLocation(
Identifier.fromString(cloneFileInfo.getSourceIdentifier()));
Path targetTableRootPath =
targetCatalog.getDataTableLocation(
targetCatalog.getTableLocation(
Identifier.fromString(cloneFileInfo.getTargetIdentifier()));

String filePathExcludeTableRoot = cloneFileInfo.getFilePathExcludeTableRoot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public Optional<MetastoreClient.Factory> metastoreClientFactory(Identifier ident
}

@Override
public Path getDataTableLocation(Identifier identifier) {
public Path getTableLocation(Identifier identifier) {
try {
String databaseName = identifier.getDatabaseName();
String tableName = identifier.getTableName();
Expand All @@ -209,7 +209,7 @@ public Path getDataTableLocation(Identifier identifier) {
}
return Optional.empty();
});
return tablePath.orElse(super.getDataTableLocation(identifier));
return tablePath.orElse(super.getTableLocation(identifier));
} catch (TException e) {
throw new RuntimeException("Can not get table " + identifier + " from metastore.", e);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -379,8 +379,7 @@ public boolean tableExists(Identifier identifier) {

return isPaimonTable(table)
&& tableSchemaInFileSystem(
getDataTableLocation(identifier),
identifier.getBranchNameOrDefault())
getTableLocation(identifier), identifier.getBranchNameOrDefault())
.isPresent();
}

Expand All @@ -398,7 +397,7 @@ public TableSchema getDataTableSchema(Identifier identifier) throws TableNotExis
}

return tableSchemaInFileSystem(
getDataTableLocation(identifier), identifier.getBranchNameOrDefault())
getTableLocation(identifier), identifier.getBranchNameOrDefault())
.orElseThrow(() -> new TableNotExistException(identifier));
}

Expand Down Expand Up @@ -431,7 +430,7 @@ protected void dropTableImpl(Identifier identifier) {
// Deletes table directory to avoid schema in filesystem exists after dropping hive
// table successfully to keep the table consistency between which in filesystem and
// which in Hive metastore.
Path path = getDataTableLocation(identifier);
Path path = getTableLocation(identifier);
try {
if (fileIO.exists(path)) {
fileIO.deleteDirectoryQuietly(path);
Expand Down Expand Up @@ -466,7 +465,7 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
try {
clients.execute(client -> client.createTable(createHiveTable(identifier, tableSchema)));
} catch (Exception e) {
Path path = getDataTableLocation(identifier);
Path path = getTableLocation(identifier);
try {
fileIO.deleteDirectoryQuietly(path);
} catch (Exception ee) {
Expand Down Expand Up @@ -495,11 +494,11 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
table.setTableName(toTable.getTableName());
clients.execute(client -> client.alter_table(fromDB, fromTableName, table));

Path fromPath = getDataTableLocation(fromTable);
Path fromPath = getTableLocation(fromTable);
if (!new SchemaManager(fileIO, fromPath).listAllIds().isEmpty()) {
// Rename the file system's table directory. Maintain consistency between tables in
// the file system and tables in the Hive Metastore.
Path toPath = getDataTableLocation(toTable);
Path toPath = getTableLocation(toTable);
try {
fileIO.rename(fromPath, toPath);
} catch (IOException e) {
Expand Down Expand Up @@ -616,8 +615,7 @@ public void repairTable(Identifier identifier) throws TableNotExistException {

TableSchema tableSchema =
tableSchemaInFileSystem(
getDataTableLocation(identifier),
identifier.getBranchNameOrDefault())
getTableLocation(identifier), identifier.getBranchNameOrDefault())
.orElseThrow(() -> new TableNotExistException(identifier));
Table newTable = createHiveTable(identifier, tableSchema);
try {
Expand Down Expand Up @@ -754,7 +752,7 @@ private void updateHmsTable(Table table, Identifier identifier, TableSchema sche
}

// update location
locationHelper.specifyTableLocation(table, getDataTableLocation(identifier).toString());
locationHelper.specifyTableLocation(table, getTableLocation(identifier).toString());
}

private void updateHmsTablePars(Table table, TableSchema schema) {
Expand All @@ -779,9 +777,7 @@ private FieldSchema convertToFieldSchema(DataField dataField) {

private SchemaManager schemaManager(Identifier identifier) {
return new SchemaManager(
fileIO,
getDataTableLocation(identifier),
identifier.getBranchNameOrDefault())
fileIO, getTableLocation(identifier), identifier.getBranchNameOrDefault())
.withLock(lock(identifier));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void testReadWriteTableWithBitmapIndex() throws Catalog.TableNotExistExce

protected void foreachIndexReader(Consumer<FileIndexReader> consumer)
throws Catalog.TableNotExistException {
Path tableRoot = fileSystemCatalog.getDataTableLocation(Identifier.create("db", "T"));
Path tableRoot = fileSystemCatalog.getTableLocation(Identifier.create("db", "T"));
SchemaManager schemaManager = new SchemaManager(fileIO, tableRoot);
FileStorePathFactory pathFactory =
new FileStorePathFactory(
Expand Down

0 comments on commit b02acc1

Please sign in to comment.