Skip to content

Commit

Permalink
SNOW-911238: after CR
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-dstempniak committed Sep 27, 2023
1 parent dd1f4c7 commit 61c19fa
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 33 deletions.
4 changes: 2 additions & 2 deletions arrow_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ func (arc *arrowResultChunk) decodeArrowChunk(rowType []execResponseRowType, hig
return chunkRows, arc.reader.Err()
}

func (arc *arrowResultChunk) decodeArrowBatch(scd *snowflakeChunkDownloader, originalTimestamp bool) (*[]arrow.Record, error) {
func (arc *arrowResultChunk) decodeArrowBatch(scd *snowflakeChunkDownloader, useOriginalTimestamp bool) (*[]arrow.Record, error) {
var records []arrow.Record
defer arc.reader.Release()

for arc.reader.Next() {
rawRecord := arc.reader.Record()

record, err := arrowToRecord(rawRecord, arc.allocator, scd.RowSet.RowType, arc.loc, originalTimestamp)
record, err := arrowToRecord(rawRecord, arc.allocator, scd.RowSet.RowType, arc.loc, useOriginalTimestamp)
if err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions chunk_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,8 @@ func (scd *snowflakeChunkDownloader) startArrowBatches() error {
}
// decode first chunk if possible
if firstArrowChunk.allocator != nil {
originalTimestamp := originalTimestampEnabled(scd.ctx)
scd.FirstBatch.rec, err = firstArrowChunk.decodeArrowBatch(scd, originalTimestamp)
useOriginalTimestamp := originalTimestampEnabled(scd.ctx)
scd.FirstBatch.rec, err = firstArrowChunk.decodeArrowBatch(scd, useOriginalTimestamp)
if err != nil {
return err
}
Expand Down Expand Up @@ -443,8 +443,8 @@ func decodeChunk(scd *snowflakeChunkDownloader, idx int, bufStream *bufio.Reader
scd.pool,
}
if usesArrowBatches(scd.ctx) {
originalTimestamp := originalTimestampEnabled(scd.ctx)
if scd.ArrowBatches[idx].rec, err = arc.decodeArrowBatch(scd, originalTimestamp); err != nil {
useOriginalTimestamp := originalTimestampEnabled(scd.ctx)
if scd.ArrowBatches[idx].rec, err = arc.decodeArrowBatch(scd, useOriginalTimestamp); err != nil {
return err
}
// updating metadata
Expand Down
4 changes: 2 additions & 2 deletions cmd/arrow/batches/arrow_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func main() {
{Name: "Host", EnvName: "SNOWFLAKE_TEST_HOST", FailOnMissing: false},
{Name: "Port", EnvName: "SNOWFLAKE_TEST_PORT", FailOnMissing: false},
{Name: "Protocol", EnvName: "SNOWFLAKE_TEST_PROTOCOL", FailOnMissing: false},
{Name: "Database", EnvName: "SNOWFLAKE_TEST_DATABASE", FailOnMissing: true},
{Name: "Schema", EnvName: "SNOWFLAKE_TEST_SCHEMA", FailOnMissing: true},
{Name: "Database", EnvName: "SNOWFLAKE_TEST_DATABASE", FailOnMissing: false},
{Name: "Schema", EnvName: "SNOWFLAKE_TEST_SCHEMA", FailOnMissing: false},
{Name: "Role", EnvName: "SNOWFLAKE_TEST_ROLE", FailOnMissing: false},
})
if err != nil {
Expand Down
34 changes: 21 additions & 13 deletions converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func decimalToBigFloat(num decimal128.Num, scale int64) *big.Float {
return new(big.Float).Quo(f, s)
}

// ArrowSnowflakeTimestampToTime converts original timestamp returned by Snowflake to time.TIme
// ArrowSnowflakeTimestampToTime converts original timestamp returned by Snowflake to time.Time
func ArrowSnowflakeTimestampToTime(rec arrow.Record, rb *ArrowBatch, colIdx int, recIdx int) *time.Time {
scale := int(rb.scd.RowSet.RowType[colIdx].Scale)
dbType := strings.ToUpper(rb.scd.RowSet.RowType[colIdx].Type)
Expand Down Expand Up @@ -382,8 +382,8 @@ func arrowSnowflakeTimestampToTime(
} else {
intData := column.(*array.Int64)
value := intData.Value(recIdx)
epoch := value / int64(math.Pow10(scale))
fraction := (value % int64(math.Pow10(scale))) * int64(math.Pow10(9-scale))
epoch := extractEpoch(value, scale)
fraction := extractFraction(value, scale)
ret = time.Unix(epoch, fraction).UTC()
}
return &ret
Expand All @@ -396,8 +396,8 @@ func arrowSnowflakeTimestampToTime(
} else {
intData := column.(*array.Int64)
value := intData.Value(recIdx)
epoch := value / int64(math.Pow10(scale))
fraction := (value % int64(math.Pow10(scale))) * int64(math.Pow10(9-scale))
epoch := extractEpoch(value, scale)
fraction := extractFraction(value, scale)
ret = time.Unix(epoch, fraction).In(loc)
}
return &ret
Expand All @@ -406,9 +406,9 @@ func arrowSnowflakeTimestampToTime(
if structData.NumField() == 2 {
value := structData.Field(0).(*array.Int64).Int64Values()
timezone := structData.Field(1).(*array.Int32).Int32Values()
epoch := extractEpoch(value[recIdx], scale)
fraction := extractFraction(value[recIdx], scale)
loc := Location(int(timezone[recIdx]) - 1440)
epoch := value[recIdx] / int64(math.Pow10(scale))
fraction := value[recIdx] % int64(math.Pow10(scale)) * int64(math.Pow10(9-scale))
ret = time.Unix(epoch, fraction).In(loc)
} else {
epoch := structData.Field(0).(*array.Int64).Int64Values()
Expand All @@ -422,6 +422,14 @@ func arrowSnowflakeTimestampToTime(
return nil
}

func extractEpoch(value int64, scale int) int64 {
return value / int64(math.Pow10(scale))
}

func extractFraction(value int64, scale int) int64 {
return (value % int64(math.Pow10(scale))) * int64(math.Pow10(9-scale))
}

// Arrow Interface (Column) converter. This is called when Arrow chunks are
// downloaded to convert to the corresponding row type.
func arrowToValue(
Expand Down Expand Up @@ -972,8 +980,8 @@ func originalTimestampEnabled(ctx context.Context) bool {
return ok && d
}

func arrowToRecord(record arrow.Record, pool memory.Allocator, rowType []execResponseRowType, loc *time.Location, originalTimestamp bool) (arrow.Record, error) {
s, err := recordToSchema(record.Schema(), rowType, loc, originalTimestamp)
func arrowToRecord(record arrow.Record, pool memory.Allocator, rowType []execResponseRowType, loc *time.Location, useOriginalTimestamp bool) (arrow.Record, error) {
s, err := recordToSchema(record.Schema(), rowType, loc, useOriginalTimestamp)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1022,7 +1030,7 @@ func arrowToRecord(record arrow.Record, pool memory.Allocator, rowType []execRes
}
defer newCol.Release()
case timestampNtzType, timestampLtzType, timestampTzType:
if originalTimestamp {
if useOriginalTimestamp {
// do nothing - return timestamp as is
} else {
var tb *array.TimestampBuilder
Expand Down Expand Up @@ -1058,7 +1066,7 @@ func arrowToRecord(record arrow.Record, pool memory.Allocator, rowType []execRes
return array.NewRecord(s, cols, numRows), nil
}

func recordToSchema(sc *arrow.Schema, rowType []execResponseRowType, loc *time.Location, originalTimestamp bool) (*arrow.Schema, error) {
func recordToSchema(sc *arrow.Schema, rowType []execResponseRowType, loc *time.Location, useOriginalTimestamp bool) (*arrow.Schema, error) {
var fields []arrow.Field
for i := 0; i < len(sc.Fields()); i++ {
f := sc.Field(i)
Expand All @@ -1085,14 +1093,14 @@ func recordToSchema(sc *arrow.Schema, rowType []execResponseRowType, loc *time.L
case timeType:
t = &arrow.Time64Type{Unit: arrow.Nanosecond}
case timestampNtzType, timestampTzType:
if originalTimestamp {
if useOriginalTimestamp {
// do nothing - return timestamp as is
converted = false
} else {
t = &arrow.TimestampType{Unit: arrow.Nanosecond}
}
case timestampLtzType:
if originalTimestamp {
if useOriginalTimestamp {
// do nothing - return timestamp as is
converted = false
} else {
Expand Down
24 changes: 12 additions & 12 deletions converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1181,7 +1181,7 @@ func TestArrowToRecord(t *testing.T) {
},
{
logical: "timestamp_ntz",
physical: "int64 with originalTimestamp", // timestamp_ntz with scale 0..3 -> int64
physical: "int64 with original timestamp", // timestamp_ntz with scale 0..3 -> int64
values: []time.Time{time.Now().Truncate(time.Millisecond), localTime.Truncate(time.Millisecond), localTimeFarIntoFuture.Truncate(time.Millisecond)},
orgTS: true,
nrows: 3,
Expand All @@ -1206,7 +1206,7 @@ func TestArrowToRecord(t *testing.T) {
},
{
logical: "timestamp_ntz",
physical: "struct with originalTimestamp", // timestamp_ntz with scale 4..9 -> int64 + int32
physical: "struct with original timestamp", // timestamp_ntz with scale 4..9 -> int64 + int32
values: []time.Time{time.Now(), localTime, localTimeFarIntoFuture},
orgTS: true,
nrows: 3,
Expand Down Expand Up @@ -1301,7 +1301,7 @@ func TestArrowToRecord(t *testing.T) {
},
{
logical: "timestamp_ltz",
physical: "int64 with originalTimestamp", // timestamp_ntz with scale 0..3 -> int64
physical: "int64 with original timestamp", // timestamp_ntz with scale 0..3 -> int64
values: []time.Time{time.Now().Truncate(time.Millisecond), localTime.Truncate(time.Millisecond), localTimeFarIntoFuture.Truncate(time.Millisecond)},
orgTS: true,
nrows: 3,
Expand All @@ -1326,7 +1326,7 @@ func TestArrowToRecord(t *testing.T) {
},
{
logical: "timestamp_ltz",
physical: "struct with originalTimestamp", // timestamp_ntz with scale 4..9 -> int64 + int32
physical: "struct with original timestamp", // timestamp_ntz with scale 4..9 -> int64 + int32
values: []time.Time{time.Now(), localTime, localTimeFarIntoFuture},
orgTS: true,
nrows: 3,
Expand Down Expand Up @@ -1410,7 +1410,7 @@ func TestArrowToRecord(t *testing.T) {
},
{
logical: "timestamp_tz",
physical: "struct2 with originalTimestamp", // timestamp_ntz with scale 0..3 -> int64 + int32
physical: "struct2 with original timestamp", // timestamp_ntz with scale 0..3 -> int64 + int32
values: []time.Time{time.Now().Truncate(time.Millisecond), localTime.Truncate(time.Millisecond), localTimeFarIntoFuture.Truncate(time.Millisecond)},
orgTS: true,
nrows: 3,
Expand Down Expand Up @@ -1439,7 +1439,7 @@ func TestArrowToRecord(t *testing.T) {
},
{
logical: "timestamp_tz",
physical: "struct3 with originalTimestamp", // timestamp_ntz with scale 4..9 -> int64 + int32 + int32
physical: "struct3 with original timestamp", // timestamp_ntz with scale 4..9 -> int64 + int32 + int32
values: []time.Time{time.Now(), localTime, localTimeFarIntoFuture},
orgTS: true,
nrows: 3,
Expand Down Expand Up @@ -1602,8 +1602,8 @@ func TestSmallTimestampBinding(t *testing.T) {
func TestTimestampConversionWithoutArrowBatches(t *testing.T) {
timestamps := [3]string{
"2000-10-10 10:10:10.123456789", // neutral
"9999-12-12 23:59:59.123456789", // max
"0001-01-01 00:00:00.123456789"} // min
"9999-12-12 23:59:59.999999999", // max
"0001-01-01 00:00:00.000000000"} // min
types := [3]string{"TIMESTAMP_NTZ", "TIMESTAMP_LTZ", "TIMESTAMP_TZ"}

runDBTest(t, func(sct *DBTest) {
Expand Down Expand Up @@ -1640,8 +1640,8 @@ func TestTimestampConversionWithoutArrowBatches(t *testing.T) {

func TestTimestampConversionWithArrowBatchesFailsForDistantDates(t *testing.T) {
timestamps := [2]string{
"9999-12-12 23:59:59.123456789", // max
"0001-01-01 00:00:00.123456789"} // min
"9999-12-12 23:59:59.999999999", // max
"0001-01-01 00:00:00.000000000"} // min
types := [3]string{"TIMESTAMP_NTZ", "TIMESTAMP_LTZ", "TIMESTAMP_TZ"}

expectedError := "Cannot convert timestamp"
Expand Down Expand Up @@ -1678,8 +1678,8 @@ func TestTimestampConversionWithArrowBatchesFailsForDistantDates(t *testing.T) {
func TestTimestampConversionWithArrowBatchesAndWithOriginalTimestamp(t *testing.T) {
timestamps := [3]string{
"2000-10-10 10:10:10.123456789", // neutral
"9999-12-12 23:59:59.123456789", // max
"0001-01-01 00:00:00.123456789"} // min
"9999-12-12 23:59:59.999999999", // max
"0001-01-01 00:00:00.000000000"} // min
types := [3]string{"TIMESTAMP_NTZ", "TIMESTAMP_LTZ", "TIMESTAMP_TZ"}

runSnowflakeConnTest(t, func(sct *SCTest) {
Expand Down

0 comments on commit 61c19fa

Please sign in to comment.