Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[common][flink] Add support for complex types in kafka debezium avro cdc action #4246

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/types/DataField.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,20 @@ public int hashCode() {
public String toString() {
return asSQLString();
}

/**
* When the order of the same field is different, its ID may also be different, so the
* comparison should not include the ID.
*/
public static boolean dataFieldEqualsIgnoreId(DataField dataField1, DataField dataField2) {
if (dataField1 == dataField2) {
return true;
} else if (dataField1 != null && dataField2 != null) {
return Objects.equals(dataField1.name(), dataField2.name())
&& Objects.equals(dataField1.type(), dataField2.type())
&& Objects.equals(dataField1.description(), dataField2.description());
} else {
return false;
}
}
}
12 changes: 11 additions & 1 deletion paimon-common/src/main/java/org/apache/paimon/types/RowType.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,17 @@ public boolean equals(Object o) {
return false;
}
RowType rowType = (RowType) o;
return fields.equals(rowType.fields);
// For nested RowTypes e.g. DataField.dataType = RowType we need to ignoreIds as they can be
// different
if (fields.size() != rowType.fields.size()) {
return false;
}
for (int i = 0; i < fields.size(); ++i) {
if (!DataField.dataFieldEqualsIgnoreId(fields.get(i), rowType.fields.get(i))) {
return false;
}
}
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,10 @@ public static int toTime(BinaryString input) throws DateTimeException {
/** Used by {@code CAST(x as TIMESTAMP)}. */
public static Timestamp toTimestamp(BinaryString input, int precision)
throws DateTimeException {
if (StringUtils.isNumeric(input.toString())) {
long epoch = toLong(input);
return fromMillisToTimestamp(epoch, precision);
}
return DateTimeUtils.parseTimestampData(input.toString(), precision);
}

Expand All @@ -315,6 +319,35 @@ public static Timestamp toTimestamp(BinaryString input, int precision, TimeZone
return DateTimeUtils.parseTimestampData(input.toString(), precision, timeZone);
}

// Helper method to convert epoch to Timestamp with the provided precision.
private static Timestamp fromMillisToTimestamp(long epoch, int precision) {
// Calculate milliseconds and nanoseconds from epoch based on precision
long millis;
int nanosOfMillis;

switch (precision) {
case 0: // seconds
millis = epoch * 1000;
nanosOfMillis = 0;
break;
case 3: // milliseconds
millis = epoch;
nanosOfMillis = 0;
break;
case 6: // microseconds
millis = epoch / 1000;
nanosOfMillis = (int) ((epoch % 1000) * 1000);
break;
case 9: // nanoseconds
millis = epoch / 1_000_000;
nanosOfMillis = (int) (epoch % 1_000_000);
break;
default:
throw new RuntimeException("Unsupported precision: " + precision);
}
return Timestamp.fromEpochMillis(millis, nanosOfMillis);
}

public static BinaryString toCharacterString(BinaryString strData, DataType type) {
final boolean targetCharType = type.getTypeRoot() == CHAR;
final int targetLength = DataTypeChecks.getLength(type);
Expand Down
127 changes: 115 additions & 12 deletions paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,36 @@
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericMap;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeChecks;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.types.VarCharType;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.stream.Collectors;

Expand All @@ -47,6 +60,8 @@

/** Type related helper functions. */
public class TypeUtils {
public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final Logger LOG = LoggerFactory.getLogger(TypeUtils.class);

public static RowType concat(RowType left, RowType right) {
RowType.Builder builder = RowType.builder();
Expand Down Expand Up @@ -152,21 +167,109 @@ public static Object castFromStringInternal(String s, DataType type, boolean isC
case ARRAY:
ArrayType arrayType = (ArrayType) type;
DataType elementType = arrayType.getElementType();
if (elementType instanceof VarCharType) {
if (s.startsWith("[")) {
s = s.substring(1);
try {
JsonNode arrayNode = OBJECT_MAPPER.readTree(s);
List<Object> resultList = new ArrayList<>();
for (JsonNode elementNode : arrayNode) {
if (!elementNode.isNull()) {
String elementJson = elementNode.toString();
Object elementObject =
castFromStringInternal(elementJson, elementType, isCdcValue);
resultList.add(elementObject);
} else {
resultList.add(null);
}
}
if (s.endsWith("]")) {
s = s.substring(0, s.length() - 1);
return new GenericArray(resultList.toArray());
} catch (JsonProcessingException e) {
LOG.info(
String.format(
"Failed to parse ARRAY for type %s with value %s", type, s),
e);
// try existing code flow
if (elementType instanceof VarCharType) {
if (s.startsWith("[")) {
s = s.substring(1);
}
if (s.endsWith("]")) {
s = s.substring(0, s.length() - 1);
}
String[] ss = s.split(",");
BinaryString[] binaryStrings = new BinaryString[ss.length];
for (int i = 0; i < ss.length; i++) {
binaryStrings[i] = BinaryString.fromString(ss[i]);
}
return new GenericArray(binaryStrings);
} else {
throw new UnsupportedOperationException("Unsupported type " + type);
}
String[] ss = s.split(",");
BinaryString[] binaryStrings = new BinaryString[ss.length];
for (int i = 0; i < ss.length; i++) {
binaryStrings[i] = BinaryString.fromString(ss[i]);
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to parse Json String %s", s), e);
}
case MAP:
MapType mapType = (MapType) type;
DataType keyType = mapType.getKeyType();
DataType valueType = mapType.getValueType();
try {
JsonNode mapNode = OBJECT_MAPPER.readTree(s);
Map<Object, Object> resultMap = new HashMap<>();
mapNode.fields()
.forEachRemaining(
entry -> {
Object key =
castFromStringInternal(
entry.getKey(), keyType, isCdcValue);
Object value = null;
if (!entry.getValue().isNull()) {
value =
castFromStringInternal(
entry.getValue().toString(),
valueType,
isCdcValue);
}
resultMap.put(key, value);
});
return new GenericMap(resultMap);
} catch (JsonProcessingException e) {
LOG.info(
String.format("Failed to parse MAP for type %s with value %s", type, s),
e);
return new GenericMap(Collections.emptyMap());
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to parse Json String %s", s), e);
}
case ROW:
RowType rowType = (RowType) type;
try {
JsonNode rowNode = OBJECT_MAPPER.readTree(s);
GenericRow genericRow =
new GenericRow(
rowType.getFields()
.size()); // TODO: What about RowKind? always +I?
for (int pos = 0; pos < rowType.getFields().size(); ++pos) {
DataField field = rowType.getFields().get(pos);
JsonNode fieldNode = rowNode.get(field.name());
if (fieldNode != null && !fieldNode.isNull()) {
String fieldJson = fieldNode.toString();
Object fieldObject =
castFromStringInternal(fieldJson, field.type(), isCdcValue);
genericRow.setField(pos, fieldObject);
} else {
genericRow.setField(pos, null); // Handle null fields
}
}
return new GenericArray(binaryStrings);
} else {
throw new UnsupportedOperationException("Unsupported type " + type);
return genericRow;
} catch (JsonProcessingException e) {
LOG.info(
String.format(
"Failed to parse ROW for type %s with value %s", type, s),
e);
return new GenericRow(0);
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to parse Json String %s", s), e);
}
default:
throw new UnsupportedOperationException("Unsupported type " + type);
Expand Down
Loading
Loading