diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java index 1a42b2b90..38c63b779 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java @@ -31,7 +31,7 @@ import org.apache.arrow.vector.Float8Vector; import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.SmallIntVector; -import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TimeStampVector; import org.apache.arrow.vector.TinyIntVector; import org.apache.arrow.vector.UInt4Vector; import org.apache.arrow.vector.VarBinaryVector; @@ -334,10 +334,6 @@ public boolean doConvert( } break; case "DATETIME": - if (!minorType.equals(Types.MinorType.TIMESTAMPMICRO) - && !minorType.equals(Types.MinorType.VARCHAR)) { - return false; - } if (minorType.equals(Types.MinorType.VARCHAR)) { VarCharVector varCharVector = (VarCharVector) fieldVector; if (varCharVector.isNull(rowIndex)) { @@ -347,16 +343,18 @@ public boolean doConvert( String stringValue = new String(varCharVector.get(rowIndex)); LocalDateTime parse = LocalDateTime.parse(stringValue, dateTimeFormatter); addValueToRow(rowIndex, parse); - } else { + } else if (fieldVector instanceof TimeStampVector) { LocalDateTime dateTime = getDateTime(rowIndex, fieldVector); addValueToRow(rowIndex, dateTime); + } else { + logger.error( + "Unsupported type for DATETIME, minorType {}, class is {}", + minorType.name(), + fieldVector == null ? null : fieldVector.getClass()); + return false; } break; case "DATETIMEV2": - if (!minorType.equals(Types.MinorType.TIMESTAMPMICRO) - && !minorType.equals(Types.MinorType.VARCHAR)) { - return false; - } if (minorType.equals(Types.MinorType.VARCHAR)) { VarCharVector varCharVector = (VarCharVector) fieldVector; if (varCharVector.isNull(rowIndex)) { @@ -367,9 +365,15 @@ public boolean doConvert( stringValue = completeMilliseconds(stringValue); LocalDateTime parse = LocalDateTime.parse(stringValue, dateTimeV2Formatter); addValueToRow(rowIndex, parse); - } else { + } else if (fieldVector instanceof TimeStampVector) { LocalDateTime dateTime = getDateTime(rowIndex, fieldVector); addValueToRow(rowIndex, dateTime); + } else { + logger.error( + "Unsupported type for DATETIMEV2, minorType {}, class is {}", + minorType.name(), + fieldVector == null ? null : fieldVector.getClass()); + return false; } break; case "LARGEINT": @@ -498,10 +502,12 @@ private Object handleMapFieldReader(FieldReader reader) { @VisibleForTesting public LocalDateTime getDateTime(int rowIndex, FieldVector fieldVector) { - TimeStampMicroVector vector = (TimeStampMicroVector) fieldVector; + TimeStampVector vector = (TimeStampVector) fieldVector; if (vector.isNull(rowIndex)) { return null; } + // todo: Currently, the scale of doris's arrow datetimev2 is hardcoded to 6, + // and there is also a time zone problem in arrow, so use timestamp to convert first long time = vector.get(rowIndex); return longToLocalDateTime(time); } diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java index e1f7b6e5e..3d9ac2617 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java @@ -34,6 +34,8 @@ import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.SmallIntVector; import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.arrow.vector.TimeStampSecVector; import org.apache.arrow.vector.TinyIntVector; import org.apache.arrow.vector.UInt4Vector; import org.apache.arrow.vector.VarBinaryVector; @@ -1331,9 +1333,23 @@ public void testDoConvert() throws Exception { flag = rowBatch.doConvert(1, 1, Types.MinorType.INT, "DATETIME", null); Assert.assertFalse(flag); + flag = rowBatch.doConvert(1, 1, Types.MinorType.TIMESTAMPSEC, "DATETIME", null); + Assert.assertFalse(flag); + + IntVector intVector1 = new IntVector("test", new RootAllocator(Integer.MAX_VALUE)); + intVector1.setNull(0); + flag = rowBatch.doConvert(1, 1, Types.MinorType.TIMESTAMPSEC, "DATETIME", intVector1); + Assert.assertFalse(flag); + flag = rowBatch.doConvert(1, 1, Types.MinorType.INT, "DATETIMEV2", null); Assert.assertFalse(flag); + flag = rowBatch.doConvert(1, 1, Types.MinorType.TIMESTAMPSEC, "DATETIMEV2", null); + Assert.assertFalse(flag); + + flag = rowBatch.doConvert(1, 1, Types.MinorType.TIMESTAMPSEC, "DATETIMEV2", intVector1); + Assert.assertFalse(flag); + flag = rowBatch.doConvert(1, 1, Types.MinorType.INT, "LARGEINT", null); Assert.assertFalse(flag); @@ -1658,4 +1674,97 @@ public void longToLocalDateTimeTest() { Assert.assertArrayEquals(expectArray, resultArray); } + + @Test + public void timestampVector() throws IOException, DorisException { + List childrenBuilder = new ArrayList<>(); + childrenBuilder.add( + new Field( + "k0", + FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)), + null)); + childrenBuilder.add( + new Field( + "k1", + FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND, null)), + null)); + childrenBuilder.add( + new Field( + "k2", + FieldType.nullable(new ArrowType.Timestamp(TimeUnit.SECOND, null)), + null)); + + VectorSchemaRoot root = + VectorSchemaRoot.create( + new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder, null), + new RootAllocator(Integer.MAX_VALUE)); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + ArrowStreamWriter arrowStreamWriter = + new ArrowStreamWriter( + root, new DictionaryProvider.MapDictionaryProvider(), outputStream); + + arrowStreamWriter.start(); + root.setRowCount(1); + + FieldVector vector = root.getVector("k0"); + TimeStampMicroVector mircoVec = (TimeStampMicroVector) vector; + mircoVec.allocateNew(1); + mircoVec.setIndexDefined(0); + mircoVec.setSafe(0, 1721892143586123L); + vector.setValueCount(1); + + vector = root.getVector("k1"); + TimeStampMilliVector milliVector = (TimeStampMilliVector) vector; + milliVector.allocateNew(1); + milliVector.setIndexDefined(0); + milliVector.setSafe(0, 1721892143586L); + vector.setValueCount(1); + + vector = root.getVector("k2"); + TimeStampSecVector secVector = (TimeStampSecVector) vector; + secVector.allocateNew(1); + secVector.setIndexDefined(0); + secVector.setSafe(0, 1721892143L); + vector.setValueCount(1); + + arrowStreamWriter.writeBatch(); + + arrowStreamWriter.end(); + arrowStreamWriter.close(); + + TStatus status = new TStatus(); + status.setStatusCode(TStatusCode.OK); + TScanBatchResult scanBatchResult = new TScanBatchResult(); + scanBatchResult.setStatus(status); + scanBatchResult.setEos(false); + scanBatchResult.setRows(outputStream.toByteArray()); + + String schemaStr = + "{\"properties\":[{\"type\":\"DATETIME\",\"name\":\"k0\",\"comment\":\"\"}, {\"type\":\"DATETIME\",\"name\":\"k1\",\"comment\":\"\"}, {\"type\":\"DATETIME\",\"name\":\"k2\",\"comment\":\"\"}]," + + "\"status\":200}"; + + Schema schema = RestService.parseSchema(schemaStr, logger); + + RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow(); + List next = rowBatch.next(); + Assert.assertEquals(next.size(), 3); + Assert.assertEquals( + next.get(0), + LocalDateTime.of(2024, 7, 25, 15, 22, 23, 586123000) + .atZone(ZoneId.of("UTC+8")) + .withZoneSameInstant(ZoneId.systemDefault()) + .toLocalDateTime()); + Assert.assertEquals( + next.get(1), + LocalDateTime.of(2024, 7, 25, 15, 22, 23, 586000000) + .atZone(ZoneId.of("UTC+8")) + .withZoneSameInstant(ZoneId.systemDefault()) + .toLocalDateTime()); + Assert.assertEquals( + next.get(2), + LocalDateTime.of(2024, 7, 25, 15, 22, 23, 0) + .atZone(ZoneId.of("UTC+8")) + .withZoneSameInstant(ZoneId.systemDefault()) + .toLocalDateTime()); + } }