From 5e9906617c216470ba7ab927cb67a4c4356debc9 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Thu, 31 Oct 2024 21:39:32 +0800 Subject: [PATCH] [core] Validate table before commit in SchemaManager --- .../apache/paimon/catalog/AbstractCatalog.java | 7 ++++++- .../paimon/catalog/FileSystemCatalog.java | 5 ++--- .../org/apache/paimon/jdbc/JdbcCatalog.java | 6 ++---- .../apache/paimon/schema/SchemaManager.java | 18 +++--------------- 4 files changed, 13 insertions(+), 23 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java index d3eef7edddfb..1b390bc66ad2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java @@ -387,7 +387,12 @@ protected Table getDataOrFormatTable(Identifier identifier) throws TableNotExist fileIO, getTableLocation(identifier), getDataTableSchema(identifier), - catalogEnvironment(identifier)); + new CatalogEnvironment( + identifier, + Lock.factory( + lockFactory().orElse(null), lockContext().orElse(null), identifier), + metastoreClientFactory(identifier).orElse(null), + lineageMetaFactory)); } protected CatalogEnvironment catalogEnvironment(Identifier identifier) diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java index 92e20810002f..450f78873dc7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java @@ -127,13 +127,12 @@ public void createTableImpl(Identifier identifier, Schema schema) { uncheck(() -> schemaManager(identifier).createTable(schema)); } - private SchemaManager schemaManager(Identifier identifier) throws TableNotExistException { + private SchemaManager schemaManager(Identifier identifier) { Path path = getTableLocation(identifier); CatalogLock catalogLock = lockFactory().map(fac -> fac.createLock(assertGetLockContext())).orElse(null); return new SchemaManager(fileIO, path, identifier.getBranchNameOrDefault()) - .withLock(catalogLock == null ? null : Lock.fromCatalog(catalogLock, identifier)) - .withCatalogEnvironment(catalogEnvironment(identifier)); + .withLock(catalogLock == null ? null : Lock.fromCatalog(catalogLock, identifier)); } private CatalogLockContext assertGetLockContext() { diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index 7612b9a9efc9..e52bc9d6b173 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -351,10 +351,8 @@ public void close() throws Exception { connections.close(); } - private SchemaManager getSchemaManager(Identifier identifier) throws TableNotExistException { - return new SchemaManager(fileIO, getTableLocation(identifier)) - .withLock(lock(identifier)) - .withCatalogEnvironment(catalogEnvironment(identifier)); + private SchemaManager getSchemaManager(Identifier identifier) { + return new SchemaManager(fileIO, getTableLocation(identifier)).withLock(lock(identifier)); } private Map fetchProperties(String databaseName) { diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index d166a782be42..d1efbcfe1e6c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -36,7 +36,6 @@ import org.apache.paimon.schema.SchemaChange.UpdateColumnPosition; import org.apache.paimon.schema.SchemaChange.UpdateColumnType; import org.apache.paimon.schema.SchemaChange.UpdateComment; -import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.table.FileStoreTableFactory; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; @@ -92,7 +91,6 @@ public class SchemaManager implements Serializable { private final Path tableRoot; @Nullable private transient Lock lock; - private transient CatalogEnvironment catalogEnvironment = CatalogEnvironment.empty(); private final String branch; @@ -116,12 +114,6 @@ public SchemaManager withLock(@Nullable Lock lock) { return this; } - public SchemaManager withCatalogEnvironment(@Nullable CatalogEnvironment catalogEnvironment) { - this.catalogEnvironment = - catalogEnvironment == null ? CatalogEnvironment.empty() : catalogEnvironment; - return this; - } - public Optional latest() { try { return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX) @@ -229,15 +221,11 @@ public TableSchema createTable(Schema schema, boolean ignoreIfExistsSame) throws options, schema.comment()); + // validate table from creating table + FileStoreTableFactory.create(fileIO, tableRoot, newSchema).store(); + boolean success = commit(newSchema); if (success) { - try { - FileStoreTableFactory.create(fileIO, tableRoot, newSchema, catalogEnvironment) - .store(); - } catch (Exception e) { - fileIO.deleteQuietly(tableRoot); - throw new RuntimeException("create table failed", e); - } return newSchema; } }