Skip to content

Commit

Permalink
[core] Introduce withReadType in ReadBuilder (#4214)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy authored Sep 25, 2024
1 parent 51f2488 commit cba8447
Show file tree
Hide file tree
Showing 56 changed files with 733 additions and 669 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.VarCharType;

import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -42,8 +43,13 @@ public class SystemFields {
public static final DataField LEVEL =
new DataField(Integer.MAX_VALUE - 3, "_LEVEL", DataTypes.INT().notNull());

// only used by AuditLogTable
public static final DataField ROW_KIND =
new DataField(
Integer.MAX_VALUE - 4, "rowkind", new VarCharType(VarCharType.MAX_LENGTH));

public static final Set<String> SYSTEM_FIELD_NAMES =
Stream.of(SEQUENCE_NUMBER.name(), VALUE_KIND.name(), LEVEL.name())
Stream.of(SEQUENCE_NUMBER.name(), VALUE_KIND.name(), LEVEL.name(), ROW_KIND.name())
.collect(Collectors.toSet());

public static boolean isSystemField(int fieldId) {
Expand Down
17 changes: 17 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/types/DataField.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ public DataField newName(String newName) {
return new DataField(id, newName, type, description);
}

public DataField newType(DataType newType) {
return new DataField(id, name, newType, description);
}

public DataField newDescription(String newDescription) {
return new DataField(id, name, type, newDescription);
}
Expand Down Expand Up @@ -144,6 +148,19 @@ public boolean equals(Object o) {
&& Objects.equals(description, field.description);
}

public boolean isPrunedFrom(DataField field) {
if (this == field) {
return true;
}
if (field == null) {
return false;
}
return Objects.equals(id, field.id)
&& Objects.equals(name, field.name)
&& type.isPrunedFrom(field.type)
&& Objects.equals(description, field.description);
}

@Override
public int hashCode() {
return Objects.hash(id, name, type, description);
Expand Down
10 changes: 10 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/types/DataType.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,16 @@ public boolean equals(Object o) {
return isNullable == that.isNullable && typeRoot == that.typeRoot;
}

/**
* Determine whether the current type is the result of the target type after pruning (e.g.
* select some fields from a nested type) or just the same.
*
* @param o the target data type
*/
public boolean isPrunedFrom(Object o) {
return equals(o);
}

@Override
public int hashCode() {
return Objects.hash(isNullable, typeRoot);
Expand Down
46 changes: 46 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/types/RowType.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ public RowType(List<DataField> fields) {
this(true, fields);
}

public RowType copy(List<DataField> newFields) {
return new RowType(isNullable(), newFields);
}

public List<DataField> getFields() {
return fields;
}
Expand Down Expand Up @@ -132,6 +136,15 @@ public DataField getField(String fieldName) {
throw new RuntimeException("Cannot find field: " + fieldName);
}

public int getFieldIndexByFieldId(int fieldId) {
for (int i = 0; i < fields.size(); i++) {
if (fields.get(i).id() == fieldId) {
return i;
}
}
throw new RuntimeException("Cannot find field index by FieldId " + fieldId);
}

@Override
public int defaultSize() {
return fields.stream().mapToInt(f -> f.type().defaultSize()).sum();
Expand Down Expand Up @@ -177,6 +190,26 @@ public boolean equals(Object o) {
return fields.equals(rowType.fields);
}

@Override
public boolean isPrunedFrom(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
RowType rowType = (RowType) o;
for (DataField field : fields) {
if (!field.isPrunedFrom(rowType.getField(field.name()))) {
return false;
}
}
return true;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), fields);
Expand Down Expand Up @@ -238,6 +271,19 @@ public RowType project(List<String> names) {
.collect(Collectors.toList()));
}

public RowType project(String... names) {
return project(Arrays.asList(names));
}

public static RowType of() {
return new RowType(true, Collections.emptyList());
}

public static RowType of(DataField... fields) {
final List<DataField> fs = new ArrayList<>(Arrays.asList(fields));
return new RowType(true, fs);
}

public static RowType of(DataType... types) {
final List<DataField> fields = new ArrayList<>();
for (int i = 0; i < types.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;

import java.util.Arrays;

Expand Down Expand Up @@ -215,6 +216,13 @@ public static ProjectedRow from(int[] projection) {
return new ProjectedRow(projection);
}

public static ProjectedRow from(RowType readType, RowType tableType) {
return new ProjectedRow(
readType.getFields().stream()
.mapToInt(field -> tableType.getFieldIndexByFieldId(field.id()))
.toArray());
}

/**
* Create an empty {@link ProjectedRow} starting from a {@link Projection}.
*
Expand Down
Loading

0 comments on commit cba8447

Please sign in to comment.