diff --git a/arrow_chunk.go b/arrow_chunk.go index 9bf3cb948..a0efafb66 100644 --- a/arrow_chunk.go +++ b/arrow_chunk.go @@ -50,14 +50,14 @@ func (arc *arrowResultChunk) decodeArrowChunk(rowType []execResponseRowType, hig return chunkRows, arc.reader.Err() } -func (arc *arrowResultChunk) decodeArrowBatch(scd *snowflakeChunkDownloader, originalTimestamp bool) (*[]arrow.Record, error) { +func (arc *arrowResultChunk) decodeArrowBatch(scd *snowflakeChunkDownloader, useOriginalTimestamp bool) (*[]arrow.Record, error) { var records []arrow.Record defer arc.reader.Release() for arc.reader.Next() { rawRecord := arc.reader.Record() - record, err := arrowToRecord(rawRecord, arc.allocator, scd.RowSet.RowType, arc.loc, originalTimestamp) + record, err := arrowToRecord(rawRecord, arc.allocator, scd.RowSet.RowType, arc.loc, useOriginalTimestamp) if err != nil { return nil, err } diff --git a/chunk_downloader.go b/chunk_downloader.go index f4eec02ce..6224d42d6 100644 --- a/chunk_downloader.go +++ b/chunk_downloader.go @@ -283,8 +283,8 @@ func (scd *snowflakeChunkDownloader) startArrowBatches() error { } // decode first chunk if possible if firstArrowChunk.allocator != nil { - originalTimestamp := originalTimestampEnabled(scd.ctx) - scd.FirstBatch.rec, err = firstArrowChunk.decodeArrowBatch(scd, originalTimestamp) + useOriginalTimestamp := originalTimestampEnabled(scd.ctx) + scd.FirstBatch.rec, err = firstArrowChunk.decodeArrowBatch(scd, useOriginalTimestamp) if err != nil { return err } @@ -443,8 +443,8 @@ func decodeChunk(scd *snowflakeChunkDownloader, idx int, bufStream *bufio.Reader scd.pool, } if usesArrowBatches(scd.ctx) { - originalTimestamp := originalTimestampEnabled(scd.ctx) - if scd.ArrowBatches[idx].rec, err = arc.decodeArrowBatch(scd, originalTimestamp); err != nil { + useOriginalTimestamp := originalTimestampEnabled(scd.ctx) + if scd.ArrowBatches[idx].rec, err = arc.decodeArrowBatch(scd, useOriginalTimestamp); err != nil { return err } // updating metadata diff --git a/cmd/arrow/batches/arrow_batches.go b/cmd/arrow/batches/arrow_batches.go index bc4e5b031..1b4ca1f5e 100644 --- a/cmd/arrow/batches/arrow_batches.go +++ b/cmd/arrow/batches/arrow_batches.go @@ -39,8 +39,8 @@ func main() { {Name: "Host", EnvName: "SNOWFLAKE_TEST_HOST", FailOnMissing: false}, {Name: "Port", EnvName: "SNOWFLAKE_TEST_PORT", FailOnMissing: false}, {Name: "Protocol", EnvName: "SNOWFLAKE_TEST_PROTOCOL", FailOnMissing: false}, - {Name: "Database", EnvName: "SNOWFLAKE_TEST_DATABASE", FailOnMissing: true}, - {Name: "Schema", EnvName: "SNOWFLAKE_TEST_SCHEMA", FailOnMissing: true}, + {Name: "Database", EnvName: "SNOWFLAKE_TEST_DATABASE", FailOnMissing: false}, + {Name: "Schema", EnvName: "SNOWFLAKE_TEST_SCHEMA", FailOnMissing: false}, {Name: "Role", EnvName: "SNOWFLAKE_TEST_ROLE", FailOnMissing: false}, }) if err != nil { diff --git a/converter.go b/converter.go index 58b368989..c9adcb404 100644 --- a/converter.go +++ b/converter.go @@ -354,7 +354,7 @@ func decimalToBigFloat(num decimal128.Num, scale int64) *big.Float { return new(big.Float).Quo(f, s) } -// ArrowSnowflakeTimestampToTime converts original timestamp returned by Snowflake to time.TIme +// ArrowSnowflakeTimestampToTime converts original timestamp returned by Snowflake to time.Time func ArrowSnowflakeTimestampToTime(rec arrow.Record, rb *ArrowBatch, colIdx int, recIdx int) *time.Time { scale := int(rb.scd.RowSet.RowType[colIdx].Scale) dbType := strings.ToUpper(rb.scd.RowSet.RowType[colIdx].Type) @@ -382,8 +382,8 @@ func arrowSnowflakeTimestampToTime( } else { intData := column.(*array.Int64) value := intData.Value(recIdx) - epoch := value / int64(math.Pow10(scale)) - fraction := (value % int64(math.Pow10(scale))) * int64(math.Pow10(9-scale)) + epoch := extractEpoch(value, scale) + fraction := extractFraction(value, scale) ret = time.Unix(epoch, fraction).UTC() } return &ret @@ -396,8 +396,8 @@ func arrowSnowflakeTimestampToTime( } else { intData := column.(*array.Int64) value := intData.Value(recIdx) - epoch := value / int64(math.Pow10(scale)) - fraction := (value % int64(math.Pow10(scale))) * int64(math.Pow10(9-scale)) + epoch := extractEpoch(value, scale) + fraction := extractFraction(value, scale) ret = time.Unix(epoch, fraction).In(loc) } return &ret @@ -406,9 +406,9 @@ func arrowSnowflakeTimestampToTime( if structData.NumField() == 2 { value := structData.Field(0).(*array.Int64).Int64Values() timezone := structData.Field(1).(*array.Int32).Int32Values() + epoch := extractEpoch(value[recIdx], scale) + fraction := extractFraction(value[recIdx], scale) loc := Location(int(timezone[recIdx]) - 1440) - epoch := value[recIdx] / int64(math.Pow10(scale)) - fraction := value[recIdx] % int64(math.Pow10(scale)) * int64(math.Pow10(9-scale)) ret = time.Unix(epoch, fraction).In(loc) } else { epoch := structData.Field(0).(*array.Int64).Int64Values() @@ -422,6 +422,14 @@ func arrowSnowflakeTimestampToTime( return nil } +func extractEpoch(value int64, scale int) int64 { + return value / int64(math.Pow10(scale)) +} + +func extractFraction(value int64, scale int) int64 { + return (value % int64(math.Pow10(scale))) * int64(math.Pow10(9-scale)) +} + // Arrow Interface (Column) converter. This is called when Arrow chunks are // downloaded to convert to the corresponding row type. func arrowToValue( @@ -972,8 +980,8 @@ func originalTimestampEnabled(ctx context.Context) bool { return ok && d } -func arrowToRecord(record arrow.Record, pool memory.Allocator, rowType []execResponseRowType, loc *time.Location, originalTimestamp bool) (arrow.Record, error) { - s, err := recordToSchema(record.Schema(), rowType, loc, originalTimestamp) +func arrowToRecord(record arrow.Record, pool memory.Allocator, rowType []execResponseRowType, loc *time.Location, useOriginalTimestamp bool) (arrow.Record, error) { + s, err := recordToSchema(record.Schema(), rowType, loc, useOriginalTimestamp) if err != nil { return nil, err } @@ -1022,7 +1030,7 @@ func arrowToRecord(record arrow.Record, pool memory.Allocator, rowType []execRes } defer newCol.Release() case timestampNtzType, timestampLtzType, timestampTzType: - if originalTimestamp { + if useOriginalTimestamp { // do nothing - return timestamp as is } else { var tb *array.TimestampBuilder @@ -1058,7 +1066,7 @@ func arrowToRecord(record arrow.Record, pool memory.Allocator, rowType []execRes return array.NewRecord(s, cols, numRows), nil } -func recordToSchema(sc *arrow.Schema, rowType []execResponseRowType, loc *time.Location, originalTimestamp bool) (*arrow.Schema, error) { +func recordToSchema(sc *arrow.Schema, rowType []execResponseRowType, loc *time.Location, useOriginalTimestamp bool) (*arrow.Schema, error) { var fields []arrow.Field for i := 0; i < len(sc.Fields()); i++ { f := sc.Field(i) @@ -1085,14 +1093,14 @@ func recordToSchema(sc *arrow.Schema, rowType []execResponseRowType, loc *time.L case timeType: t = &arrow.Time64Type{Unit: arrow.Nanosecond} case timestampNtzType, timestampTzType: - if originalTimestamp { + if useOriginalTimestamp { // do nothing - return timestamp as is converted = false } else { t = &arrow.TimestampType{Unit: arrow.Nanosecond} } case timestampLtzType: - if originalTimestamp { + if useOriginalTimestamp { // do nothing - return timestamp as is converted = false } else { diff --git a/converter_test.go b/converter_test.go index 565f84dfd..bb634bb83 100644 --- a/converter_test.go +++ b/converter_test.go @@ -1181,7 +1181,7 @@ func TestArrowToRecord(t *testing.T) { }, { logical: "timestamp_ntz", - physical: "int64 with originalTimestamp", // timestamp_ntz with scale 0..3 -> int64 + physical: "int64 with original timestamp", // timestamp_ntz with scale 0..3 -> int64 values: []time.Time{time.Now().Truncate(time.Millisecond), localTime.Truncate(time.Millisecond), localTimeFarIntoFuture.Truncate(time.Millisecond)}, orgTS: true, nrows: 3, @@ -1206,7 +1206,7 @@ func TestArrowToRecord(t *testing.T) { }, { logical: "timestamp_ntz", - physical: "struct with originalTimestamp", // timestamp_ntz with scale 4..9 -> int64 + int32 + physical: "struct with original timestamp", // timestamp_ntz with scale 4..9 -> int64 + int32 values: []time.Time{time.Now(), localTime, localTimeFarIntoFuture}, orgTS: true, nrows: 3, @@ -1301,7 +1301,7 @@ func TestArrowToRecord(t *testing.T) { }, { logical: "timestamp_ltz", - physical: "int64 with originalTimestamp", // timestamp_ntz with scale 0..3 -> int64 + physical: "int64 with original timestamp", // timestamp_ntz with scale 0..3 -> int64 values: []time.Time{time.Now().Truncate(time.Millisecond), localTime.Truncate(time.Millisecond), localTimeFarIntoFuture.Truncate(time.Millisecond)}, orgTS: true, nrows: 3, @@ -1326,7 +1326,7 @@ func TestArrowToRecord(t *testing.T) { }, { logical: "timestamp_ltz", - physical: "struct with originalTimestamp", // timestamp_ntz with scale 4..9 -> int64 + int32 + physical: "struct with original timestamp", // timestamp_ntz with scale 4..9 -> int64 + int32 values: []time.Time{time.Now(), localTime, localTimeFarIntoFuture}, orgTS: true, nrows: 3, @@ -1410,7 +1410,7 @@ func TestArrowToRecord(t *testing.T) { }, { logical: "timestamp_tz", - physical: "struct2 with originalTimestamp", // timestamp_ntz with scale 0..3 -> int64 + int32 + physical: "struct2 with original timestamp", // timestamp_ntz with scale 0..3 -> int64 + int32 values: []time.Time{time.Now().Truncate(time.Millisecond), localTime.Truncate(time.Millisecond), localTimeFarIntoFuture.Truncate(time.Millisecond)}, orgTS: true, nrows: 3, @@ -1439,7 +1439,7 @@ func TestArrowToRecord(t *testing.T) { }, { logical: "timestamp_tz", - physical: "struct3 with originalTimestamp", // timestamp_ntz with scale 4..9 -> int64 + int32 + int32 + physical: "struct3 with original timestamp", // timestamp_ntz with scale 4..9 -> int64 + int32 + int32 values: []time.Time{time.Now(), localTime, localTimeFarIntoFuture}, orgTS: true, nrows: 3, @@ -1602,8 +1602,8 @@ func TestSmallTimestampBinding(t *testing.T) { func TestTimestampConversionWithoutArrowBatches(t *testing.T) { timestamps := [3]string{ "2000-10-10 10:10:10.123456789", // neutral - "9999-12-12 23:59:59.123456789", // max - "0001-01-01 00:00:00.123456789"} // min + "9999-12-12 23:59:59.999999999", // max + "0001-01-01 00:00:00.000000000"} // min types := [3]string{"TIMESTAMP_NTZ", "TIMESTAMP_LTZ", "TIMESTAMP_TZ"} runDBTest(t, func(sct *DBTest) { @@ -1640,8 +1640,8 @@ func TestTimestampConversionWithoutArrowBatches(t *testing.T) { func TestTimestampConversionWithArrowBatchesFailsForDistantDates(t *testing.T) { timestamps := [2]string{ - "9999-12-12 23:59:59.123456789", // max - "0001-01-01 00:00:00.123456789"} // min + "9999-12-12 23:59:59.999999999", // max + "0001-01-01 00:00:00.000000000"} // min types := [3]string{"TIMESTAMP_NTZ", "TIMESTAMP_LTZ", "TIMESTAMP_TZ"} expectedError := "Cannot convert timestamp" @@ -1678,8 +1678,8 @@ func TestTimestampConversionWithArrowBatchesFailsForDistantDates(t *testing.T) { func TestTimestampConversionWithArrowBatchesAndWithOriginalTimestamp(t *testing.T) { timestamps := [3]string{ "2000-10-10 10:10:10.123456789", // neutral - "9999-12-12 23:59:59.123456789", // max - "0001-01-01 00:00:00.123456789"} // min + "9999-12-12 23:59:59.999999999", // max + "0001-01-01 00:00:00.000000000"} // min types := [3]string{"TIMESTAMP_NTZ", "TIMESTAMP_LTZ", "TIMESTAMP_TZ"} runSnowflakeConnTest(t, func(sct *SCTest) {