Skip to content

Commit

Permalink
[Fix] fix arrow read timestamp bug (apache#221)
Browse files Browse the repository at this point in the history
  • Loading branch information
liugddx authored Aug 5, 2024
1 parent 3901055 commit 7df4c3f
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeStampMicroVector;
import org.apache.arrow.vector.TimeStampVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.VarBinaryVector;
Expand All @@ -47,6 +47,7 @@
import org.apache.arrow.vector.complex.impl.UnionMapReader;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.Types.MinorType;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.spark.sql.types.Decimal;
import org.slf4j.Logger;
Expand Down Expand Up @@ -78,37 +79,24 @@
*/
public class RowBatch {
private static final Logger logger = LoggerFactory.getLogger(RowBatch.class);
private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault();

private static final DateTimeFormatter DATE_TIME_FORMATTER = new DateTimeFormatterBuilder()
.appendPattern("yyyy-MM-dd HH:mm:ss")
.appendFraction(ChronoField.MICRO_OF_SECOND, 0, 6, true)
.toFormatter();

public static class Row {
private final List<Object> cols;

Row(int colCount) {
this.cols = new ArrayList<>(colCount);
}

List<Object> getCols() {
return cols;
}

public void put(Object o) {
cols.add(o);
}
}

private static final String DATETIME_PATTERN = "yyyy-MM-dd HH:mm:ss";
private static final String DATETIMEV2_PATTERN = "yyyy-MM-dd HH:mm:ss.SSSSSS";
private final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern(DATETIME_PATTERN);
private final DateTimeFormatter dateTimeV2Formatter =
DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN);
private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
private final List<Row> rowBatch = new ArrayList<>();
private final ArrowStreamReader arrowStreamReader;
private final RootAllocator rootAllocator;
private final Schema schema;
// offset for iterate the rowBatch
private int offsetInRowBatch = 0;
private int rowCountInOneBatch = 0;
private int readRowCount = 0;
private final List<Row> rowBatch = new ArrayList<>();
private final ArrowStreamReader arrowStreamReader;
private List<FieldVector> fieldVectors;
private final RootAllocator rootAllocator;
private final Schema schema;

public RowBatch(TScanBatchResult nextResult, Schema schema) throws DorisException {
this.schema = schema;
Expand Down Expand Up @@ -146,6 +134,19 @@ public RowBatch(TScanBatchResult nextResult, Schema schema) throws DorisExceptio
}
}

public static LocalDateTime longToLocalDateTime(long time) {
Instant instant;
// Determine the timestamp accuracy and process it
if (time < 10_000_000_000L) { // Second timestamp
instant = Instant.ofEpochSecond(time);
} else if (time < 10_000_000_000_000L) { // milli second
instant = Instant.ofEpochMilli(time);
} else { // micro second
instant = Instant.ofEpochSecond(time / 1_000_000, (time % 1_000_000) * 1_000);
}
return LocalDateTime.ofInstant(instant, DEFAULT_ZONE_ID);
}

public boolean hasNext() {
if (offsetInRowBatch >= readRowCount) {
rowBatch.clear();
Expand Down Expand Up @@ -358,9 +359,13 @@ public void convertArrowToRowBatch() throws DorisException {
break;
case "DATETIME":
case "DATETIMEV2":
Preconditions.checkArgument(mt.equals(Types.MinorType.VARCHAR)
|| mt.equals(Types.MinorType.TIMESTAMPMICRO),

Preconditions.checkArgument(
mt.equals(Types.MinorType.TIMESTAMPMICRO) || mt.equals(MinorType.VARCHAR) ||
mt.equals(MinorType.TIMESTAMPMILLI) || mt.equals(MinorType.TIMESTAMPSEC),
typeMismatchMessage(currentType, mt));
typeMismatchMessage(currentType, mt);

if (mt.equals(Types.MinorType.VARCHAR)) {
VarCharVector varCharVector = (VarCharVector) curFieldVector;
for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
Expand All @@ -369,28 +374,23 @@ public void convertArrowToRowBatch() throws DorisException {
continue;
}
String value = new String(varCharVector.get(rowIndex), StandardCharsets.UTF_8);
addValueToRow(rowIndex, value);
value = completeMilliseconds(value);
LocalDateTime parse = LocalDateTime.parse(value, dateTimeV2Formatter);
addValueToRow(rowIndex, parse);
}
} else {
TimeStampMicroVector vector = (TimeStampMicroVector) curFieldVector;
} else if (curFieldVector instanceof TimeStampVector) {
TimeStampVector timeStampVector = (TimeStampVector) curFieldVector;

for (int rowIndex = 0; rowIndex < rowCountInOneBatch; rowIndex++) {
if (vector.isNull(rowIndex)) {
if (timeStampVector.isNull(rowIndex)) {

addValueToRow(rowIndex, null);
continue;
}
long time = vector.get(rowIndex);
Instant instant;
if (time / 10000000000L == 0) { // datetime(0)
instant = Instant.ofEpochSecond(time);
} else if (time / 10000000000000L == 0) { // datetime(3)
instant = Instant.ofEpochMilli(time);
} else { // datetime(6)
instant = Instant.ofEpochSecond(time / 1000000, time % 1000000 * 1000);
}
LocalDateTime dateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
String formatted = DATE_TIME_FORMATTER.format(dateTime);
addValueToRow(rowIndex, formatted);
LocalDateTime dateTime = getDateTime(rowIndex, timeStampVector);
addValueToRow(rowIndex, dateTime);
}

}
break;
case "CHAR":
Expand Down Expand Up @@ -511,4 +511,50 @@ public void close() {
// do nothing
}
}

public LocalDateTime getDateTime(int rowIndex, FieldVector fieldVector) {
TimeStampVector vector = (TimeStampVector) fieldVector;
if (vector.isNull(rowIndex)) {
return null;
}
// todo: Currently, the scale of doris's arrow datetimev2 is hardcoded to 6,
// and there is also a time zone problem in arrow, so use timestamp to convert first
long time = vector.get(rowIndex);
return longToLocalDateTime(time);
}

public static String completeMilliseconds(String stringValue) {
if (stringValue.length() == DATETIMEV2_PATTERN.length()) {
return stringValue;
}

if (stringValue.length() < DATETIME_PATTERN.length()) {
return stringValue;
}

StringBuilder sb = new StringBuilder(stringValue);
if (stringValue.length() == DATETIME_PATTERN.length()) {
sb.append(".");
}
while (sb.toString().length() < DATETIMEV2_PATTERN.length()) {
sb.append(0);
}
return sb.toString();
}

public static class Row {
private final List<Object> cols;

Row(int colCount) {
this.cols = new ArrayList<>(colCount);
}

List<Object> getCols() {
return cols;
}

public void put(Object o) {
cols.add(o);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public void testRowBatch() throws Exception {
(float) 1.1,
(double) 1.1,
Date.valueOf("2008-08-08"),
"2008-08-08 00:00:00",
LocalDateTime.of(2008, 8, 8, 0, 0, 0),
Decimal.apply(1234L, 4, 2),
"char1"
);
Expand All @@ -287,7 +287,7 @@ public void testRowBatch() throws Exception {
(float) 2.2,
(double) 2.2,
Date.valueOf("1900-08-08"),
"1900-08-08 00:00:00",
LocalDateTime.of(1900, 8, 8, 0, 0, 0),
Decimal.apply(8888L, 4, 2),
"char2"
);
Expand All @@ -301,7 +301,7 @@ public void testRowBatch() throws Exception {
(float) 3.3,
(double) 3.3,
Date.valueOf("2100-08-08"),
"2100-08-08 00:00:00",
LocalDateTime.of(2100, 8, 8, 0, 0, 0),
Decimal.apply(10L, 2, 0),
"char3"
);
Expand Down Expand Up @@ -832,16 +832,17 @@ public void testDateTime() throws IOException, DorisException {

Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow0 = rowBatch.next();
Assert.assertEquals("2024-03-20 00:00:00", actualRow0.get(0));
Assert.assertEquals("2024-03-20 00:00:00", actualRow0.get(1));
Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0), actualRow0.get(0));
Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0), actualRow0.get(1));

List<Object> actualRow1 = rowBatch.next();
Assert.assertEquals("2024-03-20 00:00:01", actualRow1.get(0));
Assert.assertEquals("2024-03-20 00:00:00.123", actualRow1.get(1));
Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 1), actualRow1.get(0));
Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0, 123000000), actualRow1.get(1));

List<Object> actualRow2 = rowBatch.next();
Assert.assertEquals("2024-03-20 00:00:02", actualRow2.get(0));
Assert.assertEquals("2024-03-20 00:00:00.123456", actualRow2.get(1));
Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 2), actualRow2.get(0));
Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0, 123456000), actualRow2.get(1));


Assert.assertFalse(rowBatch.hasNext());
thrown.expect(NoSuchElementException.class);
Expand Down Expand Up @@ -906,6 +907,7 @@ public void testVariant() throws DorisException, IOException {

Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow0 = rowBatch.next();

Assert.assertEquals("{\"id\":\"a\"}", actualRow0.get(0));

List<Object> actualRow1 = rowBatch.next();
Expand Down

0 comments on commit 7df4c3f

Please sign in to comment.