diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java index 711b09bbe668..20000389dccd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java @@ -411,7 +411,9 @@ class Move implements Serializable { public enum MoveType { FIRST, - AFTER + AFTER, + BEFORE, + LAST } public static Move first(String fieldName) { @@ -422,6 +424,14 @@ public static Move after(String fieldName, String referenceFieldName) { return new Move(fieldName, referenceFieldName, MoveType.AFTER); } + public static Move before(String fieldName, String referenceFieldName) { + return new Move(fieldName, referenceFieldName, MoveType.BEFORE); + } + + public static Move last(String fieldName) { + return new Move(fieldName, null, MoveType.LAST); + } + private static final long serialVersionUID = 1L; private final String fieldName; diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index cb478d7bad28..3ad0cd5bad02 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -339,28 +339,7 @@ public TableSchema commitChanges(List changes) } else if (change instanceof UpdateColumnPosition) { UpdateColumnPosition update = (UpdateColumnPosition) change; SchemaChange.Move move = update.move(); - - // key: name ; value : index - Map map = new HashMap<>(); - for (int i = 0; i < newFields.size(); i++) { - map.put(newFields.get(i).name(), i); - } - - int fieldIndex = map.get(move.fieldName()); - int refIndex = 0; - if (move.type().equals(SchemaChange.Move.MoveType.FIRST)) { - checkMoveIndexEqual(move, fieldIndex, refIndex); - newFields.add(refIndex, newFields.remove(fieldIndex)); - } else if (move.type().equals(SchemaChange.Move.MoveType.AFTER)) { - refIndex = map.get(move.referenceFieldName()); - checkMoveIndexEqual(move, fieldIndex, refIndex); - if (fieldIndex > refIndex) { - newFields.add(refIndex + 1, newFields.remove(fieldIndex)); - } else { - newFields.add(refIndex, newFields.remove(fieldIndex)); - } - } - + applyMove(newFields, move); } else { throw new UnsupportedOperationException( "Unsupported change: " + change.getClass()); @@ -388,6 +367,61 @@ public TableSchema commitChanges(List changes) } } + public void applyMove(List newFields, SchemaChange.Move move) { + Map map = new HashMap<>(); + for (int i = 0; i < newFields.size(); i++) { + map.put(newFields.get(i).name(), i); + } + + int fieldIndex = map.getOrDefault(move.fieldName(), -1); + if (fieldIndex == -1) { + throw new IllegalArgumentException("Field name not found: " + move.fieldName()); + } + + // Get the reference index if the move is not FIRST or LAST + int refIndex = + (move.type() == SchemaChange.Move.MoveType.FIRST + || move.type() == SchemaChange.Move.MoveType.LAST) + ? 0 + : map.getOrDefault(move.referenceFieldName(), -1); + if (refIndex == -1) { + throw new IllegalArgumentException( + "Reference field name not found: " + move.referenceFieldName()); + } + + // Check if the field to move is the same as the reference field + checkMoveIndexEqual(move, fieldIndex, refIndex); + + // Handle the move operations + switch (move.type()) { + case FIRST: + moveField(newFields, fieldIndex, 0); + break; + case LAST: + moveField(newFields, fieldIndex, newFields.size() - 1); + break; + case AFTER: + case BEFORE: + int adjustment = move.type() == SchemaChange.Move.MoveType.AFTER ? 1 : 0; + int targetIndex = + fieldIndex < refIndex ? refIndex - (1 - adjustment) : refIndex + adjustment; + moveField(newFields, fieldIndex, targetIndex); + break; + } + } + + // Utility method to move a field within the list, handling range checks + private void moveField(List newFields, int fromIndex, int toIndex) { + if (fromIndex < 0 + || fromIndex >= newFields.size() + || toIndex < 0 + || toIndex > newFields.size() - 1) { + return; + } + DataField fieldToMove = newFields.remove(fromIndex); + newFields.add(toIndex, fieldToMove); + } + public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) { TableSchema current = latest().orElseThrow( diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java index d912c0ce9265..9fc54b812296 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java @@ -24,6 +24,7 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.DoubleType; import org.apache.paimon.types.IntType; import org.apache.paimon.types.MapType; @@ -32,6 +33,7 @@ import org.apache.paimon.utils.FailingFileIO; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -297,4 +299,77 @@ public void testDeleteSchemaWithSchemaId() throws Exception { manager.deleteSchema(manager.latest().get().id()); assertThat(manager.latest().get().toString()).isEqualTo(schemaContent); } + + @Test + public void testApplyMove() { + // Create the initial list of fields + List fields = + new ArrayList<>( + Arrays.asList( + new DataField(0, "f0", DataTypes.INT()), + new DataField(1, "f1", DataTypes.BIGINT()), + new DataField(2, "f2", DataTypes.STRING()))); + + // Use factory methods to create Move objects + SchemaChange.Move moveFirst = SchemaChange.Move.first("f2"); + SchemaChange.Move moveLast = SchemaChange.Move.last("f0"); + SchemaChange.Move moveAfter = SchemaChange.Move.after("f0", "f1"); + SchemaChange.Move moveBefore = SchemaChange.Move.before("f2", "f1"); + + // Test FIRST operation + manager.applyMove(fields, moveFirst); + Assertions.assertEquals( + 2, + fields.get(0).id(), + "The field id should remain as 2 after moving f2 to the first position"); + Assertions.assertEquals( + fields.get(0).name(), "f2", "f2 should be moved to the first position"); + + // Reset fields to initial state + fields = + new ArrayList<>( + Arrays.asList( + new DataField(0, "f0", DataTypes.INT()), + new DataField(1, "f1", DataTypes.BIGINT()), + new DataField(2, "f2", DataTypes.STRING()))); + + // Test LAST operation + manager.applyMove(fields, moveLast); + Assertions.assertEquals( + 0, + fields.get(fields.size() - 1).id(), + "The field id should remain as 0 after moving f0 to the last position"); + Assertions.assertEquals( + "f0", + fields.get(fields.size() - 1).name(), + "f0 should be moved to the last position"); + + // Reset fields to initial state + fields = + new ArrayList<>( + Arrays.asList( + new DataField(0, "f0", DataTypes.INT()), + new DataField(1, "f1", DataTypes.BIGINT()), + new DataField(2, "f2", DataTypes.STRING()))); + + // Test AFTER operation + manager.applyMove(fields, moveAfter); + Assertions.assertEquals( + 0, fields.get(2).id(), "The field id should remain as 0 after moving f0 after f1"); + Assertions.assertEquals("f0", fields.get(2).name(), "f0 should be after f1"); + + // Reset fields to initial state + fields = + new ArrayList<>( + Arrays.asList( + new DataField(0, "f0", DataTypes.INT()), + new DataField(1, "f1", DataTypes.BIGINT()), + new DataField(2, "f2", DataTypes.STRING()))); + + // Test BEFORE operation + manager.applyMove(fields, moveBefore); + Assertions.assertEquals( + 2, fields.get(1).id(), "The field id should remain as 2 after moving f2 before f1"); + Assertions.assertEquals("f2", fields.get(1).name(), "f2 should be before f1"); + } }