diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java index c8d7931db775..45a828b79694 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java @@ -283,58 +283,4 @@ public Identifier identifier() { return identifier; } } - - /** Exception for trying to alter a column that already exists. */ - class ColumnAlreadyExistException extends Exception { - - private static final String MSG = "Column %s already exists in the %s table."; - - private final Identifier identifier; - private final String column; - - public ColumnAlreadyExistException(Identifier identifier, String column) { - this(identifier, column, null); - } - - public ColumnAlreadyExistException(Identifier identifier, String column, Throwable cause) { - super(String.format(MSG, column, identifier.getFullName()), cause); - this.identifier = identifier; - this.column = column; - } - - public Identifier identifier() { - return identifier; - } - - public String column() { - return column; - } - } - - /** Exception for trying to operate on a table that doesn't exist. */ - class ColumnNotExistException extends Exception { - - private static final String MSG = "Column %s does not exist in the %s table."; - - private final Identifier identifier; - private final String column; - - public ColumnNotExistException(Identifier identifier, String column) { - this(identifier, column, null); - } - - public ColumnNotExistException(Identifier identifier, String column, Throwable cause) { - super(String.format(MSG, column, identifier.getFullName()), cause); - this.identifier = identifier; - this.column = column; - } - - public Identifier identifier() { - return identifier; - } - - public String column() { - return column; - } - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index a9213668b15d..9eb6d349e98e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -21,8 +21,6 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.casting.CastExecutors; -import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.operation.Lock; @@ -70,14 +68,12 @@ public class SchemaManager implements Serializable { private final FileIO fileIO; private final Path tableRoot; - private final Identifier identifier; @Nullable private transient Lock lock; public SchemaManager(FileIO fileIO, Path tableRoot) { this.fileIO = fileIO; this.tableRoot = tableRoot; - this.identifier = Identifier.fromPath(tableRoot); } public SchemaManager withLock(@Nullable Lock lock) { @@ -190,7 +186,8 @@ public TableSchema commitChanges(SchemaChange... changes) throws Exception { public TableSchema commitChanges(List changes) throws Exception { while (true) { TableSchema schema = - latest().orElseThrow(() -> new Catalog.TableNotExistException(identifier)); + latest().orElseThrow( + () -> new RuntimeException("Table not exists: " + tableRoot)); Map newOptions = new HashMap<>(schema.options()); List newFields = new ArrayList<>(schema.fields()); AtomicInteger highestFieldId = new AtomicInteger(schema.highestFieldId()); @@ -207,8 +204,10 @@ public TableSchema commitChanges(List changes) throws Exception { AddColumn addColumn = (AddColumn) change; SchemaChange.Move move = addColumn.move(); if (newFields.stream().anyMatch(f -> f.name().equals(addColumn.fieldName()))) { - throw new Catalog.ColumnAlreadyExistException( - identifier, addColumn.fieldName()); + throw new IllegalArgumentException( + String.format( + "The column [%s] exists in the table[%s].", + addColumn.fieldName(), tableRoot)); } Preconditions.checkArgument( addColumn.dataType().isNullable(), @@ -242,7 +241,10 @@ public TableSchema commitChanges(List changes) throws Exception { RenameColumn rename = (RenameColumn) change; validateNotPrimaryAndPartitionKey(schema, rename.fieldName()); if (newFields.stream().anyMatch(f -> f.name().equals(rename.newName()))) { - throw new Catalog.ColumnAlreadyExistException(identifier, rename.newName()); + throw new IllegalArgumentException( + String.format( + "The column [%s] exists in the table[%s].", + rename.newName(), tableRoot)); } updateNestedColumn( @@ -260,7 +262,10 @@ public TableSchema commitChanges(List changes) throws Exception { validateNotPrimaryAndPartitionKey(schema, drop.fieldName()); if (!newFields.removeIf( f -> f.name().equals(((DropColumn) change).fieldName()))) { - throw new Catalog.ColumnNotExistException(identifier, drop.fieldName()); + throw new IllegalArgumentException( + String.format( + "The column [%s] doesn't exist in the table[%s].", + drop.fieldName(), tableRoot)); } if (newFields.isEmpty()) { throw new IllegalArgumentException("Cannot drop all fields in table"); @@ -270,8 +275,8 @@ public TableSchema commitChanges(List changes) throws Exception { if (schema.partitionKeys().contains(update.fieldName())) { throw new IllegalArgumentException( String.format( - "Cannot update partition column [%s] type in the table [%s].", - update.fieldName(), identifier.getFullName())); + "Cannot update partition column [%s] type in the table[%s].", + update.fieldName(), tableRoot)); } updateColumn( newFields, @@ -398,8 +403,7 @@ private void updateNestedColumn( List newFields, String[] updateFieldNames, int index, - Function updateFunc) - throws Catalog.ColumnNotExistException { + Function updateFunc) { boolean found = false; for (int i = 0; i < newFields.size(); i++) { DataField field = newFields.get(i); @@ -425,16 +429,14 @@ private void updateNestedColumn( } } if (!found) { - throw new Catalog.ColumnNotExistException( - identifier, Arrays.toString(updateFieldNames)); + throw new RuntimeException("Can not find column: " + Arrays.asList(updateFieldNames)); } } private void updateColumn( List newFields, String updateFieldName, - Function updateFunc) - throws Catalog.ColumnNotExistException { + Function updateFunc) { updateNestedColumn(newFields, new String[] {updateFieldName}, 0, updateFunc); } diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java index deb134dec246..321cbbc283a7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java @@ -439,7 +439,7 @@ public void testAlterTable() throws Exception { false)) .withMessage("Table test_db.non_existing_table does not exist."); - // Alter table adds a column throws ColumnAlreadyExistException when column already exists + // Alter table adds a column throws Exception when column already exists assertThatThrownBy( () -> catalog.alterTable( @@ -447,8 +447,8 @@ public void testAlterTable() throws Exception { Lists.newArrayList( SchemaChange.addColumn("col1", DataTypes.INT())), false)) - .hasRootCauseInstanceOf(Catalog.ColumnAlreadyExistException.class) - .hasRootCauseMessage("Column col1 already exists in the test_db.test_table table."); + .hasRootCauseInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("The column [col1] exists in the table"); } @Test @@ -476,8 +476,7 @@ public void testAlterTableRenameColumn() throws Exception { assertThat(table.rowType().getFieldIndex("col1")).isLessThan(0); assertThat(table.rowType().getFieldIndex("new_col1")).isEqualTo(0); - // Alter table renames a new column throws ColumnAlreadyExistException when column already - // exists + // Alter table renames a new column throws Exception when column already exists assertThatThrownBy( () -> catalog.alterTable( @@ -485,11 +484,10 @@ public void testAlterTableRenameColumn() throws Exception { Lists.newArrayList( SchemaChange.renameColumn("col1", "new_col1")), false)) - .hasRootCauseInstanceOf(Catalog.ColumnAlreadyExistException.class) - .hasRootCauseMessage( - "Column new_col1 already exists in the test_db.test_table table."); + .hasRootCauseInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("The column [new_col1] exists in the table"); - // Alter table renames a column throws ColumnNotExistException when column does not exist + // Alter table renames a column throws Exception when column does not exist assertThatThrownBy( () -> catalog.alterTable( @@ -498,9 +496,7 @@ public void testAlterTableRenameColumn() throws Exception { SchemaChange.renameColumn( "non_existing_col", "new_col2")), false)) - .hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class) - .hasRootCauseMessage( - "Column [non_existing_col] does not exist in the test_db.test_table table."); + .hasMessageContaining("Can not find column: [non_existing_col]"); } @Test @@ -536,7 +532,7 @@ public void testAlterTableDropColumn() throws Exception { .hasRootCauseInstanceOf(IllegalArgumentException.class) .hasMessageContaining(" Cannot drop all fields in table"); - // Alter table drop a column ColumnNotExistException when column does not exist + // Alter table drop a column throws Exception when column does not exist assertThatThrownBy( () -> catalog.alterTable( @@ -544,9 +540,7 @@ public void testAlterTableDropColumn() throws Exception { Lists.newArrayList( SchemaChange.dropColumn("non_existing_col")), false)) - .hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class) - .hasRootCauseMessage( - "Column non_existing_col does not exist in the test_db.test_table table."); + .hasMessageContaining("The column [non_existing_col] doesn't exist in the table"); } @Test @@ -589,8 +583,7 @@ public void testAlterTableUpdateColumnType() throws Exception { .hasMessageContaining( "Column type col1[DOUBLE] cannot be converted to STRING without loosing information."); - // Alter table update a column type throws ColumnNotExistException when column does not - // exist + // Alter table update a column type throws Exception when column does not exist assertThatThrownBy( () -> catalog.alterTable( @@ -599,9 +592,7 @@ public void testAlterTableUpdateColumnType() throws Exception { SchemaChange.updateColumnType( "non_existing_col", DataTypes.INT())), false)) - .hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class) - .hasRootCauseMessage( - "Column [non_existing_col] does not exist in the test_db.test_table table."); + .hasMessageContaining("Can not find column: [non_existing_col]"); // Alter table update a column type throws Exception when column is partition columns assertThatThrownBy( diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java index ad6f514cce6b..f2578c15c7c1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java @@ -18,8 +18,6 @@ package org.apache.paimon.table; -import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.DataFormatTestUtil; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; @@ -66,14 +64,12 @@ public class SchemaEvolutionTest { private Path tablePath; private SchemaManager schemaManager; private String commitUser; - private String tableFullName; @BeforeEach public void beforeEach() { - tablePath = new Path(tempDir.toUri().toString(), "test_db/test_table"); + tablePath = new Path(tempDir.toUri()); schemaManager = new SchemaManager(LocalFileIO.create(), tablePath); commitUser = UUID.randomUUID().toString(); - tableFullName = Identifier.fromPath(tablePath).getFullName(); } @Test @@ -157,8 +153,8 @@ public void testAddDuplicateField() throws Exception { Collections.singletonList( SchemaChange.addColumn( columnName, new FloatType())))) - .isInstanceOf(Catalog.ColumnAlreadyExistException.class) - .hasMessage("Column %s already exists in the %s table.", columnName, tableFullName); + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("The column [%s] exists in the table[%s].", columnName, tablePath); } @Test @@ -214,10 +210,9 @@ public void testRenameField() throws Exception { schemaManager.commitChanges( Collections.singletonList( SchemaChange.renameColumn("f0", "f1")))) - .isInstanceOf(Catalog.ColumnAlreadyExistException.class) + .isInstanceOf(IllegalArgumentException.class) .hasMessage( - String.format( - "Column %s already exists in the %s table.", "f1", tableFullName)); + String.format("The column [%s] exists in the table[%s].", "f1", tablePath)); } @Test @@ -271,11 +266,11 @@ public void testDropAllFields() throws Exception { () -> schemaManager.commitChanges( Collections.singletonList(SchemaChange.dropColumn("f100")))) - .isInstanceOf(Catalog.ColumnNotExistException.class) + .isInstanceOf(IllegalArgumentException.class) .hasMessage( String.format( - "Column %s does not exist in the %s table.", - "f100", tableFullName)); + "The column [%s] doesn't exist in the table[%s].", + "f100", tablePath)); assertThatThrownBy( () -> diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java index cce1ee7e1a55..44f748dfeb2f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/UpdatedDataFieldsProcessFunction.java @@ -18,7 +18,6 @@ package org.apache.paimon.flink.sink.cdc; -import org.apache.paimon.catalog.Catalog; import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; @@ -108,7 +107,7 @@ private void applySchemaChange(SchemaChange schemaChange) throws Exception { if (schemaChange instanceof SchemaChange.AddColumn) { try { schemaManager.commitChanges(schemaChange); - } catch (Catalog.ColumnAlreadyExistException e) { + } catch (IllegalArgumentException e) { // This is normal. For example when a table is split into multiple database tables, // all these tables will be added the same column. However schemaManager can't // handle duplicated column adds, so we just catch the exception and log it. diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java index 7d132b4da066..45d6b34e515c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/cdc/FlinkCdcSyncTableSinkITCase.java @@ -19,7 +19,6 @@ package org.apache.paimon.flink.sink.cdc; import org.apache.paimon.CoreOptions; -import org.apache.paimon.catalog.CatalogUtils; import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.paimon.fs.FileIO; @@ -56,8 +55,6 @@ public class FlinkCdcSyncTableSinkITCase extends AbstractTestBase { @TempDir java.nio.file.Path tempDir; - private static final String DATABASE_NAME = "test"; - private static final String TABLE_NAME = "test_tbl"; @Test @Timeout(120) @@ -72,26 +69,16 @@ public void testRandomCdcEvents() throws Exception { boolean enableFailure = random.nextBoolean(); TestTable testTable = - new TestTable(TABLE_NAME, numEvents, numSchemaChanges, numPartitions, numKeys); + new TestTable("test_tbl", numEvents, numSchemaChanges, numPartitions, numKeys); Path tablePath; FileIO fileIO; String failingName = UUID.randomUUID().toString(); - if (enableFailure) { - tablePath = - new Path( - FailingFileIO.getFailingPath( - failingName, - CatalogUtils.stringifyPath( - tempDir.toString(), DATABASE_NAME, TABLE_NAME))); + tablePath = new Path(FailingFileIO.getFailingPath(failingName, tempDir.toString())); fileIO = new FailingFileIO(); } else { - tablePath = - new Path( - TraceableFileIO.SCHEME + "://", - CatalogUtils.stringifyPath( - tempDir.toString(), DATABASE_NAME, TABLE_NAME)); + tablePath = new Path(TraceableFileIO.SCHEME + "://" + tempDir.toString()); fileIO = LocalFileIO.create(); }