Skip to content

Commit

Permalink
Revert "[hotfix] add column 'already exists' and 'does not exist' exc…
Browse files Browse the repository at this point in the history
…eption type (apache#902)"

This reverts commit b9e6df6
  • Loading branch information
tsreaper committed Apr 17, 2023
1 parent f44345b commit a0a3c65
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 123 deletions.
54 changes: 0 additions & 54 deletions paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -283,58 +283,4 @@ public Identifier identifier() {
return identifier;
}
}

/** Exception for trying to alter a column that already exists. */
class ColumnAlreadyExistException extends Exception {

private static final String MSG = "Column %s already exists in the %s table.";

private final Identifier identifier;
private final String column;

public ColumnAlreadyExistException(Identifier identifier, String column) {
this(identifier, column, null);
}

public ColumnAlreadyExistException(Identifier identifier, String column, Throwable cause) {
super(String.format(MSG, column, identifier.getFullName()), cause);
this.identifier = identifier;
this.column = column;
}

public Identifier identifier() {
return identifier;
}

public String column() {
return column;
}
}

/** Exception for trying to operate on a table that doesn't exist. */
class ColumnNotExistException extends Exception {

private static final String MSG = "Column %s does not exist in the %s table.";

private final Identifier identifier;
private final String column;

public ColumnNotExistException(Identifier identifier, String column) {
this(identifier, column, null);
}

public ColumnNotExistException(Identifier identifier, String column, Throwable cause) {
super(String.format(MSG, column, identifier.getFullName()), cause);
this.identifier = identifier;
this.column = column;
}

public Identifier identifier() {
return identifier;
}

public String column() {
return column;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.casting.CastExecutors;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.Lock;
Expand Down Expand Up @@ -70,14 +68,12 @@ public class SchemaManager implements Serializable {

private final FileIO fileIO;
private final Path tableRoot;
private final Identifier identifier;

@Nullable private transient Lock lock;

public SchemaManager(FileIO fileIO, Path tableRoot) {
this.fileIO = fileIO;
this.tableRoot = tableRoot;
this.identifier = Identifier.fromPath(tableRoot);
}

public SchemaManager withLock(@Nullable Lock lock) {
Expand Down Expand Up @@ -190,7 +186,8 @@ public TableSchema commitChanges(SchemaChange... changes) throws Exception {
public TableSchema commitChanges(List<SchemaChange> changes) throws Exception {
while (true) {
TableSchema schema =
latest().orElseThrow(() -> new Catalog.TableNotExistException(identifier));
latest().orElseThrow(
() -> new RuntimeException("Table not exists: " + tableRoot));
Map<String, String> newOptions = new HashMap<>(schema.options());
List<DataField> newFields = new ArrayList<>(schema.fields());
AtomicInteger highestFieldId = new AtomicInteger(schema.highestFieldId());
Expand All @@ -207,8 +204,10 @@ public TableSchema commitChanges(List<SchemaChange> changes) throws Exception {
AddColumn addColumn = (AddColumn) change;
SchemaChange.Move move = addColumn.move();
if (newFields.stream().anyMatch(f -> f.name().equals(addColumn.fieldName()))) {
throw new Catalog.ColumnAlreadyExistException(
identifier, addColumn.fieldName());
throw new IllegalArgumentException(
String.format(
"The column [%s] exists in the table[%s].",
addColumn.fieldName(), tableRoot));
}
Preconditions.checkArgument(
addColumn.dataType().isNullable(),
Expand Down Expand Up @@ -242,7 +241,10 @@ public TableSchema commitChanges(List<SchemaChange> changes) throws Exception {
RenameColumn rename = (RenameColumn) change;
validateNotPrimaryAndPartitionKey(schema, rename.fieldName());
if (newFields.stream().anyMatch(f -> f.name().equals(rename.newName()))) {
throw new Catalog.ColumnAlreadyExistException(identifier, rename.newName());
throw new IllegalArgumentException(
String.format(
"The column [%s] exists in the table[%s].",
rename.newName(), tableRoot));
}

updateNestedColumn(
Expand All @@ -260,7 +262,10 @@ public TableSchema commitChanges(List<SchemaChange> changes) throws Exception {
validateNotPrimaryAndPartitionKey(schema, drop.fieldName());
if (!newFields.removeIf(
f -> f.name().equals(((DropColumn) change).fieldName()))) {
throw new Catalog.ColumnNotExistException(identifier, drop.fieldName());
throw new IllegalArgumentException(
String.format(
"The column [%s] doesn't exist in the table[%s].",
drop.fieldName(), tableRoot));
}
if (newFields.isEmpty()) {
throw new IllegalArgumentException("Cannot drop all fields in table");
Expand All @@ -270,8 +275,8 @@ public TableSchema commitChanges(List<SchemaChange> changes) throws Exception {
if (schema.partitionKeys().contains(update.fieldName())) {
throw new IllegalArgumentException(
String.format(
"Cannot update partition column [%s] type in the table [%s].",
update.fieldName(), identifier.getFullName()));
"Cannot update partition column [%s] type in the table[%s].",
update.fieldName(), tableRoot));
}
updateColumn(
newFields,
Expand Down Expand Up @@ -398,8 +403,7 @@ private void updateNestedColumn(
List<DataField> newFields,
String[] updateFieldNames,
int index,
Function<DataField, DataField> updateFunc)
throws Catalog.ColumnNotExistException {
Function<DataField, DataField> updateFunc) {
boolean found = false;
for (int i = 0; i < newFields.size(); i++) {
DataField field = newFields.get(i);
Expand All @@ -425,16 +429,14 @@ private void updateNestedColumn(
}
}
if (!found) {
throw new Catalog.ColumnNotExistException(
identifier, Arrays.toString(updateFieldNames));
throw new RuntimeException("Can not find column: " + Arrays.asList(updateFieldNames));
}
}

private void updateColumn(
List<DataField> newFields,
String updateFieldName,
Function<DataField, DataField> updateFunc)
throws Catalog.ColumnNotExistException {
Function<DataField, DataField> updateFunc) {
updateNestedColumn(newFields, new String[] {updateFieldName}, 0, updateFunc);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,16 +439,16 @@ public void testAlterTable() throws Exception {
false))
.withMessage("Table test_db.non_existing_table does not exist.");

// Alter table adds a column throws ColumnAlreadyExistException when column already exists
// Alter table adds a column throws Exception when column already exists
assertThatThrownBy(
() ->
catalog.alterTable(
identifier,
Lists.newArrayList(
SchemaChange.addColumn("col1", DataTypes.INT())),
false))
.hasRootCauseInstanceOf(Catalog.ColumnAlreadyExistException.class)
.hasRootCauseMessage("Column col1 already exists in the test_db.test_table table.");
.hasRootCauseInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("The column [col1] exists in the table");
}

@Test
Expand Down Expand Up @@ -476,20 +476,18 @@ public void testAlterTableRenameColumn() throws Exception {
assertThat(table.rowType().getFieldIndex("col1")).isLessThan(0);
assertThat(table.rowType().getFieldIndex("new_col1")).isEqualTo(0);

// Alter table renames a new column throws ColumnAlreadyExistException when column already
// exists
// Alter table renames a new column throws Exception when column already exists
assertThatThrownBy(
() ->
catalog.alterTable(
identifier,
Lists.newArrayList(
SchemaChange.renameColumn("col1", "new_col1")),
false))
.hasRootCauseInstanceOf(Catalog.ColumnAlreadyExistException.class)
.hasRootCauseMessage(
"Column new_col1 already exists in the test_db.test_table table.");
.hasRootCauseInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("The column [new_col1] exists in the table");

// Alter table renames a column throws ColumnNotExistException when column does not exist
// Alter table renames a column throws Exception when column does not exist
assertThatThrownBy(
() ->
catalog.alterTable(
Expand All @@ -498,9 +496,7 @@ public void testAlterTableRenameColumn() throws Exception {
SchemaChange.renameColumn(
"non_existing_col", "new_col2")),
false))
.hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class)
.hasRootCauseMessage(
"Column [non_existing_col] does not exist in the test_db.test_table table.");
.hasMessageContaining("Can not find column: [non_existing_col]");
}

@Test
Expand Down Expand Up @@ -536,17 +532,15 @@ public void testAlterTableDropColumn() throws Exception {
.hasRootCauseInstanceOf(IllegalArgumentException.class)
.hasMessageContaining(" Cannot drop all fields in table");

// Alter table drop a column ColumnNotExistException when column does not exist
// Alter table drop a column throws Exception when column does not exist
assertThatThrownBy(
() ->
catalog.alterTable(
identifier,
Lists.newArrayList(
SchemaChange.dropColumn("non_existing_col")),
false))
.hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class)
.hasRootCauseMessage(
"Column non_existing_col does not exist in the test_db.test_table table.");
.hasMessageContaining("The column [non_existing_col] doesn't exist in the table");
}

@Test
Expand Down Expand Up @@ -589,8 +583,7 @@ public void testAlterTableUpdateColumnType() throws Exception {
.hasMessageContaining(
"Column type col1[DOUBLE] cannot be converted to STRING without loosing information.");

// Alter table update a column type throws ColumnNotExistException when column does not
// exist
// Alter table update a column type throws Exception when column does not exist
assertThatThrownBy(
() ->
catalog.alterTable(
Expand All @@ -599,9 +592,7 @@ public void testAlterTableUpdateColumnType() throws Exception {
SchemaChange.updateColumnType(
"non_existing_col", DataTypes.INT())),
false))
.hasRootCauseInstanceOf(Catalog.ColumnNotExistException.class)
.hasRootCauseMessage(
"Column [non_existing_col] does not exist in the test_db.test_table table.");
.hasMessageContaining("Can not find column: [non_existing_col]");

// Alter table update a column type throws Exception when column is partition columns
assertThatThrownBy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.paimon.table;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.DataFormatTestUtil;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
Expand Down Expand Up @@ -66,14 +64,12 @@ public class SchemaEvolutionTest {
private Path tablePath;
private SchemaManager schemaManager;
private String commitUser;
private String tableFullName;

@BeforeEach
public void beforeEach() {
tablePath = new Path(tempDir.toUri().toString(), "test_db/test_table");
tablePath = new Path(tempDir.toUri());
schemaManager = new SchemaManager(LocalFileIO.create(), tablePath);
commitUser = UUID.randomUUID().toString();
tableFullName = Identifier.fromPath(tablePath).getFullName();
}

@Test
Expand Down Expand Up @@ -157,8 +153,8 @@ public void testAddDuplicateField() throws Exception {
Collections.singletonList(
SchemaChange.addColumn(
columnName, new FloatType()))))
.isInstanceOf(Catalog.ColumnAlreadyExistException.class)
.hasMessage("Column %s already exists in the %s table.", columnName, tableFullName);
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("The column [%s] exists in the table[%s].", columnName, tablePath);
}

@Test
Expand Down Expand Up @@ -214,10 +210,9 @@ public void testRenameField() throws Exception {
schemaManager.commitChanges(
Collections.singletonList(
SchemaChange.renameColumn("f0", "f1"))))
.isInstanceOf(Catalog.ColumnAlreadyExistException.class)
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
String.format(
"Column %s already exists in the %s table.", "f1", tableFullName));
String.format("The column [%s] exists in the table[%s].", "f1", tablePath));
}

@Test
Expand Down Expand Up @@ -271,11 +266,11 @@ public void testDropAllFields() throws Exception {
() ->
schemaManager.commitChanges(
Collections.singletonList(SchemaChange.dropColumn("f100"))))
.isInstanceOf(Catalog.ColumnNotExistException.class)
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
String.format(
"Column %s does not exist in the %s table.",
"f100", tableFullName));
"The column [%s] doesn't exist in the table[%s].",
"f100", tablePath));

assertThatThrownBy(
() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
Expand Down Expand Up @@ -108,7 +107,7 @@ private void applySchemaChange(SchemaChange schemaChange) throws Exception {
if (schemaChange instanceof SchemaChange.AddColumn) {
try {
schemaManager.commitChanges(schemaChange);
} catch (Catalog.ColumnAlreadyExistException e) {
} catch (IllegalArgumentException e) {
// This is normal. For example when a table is split into multiple database tables,
// all these tables will be added the same column. However schemaManager can't
// handle duplicated column adds, so we just catch the exception and log it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.CatalogUtils;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.FileIO;
Expand Down Expand Up @@ -56,8 +55,6 @@
public class FlinkCdcSyncTableSinkITCase extends AbstractTestBase {

@TempDir java.nio.file.Path tempDir;
private static final String DATABASE_NAME = "test";
private static final String TABLE_NAME = "test_tbl";

@Test
@Timeout(120)
Expand All @@ -72,26 +69,16 @@ public void testRandomCdcEvents() throws Exception {
boolean enableFailure = random.nextBoolean();

TestTable testTable =
new TestTable(TABLE_NAME, numEvents, numSchemaChanges, numPartitions, numKeys);
new TestTable("test_tbl", numEvents, numSchemaChanges, numPartitions, numKeys);

Path tablePath;
FileIO fileIO;
String failingName = UUID.randomUUID().toString();

if (enableFailure) {
tablePath =
new Path(
FailingFileIO.getFailingPath(
failingName,
CatalogUtils.stringifyPath(
tempDir.toString(), DATABASE_NAME, TABLE_NAME)));
tablePath = new Path(FailingFileIO.getFailingPath(failingName, tempDir.toString()));
fileIO = new FailingFileIO();
} else {
tablePath =
new Path(
TraceableFileIO.SCHEME + "://",
CatalogUtils.stringifyPath(
tempDir.toString(), DATABASE_NAME, TABLE_NAME));
tablePath = new Path(TraceableFileIO.SCHEME + "://" + tempDir.toString());
fileIO = LocalFileIO.create();
}

Expand Down

0 comments on commit a0a3c65

Please sign in to comment.