From 6f97de2544d8c1ea329e57e615a28abeef3e4649 Mon Sep 17 00:00:00 2001 From: "tianzhu.wen" Date: Fri, 17 May 2024 11:30:00 +0800 Subject: [PATCH] [core] AddColumn move support before and last. --- .../apache/paimon/schema/SchemaChange.java | 12 +- .../apache/paimon/schema/SchemaManager.java | 94 ++++++++---- .../paimon/schema/SchemaManagerTest.java | 144 ++++++++++++++++++ 3 files changed, 220 insertions(+), 30 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java index 711b09bbe668..20000389dccd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java @@ -411,7 +411,9 @@ class Move implements Serializable { public enum MoveType { FIRST, - AFTER + AFTER, + BEFORE, + LAST } public static Move first(String fieldName) { @@ -422,6 +424,14 @@ public static Move after(String fieldName, String referenceFieldName) { return new Move(fieldName, referenceFieldName, MoveType.AFTER); } + public static Move before(String fieldName, String referenceFieldName) { + return new Move(fieldName, referenceFieldName, MoveType.BEFORE); + } + + public static Move last(String fieldName) { + return new Move(fieldName, null, MoveType.LAST); + } + private static final long serialVersionUID = 1L; private final String fieldName; diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 680ee6b8e73f..366f285fc0c1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -339,28 +339,7 @@ public TableSchema commitChanges(List changes) } else if (change instanceof UpdateColumnPosition) { UpdateColumnPosition update = (UpdateColumnPosition) change; SchemaChange.Move move = update.move(); - - // key: name ; value : index - Map map = new HashMap<>(); - for (int i = 0; i < newFields.size(); i++) { - map.put(newFields.get(i).name(), i); - } - - int fieldIndex = map.get(move.fieldName()); - int refIndex = 0; - if (move.type().equals(SchemaChange.Move.MoveType.FIRST)) { - checkMoveIndexEqual(move, fieldIndex, refIndex); - newFields.add(refIndex, newFields.remove(fieldIndex)); - } else if (move.type().equals(SchemaChange.Move.MoveType.AFTER)) { - refIndex = map.get(move.referenceFieldName()); - checkMoveIndexEqual(move, fieldIndex, refIndex); - if (fieldIndex > refIndex) { - newFields.add(refIndex + 1, newFields.remove(fieldIndex)); - } else { - newFields.add(refIndex, newFields.remove(fieldIndex)); - } - } - + applyMove(newFields, move); } else { throw new UnsupportedOperationException( "Unsupported change: " + change.getClass()); @@ -388,6 +367,70 @@ public TableSchema commitChanges(List changes) } } + public void applyMove(List newFields, SchemaChange.Move move) { + Map map = new HashMap<>(); + for (int i = 0; i < newFields.size(); i++) { + map.put(newFields.get(i).name(), i); + } + + int fieldIndex = map.getOrDefault(move.fieldName(), -1); + if (fieldIndex == -1) { + throw new IllegalArgumentException("Field name not found: " + move.fieldName()); + } + + // Handling FIRST and LAST cases directly since they don't need refIndex + switch (move.type()) { + case FIRST: + checkMoveIndexEqual(move, fieldIndex, 0); + moveField(newFields, fieldIndex, 0); + return; + case LAST: + checkMoveIndexEqual(move, fieldIndex, newFields.size() - 1); + moveField(newFields, fieldIndex, newFields.size() - 1); + return; + } + + Integer refIndex = map.getOrDefault(move.referenceFieldName(), -1); + if (refIndex == -1) { + throw new IllegalArgumentException( + "Reference field name not found: " + move.referenceFieldName()); + } + + checkMoveIndexEqual(move, fieldIndex, refIndex); + + // For AFTER and BEFORE, adjust the target index based on current and reference positions + int targetIndex = refIndex; + if (move.type() == SchemaChange.Move.MoveType.AFTER && fieldIndex > refIndex) { + targetIndex++; + } + // Ensure adjustments for moving element forwards or backwards + if (move.type() == SchemaChange.Move.MoveType.BEFORE && fieldIndex < refIndex) { + targetIndex--; + } + + if (targetIndex > (newFields.size() - 1)) { + targetIndex = newFields.size() - 1; + } + + moveField(newFields, fieldIndex, targetIndex); + } + + // Utility method to move a field within the list, handling range checks + private void moveField(List newFields, int fromIndex, int toIndex) { + if (fromIndex < 0 || fromIndex >= newFields.size() || toIndex < 0) { + return; + } + DataField fieldToMove = newFields.remove(fromIndex); + newFields.add(toIndex, fieldToMove); + } + + private static void checkMoveIndexEqual(SchemaChange.Move move, int fieldIndex, int refIndex) { + if (refIndex == fieldIndex) { + throw new UnsupportedOperationException( + String.format("Cannot move itself for column %s", move.fieldName())); + } + } + public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) { TableSchema current = latest().orElseThrow( @@ -406,13 +449,6 @@ public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) { } } - private static void checkMoveIndexEqual(SchemaChange.Move move, int fieldIndex, int refIndex) { - if (refIndex == fieldIndex) { - throw new UnsupportedOperationException( - String.format("Cannot move itself for column %s", move.fieldName())); - } - } - private void validateNotPrimaryAndPartitionKey(TableSchema schema, String fieldName) { /// TODO support partition and primary keys schema evolution if (schema.partitionKeys().contains(fieldName)) { diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java index d912c0ce9265..742e2188f2b0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaManagerTest.java @@ -24,6 +24,7 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.types.BigIntType; import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.DoubleType; import org.apache.paimon.types.IntType; import org.apache.paimon.types.MapType; @@ -32,6 +33,7 @@ import org.apache.paimon.utils.FailingFileIO; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -42,6 +44,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; @@ -297,4 +300,145 @@ public void testDeleteSchemaWithSchemaId() throws Exception { manager.deleteSchema(manager.latest().get().id()); assertThat(manager.latest().get().toString()).isEqualTo(schemaContent); } + + @Test + public void testApplyMoveFirstAndLast() { + // Create the initial list of fields + List fields = new LinkedList<>(); + fields.add(new DataField(0, "f0", DataTypes.INT())); + fields.add(new DataField(1, "f1", DataTypes.BIGINT())); + fields.add(new DataField(2, "f2", DataTypes.STRING())); + fields.add(new DataField(3, "f3", DataTypes.SMALLINT())); + + // Use factory methods to create Move objects + SchemaChange.Move moveFirst = SchemaChange.Move.first("f2"); + SchemaChange.Move moveLast = SchemaChange.Move.last("f0"); + + // Test FIRST operation + manager.applyMove(fields, moveFirst); + Assertions.assertEquals( + 2, + fields.get(0).id(), + "The field id should remain as 2 after moving f2 to the first position"); + Assertions.assertEquals( + fields.get(0).name(), "f2", "f2 should be moved to the first position"); + + // Reset fields to initial state + fields = new LinkedList<>(); + fields.add(new DataField(0, "f0", DataTypes.INT())); + fields.add(new DataField(1, "f1", DataTypes.BIGINT())); + fields.add(new DataField(2, "f2", DataTypes.STRING())); + fields.add(new DataField(3, "f3", DataTypes.SMALLINT())); + + // Test LAST operation + manager.applyMove(fields, moveLast); + Assertions.assertEquals( + 0, + fields.get(fields.size() - 1).id(), + "The field id should remain as 0 after moving f0 to the last position"); + Assertions.assertEquals( + "f0", + fields.get(fields.size() - 1).name(), + "f0 should be moved to the last position"); + } + + @Test + public void testMoveAfter() { + // Create the initial list of fields + List fields = new LinkedList<>(); + fields.add(new DataField(0, "f0", DataTypes.INT())); + fields.add(new DataField(1, "f1", DataTypes.BIGINT())); + fields.add(new DataField(2, "f2", DataTypes.STRING())); + fields.add(new DataField(3, "f3", DataTypes.SMALLINT())); + + // Test AFTER operation + SchemaChange.Move moveAfter = SchemaChange.Move.after("f1", "f2"); + manager.applyMove(fields, moveAfter); + Assertions.assertEquals( + 1, fields.get(2).id(), "The field id should remain as 1 after moving f1 after f2"); + Assertions.assertEquals("f1", fields.get(2).name(), "f1 should be after f2"); + + // Reset fields to initial state + fields = new LinkedList<>(); + fields.add(new DataField(0, "f0", DataTypes.INT())); + fields.add(new DataField(1, "f1", DataTypes.BIGINT())); + fields.add(new DataField(2, "f2", DataTypes.STRING())); + fields.add(new DataField(3, "f3", DataTypes.SMALLINT())); + + moveAfter = SchemaChange.Move.after("f3", "f1"); + // Test AFTER operation + manager.applyMove(fields, moveAfter); + Assertions.assertEquals( + 3, fields.get(2).id(), "The field id should remain as 3 after moving f3 after f1"); + Assertions.assertEquals("f3", fields.get(2).name(), "f3 should be after f1"); + + // Reset fields to initial state + fields = new LinkedList<>(); + fields.add(new DataField(0, "f0", DataTypes.INT())); + fields.add(new DataField(1, "f1", DataTypes.BIGINT())); + fields.add(new DataField(2, "f2", DataTypes.STRING())); + fields.add(new DataField(3, "f3", DataTypes.SMALLINT())); + + moveAfter = SchemaChange.Move.after("f0", "f2"); + // Test move column after last column + manager.applyMove(fields, moveAfter); + Assertions.assertEquals( + 0, fields.get(2).id(), "The field id should remain as 0 after moving f0 after f2"); + Assertions.assertEquals("f0", fields.get(2).name(), "f0 should be after f2"); + + // Reset fields to initial state + fields = new LinkedList<>(); + fields.add(new DataField(0, "f0", DataTypes.INT())); + fields.add(new DataField(1, "f1", DataTypes.BIGINT())); + fields.add(new DataField(2, "f2", DataTypes.STRING())); + fields.add(new DataField(3, "f3", DataTypes.SMALLINT())); + + moveAfter = SchemaChange.Move.after("f0", "f3"); + // Test move column after last column + manager.applyMove(fields, moveAfter); + Assertions.assertEquals( + 0, fields.get(3).id(), "The field id should remain as 0 after moving f0 after f3"); + Assertions.assertEquals("f0", fields.get(3).name(), "f0 should be after f3"); + } + + @Test + public void testMoveBefore() { + // Create the initial list of fields + List fields = new LinkedList<>(); + fields.add(new DataField(0, "f0", DataTypes.INT())); + fields.add(new DataField(1, "f1", DataTypes.BIGINT())); + fields.add(new DataField(2, "f2", DataTypes.STRING())); + fields.add(new DataField(3, "f3", DataTypes.SMALLINT())); + + SchemaChange.Move moveBefore = SchemaChange.Move.before("f2", "f1"); + manager.applyMove(fields, moveBefore); + Assertions.assertEquals( + 2, fields.get(1).id(), "The field id should remain as 2 after moving f2 before f1"); + Assertions.assertEquals("f2", fields.get(1).name(), "f2 should be before f1"); + + // Reset fields to initial state + fields = new LinkedList<>(); + fields.add(new DataField(0, "f0", DataTypes.INT())); + fields.add(new DataField(1, "f1", DataTypes.BIGINT())); + fields.add(new DataField(2, "f2", DataTypes.STRING())); + fields.add(new DataField(3, "f3", DataTypes.SMALLINT())); + + moveBefore = SchemaChange.Move.before("f1", "f3"); + manager.applyMove(fields, moveBefore); + Assertions.assertEquals( + 1, fields.get(2).id(), "The field id should remain as 1 after moving f1 before f3"); + Assertions.assertEquals("f1", fields.get(2).name(), "f1 should be before f3"); + + // Reset fields to initial state + fields = new LinkedList<>(); + fields.add(new DataField(0, "f0", DataTypes.INT())); + fields.add(new DataField(1, "f1", DataTypes.BIGINT())); + fields.add(new DataField(2, "f2", DataTypes.STRING())); + fields.add(new DataField(3, "f3", DataTypes.SMALLINT())); + + moveBefore = SchemaChange.Move.before("f2", "f0"); + manager.applyMove(fields, moveBefore); + Assertions.assertEquals( + 2, fields.get(0).id(), "The field id should remain as 2 after moving f2 before f0"); + } }