Skip to content

Commit

Permalink
[core][spark] Support adding and dropping nested columns in Spark (#4483
Browse files Browse the repository at this point in the history
)
  • Loading branch information
tsreaper authored Nov 8, 2024
1 parent ddf10d4 commit 5b07745
Show file tree
Hide file tree
Showing 9 changed files with 375 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ private void validateFieldNameCaseInsensitiveInSchemaChange(List<SchemaChange> c
for (SchemaChange change : changes) {
if (change instanceof SchemaChange.AddColumn) {
SchemaChange.AddColumn addColumn = (SchemaChange.AddColumn) change;
fieldNames.add(addColumn.fieldName());
fieldNames.addAll(addColumn.fieldNames());
} else if (change instanceof SchemaChange.RenameColumn) {
SchemaChange.RenameColumn rename = (SchemaChange.RenameColumn) change;
fieldNames.add(rename.newName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

/**
Expand Down Expand Up @@ -52,19 +54,28 @@ static SchemaChange addColumn(String fieldName, DataType dataType) {
}

static SchemaChange addColumn(String fieldName, DataType dataType, String comment) {
return new AddColumn(fieldName, dataType, comment, null);
return new AddColumn(Collections.singletonList(fieldName), dataType, comment, null);
}

static SchemaChange addColumn(String fieldName, DataType dataType, String comment, Move move) {
return new AddColumn(fieldName, dataType, comment, move);
return new AddColumn(Collections.singletonList(fieldName), dataType, comment, move);
}

static SchemaChange addColumn(
List<String> fieldNames, DataType dataType, String comment, Move move) {
return new AddColumn(fieldNames, dataType, comment, move);
}

static SchemaChange renameColumn(String fieldName, String newName) {
return new RenameColumn(fieldName, newName);
}

static SchemaChange dropColumn(String fieldName) {
return new DropColumn(fieldName);
return new DropColumn(Collections.singletonList(fieldName));
}

static SchemaChange dropColumn(List<String> fieldNames) {
return new DropColumn(fieldNames);
}

static SchemaChange updateColumnType(String fieldName, DataType newDataType) {
Expand Down Expand Up @@ -207,20 +218,21 @@ final class AddColumn implements SchemaChange {

private static final long serialVersionUID = 1L;

private final String fieldName;
private final List<String> fieldNames;
private final DataType dataType;
private final String description;
private final Move move;

private AddColumn(String fieldName, DataType dataType, String description, Move move) {
this.fieldName = fieldName;
private AddColumn(
List<String> fieldNames, DataType dataType, String description, Move move) {
this.fieldNames = fieldNames;
this.dataType = dataType;
this.description = description;
this.move = move;
}

public String fieldName() {
return fieldName;
public List<String> fieldNames() {
return fieldNames;
}

public DataType dataType() {
Expand All @@ -246,7 +258,7 @@ public boolean equals(Object o) {
return false;
}
AddColumn addColumn = (AddColumn) o;
return Objects.equals(fieldName, addColumn.fieldName)
return Objects.equals(fieldNames, addColumn.fieldNames)
&& dataType.equals(addColumn.dataType)
&& Objects.equals(description, addColumn.description)
&& move.equals(addColumn.move);
Expand All @@ -255,7 +267,7 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
int result = Objects.hash(dataType, description);
result = 31 * result + Objects.hashCode(fieldName);
result = 31 * result + Objects.hashCode(fieldNames);
result = 31 * result + Objects.hashCode(move);
return result;
}
Expand Down Expand Up @@ -308,14 +320,14 @@ final class DropColumn implements SchemaChange {

private static final long serialVersionUID = 1L;

private final String fieldName;
private final List<String> fieldNames;

private DropColumn(String fieldName) {
this.fieldName = fieldName;
private DropColumn(List<String> fieldNames) {
this.fieldNames = fieldNames;
}

public String fieldName() {
return fieldName;
public List<String> fieldNames() {
return fieldNames;
}

@Override
Expand All @@ -327,12 +339,12 @@ public boolean equals(Object o) {
return false;
}
DropColumn that = (DropColumn) o;
return Objects.equals(fieldName, that.fieldName);
return Objects.equals(fieldNames, that.fieldNames);
}

@Override
public int hashCode() {
return Objects.hashCode(fieldName);
return Objects.hashCode(fieldNames);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.casting.CastExecutor;
import org.apache.paimon.casting.CastExecutors;
import org.apache.paimon.casting.CastFieldGetter;
import org.apache.paimon.casting.CastedRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Predicate;
Expand Down Expand Up @@ -67,8 +68,6 @@ public class SchemaEvolutionUtil {
* data fields, -1 is the index of 6->b in data fields and 1 is the index of 3->a in data
* fields.
*
* <p>/// TODO should support nest index mapping when nest schema evolution is supported.
*
* @param tableFields the fields of table
* @param dataFields the fields of underlying data
* @return the index mapping
Expand Down Expand Up @@ -394,23 +393,57 @@ private static CastFieldGetter[] createCastFieldGetterMapping(
checkState(
!(tableField.type() instanceof MapType
|| dataField.type() instanceof ArrayType
|| dataField.type() instanceof MultisetType
|| dataField.type() instanceof RowType),
"Only support column type evolution in atomic data type.");
|| dataField.type() instanceof MultisetType),
"Only support column type evolution in atomic and row data type.");

CastExecutor<?, ?> castExecutor;
if (tableField.type() instanceof RowType
&& dataField.type() instanceof RowType) {
castExecutor =
createRowCastExecutor(
(RowType) dataField.type(), (RowType) tableField.type());
} else {
castExecutor = CastExecutors.resolve(dataField.type(), tableField.type());
}
checkNotNull(
castExecutor,
"Cannot cast from type "
+ dataField.type()
+ " to type "
+ tableField.type());

// Create getter with index i and projected row data will convert to underlying
// data
converterMapping[i] =
new CastFieldGetter(
InternalRowUtils.createNullCheckingFieldGetter(
dataField.type(), i),
checkNotNull(
CastExecutors.resolve(
dataField.type(), tableField.type())));
castExecutor);
castExist = true;
}
}
}

return castExist ? converterMapping : null;
}

private static CastExecutor<InternalRow, InternalRow> createRowCastExecutor(
RowType inputType, RowType targetType) {
int[] indexMapping = createIndexMapping(targetType.getFields(), inputType.getFields());
CastFieldGetter[] castFieldGetters =
createCastFieldGetterMapping(
targetType.getFields(), inputType.getFields(), indexMapping);

ProjectedRow projectedRow = indexMapping == null ? null : ProjectedRow.from(indexMapping);
CastedRow castedRow = castFieldGetters == null ? null : CastedRow.from(castFieldGetters);
return value -> {
if (projectedRow != null) {
value = projectedRow.replaceRow(value);
}
if (castedRow != null) {
value = castedRow.replaceRow(value);
}
return value;
};
}
}
Loading

0 comments on commit 5b07745

Please sign in to comment.