From 0a414f3b081d2e209bd9d4dcc0fea1471c9a506d Mon Sep 17 00:00:00 2001 From: Ashish Khatkar Date: Tue, 24 Sep 2024 09:58:59 +0100 Subject: [PATCH 1/6] [common][flink] Add support for complex types in kafka debezium avro cdc action --- .../org/apache/paimon/types/DataField.java | 16 +++ .../java/org/apache/paimon/types/RowType.java | 12 +- .../paimon/utils/BinaryStringUtils.java | 26 ++++ .../org/apache/paimon/utils/TypeUtils.java | 113 +++++++++++++-- .../format/debezium/DebeziumSchemaUtils.java | 131 +++++++++++++++++- .../flink/sink/cdc/RichEventParser.java | 15 +- 6 files changed, 284 insertions(+), 29 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/types/DataField.java b/paimon-common/src/main/java/org/apache/paimon/types/DataField.java index baffbcf3f997..a35af1094583 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/DataField.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/DataField.java @@ -153,4 +153,20 @@ public int hashCode() { public String toString() { return asSQLString(); } + + /** + * When the order of the same field is different, its ID may also be different, so the + * comparison should not include the ID. + */ + public static boolean dataFieldEqualsIgnoreId(DataField dataField1, DataField dataField2) { + if (dataField1 == dataField2) { + return true; + } else if (dataField1 != null && dataField2 != null) { + return Objects.equals(dataField1.name(), dataField2.name()) + && Objects.equals(dataField1.type(), dataField2.type()) + && Objects.equals(dataField1.description(), dataField2.description()); + } else { + return false; + } + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java index dc7c7ae32cfa..3aeca82b3081 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java @@ -174,7 +174,17 @@ public boolean equals(Object o) { return false; } RowType rowType = (RowType) o; - return fields.equals(rowType.fields); + // For nested RowTypes e.g. DataField.dataType = RowType we need to ignoreIds as they can be + // different + if (fields.size() != rowType.fields.size()) { + return false; + } + for (int i = 0; i < fields.size(); ++i) { + if (!DataField.dataFieldEqualsIgnoreId(fields.get(i), rowType.fields.get(i))) { + return false; + } + } + return true; } @Override diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java index a5e12e344f3b..99dd92482dea 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java @@ -25,6 +25,8 @@ import org.apache.paimon.types.DataTypeChecks; import java.time.DateTimeException; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.Arrays; import java.util.List; import java.util.TimeZone; @@ -32,6 +34,7 @@ import java.util.stream.Stream; import static org.apache.paimon.data.BinaryString.fromString; +import static org.apache.paimon.data.Timestamp.fromLocalDateTime; import static org.apache.paimon.types.DataTypeRoot.BINARY; import static org.apache.paimon.types.DataTypeRoot.CHAR; @@ -306,6 +309,10 @@ public static int toTime(BinaryString input) throws DateTimeException { /** Used by {@code CAST(x as TIMESTAMP)}. */ public static Timestamp toTimestamp(BinaryString input, int precision) throws DateTimeException { + if (StringUtils.isNumeric(input.toString())) { + long millis = toLong(input); + return fromMillisToTimestamp(millis, precision); + } return DateTimeUtils.parseTimestampData(input.toString(), precision); } @@ -315,6 +322,25 @@ public static Timestamp toTimestamp(BinaryString input, int precision, TimeZone return DateTimeUtils.parseTimestampData(input.toString(), precision, timeZone); } + // Helper method to convert milliseconds to Timestamp with the provided precision. + private static Timestamp fromMillisToTimestamp(long millis, int precision) { + // Calculate seconds and nanoseconds from the millis + long seconds = millis / 1000; + int nanos = (int) ((millis % 1000) * 1_000_000); + + // Adjust the nanoseconds to the specified precision + if (precision < 9) { + nanos = + (int) + (Math.floor(nanos / Math.pow(10, 9 - precision)) + * Math.pow(10, 9 - precision)); + } + + // Create a LocalDateTime from the seconds and nanoseconds + LocalDateTime dateTime = LocalDateTime.ofEpochSecond(seconds, nanos, ZoneOffset.UTC); + return fromLocalDateTime(dateTime); + } + public static BinaryString toCharacterString(BinaryString strData, DataType type) { final boolean targetCharType = type.getTypeRoot() == CHAR; final int targetLength = DataTypeChecks.getLength(type); diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java index 0f4490521646..0e99439f33a4 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java @@ -21,6 +21,8 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericMap; +import org.apache.paimon.data.GenericRow; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; @@ -28,16 +30,27 @@ import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.DecimalType; import org.apache.paimon.types.LocalZonedTimestampType; +import org.apache.paimon.types.MapType; import org.apache.paimon.types.RowType; import org.apache.paimon.types.TimestampType; import org.apache.paimon.types.VarCharType; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.TimeZone; import java.util.stream.Collectors; @@ -47,6 +60,8 @@ /** Type related helper functions. */ public class TypeUtils { + public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final Logger LOG = LoggerFactory.getLogger(TypeUtils.class); public static RowType concat(RowType left, RowType right) { RowType.Builder builder = RowType.builder(); @@ -152,21 +167,93 @@ public static Object castFromStringInternal(String s, DataType type, boolean isC case ARRAY: ArrayType arrayType = (ArrayType) type; DataType elementType = arrayType.getElementType(); - if (elementType instanceof VarCharType) { - if (s.startsWith("[")) { - s = s.substring(1); - } - if (s.endsWith("]")) { - s = s.substring(0, s.length() - 1); + try { + JsonNode arrayNode = OBJECT_MAPPER.readTree(s); + List resultList = new ArrayList<>(); + for (JsonNode elementNode : arrayNode) { + if (!elementNode.isNull()) { + String elementJson = elementNode.toString(); + Object elementObject = + castFromStringInternal(elementJson, elementType, isCdcValue); + resultList.add(elementObject); + } else { + resultList.add(null); + } } - String[] ss = s.split(","); - BinaryString[] binaryStrings = new BinaryString[ss.length]; - for (int i = 0; i < ss.length; i++) { - binaryStrings[i] = BinaryString.fromString(ss[i]); + return new GenericArray(resultList.toArray()); + } catch (JsonProcessingException e) { + LOG.info( + String.format( + "Failed to parse ARRAY for type %s with value %s", type, s), + e); + return new GenericArray(new int[] {}); + } catch (Exception e) { + throw new RuntimeException( + String.format("Failed to parse Json String %s", s), e); + } + case MAP: + MapType mapType = (MapType) type; + DataType keyType = mapType.getKeyType(); + DataType valueType = mapType.getValueType(); + try { + JsonNode mapNode = OBJECT_MAPPER.readTree(s); + Map resultMap = new HashMap<>(); + mapNode.fields() + .forEachRemaining( + entry -> { + Object key = + castFromStringInternal( + entry.getKey(), keyType, isCdcValue); + Object value = null; + if (!entry.getValue().isNull()) { + value = + castFromStringInternal( + entry.getValue().toString(), + valueType, + isCdcValue); + } + resultMap.put(key, value); + }); + return new GenericMap(resultMap); + } catch (JsonProcessingException e) { + LOG.info( + String.format("Failed to parse MAP for type %s with value %s", type, s), + e); + return new GenericMap(Collections.emptyMap()); + } catch (Exception e) { + throw new RuntimeException( + String.format("Failed to parse Json String %s", s), e); + } + case ROW: + RowType rowType = (RowType) type; + try { + JsonNode rowNode = OBJECT_MAPPER.readTree(s); + GenericRow genericRow = + new GenericRow( + rowType.getFields() + .size()); // TODO: What about RowKind? always +I? + for (int pos = 0; pos < rowType.getFields().size(); ++pos) { + DataField field = rowType.getFields().get(pos); + JsonNode fieldNode = rowNode.get(field.name()); + if (fieldNode != null && !fieldNode.isNull()) { + String fieldJson = fieldNode.toString(); + Object fieldObject = + castFromStringInternal(fieldJson, field.type(), isCdcValue); + genericRow.setField(pos, fieldObject); + } else { + genericRow.setField(pos, null); // Handle null fields + } } - return new GenericArray(binaryStrings); - } else { - throw new UnsupportedOperationException("Unsupported type " + type); + return genericRow; + } catch (JsonProcessingException e) { + LOG.info( + String.format( + "Failed to parse ROW for type %s with value %s", type, s), + e); + return new GenericRow(0); + } catch (Exception e) { + throw new RuntimeException( + String.format("Failed to parse Json String %s", s), e); } default: throw new UnsupportedOperationException("Unsupported type " + type); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java index b705bc9e1d88..59afcf7dce36 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java @@ -20,12 +20,14 @@ import org.apache.paimon.flink.action.cdc.TypeMapping; import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.DecimalType; import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.StringUtils; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; import io.debezium.data.Bits; @@ -36,8 +38,12 @@ import io.debezium.time.MicroTimestamp; import io.debezium.time.Timestamp; import io.debezium.time.ZonedTimestamp; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; import org.apache.kafka.connect.json.JsonConverterConfig; import javax.annotation.Nullable; @@ -49,13 +55,16 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.time.ZoneOffset; +import java.util.ArrayList; import java.util.Base64; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.Supplier; import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING; +import static org.apache.paimon.utils.TypeUtils.OBJECT_MAPPER; /** * Utils to handle 'schema' field in debezium Json. TODO: The methods have many duplicate codes with @@ -101,6 +110,8 @@ public static String transformRawValue( e); } }, + origin, + false, serverTimeZone); } @@ -122,6 +133,8 @@ public static String transformAvroRawValue( className, typeMapping, () -> (ByteBuffer) ((GenericRecord) origin).get(Geometry.WKB_FIELD), + origin, + true, serverTimeZone); } @@ -132,6 +145,8 @@ public static String transformRawValue( @Nullable String className, TypeMapping typeMapping, Supplier geometryGetter, + Object origin, + boolean isAvro, ZoneId serverTimeZone) { if (rawValue == null) { return null; @@ -232,11 +247,70 @@ else if (Date.SCHEMA_NAME.equals(className)) { throw new IllegalArgumentException( String.format("Failed to convert %s to geometry JSON.", rawValue), e); } + } else if (isAvro) { + Object convertedObject = convertAvroObjectToJsonCompatible(origin); + try { + transformed = OBJECT_MAPPER.writer().writeValueAsString(convertedObject); + } catch (JsonProcessingException e) { + throw new RuntimeException( + String.format("Failed to convert %s to JSON.", origin), e); + } } return transformed; } + public static Object convertAvroObjectToJsonCompatible(Object avroObject) { + if (avroObject instanceof GenericData.Record) { + return convertRecord((GenericData.Record) avroObject); + } else if (avroObject instanceof GenericData.Array) { + return convertArray((GenericData.Array) avroObject); + } else if (avroObject instanceof Utf8) { + return avroObject.toString(); + } else if (avroObject instanceof Map) { + return convertMap((Map) avroObject); + } else if (avroObject instanceof List) { + return convertList((List) avroObject); + } else { + return avroObject; + } + } + + private static Map convertMap(Map map) { + Map newMap = new HashMap<>(); + for (Map.Entry entry : map.entrySet()) { + Object key = convertAvroObjectToJsonCompatible(entry.getKey()); + Object value = convertAvroObjectToJsonCompatible(entry.getValue()); + newMap.put(key, value); + } + return newMap; + } + + private static List convertList(List list) { + List newList = new ArrayList<>(); + for (Object element : list) { + newList.add(convertAvroObjectToJsonCompatible(element)); + } + return newList; + } + + private static Map convertRecord(GenericData.Record record) { + Map map = new HashMap<>(); + for (Schema.Field field : record.getSchema().getFields()) { + Object value = record.get(field.pos()); + map.put(field.name(), convertAvroObjectToJsonCompatible(value)); + } + return map; + } + + private static List convertArray(GenericData.Array array) { + List list = new ArrayList<>(); + for (Object element : array) { + list.add(convertAvroObjectToJsonCompatible(element)); + } + return list; + } + public static DataType toDataType( String debeziumType, @Nullable String className, Map parameters) { if (className == null) { @@ -362,6 +436,30 @@ public static DataType avroToPaimonDataType(Schema schema) { } private static DataType fromDebeziumAvroType(Schema schema) { + LogicalType logicalType = schema.getLogicalType(); + if (logicalType != null) { + if (logicalType instanceof LogicalTypes.Date) { + return DataTypes.DATE(); + } else if (logicalType instanceof LogicalTypes.TimestampMillis) { + return DataTypes.TIMESTAMP_MILLIS(); + } else if (logicalType instanceof LogicalTypes.TimestampMicros) { + return DataTypes.TIMESTAMP(); + } else if (logicalType instanceof LogicalTypes.Decimal) { + LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType; + return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale()); + } else if (logicalType instanceof LogicalTypes.TimeMillis) { + return DataTypes.TIME(3); + } else if (logicalType instanceof LogicalTypes.TimeMicros) { + return DataTypes.TIME(6); + } else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) { + return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(); + } else if (logicalType instanceof LogicalTypes.LocalTimestampMillis) { + return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3); + } else { + throw new UnsupportedOperationException( + String.format("Don't support logical avro type '%s' yet.", logicalType)); + } + } Schema.Type avroType = schema.getType(); switch (avroType) { case BOOLEAN: @@ -378,8 +476,39 @@ private static DataType fromDebeziumAvroType(Schema schema) { case LONG: return DataTypes.BIGINT(); case STRING: - case RECORD: return DataTypes.STRING(); + case RECORD: + List fields = new ArrayList<>(); + for (Schema.Field field : schema.getFields()) { + DataType fieldType = fromDebeziumAvroType(field.schema()); + fields.add(DataTypes.FIELD(field.pos(), field.name(), fieldType, field.doc())); + } + return DataTypes.ROW(fields.toArray(new DataField[0])); + case ARRAY: + Schema elementSchema = schema.getElementType(); + DataType elementType = fromDebeziumAvroType(elementSchema); + return DataTypes.ARRAY(elementType); + case MAP: + DataType valueType = fromDebeziumAvroType(schema.getValueType()); + return DataTypes.MAP(DataTypes.STRING(), valueType); + case UNION: + List unionTypes = schema.getTypes(); + // Check if it's a nullable type union + if (unionTypes.size() == 2 + && unionTypes.contains(Schema.create(Schema.Type.NULL))) { + Schema actualSchema = + unionTypes.stream() + .filter(s -> s.getType() != Schema.Type.NULL) + .findFirst() + .orElseThrow( + () -> + new IllegalStateException( + "Union type does not contain a non-null type")); + return fromDebeziumAvroType(actualSchema) + .copy(true); // Return nullable version of the non-null type + } + // Handle generic unions or throw an exception + throw new UnsupportedOperationException("Generic unions are not supported"); default: throw new UnsupportedOperationException( String.format("Don't support avro type '%s' yet.", avroType)); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java index 10dbdcc8dc8c..01a002214e80 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichEventParser.java @@ -24,7 +24,6 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; -import java.util.Objects; /** A {@link EventParser} for {@link RichCdcRecord}. */ public class RichEventParser implements EventParser { @@ -48,7 +47,7 @@ public List parseSchemaChange() { // When the order of the same field is different, its ID may also be // different, // so the comparison should not include the ID. - if (!dataFieldEqualsIgnoreId(previous, dataField)) { + if (!DataField.dataFieldEqualsIgnoreId(previous, dataField)) { previousDataFields.put(dataField.name(), dataField); change.add(dataField); } @@ -56,18 +55,6 @@ public List parseSchemaChange() { return change; } - private boolean dataFieldEqualsIgnoreId(DataField dataField1, DataField dataField2) { - if (dataField1 == dataField2) { - return true; - } else if (dataField1 != null && dataField2 != null) { - return Objects.equals(dataField1.name(), dataField2.name()) - && Objects.equals(dataField1.type(), dataField2.type()) - && Objects.equals(dataField1.description(), dataField2.description()); - } else { - return false; - } - } - @Override public List parseRecords() { if (record.hasPayload()) { From 44e1386b5f47215b69fa256242f248ca215b42e3 Mon Sep 17 00:00:00 2001 From: Ashish Khatkar Date: Tue, 24 Sep 2024 18:47:58 +0100 Subject: [PATCH 2/6] Fix itest and separate the code flow for complex types --- .../org/apache/paimon/utils/TypeUtils.java | 24 +++++++++++++++---- .../format/debezium/DebeziumSchemaUtils.java | 8 +++---- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java index 0e99439f33a4..8906af3e2a30 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java @@ -172,7 +172,7 @@ public static Object castFromStringInternal(String s, DataType type, boolean isC List resultList = new ArrayList<>(); for (JsonNode elementNode : arrayNode) { if (!elementNode.isNull()) { - String elementJson = elementNode.toString(); + String elementJson = elementNode.asText(); Object elementObject = castFromStringInternal(elementJson, elementType, isCdcValue); resultList.add(elementObject); @@ -186,7 +186,23 @@ public static Object castFromStringInternal(String s, DataType type, boolean isC String.format( "Failed to parse ARRAY for type %s with value %s", type, s), e); - return new GenericArray(new int[] {}); + // try existing code flow + if (elementType instanceof VarCharType) { + if (s.startsWith("[")) { + s = s.substring(1); + } + if (s.endsWith("]")) { + s = s.substring(0, s.length() - 1); + } + String[] ss = s.split(","); + BinaryString[] binaryStrings = new BinaryString[ss.length]; + for (int i = 0; i < ss.length; i++) { + binaryStrings[i] = BinaryString.fromString(ss[i]); + } + return new GenericArray(binaryStrings); + } else { + throw new UnsupportedOperationException("Unsupported type " + type); + } } catch (Exception e) { throw new RuntimeException( String.format("Failed to parse Json String %s", s), e); @@ -208,7 +224,7 @@ public static Object castFromStringInternal(String s, DataType type, boolean isC if (!entry.getValue().isNull()) { value = castFromStringInternal( - entry.getValue().toString(), + entry.getValue().asText(), valueType, isCdcValue); } @@ -236,7 +252,7 @@ public static Object castFromStringInternal(String s, DataType type, boolean isC DataField field = rowType.getFields().get(pos); JsonNode fieldNode = rowNode.get(field.name()); if (fieldNode != null && !fieldNode.isNull()) { - String fieldJson = fieldNode.toString(); + String fieldJson = fieldNode.asText(); Object fieldObject = castFromStringInternal(fieldJson, field.type(), isCdcValue); genericRow.setField(pos, fieldObject); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java index 59afcf7dce36..114344badeed 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java @@ -111,7 +111,6 @@ public static String transformRawValue( } }, origin, - false, serverTimeZone); } @@ -134,7 +133,6 @@ public static String transformAvroRawValue( typeMapping, () -> (ByteBuffer) ((GenericRecord) origin).get(Geometry.WKB_FIELD), origin, - true, serverTimeZone); } @@ -146,7 +144,6 @@ public static String transformRawValue( TypeMapping typeMapping, Supplier geometryGetter, Object origin, - boolean isAvro, ZoneId serverTimeZone) { if (rawValue == null) { return null; @@ -247,7 +244,10 @@ else if (Date.SCHEMA_NAME.equals(className)) { throw new IllegalArgumentException( String.format("Failed to convert %s to geometry JSON.", rawValue), e); } - } else if (isAvro) { + } else if ((origin instanceof GenericData.Record) + || (origin instanceof GenericData.Array) + || (origin instanceof Map) + || (origin instanceof List)) { Object convertedObject = convertAvroObjectToJsonCompatible(origin); try { transformed = OBJECT_MAPPER.writer().writeValueAsString(convertedObject); From e699a3afbc76e1fb7616b7c388ccb23536d3553b Mon Sep 17 00:00:00 2001 From: Ashish Khatkar Date: Tue, 24 Sep 2024 19:30:48 +0100 Subject: [PATCH 3/6] Use toString instead of asText --- .../src/main/java/org/apache/paimon/utils/TypeUtils.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java index 8906af3e2a30..8d8d84953da7 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/TypeUtils.java @@ -172,7 +172,7 @@ public static Object castFromStringInternal(String s, DataType type, boolean isC List resultList = new ArrayList<>(); for (JsonNode elementNode : arrayNode) { if (!elementNode.isNull()) { - String elementJson = elementNode.asText(); + String elementJson = elementNode.toString(); Object elementObject = castFromStringInternal(elementJson, elementType, isCdcValue); resultList.add(elementObject); @@ -224,7 +224,7 @@ public static Object castFromStringInternal(String s, DataType type, boolean isC if (!entry.getValue().isNull()) { value = castFromStringInternal( - entry.getValue().asText(), + entry.getValue().toString(), valueType, isCdcValue); } @@ -252,7 +252,7 @@ public static Object castFromStringInternal(String s, DataType type, boolean isC DataField field = rowType.getFields().get(pos); JsonNode fieldNode = rowNode.get(field.name()); if (fieldNode != null && !fieldNode.isNull()) { - String fieldJson = fieldNode.asText(); + String fieldJson = fieldNode.toString(); Object fieldObject = castFromStringInternal(fieldJson, field.type(), isCdcValue); genericRow.setField(pos, fieldObject); From c627b3c47e40977699d433f35ab8f5d5cade6526 Mon Sep 17 00:00:00 2001 From: Ashish Khatkar Date: Tue, 8 Oct 2024 18:03:16 +0100 Subject: [PATCH 4/6] Use Timestamp.fromEpochMillis when parsing epoch string --- .../paimon/utils/BinaryStringUtils.java | 47 +++++++++++-------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java index 99dd92482dea..9a1e1ce07778 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java @@ -25,8 +25,6 @@ import org.apache.paimon.types.DataTypeChecks; import java.time.DateTimeException; -import java.time.LocalDateTime; -import java.time.ZoneOffset; import java.util.Arrays; import java.util.List; import java.util.TimeZone; @@ -34,7 +32,6 @@ import java.util.stream.Stream; import static org.apache.paimon.data.BinaryString.fromString; -import static org.apache.paimon.data.Timestamp.fromLocalDateTime; import static org.apache.paimon.types.DataTypeRoot.BINARY; import static org.apache.paimon.types.DataTypeRoot.CHAR; @@ -310,8 +307,8 @@ public static int toTime(BinaryString input) throws DateTimeException { public static Timestamp toTimestamp(BinaryString input, int precision) throws DateTimeException { if (StringUtils.isNumeric(input.toString())) { - long millis = toLong(input); - return fromMillisToTimestamp(millis, precision); + long epoch = toLong(input); + return fromMillisToTimestamp(epoch, precision); } return DateTimeUtils.parseTimestampData(input.toString(), precision); } @@ -323,22 +320,32 @@ public static Timestamp toTimestamp(BinaryString input, int precision, TimeZone } // Helper method to convert milliseconds to Timestamp with the provided precision. - private static Timestamp fromMillisToTimestamp(long millis, int precision) { - // Calculate seconds and nanoseconds from the millis - long seconds = millis / 1000; - int nanos = (int) ((millis % 1000) * 1_000_000); - - // Adjust the nanoseconds to the specified precision - if (precision < 9) { - nanos = - (int) - (Math.floor(nanos / Math.pow(10, 9 - precision)) - * Math.pow(10, 9 - precision)); + private static Timestamp fromMillisToTimestamp(long epoch, int precision) { + // Calculate milliseconds and nanoseconds from epoch based on precision + long millis; + int nanosOfMillis; + + switch (precision) { + case 0: // seconds + millis = epoch * 1000; + nanosOfMillis = 0; + break; + case 3: // milliseconds + millis = epoch; + nanosOfMillis = 0; + break; + case 6: // microseconds + millis = epoch / 1000; + nanosOfMillis = (int)((epoch % 1000) * 1000); + break; + case 9: // nanoseconds + millis = epoch / 1_000_000; + nanosOfMillis = (int)(epoch % 1_000_000); + break; + default: + throw new RuntimeException("Unsupported precision: " + precision); } - - // Create a LocalDateTime from the seconds and nanoseconds - LocalDateTime dateTime = LocalDateTime.ofEpochSecond(seconds, nanos, ZoneOffset.UTC); - return fromLocalDateTime(dateTime); + return Timestamp.fromEpochMillis(millis, nanosOfMillis); } public static BinaryString toCharacterString(BinaryString strData, DataType type) { From 95155e673c30ba0c3e6bde1670d7e6b23b410d98 Mon Sep 17 00:00:00 2001 From: Ashish Khatkar Date: Tue, 8 Oct 2024 18:03:55 +0100 Subject: [PATCH 5/6] nit: fix comment --- .../main/java/org/apache/paimon/utils/BinaryStringUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java index 9a1e1ce07778..5385a79f05bb 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java @@ -319,7 +319,7 @@ public static Timestamp toTimestamp(BinaryString input, int precision, TimeZone return DateTimeUtils.parseTimestampData(input.toString(), precision, timeZone); } - // Helper method to convert milliseconds to Timestamp with the provided precision. + // Helper method to convert epoch to Timestamp with the provided precision. private static Timestamp fromMillisToTimestamp(long epoch, int precision) { // Calculate milliseconds and nanoseconds from epoch based on precision long millis; From 3d6d9e3753392d3df6ca86195e8d6b698d0243b1 Mon Sep 17 00:00:00 2001 From: Ashish Khatkar Date: Tue, 8 Oct 2024 18:16:36 +0100 Subject: [PATCH 6/6] Apply spotless --- .../main/java/org/apache/paimon/utils/BinaryStringUtils.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java index 5385a79f05bb..299823d35d9c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java @@ -336,11 +336,11 @@ private static Timestamp fromMillisToTimestamp(long epoch, int precision) { break; case 6: // microseconds millis = epoch / 1000; - nanosOfMillis = (int)((epoch % 1000) * 1000); + nanosOfMillis = (int) ((epoch % 1000) * 1000); break; case 9: // nanoseconds millis = epoch / 1_000_000; - nanosOfMillis = (int)(epoch % 1_000_000); + nanosOfMillis = (int) (epoch % 1_000_000); break; default: throw new RuntimeException("Unsupported precision: " + precision);