Skip to content

Commit

Permalink
[common][flink] Add support for complex types in kafka debezium avro …
Browse files Browse the repository at this point in the history
…cdc action (apache#4246)
  • Loading branch information
AshishKhatkar authored Oct 9, 2024
1 parent 20a3967 commit b2641ad
Show file tree
Hide file tree
Showing 6 changed files with 306 additions and 28 deletions.
16 changes: 16 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/types/DataField.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,4 +170,20 @@ public int hashCode() {
public String toString() {
return asSQLString();
}

/**
* When the order of the same field is different, its ID may also be different, so the
* comparison should not include the ID.
*/
public static boolean dataFieldEqualsIgnoreId(DataField dataField1, DataField dataField2) {
if (dataField1 == dataField2) {
return true;
} else if (dataField1 != null && dataField2 != null) {
return Objects.equals(dataField1.name(), dataField2.name())
&& Objects.equals(dataField1.type(), dataField2.type())
&& Objects.equals(dataField1.description(), dataField2.description());
} else {
return false;
}
}
}
12 changes: 11 additions & 1 deletion paimon-common/src/main/java/org/apache/paimon/types/RowType.java
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,17 @@ public boolean equals(Object o) {
return false;
}
RowType rowType = (RowType) o;
return fields.equals(rowType.fields);
// For nested RowTypes e.g. DataField.dataType = RowType we need to ignoreIds as they can be
// different
if (fields.size() != rowType.fields.size()) {
return false;
}
for (int i = 0; i < fields.size(); ++i) {
if (!DataField.dataFieldEqualsIgnoreId(fields.get(i), rowType.fields.get(i))) {
return false;
}
}
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,10 @@ public static int toTime(BinaryString input) throws DateTimeException {
/** Used by {@code CAST(x as TIMESTAMP)}. */
public static Timestamp toTimestamp(BinaryString input, int precision)
throws DateTimeException {
if (StringUtils.isNumeric(input.toString())) {
long epoch = toLong(input);
return fromMillisToTimestamp(epoch, precision);
}
return DateTimeUtils.parseTimestampData(input.toString(), precision);
}

Expand All @@ -315,6 +319,35 @@ public static Timestamp toTimestamp(BinaryString input, int precision, TimeZone
return DateTimeUtils.parseTimestampData(input.toString(), precision, timeZone);
}

// Helper method to convert epoch to Timestamp with the provided precision.
private static Timestamp fromMillisToTimestamp(long epoch, int precision) {
// Calculate milliseconds and nanoseconds from epoch based on precision
long millis;
int nanosOfMillis;

switch (precision) {
case 0: // seconds
millis = epoch * 1000;
nanosOfMillis = 0;
break;
case 3: // milliseconds
millis = epoch;
nanosOfMillis = 0;
break;
case 6: // microseconds
millis = epoch / 1000;
nanosOfMillis = (int) ((epoch % 1000) * 1000);
break;
case 9: // nanoseconds
millis = epoch / 1_000_000;
nanosOfMillis = (int) (epoch % 1_000_000);
break;
default:
throw new RuntimeException("Unsupported precision: " + precision);
}
return Timestamp.fromEpochMillis(millis, nanosOfMillis);
}

public static BinaryString toCharacterString(BinaryString strData, DataType type) {
final boolean targetCharType = type.getTypeRoot() == CHAR;
final int targetLength = DataTypeChecks.getLength(type);
Expand Down
127 changes: 115 additions & 12 deletions paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,36 @@
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericMap;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeChecks;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.types.VarCharType;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.stream.Collectors;

Expand All @@ -47,6 +60,8 @@

/** Type related helper functions. */
public class TypeUtils {
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final Logger LOG = LoggerFactory.getLogger(TypeUtils.class);

public static RowType concat(RowType left, RowType right) {
RowType.Builder builder = RowType.builder();
Expand Down Expand Up @@ -152,21 +167,109 @@ public static Object castFromStringInternal(String s, DataType type, boolean isC
case ARRAY:
ArrayType arrayType = (ArrayType) type;
DataType elementType = arrayType.getElementType();
if (elementType instanceof VarCharType) {
if (s.startsWith("[")) {
s = s.substring(1);
try {
JsonNode arrayNode = OBJECT_MAPPER.readTree(s);
List<Object> resultList = new ArrayList<>();
for (JsonNode elementNode : arrayNode) {
if (!elementNode.isNull()) {
String elementJson = elementNode.toString();
Object elementObject =
castFromStringInternal(elementJson, elementType, isCdcValue);
resultList.add(elementObject);
} else {
resultList.add(null);
}
}
if (s.endsWith("]")) {
s = s.substring(0, s.length() - 1);
return new GenericArray(resultList.toArray());
} catch (JsonProcessingException e) {
LOG.info(
String.format(
"Failed to parse ARRAY for type %s with value %s", type, s),
e);
// try existing code flow
if (elementType instanceof VarCharType) {
if (s.startsWith("[")) {
s = s.substring(1);
}
if (s.endsWith("]")) {
s = s.substring(0, s.length() - 1);
}
String[] ss = s.split(",");
BinaryString[] binaryStrings = new BinaryString[ss.length];
for (int i = 0; i < ss.length; i++) {
binaryStrings[i] = BinaryString.fromString(ss[i]);
}
return new GenericArray(binaryStrings);
} else {
throw new UnsupportedOperationException("Unsupported type " + type);
}
String[] ss = s.split(",");
BinaryString[] binaryStrings = new BinaryString[ss.length];
for (int i = 0; i < ss.length; i++) {
binaryStrings[i] = BinaryString.fromString(ss[i]);
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to parse Json String %s", s), e);
}
case MAP:
MapType mapType = (MapType) type;
DataType keyType = mapType.getKeyType();
DataType valueType = mapType.getValueType();
try {
JsonNode mapNode = OBJECT_MAPPER.readTree(s);
Map<Object, Object> resultMap = new HashMap<>();
mapNode.fields()
.forEachRemaining(
entry -> {
Object key =
castFromStringInternal(
entry.getKey(), keyType, isCdcValue);
Object value = null;
if (!entry.getValue().isNull()) {
value =
castFromStringInternal(
entry.getValue().toString(),
valueType,
isCdcValue);
}
resultMap.put(key, value);
});
return new GenericMap(resultMap);
} catch (JsonProcessingException e) {
LOG.info(
String.format("Failed to parse MAP for type %s with value %s", type, s),
e);
return new GenericMap(Collections.emptyMap());
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to parse Json String %s", s), e);
}
case ROW:
RowType rowType = (RowType) type;
try {
JsonNode rowNode = OBJECT_MAPPER.readTree(s);
GenericRow genericRow =
new GenericRow(
rowType.getFields()
.size()); // TODO: What about RowKind? always +I?
for (int pos = 0; pos < rowType.getFields().size(); ++pos) {
DataField field = rowType.getFields().get(pos);
JsonNode fieldNode = rowNode.get(field.name());
if (fieldNode != null && !fieldNode.isNull()) {
String fieldJson = fieldNode.toString();
Object fieldObject =
castFromStringInternal(fieldJson, field.type(), isCdcValue);
genericRow.setField(pos, fieldObject);
} else {
genericRow.setField(pos, null); // Handle null fields
}
}
return new GenericArray(binaryStrings);
} else {
throw new UnsupportedOperationException("Unsupported type " + type);
return genericRow;
} catch (JsonProcessingException e) {
LOG.info(
String.format(
"Failed to parse ROW for type %s with value %s", type, s),
e);
return new GenericRow(0);
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to parse Json String %s", s), e);
}
default:
throw new UnsupportedOperationException("Unsupported type " + type);
Expand Down
Loading

0 comments on commit b2641ad

Please sign in to comment.