Skip to content

Commit

Permalink
[core] Validate table before commit in SchemaManager
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Oct 31, 2024
1 parent 27e1ba3 commit 5e99066
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,12 @@ protected Table getDataOrFormatTable(Identifier identifier) throws TableNotExist
fileIO,
getTableLocation(identifier),
getDataTableSchema(identifier),
catalogEnvironment(identifier));
new CatalogEnvironment(
identifier,
Lock.factory(
lockFactory().orElse(null), lockContext().orElse(null), identifier),
metastoreClientFactory(identifier).orElse(null),
lineageMetaFactory));
}

protected CatalogEnvironment catalogEnvironment(Identifier identifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,12 @@ public void createTableImpl(Identifier identifier, Schema schema) {
uncheck(() -> schemaManager(identifier).createTable(schema));
}

private SchemaManager schemaManager(Identifier identifier) throws TableNotExistException {
private SchemaManager schemaManager(Identifier identifier) {
Path path = getTableLocation(identifier);
CatalogLock catalogLock =
lockFactory().map(fac -> fac.createLock(assertGetLockContext())).orElse(null);
return new SchemaManager(fileIO, path, identifier.getBranchNameOrDefault())
.withLock(catalogLock == null ? null : Lock.fromCatalog(catalogLock, identifier))
.withCatalogEnvironment(catalogEnvironment(identifier));
.withLock(catalogLock == null ? null : Lock.fromCatalog(catalogLock, identifier));
}

private CatalogLockContext assertGetLockContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,10 +351,8 @@ public void close() throws Exception {
connections.close();
}

private SchemaManager getSchemaManager(Identifier identifier) throws TableNotExistException {
return new SchemaManager(fileIO, getTableLocation(identifier))
.withLock(lock(identifier))
.withCatalogEnvironment(catalogEnvironment(identifier));
private SchemaManager getSchemaManager(Identifier identifier) {
return new SchemaManager(fileIO, getTableLocation(identifier)).withLock(lock(identifier));
}

private Map<String, String> fetchProperties(String databaseName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.paimon.schema.SchemaChange.UpdateColumnPosition;
import org.apache.paimon.schema.SchemaChange.UpdateColumnType;
import org.apache.paimon.schema.SchemaChange.UpdateComment;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
Expand Down Expand Up @@ -92,7 +91,6 @@ public class SchemaManager implements Serializable {
private final Path tableRoot;

@Nullable private transient Lock lock;
private transient CatalogEnvironment catalogEnvironment = CatalogEnvironment.empty();

private final String branch;

Expand All @@ -116,12 +114,6 @@ public SchemaManager withLock(@Nullable Lock lock) {
return this;
}

public SchemaManager withCatalogEnvironment(@Nullable CatalogEnvironment catalogEnvironment) {
this.catalogEnvironment =
catalogEnvironment == null ? CatalogEnvironment.empty() : catalogEnvironment;
return this;
}

public Optional<TableSchema> latest() {
try {
return listVersionedFiles(fileIO, schemaDirectory(), SCHEMA_PREFIX)
Expand Down Expand Up @@ -229,15 +221,11 @@ public TableSchema createTable(Schema schema, boolean ignoreIfExistsSame) throws
options,
schema.comment());

// validate table from creating table
FileStoreTableFactory.create(fileIO, tableRoot, newSchema).store();

boolean success = commit(newSchema);
if (success) {
try {
FileStoreTableFactory.create(fileIO, tableRoot, newSchema, catalogEnvironment)
.store();
} catch (Exception e) {
fileIO.deleteQuietly(tableRoot);
throw new RuntimeException("create table failed", e);
}
return newSchema;
}
}
Expand Down

0 comments on commit 5e99066

Please sign in to comment.