Skip to content

Commit

Permalink
[core] Identifier can now recognize branch and system tables (#3862)
Browse files Browse the repository at this point in the history
  • Loading branch information
tsreaper authored Aug 1, 2024
1 parent eda5c40 commit 7df8880
Show file tree
Hide file tree
Showing 15 changed files with 243 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ public static String randomNumericString(int len) {
* @return an array of parsed Strings, {@code null} if null String input
*/
public static String[] split(final String str, final String separatorChars) {
return splitWorker(str, separatorChars, -1, false);
return split(str, separatorChars, -1, false);
}

/**
Expand All @@ -388,7 +388,7 @@ public static String[] split(final String str, final String separatorChars) {
* separators; if {@code false}, adjacent separators are treated as one separator.
* @return an array of parsed Strings, {@code null} if null String input
*/
private static String[] splitWorker(
public static String[] split(
final String str,
final String separatorChars,
final int max,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.StringUtils;
import org.apache.paimon.utils.Preconditions;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -156,6 +155,7 @@ protected abstract Map<String, String> loadDatabasePropertiesImpl(String name)
@Override
public void dropPartition(Identifier identifier, Map<String, String> partitionSpec)
throws TableNotExistException {
checkNotSystemTable(identifier, "dropPartition");
Table table = getTable(identifier);
FileStoreTable fileStoreTable = (FileStoreTable) table;
try (FileStoreCommit commit =
Expand All @@ -181,7 +181,7 @@ public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade
throw new DatabaseNotExistException(name);
}

if (!cascade && listTables(name).size() > 0) {
if (!cascade && !listTables(name).isEmpty()) {
throw new DatabaseNotEmptyException(name);
}

Expand Down Expand Up @@ -282,26 +282,17 @@ public void alterTable(
validateIdentifierNameCaseInsensitive(identifier);
validateFieldNameCaseInsensitiveInSchemaChange(changes);

Optional<Pair<Identifier, String>> optionalBranchName =
getOriginalIdentifierAndBranch(identifier);
String branchName = DEFAULT_MAIN_BRANCH;
if (optionalBranchName.isPresent()) {
identifier = optionalBranchName.get().getLeft();
branchName = optionalBranchName.get().getRight();
}

if (!tableExists(identifier)) {
if (ignoreIfNotExists) {
return;
}
throw new TableNotExistException(identifier);
}

alterTableImpl(identifier, branchName, changes);
alterTableImpl(identifier, changes);
}

protected abstract void alterTableImpl(
Identifier identifier, String branchName, List<SchemaChange> changes)
protected abstract void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
throws TableNotExistException, ColumnAlreadyExistException, ColumnNotExistException;

@Nullable
Expand All @@ -317,7 +308,7 @@ private LineageMetaFactory findAndCreateLineageMeta(Options options, ClassLoader
@Override
public Table getTable(Identifier identifier) throws TableNotExistException {
if (isSystemDatabase(identifier.getDatabaseName())) {
String tableName = identifier.getObjectName();
String tableName = identifier.getTableName();
Table table =
SystemTableLoader.loadGlobal(
tableName,
Expand All @@ -330,12 +321,17 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
}
return table;
} else if (isSpecifiedSystemTable(identifier)) {
String[] splits = tableAndSystemName(identifier);
String tableName = splits[0];
String type = splits[1];
FileStoreTable originTable =
getDataTable(new Identifier(identifier.getDatabaseName(), tableName));
Table table = SystemTableLoader.load(type, originTable);
getDataTable(
new Identifier(
identifier.getDatabaseName(),
identifier.getTableName(),
identifier.getBranchName(),
null));
Table table =
SystemTableLoader.load(
Preconditions.checkNotNull(identifier.getSystemTableName()),
originTable);
if (table == null) {
throw new TableNotExistException(identifier);
}
Expand All @@ -346,15 +342,8 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
}

private FileStoreTable getDataTable(Identifier identifier) throws TableNotExistException {
Optional<Pair<Identifier, String>> optionalBranchName =
getOriginalIdentifierAndBranch(identifier);
String branch = DEFAULT_MAIN_BRANCH;
if (optionalBranchName.isPresent()) {
identifier = optionalBranchName.get().getLeft();
branch = optionalBranchName.get().getRight();
}

TableSchema tableSchema = getDataTableSchema(identifier, branch);
Preconditions.checkArgument(identifier.getSystemTableName() == null);
TableSchema tableSchema = getDataTableSchema(identifier);
return FileStoreTableFactory.create(
fileIO,
getDataTableLocation(identifier),
Expand Down Expand Up @@ -394,35 +383,16 @@ public Map<String, Map<String, Path>> allTablePaths() {
}
}

protected abstract TableSchema getDataTableSchema(Identifier identifier, String branchName)
protected abstract TableSchema getDataTableSchema(Identifier identifier)
throws TableNotExistException;

@VisibleForTesting
public Path getDataTableLocation(Identifier identifier) {
return new Path(newDatabasePath(identifier.getDatabaseName()), identifier.getObjectName());
}

private static Optional<Pair<Identifier, String>> getOriginalIdentifierAndBranch(
Identifier identifier) {
String tableName = identifier.getObjectName();
if (tableName.contains(BRANCH_PREFIX)) {
int idx = tableName.indexOf(BRANCH_PREFIX);
String branchName = tableName.substring(idx + BRANCH_PREFIX.length());
if (StringUtils.isNullOrWhitespaceOnly(branchName)) {
return Optional.empty();
} else {
return Optional.of(
Pair.of(
Identifier.create(
identifier.getDatabaseName(), tableName.substring(0, idx)),
branchName));
}
}
return Optional.empty();
return new Path(newDatabasePath(identifier.getDatabaseName()), identifier.getTableName());
}

protected void checkNotBranch(Identifier identifier, String method) {
if (getOriginalIdentifierAndBranch(identifier).isPresent()) {
protected static void checkNotBranch(Identifier identifier, String method) {
if (identifier.getBranchName() != null) {
throw new IllegalArgumentException(
String.format(
"Cannot '%s' for branch table '%s', "
Expand All @@ -431,23 +401,23 @@ protected void checkNotBranch(Identifier identifier, String method) {
}
}

protected void assertMainBranch(String branchName) {
if (!DEFAULT_MAIN_BRANCH.equals(branchName)) {
protected void assertMainBranch(Identifier identifier) {
if (identifier.getBranchName() != null
&& !DEFAULT_MAIN_BRANCH.equals(identifier.getBranchName())) {
throw new UnsupportedOperationException(
this.getClass().getName() + " currently does not support table branches");
}
}

public static boolean isSpecifiedSystemTable(Identifier identifier) {
return identifier.getObjectName().contains(SYSTEM_TABLE_SPLITTER)
&& !getOriginalIdentifierAndBranch(identifier).isPresent();
return identifier.getSystemTableName() != null;
}

protected static boolean isSystemTable(Identifier identifier) {
return isSystemDatabase(identifier.getDatabaseName()) || isSpecifiedSystemTable(identifier);
}

protected void checkNotSystemTable(Identifier identifier, String method) {
protected static void checkNotSystemTable(Identifier identifier, String method) {
if (isSystemTable(identifier)) {
throw new IllegalArgumentException(
String.format(
Expand All @@ -460,26 +430,12 @@ private void copyTableDefaultOptions(Map<String, String> options) {
tableDefaultOptions.forEach(options::putIfAbsent);
}

public static String[] tableAndSystemName(Identifier identifier) {
String[] splits = StringUtils.split(identifier.getObjectName(), SYSTEM_TABLE_SPLITTER);
if (splits.length != 2) {
throw new IllegalArgumentException(
"System table can only contain one '$' separator, but this is: "
+ identifier.getObjectName());
}
return splits;
}

public static Path newTableLocation(String warehouse, Identifier identifier) {
if (isSpecifiedSystemTable(identifier)) {
throw new IllegalArgumentException(
String.format(
"Table name[%s] cannot contain '%s' separator",
identifier.getObjectName(), SYSTEM_TABLE_SPLITTER));
}
checkNotBranch(identifier, "newTableLocation");
checkNotSystemTable(identifier, "newTableLocation");
return new Path(
newDatabasePath(warehouse, identifier.getDatabaseName()),
identifier.getObjectName());
identifier.getTableName());
}

public static Path newDatabasePath(String warehouse, String database) {
Expand Down Expand Up @@ -548,18 +504,29 @@ protected List<String> listDatabasesInFileSystem(Path warehouse) throws IOExcept
protected List<String> listTablesInFileSystem(Path databasePath) throws IOException {
List<String> tables = new ArrayList<>();
for (FileStatus status : fileIO.listDirectories(databasePath)) {
if (status.isDir() && tableExistsInFileSystem(status.getPath())) {
if (status.isDir() && tableExistsInFileSystem(status.getPath(), DEFAULT_MAIN_BRANCH)) {
tables.add(status.getPath().getName());
}
}
return tables;
}

protected boolean tableExistsInFileSystem(Path tablePath) {
return !new SchemaManager(fileIO, tablePath).listAllIds().isEmpty();
protected boolean tableExistsInFileSystem(Path tablePath, String branchName) {
return !new SchemaManager(fileIO, tablePath, branchName).listAllIds().isEmpty();
}

public Optional<TableSchema> tableSchemaInFileSystem(Path tablePath) {
return new SchemaManager(fileIO, tablePath).latest();
public Optional<TableSchema> tableSchemaInFileSystem(Path tablePath, String branchName) {
return new SchemaManager(fileIO, tablePath, branchName)
.latest()
.map(
s -> {
if (!DEFAULT_MAIN_BRANCH.equals(branchName)) {
Options branchOptions = new Options(s.options());
branchOptions.set(CoreOptions.BRANCH, branchName);
return s.copy(branchOptions.toMap());
} else {
return s;
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.Preconditions;

import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine;
Expand All @@ -39,7 +40,6 @@
import java.util.Map;

import static org.apache.paimon.catalog.AbstractCatalog.isSpecifiedSystemTable;
import static org.apache.paimon.catalog.AbstractCatalog.tableAndSystemName;
import static org.apache.paimon.options.CatalogOptions.CACHE_EXPIRATION_INTERVAL_MS;
import static org.apache.paimon.table.system.SystemTableLoader.SYSTEM_TABLES;

Expand Down Expand Up @@ -142,18 +142,21 @@ public Table getTable(Identifier identifier) throws TableNotExistException {
}

if (isSpecifiedSystemTable(identifier)) {
String[] splits = tableAndSystemName(identifier);
String tableName = splits[0];
String type = splits[1];

Identifier originIdentifier =
Identifier.create(identifier.getDatabaseName(), tableName);
new Identifier(
identifier.getDatabaseName(),
identifier.getTableName(),
identifier.getBranchName(),
null);
Table originTable = tableCache.getIfPresent(originIdentifier);
if (originTable == null) {
originTable = wrapped.getTable(originIdentifier);
tableCache.put(originIdentifier, originTable);
}
table = SystemTableLoader.load(type, (FileStoreTable) originTable);
table =
SystemTableLoader.load(
Preconditions.checkNotNull(identifier.getSystemTableName()),
(FileStoreTable) originTable);
if (table == null) {
throw new TableNotExistException(identifier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ public interface Catalog extends AutoCloseable {

String DEFAULT_DATABASE = "default";

String BRANCH_PREFIX = "$branch_";
String SYSTEM_TABLE_SPLITTER = "$";
String SYSTEM_DATABASE_NAME = "sys";
String SYSTEM_BRANCH_PREFIX = "branch_";
String COMMENT_PROP = "comment";
String TABLE_DEFAULT_OPTION_PREFIX = "table-default.";
String DB_LOCATION_PROP = "location";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@ public static String warehouse(String path) {
}

public static String database(Path path) {
return SchemaManager.fromPath(path.toString(), false).getDatabaseName();
return SchemaManager.identifierFromPath(path.toString(), false).getDatabaseName();
}

public static String database(String path) {
return SchemaManager.fromPath(path, false).getDatabaseName();
return SchemaManager.identifierFromPath(path, false).getDatabaseName();
}

public static String table(Path path) {
return SchemaManager.fromPath(path.toString(), false).getObjectName();
return SchemaManager.identifierFromPath(path.toString(), false).getObjectName();
}

public static String table(String path) {
return SchemaManager.fromPath(path, false).getObjectName();
return SchemaManager.identifierFromPath(path, false).getObjectName();
}
}
Loading

0 comments on commit 7df8880

Please sign in to comment.