Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] AddColumn move support before and last. #3442

Merged
merged 1 commit into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,9 @@ class Move implements Serializable {

public enum MoveType {
FIRST,
AFTER
AFTER,
BEFORE,
LAST
}

public static Move first(String fieldName) {
Expand All @@ -422,6 +424,14 @@ public static Move after(String fieldName, String referenceFieldName) {
return new Move(fieldName, referenceFieldName, MoveType.AFTER);
}

public static Move before(String fieldName, String referenceFieldName) {
return new Move(fieldName, referenceFieldName, MoveType.BEFORE);
}

public static Move last(String fieldName) {
return new Move(fieldName, null, MoveType.LAST);
}

private static final long serialVersionUID = 1L;

private final String fieldName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,28 +339,7 @@ public TableSchema commitChanges(List<SchemaChange> changes)
} else if (change instanceof UpdateColumnPosition) {
UpdateColumnPosition update = (UpdateColumnPosition) change;
SchemaChange.Move move = update.move();

// key: name ; value : index
Map<String, Integer> map = new HashMap<>();
for (int i = 0; i < newFields.size(); i++) {
map.put(newFields.get(i).name(), i);
}

int fieldIndex = map.get(move.fieldName());
int refIndex = 0;
if (move.type().equals(SchemaChange.Move.MoveType.FIRST)) {
checkMoveIndexEqual(move, fieldIndex, refIndex);
newFields.add(refIndex, newFields.remove(fieldIndex));
} else if (move.type().equals(SchemaChange.Move.MoveType.AFTER)) {
refIndex = map.get(move.referenceFieldName());
checkMoveIndexEqual(move, fieldIndex, refIndex);
if (fieldIndex > refIndex) {
newFields.add(refIndex + 1, newFields.remove(fieldIndex));
} else {
newFields.add(refIndex, newFields.remove(fieldIndex));
}
}

applyMove(newFields, move);
} else {
throw new UnsupportedOperationException(
"Unsupported change: " + change.getClass());
Expand Down Expand Up @@ -388,6 +367,70 @@ public TableSchema commitChanges(List<SchemaChange> changes)
}
}

public void applyMove(List<DataField> newFields, SchemaChange.Move move) {
Map<String, Integer> map = new HashMap<>();
for (int i = 0; i < newFields.size(); i++) {
map.put(newFields.get(i).name(), i);
}

int fieldIndex = map.getOrDefault(move.fieldName(), -1);
if (fieldIndex == -1) {
throw new IllegalArgumentException("Field name not found: " + move.fieldName());
}

// Handling FIRST and LAST cases directly since they don't need refIndex
switch (move.type()) {
case FIRST:
checkMoveIndexEqual(move, fieldIndex, 0);
moveField(newFields, fieldIndex, 0);
return;
case LAST:
checkMoveIndexEqual(move, fieldIndex, newFields.size() - 1);
moveField(newFields, fieldIndex, newFields.size() - 1);
return;
}

Integer refIndex = map.getOrDefault(move.referenceFieldName(), -1);
if (refIndex == -1) {
throw new IllegalArgumentException(
"Reference field name not found: " + move.referenceFieldName());
}

checkMoveIndexEqual(move, fieldIndex, refIndex);

// For AFTER and BEFORE, adjust the target index based on current and reference positions
int targetIndex = refIndex;
if (move.type() == SchemaChange.Move.MoveType.AFTER && fieldIndex > refIndex) {
targetIndex++;
}
// Ensure adjustments for moving element forwards or backwards
if (move.type() == SchemaChange.Move.MoveType.BEFORE && fieldIndex < refIndex) {
targetIndex--;
}

if (targetIndex > (newFields.size() - 1)) {
targetIndex = newFields.size() - 1;
}

moveField(newFields, fieldIndex, targetIndex);
}

// Utility method to move a field within the list, handling range checks
private void moveField(List<DataField> newFields, int fromIndex, int toIndex) {
if (fromIndex < 0 || fromIndex >= newFields.size() || toIndex < 0) {
return;
}
DataField fieldToMove = newFields.remove(fromIndex);
newFields.add(toIndex, fieldToMove);
}

private static void checkMoveIndexEqual(SchemaChange.Move move, int fieldIndex, int refIndex) {
if (refIndex == fieldIndex) {
throw new UnsupportedOperationException(
String.format("Cannot move itself for column %s", move.fieldName()));
}
}

public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {
TableSchema current =
latest().orElseThrow(
Expand All @@ -406,13 +449,6 @@ public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {
}
}

private static void checkMoveIndexEqual(SchemaChange.Move move, int fieldIndex, int refIndex) {
if (refIndex == fieldIndex) {
throw new UnsupportedOperationException(
String.format("Cannot move itself for column %s", move.fieldName()));
}
}

private void validateNotPrimaryAndPartitionKey(TableSchema schema, String fieldName) {
/// TODO support partition and primary keys schema evolution
if (schema.partitionKeys().contains(fieldName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.DoubleType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.MapType;
Expand All @@ -32,6 +33,7 @@
import org.apache.paimon.utils.FailingFileIO;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand All @@ -42,6 +44,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -297,4 +300,145 @@ public void testDeleteSchemaWithSchemaId() throws Exception {
manager.deleteSchema(manager.latest().get().id());
assertThat(manager.latest().get().toString()).isEqualTo(schemaContent);
}

@Test
public void testApplyMoveFirstAndLast() {
// Create the initial list of fields
List<DataField> fields = new LinkedList<>();
fields.add(new DataField(0, "f0", DataTypes.INT()));
fields.add(new DataField(1, "f1", DataTypes.BIGINT()));
fields.add(new DataField(2, "f2", DataTypes.STRING()));
fields.add(new DataField(3, "f3", DataTypes.SMALLINT()));

// Use factory methods to create Move objects
SchemaChange.Move moveFirst = SchemaChange.Move.first("f2");
SchemaChange.Move moveLast = SchemaChange.Move.last("f0");

// Test FIRST operation
manager.applyMove(fields, moveFirst);
Assertions.assertEquals(
2,
fields.get(0).id(),
"The field id should remain as 2 after moving f2 to the first position");
Assertions.assertEquals(
fields.get(0).name(), "f2", "f2 should be moved to the first position");

// Reset fields to initial state
fields = new LinkedList<>();
fields.add(new DataField(0, "f0", DataTypes.INT()));
fields.add(new DataField(1, "f1", DataTypes.BIGINT()));
fields.add(new DataField(2, "f2", DataTypes.STRING()));
fields.add(new DataField(3, "f3", DataTypes.SMALLINT()));

// Test LAST operation
manager.applyMove(fields, moveLast);
Assertions.assertEquals(
0,
fields.get(fields.size() - 1).id(),
"The field id should remain as 0 after moving f0 to the last position");
Assertions.assertEquals(
"f0",
fields.get(fields.size() - 1).name(),
"f0 should be moved to the last position");
}

@Test
public void testMoveAfter() {
// Create the initial list of fields
List<DataField> fields = new LinkedList<>();
fields.add(new DataField(0, "f0", DataTypes.INT()));
fields.add(new DataField(1, "f1", DataTypes.BIGINT()));
fields.add(new DataField(2, "f2", DataTypes.STRING()));
fields.add(new DataField(3, "f3", DataTypes.SMALLINT()));

// Test AFTER operation
SchemaChange.Move moveAfter = SchemaChange.Move.after("f1", "f2");
manager.applyMove(fields, moveAfter);
Assertions.assertEquals(
1, fields.get(2).id(), "The field id should remain as 1 after moving f1 after f2");
Assertions.assertEquals("f1", fields.get(2).name(), "f1 should be after f2");

// Reset fields to initial state
fields = new LinkedList<>();
fields.add(new DataField(0, "f0", DataTypes.INT()));
fields.add(new DataField(1, "f1", DataTypes.BIGINT()));
fields.add(new DataField(2, "f2", DataTypes.STRING()));
fields.add(new DataField(3, "f3", DataTypes.SMALLINT()));

moveAfter = SchemaChange.Move.after("f3", "f1");
// Test AFTER operation
manager.applyMove(fields, moveAfter);
Assertions.assertEquals(
3, fields.get(2).id(), "The field id should remain as 3 after moving f3 after f1");
Assertions.assertEquals("f3", fields.get(2).name(), "f3 should be after f1");

// Reset fields to initial state
fields = new LinkedList<>();
fields.add(new DataField(0, "f0", DataTypes.INT()));
fields.add(new DataField(1, "f1", DataTypes.BIGINT()));
fields.add(new DataField(2, "f2", DataTypes.STRING()));
fields.add(new DataField(3, "f3", DataTypes.SMALLINT()));

moveAfter = SchemaChange.Move.after("f0", "f2");
// Test move column after last column
manager.applyMove(fields, moveAfter);
Assertions.assertEquals(
0, fields.get(2).id(), "The field id should remain as 0 after moving f0 after f2");
Assertions.assertEquals("f0", fields.get(2).name(), "f0 should be after f2");

// Reset fields to initial state
fields = new LinkedList<>();
fields.add(new DataField(0, "f0", DataTypes.INT()));
fields.add(new DataField(1, "f1", DataTypes.BIGINT()));
fields.add(new DataField(2, "f2", DataTypes.STRING()));
fields.add(new DataField(3, "f3", DataTypes.SMALLINT()));

moveAfter = SchemaChange.Move.after("f0", "f3");
// Test move column after last column
manager.applyMove(fields, moveAfter);
Assertions.assertEquals(
0, fields.get(3).id(), "The field id should remain as 0 after moving f0 after f3");
Assertions.assertEquals("f0", fields.get(3).name(), "f0 should be after f3");
}

@Test
public void testMoveBefore() {
// Create the initial list of fields
List<DataField> fields = new LinkedList<>();
fields.add(new DataField(0, "f0", DataTypes.INT()));
fields.add(new DataField(1, "f1", DataTypes.BIGINT()));
fields.add(new DataField(2, "f2", DataTypes.STRING()));
fields.add(new DataField(3, "f3", DataTypes.SMALLINT()));

SchemaChange.Move moveBefore = SchemaChange.Move.before("f2", "f1");
manager.applyMove(fields, moveBefore);
Assertions.assertEquals(
2, fields.get(1).id(), "The field id should remain as 2 after moving f2 before f1");
Assertions.assertEquals("f2", fields.get(1).name(), "f2 should be before f1");

// Reset fields to initial state
fields = new LinkedList<>();
fields.add(new DataField(0, "f0", DataTypes.INT()));
fields.add(new DataField(1, "f1", DataTypes.BIGINT()));
fields.add(new DataField(2, "f2", DataTypes.STRING()));
fields.add(new DataField(3, "f3", DataTypes.SMALLINT()));

moveBefore = SchemaChange.Move.before("f1", "f3");
manager.applyMove(fields, moveBefore);
Assertions.assertEquals(
1, fields.get(2).id(), "The field id should remain as 1 after moving f1 before f3");
Assertions.assertEquals("f1", fields.get(2).name(), "f1 should be before f3");

// Reset fields to initial state
fields = new LinkedList<>();
fields.add(new DataField(0, "f0", DataTypes.INT()));
fields.add(new DataField(1, "f1", DataTypes.BIGINT()));
fields.add(new DataField(2, "f2", DataTypes.STRING()));
fields.add(new DataField(3, "f3", DataTypes.SMALLINT()));

moveBefore = SchemaChange.Move.before("f2", "f0");
manager.applyMove(fields, moveBefore);
Assertions.assertEquals(
2, fields.get(0).id(), "The field id should remain as 2 after moving f2 before f0");
}
}
Loading