diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java index feb9a4f0..742d1809 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/serialization/RowBatch.java @@ -35,7 +35,7 @@ import org.apache.arrow.vector.Float8Vector; import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.SmallIntVector; -import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TimeStampVector; import org.apache.arrow.vector.TinyIntVector; import org.apache.arrow.vector.UInt4Vector; import org.apache.arrow.vector.VarBinaryVector; @@ -47,6 +47,7 @@ import org.apache.arrow.vector.complex.impl.UnionMapReader; import org.apache.arrow.vector.ipc.ArrowStreamReader; import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.Types.MinorType; import org.apache.commons.lang3.ArrayUtils; import org.apache.spark.sql.types.Decimal; import org.slf4j.Logger; @@ -78,37 +79,24 @@ */ public class RowBatch { private static final Logger logger = LoggerFactory.getLogger(RowBatch.class); + private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault(); - private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder() - .appendPattern("yyyy-MM-dd HH:mm:ss") - .appendFraction(ChronoField.MICRO_OF_SECOND, 0, 6, true) - .toFormatter(); - - public static class Row { - private final List cols; - - Row(int colCount) { - this.cols = new ArrayList<>(colCount); - } - - List getCols() { - return cols; - } - - public void put(Object o) { - cols.add(o); - } - } - + private static final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss"; + private static final String DATETIMEV2_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSS"; + private final DateTimeFormatter dateTimeFormatter = + DateTimeFormatter.ofPattern(DATETIME_PATTERN); + private final DateTimeFormatter dateTimeV2Formatter = + DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN); + private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + private final List rowBatch = new ArrayList<>(); + private final ArrowStreamReader arrowStreamReader; + private final RootAllocator rootAllocator; + private final Schema schema; // offset for iterate the rowBatch private int offsetInRowBatch = 0; private int rowCountInOneBatch = 0; private int readRowCount = 0; - private final List rowBatch = new ArrayList<>(); - private final ArrowStreamReader arrowStreamReader; private List fieldVectors; - private final RootAllocator rootAllocator; - private final Schema schema; public RowBatch(TScanBatchResult nextResult, Schema schema) throws DorisException { this.schema = schema; @@ -146,6 +134,19 @@ public RowBatch(TScanBatchResult nextResult, Schema schema) throws DorisExceptio } } + public static LocalDateTime longToLocalDateTime(long time) { + Instant instant; + // Determine the timestamp accuracy and process it + if (time < 10_000_000_000L) { // Second timestamp + instant = Instant.ofEpochSecond(time); + } else if (time < 10_000_000_000_000L) { // milli second + instant = Instant.ofEpochMilli(time); + } else { // micro second + instant = Instant.ofEpochSecond(time / 1_000_000, (time % 1_000_000) * 1_000); + } + return LocalDateTime.ofInstant(instant, DEFAULT_ZONE_ID); + } + public boolean hasNext() { if (offsetInRowBatch >= readRowCount) { rowBatch.clear(); @@ -358,9 +359,13 @@ public void convertArrowToRowBatch() throws DorisException { break; case "DATETIME": case "DATETIMEV2": - Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR) - || mt.equals(Types.MinorType.TIMESTAMPMICRO), + + Preconditions.checkArgument( + mt.equals(Types.MinorType.TIMESTAMPMICRO) || mt.equals(MinorType.VARCHAR) || + mt.equals(MinorType.TIMESTAMPMILLI) || mt.equals(MinorType.TIMESTAMPSEC), typeMismatchMessage(currentType, mt)); + typeMismatchMessage(currentType, mt); + if (mt.equals(Types.MinorType.VARCHAR)) { VarCharVector varCharVector = (VarCharVector) curFieldVector; for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { @@ -369,28 +374,23 @@ public void convertArrowToRowBatch() throws DorisException { continue; } String value = new String(varCharVector.get(rowIndex), StandardCharsets.UTF_8); - addValueToRow(rowIndex, value); + value = completeMilliseconds(value); + LocalDateTime parse = LocalDateTime.parse(value, dateTimeV2Formatter); + addValueToRow(rowIndex, parse); } - } else { - TimeStampMicroVector vector = (TimeStampMicroVector) curFieldVector; + } else if (curFieldVector instanceof TimeStampVector) { + TimeStampVector timeStampVector = (TimeStampVector) curFieldVector; + for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) { - if (vector.isNull(rowIndex)) { + if (timeStampVector.isNull(rowIndex)) { + addValueToRow(rowIndex, null); continue; } - long time = vector.get(rowIndex); - Instant instant; - if (time / 10000000000L == 0) { // datetime(0) - instant = Instant.ofEpochSecond(time); - } else if (time / 10000000000000L == 0) { // datetime(3) - instant = Instant.ofEpochMilli(time); - } else { // datetime(6) - instant = Instant.ofEpochSecond(time / 1000000, time % 1000000 * 1000); - } - LocalDateTime dateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault()); - String formatted = DATE_TIME_FORMATTER.format(dateTime); - addValueToRow(rowIndex, formatted); + LocalDateTime dateTime = getDateTime(rowIndex, timeStampVector); + addValueToRow(rowIndex, dateTime); } + } break; case "CHAR": @@ -511,4 +511,50 @@ public void close() { // do nothing } } + + public LocalDateTime getDateTime(int rowIndex, FieldVector fieldVector) { + TimeStampVector vector = (TimeStampVector) fieldVector; + if (vector.isNull(rowIndex)) { + return null; + } + // todo: Currently, the scale of doris's arrow datetimev2 is hardcoded to 6, + // and there is also a time zone problem in arrow, so use timestamp to convert first + long time = vector.get(rowIndex); + return longToLocalDateTime(time); + } + + public static String completeMilliseconds(String stringValue) { + if (stringValue.length() == DATETIMEV2_PATTERN.length()) { + return stringValue; + } + + if (stringValue.length() < DATETIME_PATTERN.length()) { + return stringValue; + } + + StringBuilder sb = new StringBuilder(stringValue); + if (stringValue.length() == DATETIME_PATTERN.length()) { + sb.append("."); + } + while (sb.toString().length() < DATETIMEV2_PATTERN.length()) { + sb.append(0); + } + return sb.toString(); + } + + public static class Row { + private final List cols; + + Row(int colCount) { + this.cols = new ArrayList<>(colCount); + } + + List getCols() { + return cols; + } + + public void put(Object o) { + cols.add(o); + } + } } diff --git a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java index 0c830500..0320cd86 100644 --- a/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java +++ b/spark-doris-connector/src/test/java/org/apache/doris/spark/serialization/TestRowBatch.java @@ -273,7 +273,7 @@ public void testRowBatch() throws Exception { (float) 1.1, (double) 1.1, Date.valueOf("2008-08-08"), - "2008-08-08 00:00:00", + LocalDateTime.of(2008, 8, 8, 0, 0, 0), Decimal.apply(1234L, 4, 2), "char1" ); @@ -287,7 +287,7 @@ public void testRowBatch() throws Exception { (float) 2.2, (double) 2.2, Date.valueOf("1900-08-08"), - "1900-08-08 00:00:00", + LocalDateTime.of(1900, 8, 8, 0, 0, 0), Decimal.apply(8888L, 4, 2), "char2" ); @@ -301,7 +301,7 @@ public void testRowBatch() throws Exception { (float) 3.3, (double) 3.3, Date.valueOf("2100-08-08"), - "2100-08-08 00:00:00", + LocalDateTime.of(2100, 8, 8, 0, 0, 0), Decimal.apply(10L, 2, 0), "char3" ); @@ -832,16 +832,17 @@ public void testDateTime() throws IOException, DorisException { Assert.assertTrue(rowBatch.hasNext()); List actualRow0 = rowBatch.next(); - Assert.assertEquals("2024-03-20 00:00:00", actualRow0.get(0)); - Assert.assertEquals("2024-03-20 00:00:00", actualRow0.get(1)); + Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0), actualRow0.get(0)); + Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0), actualRow0.get(1)); List actualRow1 = rowBatch.next(); - Assert.assertEquals("2024-03-20 00:00:01", actualRow1.get(0)); - Assert.assertEquals("2024-03-20 00:00:00.123", actualRow1.get(1)); + Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 1), actualRow1.get(0)); + Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0, 123000000), actualRow1.get(1)); List actualRow2 = rowBatch.next(); - Assert.assertEquals("2024-03-20 00:00:02", actualRow2.get(0)); - Assert.assertEquals("2024-03-20 00:00:00.123456", actualRow2.get(1)); + Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 2), actualRow2.get(0)); + Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0, 123456000), actualRow2.get(1)); + Assert.assertFalse(rowBatch.hasNext()); thrown.expect(NoSuchElementException.class); @@ -906,6 +907,7 @@ public void testVariant() throws DorisException, IOException { Assert.assertTrue(rowBatch.hasNext()); List actualRow0 = rowBatch.next(); + Assert.assertEquals("{\"id\":\"a\"}", actualRow0.get(0)); List actualRow1 = rowBatch.next();