Skip to content

Commit

Permalink
[core] AddColumn move support before and last.
Browse files Browse the repository at this point in the history
  • Loading branch information
joyCurry30 committed May 30, 2024
1 parent 22c7c61 commit 3bffa1d
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,9 @@ class Move implements Serializable {

public enum MoveType {
FIRST,
AFTER
AFTER,
BEFORE,
LAST
}

public static Move first(String fieldName) {
Expand All @@ -422,6 +424,14 @@ public static Move after(String fieldName, String referenceFieldName) {
return new Move(fieldName, referenceFieldName, MoveType.AFTER);
}

public static Move before(String fieldName, String referenceFieldName) {
return new Move(fieldName, referenceFieldName, MoveType.BEFORE);
}

public static Move last(String fieldName) {
return new Move(fieldName, null, MoveType.LAST);
}

private static final long serialVersionUID = 1L;

private final String fieldName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,28 +339,7 @@ public TableSchema commitChanges(List<SchemaChange> changes)
} else if (change instanceof UpdateColumnPosition) {
UpdateColumnPosition update = (UpdateColumnPosition) change;
SchemaChange.Move move = update.move();

// key: name ; value : index
Map<String, Integer> map = new HashMap<>();
for (int i = 0; i < newFields.size(); i++) {
map.put(newFields.get(i).name(), i);
}

int fieldIndex = map.get(move.fieldName());
int refIndex = 0;
if (move.type().equals(SchemaChange.Move.MoveType.FIRST)) {
checkMoveIndexEqual(move, fieldIndex, refIndex);
newFields.add(refIndex, newFields.remove(fieldIndex));
} else if (move.type().equals(SchemaChange.Move.MoveType.AFTER)) {
refIndex = map.get(move.referenceFieldName());
checkMoveIndexEqual(move, fieldIndex, refIndex);
if (fieldIndex > refIndex) {
newFields.add(refIndex + 1, newFields.remove(fieldIndex));
} else {
newFields.add(refIndex, newFields.remove(fieldIndex));
}
}

applyMove(newFields, move);
} else {
throw new UnsupportedOperationException(
"Unsupported change: " + change.getClass());
Expand Down Expand Up @@ -388,6 +367,61 @@ public TableSchema commitChanges(List<SchemaChange> changes)
}
}

public void applyMove(List<DataField> newFields, SchemaChange.Move move) {
Map<String, Integer> map = new HashMap<>();
for (int i = 0; i < newFields.size(); i++) {
map.put(newFields.get(i).name(), i);
}

int fieldIndex = map.getOrDefault(move.fieldName(), -1);
if (fieldIndex == -1) {
throw new IllegalArgumentException("Field name not found: " + move.fieldName());
}

// Get the reference index if the move is not FIRST or LAST
int refIndex =
(move.type() == SchemaChange.Move.MoveType.FIRST
|| move.type() == SchemaChange.Move.MoveType.LAST)
? 0
: map.getOrDefault(move.referenceFieldName(), -1);
if (refIndex == -1) {
throw new IllegalArgumentException(
"Reference field name not found: " + move.referenceFieldName());
}

// Check if the field to move is the same as the reference field
checkMoveIndexEqual(move, fieldIndex, refIndex);

// Handle the move operations
switch (move.type()) {
case FIRST:
moveField(newFields, fieldIndex, 0);
break;
case LAST:
moveField(newFields, fieldIndex, newFields.size() - 1);
break;
case AFTER:
case BEFORE:
int adjustment = move.type() == SchemaChange.Move.MoveType.AFTER ? 1 : 0;
int targetIndex =
fieldIndex < refIndex ? refIndex - (1 - adjustment) : refIndex + adjustment;
moveField(newFields, fieldIndex, targetIndex);
break;
}
}

// Utility method to move a field within the list, handling range checks
private void moveField(List<DataField> newFields, int fromIndex, int toIndex) {
if (fromIndex < 0
|| fromIndex >= newFields.size()
|| toIndex < 0
|| toIndex > newFields.size() - 1) {
return;
}
DataField fieldToMove = newFields.remove(fromIndex);
newFields.add(toIndex, fieldToMove);
}

public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {
TableSchema current =
latest().orElseThrow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.DoubleType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.MapType;
Expand All @@ -32,6 +33,7 @@
import org.apache.paimon.utils.FailingFileIO;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand Down Expand Up @@ -297,4 +299,77 @@ public void testDeleteSchemaWithSchemaId() throws Exception {
manager.deleteSchema(manager.latest().get().id());
assertThat(manager.latest().get().toString()).isEqualTo(schemaContent);
}

@Test
public void testApplyMove() {
// Create the initial list of fields
List<DataField> fields =
new ArrayList<>(
Arrays.asList(
new DataField(0, "f0", DataTypes.INT()),
new DataField(1, "f1", DataTypes.BIGINT()),
new DataField(2, "f2", DataTypes.STRING())));

// Use factory methods to create Move objects
SchemaChange.Move moveFirst = SchemaChange.Move.first("f2");
SchemaChange.Move moveLast = SchemaChange.Move.last("f0");
SchemaChange.Move moveAfter = SchemaChange.Move.after("f0", "f1");
SchemaChange.Move moveBefore = SchemaChange.Move.before("f2", "f1");

// Test FIRST operation
manager.applyMove(fields, moveFirst);
Assertions.assertEquals(
2,
fields.get(0).id(),
"The field id should remain as 2 after moving f2 to the first position");
Assertions.assertEquals(
fields.get(0).name(), "f2", "f2 should be moved to the first position");

// Reset fields to initial state
fields =
new ArrayList<>(
Arrays.asList(
new DataField(0, "f0", DataTypes.INT()),
new DataField(1, "f1", DataTypes.BIGINT()),
new DataField(2, "f2", DataTypes.STRING())));

// Test LAST operation
manager.applyMove(fields, moveLast);
Assertions.assertEquals(
0,
fields.get(fields.size() - 1).id(),
"The field id should remain as 0 after moving f0 to the last position");
Assertions.assertEquals(
"f0",
fields.get(fields.size() - 1).name(),
"f0 should be moved to the last position");

// Reset fields to initial state
fields =
new ArrayList<>(
Arrays.asList(
new DataField(0, "f0", DataTypes.INT()),
new DataField(1, "f1", DataTypes.BIGINT()),
new DataField(2, "f2", DataTypes.STRING())));

// Test AFTER operation
manager.applyMove(fields, moveAfter);
Assertions.assertEquals(
0, fields.get(2).id(), "The field id should remain as 0 after moving f0 after f1");
Assertions.assertEquals("f0", fields.get(2).name(), "f0 should be after f1");

// Reset fields to initial state
fields =
new ArrayList<>(
Arrays.asList(
new DataField(0, "f0", DataTypes.INT()),
new DataField(1, "f1", DataTypes.BIGINT()),
new DataField(2, "f2", DataTypes.STRING())));

// Test BEFORE operation
manager.applyMove(fields, moveBefore);
Assertions.assertEquals(
2, fields.get(1).id(), "The field id should remain as 2 after moving f2 before f1");
Assertions.assertEquals("f2", fields.get(1).name(), "f2 should be before f1");
}
}

0 comments on commit 3bffa1d

Please sign in to comment.