Skip to content

Commit

Permalink
[Fix] fix arrow read timestamp bug (apache#446)
Browse files Browse the repository at this point in the history
  • Loading branch information
JNSimba authored Aug 2, 2024
1 parent 130c0a0 commit 27bf18e
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.TimeStampVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.VarBinaryVector;
Expand Down Expand Up @@ -334,10 +334,6 @@ public boolean doConvert(
}
break;
case "DATETIME":
if (!minorType.equals(Types.MinorType.TIMESTAMPMICRO)
&& !minorType.equals(Types.MinorType.VARCHAR)) {
return false;
}
if (minorType.equals(Types.MinorType.VARCHAR)) {
VarCharVector varCharVector = (VarCharVector) fieldVector;
if (varCharVector.isNull(rowIndex)) {
Expand All @@ -347,16 +343,18 @@ public boolean doConvert(
String stringValue = new String(varCharVector.get(rowIndex));
LocalDateTime parse = LocalDateTime.parse(stringValue, dateTimeFormatter);
addValueToRow(rowIndex, parse);
} else {
} else if (fieldVector instanceof TimeStampVector) {
LocalDateTime dateTime = getDateTime(rowIndex, fieldVector);
addValueToRow(rowIndex, dateTime);
} else {
logger.error(
"Unsupported type for DATETIME, minorType {}, class is {}",
minorType.name(),
fieldVector == null ? null : fieldVector.getClass());
return false;
}
break;
case "DATETIMEV2":
if (!minorType.equals(Types.MinorType.TIMESTAMPMICRO)
&& !minorType.equals(Types.MinorType.VARCHAR)) {
return false;
}
if (minorType.equals(Types.MinorType.VARCHAR)) {
VarCharVector varCharVector = (VarCharVector) fieldVector;
if (varCharVector.isNull(rowIndex)) {
Expand All @@ -367,9 +365,15 @@ public boolean doConvert(
stringValue = completeMilliseconds(stringValue);
LocalDateTime parse = LocalDateTime.parse(stringValue, dateTimeV2Formatter);
addValueToRow(rowIndex, parse);
} else {
} else if (fieldVector instanceof TimeStampVector) {
LocalDateTime dateTime = getDateTime(rowIndex, fieldVector);
addValueToRow(rowIndex, dateTime);
} else {
logger.error(
"Unsupported type for DATETIMEV2, minorType {}, class is {}",
minorType.name(),
fieldVector == null ? null : fieldVector.getClass());
return false;
}
break;
case "LARGEINT":
Expand Down Expand Up @@ -498,10 +502,12 @@ private Object handleMapFieldReader(FieldReader reader) {

@VisibleForTesting
public LocalDateTime getDateTime(int rowIndex, FieldVector fieldVector) {
TimeStampMicroVector vector = (TimeStampMicroVector) fieldVector;
TimeStampVector vector = (TimeStampVector) fieldVector;
if (vector.isNull(rowIndex)) {
return null;
}
// todo: Currently, the scale of doris's arrow datetimev2 is hardcoded to 6,
// and there is also a time zone problem in arrow, so use timestamp to convert first
long time = vector.get(rowIndex);
return longToLocalDateTime(time);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.TimeStampMilliVector;
import org.apache.arrow.vector.TimeStampSecVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.VarBinaryVector;
Expand Down Expand Up @@ -1331,9 +1333,23 @@ public void testDoConvert() throws Exception {
flag = rowBatch.doConvert(1, 1, Types.MinorType.INT, "DATETIME", null);
Assert.assertFalse(flag);

flag = rowBatch.doConvert(1, 1, Types.MinorType.TIMESTAMPSEC, "DATETIME", null);
Assert.assertFalse(flag);

IntVector intVector1 = new IntVector("test", new RootAllocator(Integer.MAX_VALUE));
intVector1.setNull(0);
flag = rowBatch.doConvert(1, 1, Types.MinorType.TIMESTAMPSEC, "DATETIME", intVector1);
Assert.assertFalse(flag);

flag = rowBatch.doConvert(1, 1, Types.MinorType.INT, "DATETIMEV2", null);
Assert.assertFalse(flag);

flag = rowBatch.doConvert(1, 1, Types.MinorType.TIMESTAMPSEC, "DATETIMEV2", null);
Assert.assertFalse(flag);

flag = rowBatch.doConvert(1, 1, Types.MinorType.TIMESTAMPSEC, "DATETIMEV2", intVector1);
Assert.assertFalse(flag);

flag = rowBatch.doConvert(1, 1, Types.MinorType.INT, "LARGEINT", null);
Assert.assertFalse(flag);

Expand Down Expand Up @@ -1658,4 +1674,97 @@ public void longToLocalDateTimeTest() {

Assert.assertArrayEquals(expectArray, resultArray);
}

@Test
public void timestampVector() throws IOException, DorisException {
List<Field> childrenBuilder = new ArrayList<>();
childrenBuilder.add(
new Field(
"k0",
FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MICROSECOND, null)),
null));
childrenBuilder.add(
new Field(
"k1",
FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND, null)),
null));
childrenBuilder.add(
new Field(
"k2",
FieldType.nullable(new ArrowType.Timestamp(TimeUnit.SECOND, null)),
null));

VectorSchemaRoot root =
VectorSchemaRoot.create(
new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder, null),
new RootAllocator(Integer.MAX_VALUE));
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ArrowStreamWriter arrowStreamWriter =
new ArrowStreamWriter(
root, new DictionaryProvider.MapDictionaryProvider(), outputStream);

arrowStreamWriter.start();
root.setRowCount(1);

FieldVector vector = root.getVector("k0");
TimeStampMicroVector mircoVec = (TimeStampMicroVector) vector;
mircoVec.allocateNew(1);
mircoVec.setIndexDefined(0);
mircoVec.setSafe(0, 1721892143586123L);
vector.setValueCount(1);

vector = root.getVector("k1");
TimeStampMilliVector milliVector = (TimeStampMilliVector) vector;
milliVector.allocateNew(1);
milliVector.setIndexDefined(0);
milliVector.setSafe(0, 1721892143586L);
vector.setValueCount(1);

vector = root.getVector("k2");
TimeStampSecVector secVector = (TimeStampSecVector) vector;
secVector.allocateNew(1);
secVector.setIndexDefined(0);
secVector.setSafe(0, 1721892143L);
vector.setValueCount(1);

arrowStreamWriter.writeBatch();

arrowStreamWriter.end();
arrowStreamWriter.close();

TStatus status = new TStatus();
status.setStatusCode(TStatusCode.OK);
TScanBatchResult scanBatchResult = new TScanBatchResult();
scanBatchResult.setStatus(status);
scanBatchResult.setEos(false);
scanBatchResult.setRows(outputStream.toByteArray());

String schemaStr =
"{\"properties\":[{\"type\":\"DATETIME\",\"name\":\"k0\",\"comment\":\"\"}, {\"type\":\"DATETIME\",\"name\":\"k1\",\"comment\":\"\"}, {\"type\":\"DATETIME\",\"name\":\"k2\",\"comment\":\"\"}],"
+ "\"status\":200}";

Schema schema = RestService.parseSchema(schemaStr, logger);

RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow();
List<Object> next = rowBatch.next();
Assert.assertEquals(next.size(), 3);
Assert.assertEquals(
next.get(0),
LocalDateTime.of(2024, 7, 25, 15, 22, 23, 586123000)
.atZone(ZoneId.of("UTC+8"))
.withZoneSameInstant(ZoneId.systemDefault())
.toLocalDateTime());
Assert.assertEquals(
next.get(1),
LocalDateTime.of(2024, 7, 25, 15, 22, 23, 586000000)
.atZone(ZoneId.of("UTC+8"))
.withZoneSameInstant(ZoneId.systemDefault())
.toLocalDateTime());
Assert.assertEquals(
next.get(2),
LocalDateTime.of(2024, 7, 25, 15, 22, 23, 0)
.atZone(ZoneId.of("UTC+8"))
.withZoneSameInstant(ZoneId.systemDefault())
.toLocalDateTime());
}
}

0 comments on commit 27bf18e

Please sign in to comment.