diff --git a/paimon-common/src/main/java/org/apache/paimon/types/DataField.java b/paimon-common/src/main/java/org/apache/paimon/types/DataField.java index 754e0e86a4aa..e9052684f33f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/DataField.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/DataField.java @@ -170,4 +170,20 @@ public int hashCode() { public String toString() { return asSQLString(); } + + /** + * When the order of the same field is different, its ID may also be different, so the + * comparison should not include the ID. + */ + public static boolean dataFieldEqualsIgnoreId(DataField dataField1, DataField dataField2) { + if (dataField1 == dataField2) { + return true; + } else if (dataField1 != null && dataField2 != null) { + return Objects.equals(dataField1.name(), dataField2.name()) + && Objects.equals(dataField1.type(), dataField2.type()) + && Objects.equals(dataField1.description(), dataField2.description()); + } else { + return false; + } + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java index b31937f90658..9dc89117f331 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java @@ -205,7 +205,17 @@ public boolean equals(Object o) { return false; } RowType rowType = (RowType) o; - return fields.equals(rowType.fields); + // For nested RowTypes e.g. DataField.dataType = RowType we need to ignoreIds as they can be + // different + if (fields.size() != rowType.fields.size()) { + return false; + } + for (int i = 0; i < fields.size(); ++i) { + if (!DataField.dataFieldEqualsIgnoreId(fields.get(i), rowType.fields.get(i))) { + return false; + } + } + return true; } @Override diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java index a5e12e344f3b..299823d35d9c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java @@ -306,6 +306,10 @@ public static int toTime(BinaryString input) throws DateTimeException { /** Used by {@code CAST(x as TIMESTAMP)}. */ public static Timestamp toTimestamp(BinaryString input, int precision) throws DateTimeException { + if (StringUtils.isNumeric(input.toString())) { + long epoch = toLong(input); + return fromMillisToTimestamp(epoch, precision); + } return DateTimeUtils.parseTimestampData(input.toString(), precision); } @@ -315,6 +319,35 @@ public static Timestamp toTimestamp(BinaryString input, int precision, TimeZone return DateTimeUtils.parseTimestampData(input.toString(), precision, timeZone); } + // Helper method to convert epoch to Timestamp with the provided precision. + private static Timestamp fromMillisToTimestamp(long epoch, int precision) { + // Calculate milliseconds and nanoseconds from epoch based on precision + long millis; + int nanosOfMillis; + + switch (precision) { + case 0: // seconds + millis = epoch * 1000; + nanosOfMillis = 0; + break; + case 3: // milliseconds + millis = epoch; + nanosOfMillis = 0; + break; + case 6: // microseconds + millis = epoch / 1000; + nanosOfMillis = (int) ((epoch % 1000) * 1000); + break; + case 9: // nanoseconds + millis = epoch / 1_000_000; + nanosOfMillis = (int) (epoch % 1_000_000); + break; + default: + throw new RuntimeException("Unsupported precision: " + precision); + } + return Timestamp.fromEpochMillis(millis, nanosOfMillis); + } + public static BinaryString toCharacterString(BinaryString strData, DataType type) { final boolean targetCharType = type.getTypeRoot() == CHAR; final int targetLength = DataTypeChecks.getLength(type); diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java index 0f4490521646..8d8d84953da7 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java @@ -21,6 +21,8 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericMap; +import org.apache.paimon.data.GenericRow; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; @@ -28,16 +30,27 @@ import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.DecimalType; import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.MapType; import org.apache.paimon.types.RowType; import org.apache.paimon.types.TimestampType; import org.apache.paimon.types.VarCharType; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.TimeZone; import java.util.stream.Collectors; @@ -47,6 +60,8 @@ /** Type related helper functions. */ public class TypeUtils { + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final Logger LOG = LoggerFactory.getLogger(TypeUtils.class); public static RowType concat(RowType left, RowType right) { RowType.Builder builder = RowType.builder(); @@ -152,21 +167,109 @@ public static Object castFromStringInternal(String s, DataType type, boolean isC case ARRAY: ArrayType arrayType = (ArrayType) type; DataType elementType = arrayType.getElementType(); - if (elementType instanceof VarCharType) { - if (s.startsWith("[")) { - s = s.substring(1); + try { + JsonNode arrayNode = OBJECT_MAPPER.readTree(s); + List resultList = new ArrayList<>(); + for (JsonNode elementNode : arrayNode) { + if (!elementNode.isNull()) { + String elementJson = elementNode.toString(); + Object elementObject = + castFromStringInternal(elementJson, elementType, isCdcValue); + resultList.add(elementObject); + } else { + resultList.add(null); + } } - if (s.endsWith("]")) { - s = s.substring(0, s.length() - 1); + return new GenericArray(resultList.toArray()); + } catch (JsonProcessingException e) { + LOG.info( + String.format( + "Failed to parse ARRAY for type %s with value %s", type, s), + e); + // try existing code flow + if (elementType instanceof VarCharType) { + if (s.startsWith("[")) { + s = s.substring(1); + } + if (s.endsWith("]")) { + s = s.substring(0, s.length() - 1); + } + String[] ss = s.split(","); + BinaryString[] binaryStrings = new BinaryString[ss.length]; + for (int i = 0; i < ss.length; i++) { + binaryStrings[i] = BinaryString.fromString(ss[i]); + } + return new GenericArray(binaryStrings); + } else { + throw new UnsupportedOperationException("Unsupported type " + type); } - String[] ss = s.split(","); - BinaryString[] binaryStrings = new BinaryString[ss.length]; - for (int i = 0; i < ss.length; i++) { - binaryStrings[i] = BinaryString.fromString(ss[i]); + } catch (Exception e) { + throw new RuntimeException( + String.format("Failed to parse Json String %s", s), e); + } + case MAP: + MapType mapType = (MapType) type; + DataType keyType = mapType.getKeyType(); + DataType valueType = mapType.getValueType(); + try { + JsonNode mapNode = OBJECT_MAPPER.readTree(s); + Map resultMap = new HashMap<>(); + mapNode.fields() + .forEachRemaining( + entry -> { + Object key = + castFromStringInternal( + entry.getKey(), keyType, isCdcValue); + Object value = null; + if (!entry.getValue().isNull()) { + value = + castFromStringInternal( + entry.getValue().toString(), + valueType, + isCdcValue); + } + resultMap.put(key, value); + }); + return new GenericMap(resultMap); + } catch (JsonProcessingException e) { + LOG.info( + String.format("Failed to parse MAP for type %s with value %s", type, s), + e); + return new GenericMap(Collections.emptyMap()); + } catch (Exception e) { + throw new RuntimeException( + String.format("Failed to parse Json String %s", s), e); + } + case ROW: + RowType rowType = (RowType) type; + try { + JsonNode rowNode = OBJECT_MAPPER.readTree(s); + GenericRow genericRow = + new GenericRow( + rowType.getFields() + .size()); // TODO: What about RowKind? always +I? + for (int pos = 0; pos < rowType.getFields().size(); ++pos) { + DataField field = rowType.getFields().get(pos); + JsonNode fieldNode = rowNode.get(field.name()); + if (fieldNode != null && !fieldNode.isNull()) { + String fieldJson = fieldNode.toString(); + Object fieldObject = + castFromStringInternal(fieldJson, field.type(), isCdcValue); + genericRow.setField(pos, fieldObject); + } else { + genericRow.setField(pos, null); // Handle null fields + } } - return new GenericArray(binaryStrings); - } else { - throw new UnsupportedOperationException("Unsupported type " + type); + return genericRow; + } catch (JsonProcessingException e) { + LOG.info( + String.format( + "Failed to parse ROW for type %s with value %s", type, s), + e); + return new GenericRow(0); + } catch (Exception e) { + throw new RuntimeException( + String.format("Failed to parse Json String %s", s), e); } default: throw new UnsupportedOperationException("Unsupported type " + type); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java index b705bc9e1d88..114344badeed 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java @@ -20,12 +20,14 @@ import org.apache.paimon.flink.action.cdc.TypeMapping; import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.DecimalType; import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.StringUtils; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; import io.debezium.data.Bits; @@ -36,8 +38,12 @@ import io.debezium.time.MicroTimestamp; import io.debezium.time.Timestamp; import io.debezium.time.ZonedTimestamp; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; import org.apache.kafka.connect.json.JsonConverterConfig; import javax.annotation.Nullable; @@ -49,13 +55,16 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.time.ZoneOffset; +import java.util.ArrayList; import java.util.Base64; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Supplier; import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING; +import static org.apache.paimon.utils.TypeUtils.OBJECT_MAPPER; /** * Utils to handle 'schema' field in debezium Json. TODO: The methods have many duplicate codes with @@ -101,6 +110,7 @@ public static String transformRawValue( e); } }, + origin, serverTimeZone); } @@ -122,6 +132,7 @@ public static String transformAvroRawValue( className, typeMapping, () -> (ByteBuffer) ((GenericRecord) origin).get(Geometry.WKB_FIELD), + origin, serverTimeZone); } @@ -132,6 +143,7 @@ public static String transformRawValue( @Nullable String className, TypeMapping typeMapping, Supplier geometryGetter, + Object origin, ZoneId serverTimeZone) { if (rawValue == null) { return null; @@ -232,11 +244,73 @@ else if (Date.SCHEMA_NAME.equals(className)) { throw new IllegalArgumentException( String.format("Failed to convert %s to geometry JSON.", rawValue), e); } + } else if ((origin instanceof GenericData.Record) + || (origin instanceof GenericData.Array) + || (origin instanceof Map) + || (origin instanceof List)) { + Object convertedObject = convertAvroObjectToJsonCompatible(origin); + try { + transformed = OBJECT_MAPPER.writer().writeValueAsString(convertedObject); + } catch (JsonProcessingException e) { + throw new RuntimeException( + String.format("Failed to convert %s to JSON.", origin), e); + } } return transformed; } + public static Object convertAvroObjectToJsonCompatible(Object avroObject) { + if (avroObject instanceof GenericData.Record) { + return convertRecord((GenericData.Record) avroObject); + } else if (avroObject instanceof GenericData.Array) { + return convertArray((GenericData.Array) avroObject); + } else if (avroObject instanceof Utf8) { + return avroObject.toString(); + } else if (avroObject instanceof Map) { + return convertMap((Map) avroObject); + } else if (avroObject instanceof List) { + return convertList((List) avroObject); + } else { + return avroObject; + } + } + + private static Map convertMap(Map map) { + Map newMap = new HashMap<>(); + for (Map.Entry entry : map.entrySet()) { + Object key = convertAvroObjectToJsonCompatible(entry.getKey()); + Object value = convertAvroObjectToJsonCompatible(entry.getValue()); + newMap.put(key, value); + } + return newMap; + } + + private static List convertList(List list) { + List newList = new ArrayList<>(); + for (Object element : list) { + newList.add(convertAvroObjectToJsonCompatible(element)); + } + return newList; + } + + private static Map convertRecord(GenericData.Record record) { + Map map = new HashMap<>(); + for (Schema.Field field : record.getSchema().getFields()) { + Object value = record.get(field.pos()); + map.put(field.name(), convertAvroObjectToJsonCompatible(value)); + } + return map; + } + + private static List convertArray(GenericData.Array array) { + List list = new ArrayList<>(); + for (Object element : array) { + list.add(convertAvroObjectToJsonCompatible(element)); + } + return list; + } + public static DataType toDataType( String debeziumType, @Nullable String className, Map parameters) { if (className == null) { @@ -362,6 +436,30 @@ public static DataType avroToPaimonDataType(Schema schema) { } private static DataType fromDebeziumAvroType(Schema schema) { + LogicalType logicalType = schema.getLogicalType(); + if (logicalType != null) { + if (logicalType instanceof LogicalTypes.Date) { + return DataTypes.DATE(); + } else if (logicalType instanceof LogicalTypes.TimestampMillis) { + return DataTypes.TIMESTAMP_MILLIS(); + } else if (logicalType instanceof LogicalTypes.TimestampMicros) { + return DataTypes.TIMESTAMP(); + } else if (logicalType instanceof LogicalTypes.Decimal) { + LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType; + return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale()); + } else if (logicalType instanceof LogicalTypes.TimeMillis) { + return DataTypes.TIME(3); + } else if (logicalType instanceof LogicalTypes.TimeMicros) { + return DataTypes.TIME(6); + } else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) { + return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(); + } else if (logicalType instanceof LogicalTypes.LocalTimestampMillis) { + return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3); + } else { + throw new UnsupportedOperationException( + String.format("Don't support logical avro type '%s' yet.", logicalType)); + } + } Schema.Type avroType = schema.getType(); switch (avroType) { case BOOLEAN: @@ -378,8 +476,39 @@ private static DataType fromDebeziumAvroType(Schema schema) { case LONG: return DataTypes.BIGINT(); case STRING: - case RECORD: return DataTypes.STRING(); + case RECORD: + List fields = new ArrayList<>(); + for (Schema.Field field : schema.getFields()) { + DataType fieldType = fromDebeziumAvroType(field.schema()); + fields.add(DataTypes.FIELD(field.pos(), field.name(), fieldType, field.doc())); + } + return DataTypes.ROW(fields.toArray(new DataField[0])); + case ARRAY: + Schema elementSchema = schema.getElementType(); + DataType elementType = fromDebeziumAvroType(elementSchema); + return DataTypes.ARRAY(elementType); + case MAP: + DataType valueType = fromDebeziumAvroType(schema.getValueType()); + return DataTypes.MAP(DataTypes.STRING(), valueType); + case UNION: + List unionTypes = schema.getTypes(); + // Check if it's a nullable type union + if (unionTypes.size() == 2 + && unionTypes.contains(Schema.create(Schema.Type.NULL))) { + Schema actualSchema = + unionTypes.stream() + .filter(s -> s.getType() != Schema.Type.NULL) + .findFirst() + .orElseThrow( + () -> + new IllegalStateException( + "Union type does not contain a non-null type")); + return fromDebeziumAvroType(actualSchema) + .copy(true); // Return nullable version of the non-null type + } + // Handle generic unions or throw an exception + throw new UnsupportedOperationException("Generic unions are not supported"); default: throw new UnsupportedOperationException( String.format("Don't support avro type '%s' yet.", avroType)); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java index 10dbdcc8dc8c..01a002214e80 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java @@ -24,7 +24,6 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; -import java.util.Objects; /** A {@link EventParser} for {@link RichCdcRecord}. */ public class RichEventParser implements EventParser { @@ -48,7 +47,7 @@ public List parseSchemaChange() { // When the order of the same field is different, its ID may also be // different, // so the comparison should not include the ID. - if (!dataFieldEqualsIgnoreId(previous, dataField)) { + if (!DataField.dataFieldEqualsIgnoreId(previous, dataField)) { previousDataFields.put(dataField.name(), dataField); change.add(dataField); } @@ -56,18 +55,6 @@ public List parseSchemaChange() { return change; } - private boolean dataFieldEqualsIgnoreId(DataField dataField1, DataField dataField2) { - if (dataField1 == dataField2) { - return true; - } else if (dataField1 != null && dataField2 != null) { - return Objects.equals(dataField1.name(), dataField2.name()) - && Objects.equals(dataField1.type(), dataField2.type()) - && Objects.equals(dataField1.description(), dataField2.description()); - } else { - return false; - } - } - @Override public List parseRecords() { if (record.hasPayload()) {