From 0eaf5dcabbe9811ac5dce0eae76387948f5aea3b Mon Sep 17 00:00:00 2001 From: Dariusz Stempniak Date: Mon, 25 Sep 2023 09:33:05 +0200 Subject: [PATCH] SNOW-911238: Check timestamp overflow --- arrow_chunk.go | 4 +- chunk_downloader.go | 9 +- cmd/arrow/batches/arrow_batches.go | 25 +- connection.go | 2 +- converter.go | 277 ++++++++-------- converter_test.go | 495 ++++++++++++++++++++++++++--- errors.go | 2 + util.go | 28 +- 8 files changed, 623 insertions(+), 219 deletions(-) diff --git a/arrow_chunk.go b/arrow_chunk.go index 344774af8..9bf3cb948 100644 --- a/arrow_chunk.go +++ b/arrow_chunk.go @@ -50,14 +50,14 @@ func (arc *arrowResultChunk) decodeArrowChunk(rowType []execResponseRowType, hig return chunkRows, arc.reader.Err() } -func (arc *arrowResultChunk) decodeArrowBatch(scd *snowflakeChunkDownloader) (*[]arrow.Record, error) { +func (arc *arrowResultChunk) decodeArrowBatch(scd *snowflakeChunkDownloader, originalTimestamp bool) (*[]arrow.Record, error) { var records []arrow.Record defer arc.reader.Release() for arc.reader.Next() { rawRecord := arc.reader.Record() - record, err := arrowToRecord(rawRecord, arc.allocator, scd.RowSet.RowType, arc.loc) + record, err := arrowToRecord(rawRecord, arc.allocator, scd.RowSet.RowType, arc.loc, originalTimestamp) if err != nil { return nil, err } diff --git a/chunk_downloader.go b/chunk_downloader.go index 1a298cf3e..f4eec02ce 100644 --- a/chunk_downloader.go +++ b/chunk_downloader.go @@ -279,10 +279,12 @@ func (scd *snowflakeChunkDownloader) startArrowBatches() error { idx: 0, scd: scd, funcDownloadHelper: scd.FuncDownloadHelper, + loc: loc, } // decode first chunk if possible if firstArrowChunk.allocator != nil { - scd.FirstBatch.rec, err = firstArrowChunk.decodeArrowBatch(scd) + originalTimestamp := originalTimestampEnabled(scd.ctx) + scd.FirstBatch.rec, err = firstArrowChunk.decodeArrowBatch(scd, originalTimestamp) if err != nil { return err } @@ -293,6 +295,7 @@ func (scd *snowflakeChunkDownloader) startArrowBatches() error { idx: i, scd: scd, funcDownloadHelper: scd.FuncDownloadHelper, + loc: loc, } } return nil @@ -440,7 +443,8 @@ func decodeChunk(scd *snowflakeChunkDownloader, idx int, bufStream *bufio.Reader scd.pool, } if usesArrowBatches(scd.ctx) { - if scd.ArrowBatches[idx].rec, err = arc.decodeArrowBatch(scd); err != nil { + originalTimestamp := originalTimestampEnabled(scd.ctx) + if scd.ArrowBatches[idx].rec, err = arc.decodeArrowBatch(scd, originalTimestamp); err != nil { return err } // updating metadata @@ -708,6 +712,7 @@ type ArrowBatch struct { scd *snowflakeChunkDownloader funcDownloadHelper func(context.Context, *snowflakeChunkDownloader, int) error ctx context.Context + loc *time.Location } // WithContext sets the context which will be used for this ArrowBatch. diff --git a/cmd/arrow/batches/arrow_batches.go b/cmd/arrow/batches/arrow_batches.go index 90ab7a698..bc4e5b031 100644 --- a/cmd/arrow/batches/arrow_batches.go +++ b/cmd/arrow/batches/arrow_batches.go @@ -9,10 +9,10 @@ import ( "github.com/apache/arrow/go/v12/arrow" "github.com/apache/arrow/go/v12/arrow/array" "github.com/apache/arrow/go/v12/arrow/memory" + sf "github.com/snowflakedb/gosnowflake" "log" "sync" - - sf "github.com/snowflakedb/gosnowflake" + "time" ) type sampleRecord struct { @@ -20,10 +20,11 @@ type sampleRecord struct { workerID int number int32 string string + ts *time.Time } func (s sampleRecord) String() string { - return fmt.Sprintf("batchID: %v, workerID: %v, number: %v, string: %v", s.batchID, s.workerID, s.number, s.string) + return fmt.Sprintf("batchID: %v, workerID: %v, number: %v, string: %v, ts: %v", s.batchID, s.workerID, s.number, s.string, s.ts) } func main() { @@ -38,6 +39,9 @@ func main() { {Name: "Host", EnvName: "SNOWFLAKE_TEST_HOST", FailOnMissing: false}, {Name: "Port", EnvName: "SNOWFLAKE_TEST_PORT", FailOnMissing: false}, {Name: "Protocol", EnvName: "SNOWFLAKE_TEST_PROTOCOL", FailOnMissing: false}, + {Name: "Database", EnvName: "SNOWFLAKE_TEST_DATABASE", FailOnMissing: true}, + {Name: "Schema", EnvName: "SNOWFLAKE_TEST_SCHEMA", FailOnMissing: true}, + {Name: "Role", EnvName: "SNOWFLAKE_TEST_ROLE", FailOnMissing: false}, }) if err != nil { log.Fatalf("failed to create Config, err: %v", err) @@ -48,8 +52,14 @@ func main() { log.Fatalf("failed to create DSN from Config: %v, err: %v", cfg, err) } - ctx := sf.WithArrowAllocator(sf.WithArrowBatches(context.Background()), memory.DefaultAllocator) - query := "SELECT SEQ4(), 'example ' || (SEQ4() * 2) FROM TABLE(GENERATOR(ROWCOUNT=>30000))" + ctx := + sf.WithOriginalTimestamp( + sf.WithArrowAllocator( + sf.WithArrowBatches(context.Background()), memory.DefaultAllocator)) + + query := "SELECT SEQ4(), 'example ' || (SEQ4() * 2), " + + " TO_TIMESTAMP_NTZ('9999-01-01 13:13:13.' || LPAD(SEQ4(),9,'0')) ltz " + + " FROM TABLE(GENERATOR(ROWCOUNT=>30000))" db, err := sql.Open("snowflake", dsn) if err != nil { @@ -88,7 +98,7 @@ func main() { } sampleRecordsPerBatch[batchID] = make([]sampleRecord, batches[batchID].GetRowCount()) totalRowID := 0 - convertFromColumnsToRows(records, sampleRecordsPerBatch, batchID, workerId, totalRowID) + convertFromColumnsToRows(records, sampleRecordsPerBatch, batchID, workerId, totalRowID, batches[batchID]) } }(&waitGroup, batchIds, workerID) } @@ -110,7 +120,7 @@ func main() { } func convertFromColumnsToRows(records *[]arrow.Record, sampleRecordsPerBatch [][]sampleRecord, batchID int, - workerID int, totalRowID int) { + workerID int, totalRowID int, batch *sf.ArrowBatch) { for _, record := range *records { for rowID, intColumn := range record.Column(0).(*array.Int32).Int32Values() { sampleRecord := sampleRecord{ @@ -118,6 +128,7 @@ func convertFromColumnsToRows(records *[]arrow.Record, sampleRecordsPerBatch [][ workerID: workerID, number: intColumn, string: record.Column(1).(*array.String).Value(rowID), + ts: sf.ArrowSnowflakeTimestampToTime(record, batch, 2, rowID), } sampleRecordsPerBatch[batchID][totalRowID] = sampleRecord totalRowID++ diff --git a/connection.go b/connection.go index 769253d46..c9d760327 100644 --- a/connection.go +++ b/connection.go @@ -409,7 +409,7 @@ func (sc *snowflakeConn) queryContextInternal( rows.addDownloader(populateChunkDownloader(ctx, sc, data.Data)) } - rows.ChunkDownloader.start() + err = rows.ChunkDownloader.start() return rows, err } diff --git a/converter.go b/converter.go index 44c0afbfa..6aa30798b 100644 --- a/converter.go +++ b/converter.go @@ -354,6 +354,75 @@ func decimalToBigFloat(num decimal128.Num, scale int64) *big.Float { return new(big.Float).Quo(f, s) } +func ArrowSnowflakeTimestampToTime(rec arrow.Record, rb *ArrowBatch, colIdx int, recIdx int) *time.Time { + scale := int(rb.scd.RowSet.RowType[colIdx].Scale) + dbType := strings.ToUpper(rb.scd.RowSet.RowType[colIdx].Type) + return arrowSnowflakeTimestampToTime(rec.Column(colIdx), getSnowflakeType(dbType), scale, recIdx, rb.loc) +} + +func arrowSnowflakeTimestampToTime( + column arrow.Array, + sfType snowflakeType, + scale int, + recIdx int, + loc *time.Location) *time.Time { + + if column.IsNull(recIdx) { + return nil + } + switch sfType { + case timestampNtzType: + if column.DataType().ID() == arrow.STRUCT { + structData := column.(*array.Struct) + epoch := structData.Field(0).(*array.Int64).Int64Values() + fraction := structData.Field(1).(*array.Int32).Int32Values() + ret := time.Unix(epoch[recIdx], int64(fraction[recIdx])).UTC() + return &ret + } else { + intData := column.(*array.Int64) + value := intData.Value(recIdx) + epoch := value / int64(math.Pow10(scale)) + fraction := (value % int64(math.Pow10(scale))) * int64(math.Pow10(9-scale)) + ret := time.Unix(epoch, fraction).UTC() + return &ret + } + case timestampLtzType: + if column.DataType().ID() == arrow.STRUCT { + structData := column.(*array.Struct) + epoch := structData.Field(0).(*array.Int64).Int64Values() + fraction := structData.Field(1).(*array.Int32).Int32Values() + ret := time.Unix(epoch[recIdx], int64(fraction[recIdx])).In(loc) + return &ret + } else { + intData := column.(*array.Int64) + value := intData.Value(recIdx) + epoch := value / int64(math.Pow10(scale)) + fraction := (value % int64(math.Pow10(scale))) * int64(math.Pow10(9-scale)) + ret := time.Unix(epoch, fraction).In(loc) + return &ret + } + case timestampTzType: + structData := column.(*array.Struct) + if structData.NumField() == 2 { + value := structData.Field(0).(*array.Int64).Int64Values() + timezone := structData.Field(1).(*array.Int32).Int32Values() + loc := Location(int(timezone[recIdx]) - 1440) + epoch := value[recIdx] / int64(math.Pow10(scale)) + fraction := value[recIdx] % int64(math.Pow10(scale)) * int64(math.Pow10(9-scale)) + ret := time.Unix(epoch, fraction).In(loc) + return &ret + } else { + epoch := structData.Field(0).(*array.Int64).Int64Values() + fraction := structData.Field(1).(*array.Int32).Int32Values() + timezone := structData.Field(2).(*array.Int32).Int32Values() + loc := Location(int(timezone[recIdx]) - 1440) + ret := time.Unix(epoch[recIdx], int64(fraction[recIdx])).In(loc) + return &ret + } + } + return nil +} + // Arrow Interface (Column) converter. This is called when Arrow chunks are // downloaded to convert to the corresponding row type. func arrowToValue( @@ -369,7 +438,8 @@ func arrowToValue( } logger.Debugf("snowflake data type: %v, arrow data type: %v", srcColumnMeta.Type, srcValue.DataType()) - switch getSnowflakeType(strings.ToUpper(srcColumnMeta.Type)) { + snowflakeType := getSnowflakeType(strings.ToUpper(srcColumnMeta.Type)) + switch snowflakeType { case fixedType: // Snowflake data types that are fixed-point numbers will fall into this category // e.g. NUMBER, DECIMAL/NUMERIC, INT/INTEGER @@ -528,69 +598,11 @@ func arrowToValue( } } return err - case timestampNtzType: - if srcValue.DataType().ID() == arrow.STRUCT { - structData := srcValue.(*array.Struct) - epoch := structData.Field(0).(*array.Int64).Int64Values() - fraction := structData.Field(1).(*array.Int32).Int32Values() - for i := range destcol { - if !srcValue.IsNull(i) { - destcol[i] = time.Unix(epoch[i], int64(fraction[i])).UTC() - } - } - } else { - for i, t := range srcValue.(*array.Int64).Int64Values() { - if !srcValue.IsNull(i) { - scale := int(srcColumnMeta.Scale) - epoch := t / int64(math.Pow10(scale)) - fraction := (t % int64(math.Pow10(scale))) * int64(math.Pow10(9-scale)) - destcol[i] = time.Unix(epoch, fraction).UTC() - } - } - } - return err - case timestampLtzType: - if srcValue.DataType().ID() == arrow.STRUCT { - structData := srcValue.(*array.Struct) - epoch := structData.Field(0).(*array.Int64).Int64Values() - fraction := structData.Field(1).(*array.Int32).Int32Values() - for i := range destcol { - if !srcValue.IsNull(i) { - destcol[i] = time.Unix(epoch[i], int64(fraction[i])).In(loc) - } - } - } else { - for i, t := range srcValue.(*array.Int64).Int64Values() { - if !srcValue.IsNull(i) { - q := t / int64(math.Pow10(int(srcColumnMeta.Scale))) - r := t % int64(math.Pow10(int(srcColumnMeta.Scale))) - destcol[i] = time.Unix(q, r).In(loc) - } - } - } - return err - case timestampTzType: - structData := srcValue.(*array.Struct) - if structData.NumField() == 2 { - epoch := structData.Field(0).(*array.Int64).Int64Values() - timezone := structData.Field(1).(*array.Int32).Int32Values() - for i := range destcol { - if !srcValue.IsNull(i) { - loc := Location(int(timezone[i]) - 1440) - tt := time.Unix(epoch[i], 0) - destcol[i] = tt.In(loc) - } - } - } else { - epoch := structData.Field(0).(*array.Int64).Int64Values() - fraction := structData.Field(1).(*array.Int32).Int32Values() - timezone := structData.Field(2).(*array.Int32).Int32Values() - for i := range destcol { - if !srcValue.IsNull(i) { - loc := Location(int(timezone[i]) - 1440) - tt := time.Unix(epoch[i], int64(fraction[i])) - destcol[i] = tt.In(loc) - } + case timestampNtzType, timestampLtzType, timestampTzType: + for i := range destcol { + var ts = arrowSnowflakeTimestampToTime(srcValue, snowflakeType, int(srcColumnMeta.Scale), i, loc) + if ts != nil { + destcol[i] = *ts } } return err @@ -952,8 +964,17 @@ func higherPrecisionEnabled(ctx context.Context) bool { return ok && d } -func arrowToRecord(record arrow.Record, pool memory.Allocator, rowType []execResponseRowType, loc *time.Location) (arrow.Record, error) { - s, err := recordToSchema(record.Schema(), rowType, loc) +func originalTimestampEnabled(ctx context.Context) bool { + v := ctx.Value(enableOriginalTimestamp) + if v == nil { + return false + } + d, ok := v.(bool) + return ok && d +} + +func arrowToRecord(record arrow.Record, pool memory.Allocator, rowType []execResponseRowType, loc *time.Location, originalTimestamp bool) (arrow.Record, error) { + s, err := recordToSchema(record.Schema(), rowType, loc, originalTimestamp) if err != nil { return nil, err } @@ -967,7 +988,8 @@ func arrowToRecord(record arrow.Record, pool memory.Allocator, rowType []execRes // TODO: confirm that it is okay to be using higher precision logic for conversions newCol := col - switch getSnowflakeType(strings.ToUpper(srcColumnMeta.Type)) { + snowflakeType := getSnowflakeType(strings.ToUpper(srcColumnMeta.Type)) + switch snowflakeType { case fixedType: var toType arrow.DataType if col.DataType().ID() == arrow.DECIMAL || col.DataType().ID() == arrow.DECIMAL256 { @@ -1000,103 +1022,44 @@ func arrowToRecord(record arrow.Record, pool memory.Allocator, rowType []execRes return nil, err } defer newCol.Release() - case timestampNtzType: - tb := array.NewTimestampBuilder(pool, &arrow.TimestampType{Unit: arrow.Nanosecond}) - if col.DataType().ID() == arrow.STRUCT { - structData := col.(*array.Struct) - epoch := structData.Field(0).(*array.Int64).Int64Values() - fraction := structData.Field(1).(*array.Int32).Int32Values() - for i := 0; i < int(numRows); i++ { - if !col.IsNull(i) { - val := time.Unix(epoch[i], int64(fraction[i])) - tb.Append(arrow.Timestamp(val.UnixNano())) - } else { - tb.AppendNull() - } - } + case timestampNtzType, timestampLtzType, timestampTzType: + if originalTimestamp { + // do nothing - return timestamp as is } else { - for i, t := range col.(*array.Timestamp).TimestampValues() { - if !col.IsNull(i) { - val := time.Unix(0, int64(t)*int64(math.Pow10(9-int(srcColumnMeta.Scale)))).UTC() - tb.Append(arrow.Timestamp(val.UnixNano())) - } else { - tb.AppendNull() - } - } - } - newCol = tb.NewArray() - defer newCol.Release() - tb.Release() - case timestampLtzType: - tb := array.NewTimestampBuilder(pool, &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: loc.String()}) - if col.DataType().ID() == arrow.STRUCT { - structData := col.(*array.Struct) - epoch := structData.Field(0).(*array.Int64).Int64Values() - fraction := structData.Field(1).(*array.Int32).Int32Values() - for i := 0; i < int(numRows); i++ { - if !col.IsNull(i) { - val := time.Unix(epoch[i], int64(fraction[i])) - tb.Append(arrow.Timestamp(val.UnixNano())) - } else { - tb.AppendNull() - } - } - } else { - for i, t := range col.(*array.Timestamp).TimestampValues() { - if !col.IsNull(i) { - q := int64(t) / int64(math.Pow10(int(srcColumnMeta.Scale))) - r := int64(t) % int64(math.Pow10(int(srcColumnMeta.Scale))) - val := time.Unix(q, r) - tb.Append(arrow.Timestamp(val.UnixNano())) - } else { - tb.AppendNull() - } - } - } - newCol = tb.NewArray() - defer newCol.Release() - tb.Release() - case timestampTzType: - tb := array.NewTimestampBuilder(pool, &arrow.TimestampType{Unit: arrow.Nanosecond}) - structData := col.(*array.Struct) - if structData.NumField() == 2 { - epoch := structData.Field(0).(*array.Int64).Int64Values() - timezone := structData.Field(1).(*array.Int32).Int32Values() - for i := 0; i < int(numRows); i++ { - if !col.IsNull(i) { - loc := Location(int(timezone[i]) - 1440) - tt := time.Unix(epoch[i], 0) - val := tt.In(loc) - tb.Append(arrow.Timestamp(val.UnixNano())) - } else { - tb.AppendNull() - } + var tb *array.TimestampBuilder + if snowflakeType == timestampLtzType { + tb = array.NewTimestampBuilder(pool, &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: loc.String()}) + } else { + tb = array.NewTimestampBuilder(pool, &arrow.TimestampType{Unit: arrow.Nanosecond}) } - } else { - epoch := structData.Field(0).(*array.Int64).Int64Values() - fraction := structData.Field(1).(*array.Int32).Int32Values() - timezone := structData.Field(2).(*array.Int32).Int32Values() for i := 0; i < int(numRows); i++ { - if !col.IsNull(i) { - loc := Location(int(timezone[i]) - 1440) - tt := time.Unix(epoch[i], int64(fraction[i])) - val := tt.In(loc) - tb.Append(arrow.Timestamp(val.UnixNano())) + ts := arrowSnowflakeTimestampToTime(col, snowflakeType, int(srcColumnMeta.Scale), i, loc) + if ts != nil { + ar := arrow.Timestamp(ts.UnixNano()) + if ts.Year() != ar.ToTime(arrow.Nanosecond).Year() { + return nil, &SnowflakeError{ + Number: ErrTooHighTimestampPrecision, + SQLState: SQLStateInvalidDataTimeFormat, + Message: fmt.Sprintf("Cannot convert timestamp %v in column %v to Arrow.Timestamp data type due to too high precision. Please use context with WithOriginalTimestamp.", ts.UTC(), srcColumnMeta.Name), + } + } + tb.Append(ar) } else { tb.AppendNull() } } + + newCol = tb.NewArray() + defer newCol.Release() + tb.Release() } - newCol = tb.NewArray() - defer newCol.Release() - tb.Release() } cols = append(cols, newCol) } return array.NewRecord(s, cols, numRows), nil } -func recordToSchema(sc *arrow.Schema, rowType []execResponseRowType, loc *time.Location) (*arrow.Schema, error) { +func recordToSchema(sc *arrow.Schema, rowType []execResponseRowType, loc *time.Location, originalTimestamp bool) (*arrow.Schema, error) { var fields []arrow.Field for i := 0; i < len(sc.Fields()); i++ { f := sc.Field(i) @@ -1123,9 +1086,19 @@ func recordToSchema(sc *arrow.Schema, rowType []execResponseRowType, loc *time.L case timeType: t = &arrow.Time64Type{Unit: arrow.Nanosecond} case timestampNtzType, timestampTzType: - t = &arrow.TimestampType{Unit: arrow.Nanosecond} + if originalTimestamp { + // do nothing - return timestamp as is + converted = false + } else { + t = &arrow.TimestampType{Unit: arrow.Nanosecond} + } case timestampLtzType: - t = &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: loc.String()} + if originalTimestamp { + // do nothing - return timestamp as is + converted = false + } else { + t = &arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: loc.String()} + } default: converted = false } diff --git a/converter_test.go b/converter_test.go index 6a43404f2..8a58d1978 100644 --- a/converter_test.go +++ b/converter_test.go @@ -12,6 +12,7 @@ import ( "math/big" "math/cmplx" "reflect" + "strconv" "strings" "testing" "time" @@ -828,10 +829,15 @@ func TestArrowToRecord(t *testing.T) { var valids []bool // AppendValues() with an empty valid array adds every value by default localTime := time.Date(2019, 2, 6, 14, 17, 31, 123456789, time.FixedZone("-08:00", -8*3600)) + localTimeFarIntoFuture := time.Date(9000, 2, 6, 14, 17, 31, 123456789, time.FixedZone("-08:00", -8*3600)) - field1 := arrow.Field{Name: "epoch", Type: &arrow.Int64Type{}} - field2 := arrow.Field{Name: "timezone", Type: &arrow.Int32Type{}} - tzStruct := arrow.StructOf(field1, field2) + epochField := arrow.Field{Name: "epoch", Type: &arrow.Int64Type{}} + timezoneField := arrow.Field{Name: "timezone", Type: &arrow.Int32Type{}} + fractionField := arrow.Field{Name: "fraction", Type: &arrow.Int32Type{}} + timestampTzStruct2 := arrow.StructOf(epochField, timezoneField) + timestampTzStruct3 := arrow.StructOf(epochField, fractionField, timezoneField) + timestampNtzStruct := arrow.StructOf(epochField, fractionField) + timestampLtzStruct := arrow.StructOf(epochField, fractionField) type testObj struct { field1 int @@ -844,6 +850,8 @@ func TestArrowToRecord(t *testing.T) { sc *arrow.Schema rowType execResponseRowType values interface{} + error string + orgTS bool nrows int builder array.Builder append func(b array.Builder, vs interface{}) @@ -1106,21 +1114,22 @@ func TestArrowToRecord(t *testing.T) { }, }, { - logical: "timestamp_ntz", - values: []time.Time{time.Now(), localTime}, - nrows: 2, - rowType: execResponseRowType{Scale: 9}, - sc: arrow.NewSchema([]arrow.Field{{Type: &arrow.TimestampType{}}}, nil), - builder: array.NewTimestampBuilder(pool, &arrow.TimestampType{}), + logical: "timestamp_ntz", + physical: "int64", // timestamp_ntz with scale 0..3 -> int64 + values: []time.Time{time.Now().Truncate(time.Millisecond), localTime.Truncate(time.Millisecond)}, + nrows: 2, + rowType: execResponseRowType{Scale: 3}, + sc: arrow.NewSchema([]arrow.Field{{Type: &arrow.Int64Type{}}}, nil), + builder: array.NewInt64Builder(pool), append: func(b array.Builder, vs interface{}) { for _, t := range vs.([]time.Time) { - b.(*array.TimestampBuilder).Append(arrow.Timestamp(t.UnixNano())) + b.(*array.Int64Builder).Append(t.UnixMilli()) } }, compare: func(src interface{}, convertedRec arrow.Record) int { srcvs := src.([]time.Time) for i, t := range convertedRec.Column(0).(*array.Timestamp).TimestampValues() { - if srcvs[i].UnixNano() != int64(t) { + if srcvs[i].Compare(t.ToTime(arrow.Nanosecond)) != 0 { return i } } @@ -1128,21 +1137,26 @@ func TestArrowToRecord(t *testing.T) { }, }, { - logical: "timestamp_ltz", - values: []time.Time{time.Now(), localTime}, - nrows: 2, - rowType: execResponseRowType{Scale: 9}, - sc: arrow.NewSchema([]arrow.Field{{Type: &arrow.TimestampType{}}}, nil), - builder: array.NewTimestampBuilder(pool, &arrow.TimestampType{}), + logical: "timestamp_ntz", + physical: "struct", // timestamp_ntz with scale 4..9 -> int64 + int32 + values: []time.Time{time.Now(), localTime}, + nrows: 2, + rowType: execResponseRowType{Scale: 9}, + sc: arrow.NewSchema([]arrow.Field{{Type: timestampNtzStruct}}, nil), + builder: array.NewStructBuilder(pool, timestampNtzStruct), append: func(b array.Builder, vs interface{}) { + sb := b.(*array.StructBuilder) + valids = []bool{true, true} + sb.AppendValues(valids) for _, t := range vs.([]time.Time) { - b.(*array.TimestampBuilder).Append(arrow.Timestamp(t.UnixNano())) + sb.FieldBuilder(0).(*array.Int64Builder).Append(t.Unix()) + sb.FieldBuilder(1).(*array.Int32Builder).Append(int32(t.Nanosecond())) } }, compare: func(src interface{}, convertedRec arrow.Record) int { srcvs := src.([]time.Time) for i, t := range convertedRec.Column(0).(*array.Timestamp).TimestampValues() { - if srcvs[i].UnixNano() != int64(t) { + if srcvs[i].Compare(t.ToTime(arrow.Nanosecond)) != 0 { return i } } @@ -1150,24 +1164,303 @@ func TestArrowToRecord(t *testing.T) { }, }, { - logical: "timestamp_tz", - values: []time.Time{time.Now(), localTime}, - nrows: 2, - sc: arrow.NewSchema([]arrow.Field{{Type: arrow.StructOf(field1, field2)}}, nil), - builder: array.NewStructBuilder(pool, tzStruct), + logical: "timestamp_ntz", + physical: "error", + values: []time.Time{localTimeFarIntoFuture}, + error: "Cannot convert timestamp", + nrows: 1, + rowType: execResponseRowType{Scale: 3}, + sc: arrow.NewSchema([]arrow.Field{{Type: &arrow.Int64Type{}}}, nil), + builder: array.NewInt64Builder(pool), + append: func(b array.Builder, vs interface{}) { + for _, t := range vs.([]time.Time) { + b.(*array.Int64Builder).Append(t.UnixMilli()) + } + }, + compare: func(src interface{}, convertedRec arrow.Record) int { return 0 }, + }, + { + logical: "timestamp_ntz", + physical: "int64 with originalTimestamp", // timestamp_ntz with scale 0..3 -> int64 + values: []time.Time{time.Now().Truncate(time.Millisecond), localTime.Truncate(time.Millisecond), localTimeFarIntoFuture.Truncate(time.Millisecond)}, + orgTS: true, + nrows: 3, + rowType: execResponseRowType{Scale: 3}, + sc: arrow.NewSchema([]arrow.Field{{Type: &arrow.Int64Type{}}}, nil), + builder: array.NewInt64Builder(pool), + append: func(b array.Builder, vs interface{}) { + for _, t := range vs.([]time.Time) { + b.(*array.Int64Builder).Append(t.UnixMilli()) + } + }, + compare: func(src interface{}, convertedRec arrow.Record) int { + srcvs := src.([]time.Time) + for i := 0; i < convertedRec.Column(0).Len(); i++ { + ts := arrowSnowflakeTimestampToTime(convertedRec.Column(0), timestampNtzType, 3, i, nil) + if srcvs[i].Compare(*ts) != 0 { + return i + } + } + return -1 + }, + }, + { + logical: "timestamp_ntz", + physical: "struct with originalTimestamp", // timestamp_ntz with scale 4..9 -> int64 + int32 + values: []time.Time{time.Now(), localTime, localTimeFarIntoFuture}, + orgTS: true, + nrows: 3, + rowType: execResponseRowType{Scale: 9}, + sc: arrow.NewSchema([]arrow.Field{{Type: timestampNtzStruct}}, nil), + builder: array.NewStructBuilder(pool, timestampNtzStruct), + append: func(b array.Builder, vs interface{}) { + sb := b.(*array.StructBuilder) + valids = []bool{true, true, true} + sb.AppendValues(valids) + for _, t := range vs.([]time.Time) { + sb.FieldBuilder(0).(*array.Int64Builder).Append(t.Unix()) + sb.FieldBuilder(1).(*array.Int32Builder).Append(int32(t.Nanosecond())) + } + }, + compare: func(src interface{}, convertedRec arrow.Record) int { + srcvs := src.([]time.Time) + for i := 0; i < convertedRec.Column(0).Len(); i++ { + ts := arrowSnowflakeTimestampToTime(convertedRec.Column(0), timestampNtzType, 9, i, nil) + if srcvs[i].Compare(*ts) != 0 { + return i + } + } + return -1 + }, + }, + { + logical: "timestamp_ltz", + physical: "int64", // timestamp_ntz with scale 0..3 -> int64 + values: []time.Time{time.Now().Truncate(time.Millisecond), localTime.Truncate(time.Millisecond)}, + nrows: 2, + rowType: execResponseRowType{Scale: 3}, + sc: arrow.NewSchema([]arrow.Field{{Type: &arrow.Int64Type{}}}, nil), + builder: array.NewInt64Builder(pool), + append: func(b array.Builder, vs interface{}) { + for _, t := range vs.([]time.Time) { + b.(*array.Int64Builder).Append(t.UnixMilli()) + } + }, + compare: func(src interface{}, convertedRec arrow.Record) int { + srcvs := src.([]time.Time) + for i, t := range convertedRec.Column(0).(*array.Timestamp).TimestampValues() { + if srcvs[i].UTC() != t.ToTime(arrow.Nanosecond).UTC() { + return i + } + } + return -1 + }, + }, + { + logical: "timestamp_ltz", + physical: "struct", // timestamp_ntz with scale 4..9 -> int64 + int32 + values: []time.Time{time.Now(), localTime}, + nrows: 2, + rowType: execResponseRowType{Scale: 9}, + sc: arrow.NewSchema([]arrow.Field{{Type: timestampNtzStruct}}, nil), + builder: array.NewStructBuilder(pool, timestampNtzStruct), append: func(b array.Builder, vs interface{}) { sb := b.(*array.StructBuilder) valids = []bool{true, true} sb.AppendValues(valids) for _, t := range vs.([]time.Time) { sb.FieldBuilder(0).(*array.Int64Builder).Append(t.Unix()) - sb.FieldBuilder(1).(*array.Int32Builder).Append(int32(t.UnixNano())) + sb.FieldBuilder(1).(*array.Int32Builder).Append(int32(t.Nanosecond())) + } + }, + compare: func(src interface{}, convertedRec arrow.Record) int { + srcvs := src.([]time.Time) + for i, t := range convertedRec.Column(0).(*array.Timestamp).TimestampValues() { + if srcvs[i].Compare(t.ToTime(arrow.Nanosecond)) != 0 { + return i + } + } + return -1 + }, + }, + { + logical: "timestamp_ltz", + physical: "error", + values: []time.Time{localTimeFarIntoFuture}, + error: "Cannot convert timestamp", + nrows: 1, + rowType: execResponseRowType{Scale: 3}, + sc: arrow.NewSchema([]arrow.Field{{Type: &arrow.Int64Type{}}}, nil), + builder: array.NewInt64Builder(pool), + append: func(b array.Builder, vs interface{}) { + for _, t := range vs.([]time.Time) { + b.(*array.Int64Builder).Append(t.UnixMilli()) + } + }, + compare: func(src interface{}, convertedRec arrow.Record) int { return 0 }, + }, + { + logical: "timestamp_ltz", + physical: "int64 with originalTimestamp", // timestamp_ntz with scale 0..3 -> int64 + values: []time.Time{time.Now().Truncate(time.Millisecond), localTime.Truncate(time.Millisecond), localTimeFarIntoFuture.Truncate(time.Millisecond)}, + orgTS: true, + nrows: 3, + rowType: execResponseRowType{Scale: 3}, + sc: arrow.NewSchema([]arrow.Field{{Type: &arrow.Int64Type{}}}, nil), + builder: array.NewInt64Builder(pool), + append: func(b array.Builder, vs interface{}) { + for _, t := range vs.([]time.Time) { + b.(*array.Int64Builder).Append(t.UnixMilli()) + } + }, + compare: func(src interface{}, convertedRec arrow.Record) int { + srcvs := src.([]time.Time) + for i := 0; i < convertedRec.Column(0).Len(); i++ { + ts := arrowSnowflakeTimestampToTime(convertedRec.Column(0), timestampLtzType, 3, i, localTime.Location()) + if srcvs[i].Compare(*ts) != 0 { + return i + } + } + return -1 + }, + }, + { + logical: "timestamp_ltz", + physical: "struct with originalTimestamp", // timestamp_ntz with scale 4..9 -> int64 + int32 + values: []time.Time{time.Now(), localTime, localTimeFarIntoFuture}, + orgTS: true, + nrows: 3, + rowType: execResponseRowType{Scale: 9}, + sc: arrow.NewSchema([]arrow.Field{{Type: timestampLtzStruct}}, nil), + builder: array.NewStructBuilder(pool, timestampLtzStruct), + append: func(b array.Builder, vs interface{}) { + sb := b.(*array.StructBuilder) + valids = []bool{true, true, true} + sb.AppendValues(valids) + for _, t := range vs.([]time.Time) { + sb.FieldBuilder(0).(*array.Int64Builder).Append(t.Unix()) + sb.FieldBuilder(1).(*array.Int32Builder).Append(int32(t.Nanosecond())) + } + }, + compare: func(src interface{}, convertedRec arrow.Record) int { + srcvs := src.([]time.Time) + for i := 0; i < convertedRec.Column(0).Len(); i++ { + ts := arrowSnowflakeTimestampToTime(convertedRec.Column(0), timestampLtzType, 9, i, localTime.Location()) + if srcvs[i].Compare(*ts) != 0 { + return i + } + } + return -1 + }, + }, + { + logical: "timestamp_tz", + physical: "struct2", // timestamp_tz with scale 0..3 -> int64 + int32 + values: []time.Time{time.Now().Truncate(time.Millisecond), localTime.Truncate(time.Millisecond)}, + nrows: 2, + rowType: execResponseRowType{Scale: 3}, + sc: arrow.NewSchema([]arrow.Field{{Type: timestampTzStruct2}}, nil), + builder: array.NewStructBuilder(pool, timestampTzStruct2), + append: func(b array.Builder, vs interface{}) { + sb := b.(*array.StructBuilder) + valids = []bool{true, true} + sb.AppendValues(valids) + for _, t := range vs.([]time.Time) { + sb.FieldBuilder(0).(*array.Int64Builder).Append(t.UnixMilli()) + sb.FieldBuilder(1).(*array.Int32Builder).Append(int32(0)) // timezone index } }, compare: func(src interface{}, convertedRec arrow.Record) int { srcvs := src.([]time.Time) for i, t := range convertedRec.Column(0).(*array.Timestamp).TimestampValues() { - if srcvs[i].Unix() != time.Unix(0, int64(t)).Unix() { + if srcvs[i].UTC() != t.ToTime(arrow.Nanosecond).UTC() { + return i + } + } + return -1 + }, + }, + { + logical: "timestamp_tz", + physical: "struct3", // timestamp_tz with scale 4..9 -> int64 + int32 + int32 + values: []time.Time{time.Now(), localTime}, + nrows: 2, + rowType: execResponseRowType{Scale: 9}, + sc: arrow.NewSchema([]arrow.Field{{Type: timestampTzStruct3}}, nil), + builder: array.NewStructBuilder(pool, timestampTzStruct3), + append: func(b array.Builder, vs interface{}) { + sb := b.(*array.StructBuilder) + valids = []bool{true, true} + sb.AppendValues(valids) + for _, t := range vs.([]time.Time) { + sb.FieldBuilder(0).(*array.Int64Builder).Append(t.Unix()) + sb.FieldBuilder(1).(*array.Int32Builder).Append(int32(t.Nanosecond())) + sb.FieldBuilder(2).(*array.Int32Builder).Append(int32(0)) // timezone index + } + }, + compare: func(src interface{}, convertedRec arrow.Record) int { + srcvs := src.([]time.Time) + for i, t := range convertedRec.Column(0).(*array.Timestamp).TimestampValues() { + if srcvs[i].UTC() != t.ToTime(arrow.Nanosecond).UTC() { + return i + } + } + return -1 + }, + }, + { + logical: "timestamp_tz", + physical: "struct2 with originalTimestamp", // timestamp_ntz with scale 0..3 -> int64 + int32 + values: []time.Time{time.Now().Truncate(time.Millisecond), localTime.Truncate(time.Millisecond), localTimeFarIntoFuture.Truncate(time.Millisecond)}, + orgTS: true, + nrows: 3, + rowType: execResponseRowType{Scale: 3}, + sc: arrow.NewSchema([]arrow.Field{{Type: timestampTzStruct2}}, nil), + builder: array.NewStructBuilder(pool, timestampTzStruct2), + append: func(b array.Builder, vs interface{}) { + sb := b.(*array.StructBuilder) + valids = []bool{true, true, true} + sb.AppendValues(valids) + for _, t := range vs.([]time.Time) { + sb.FieldBuilder(0).(*array.Int64Builder).Append(t.UnixMilli()) + sb.FieldBuilder(1).(*array.Int32Builder).Append(int32(0)) // timezone index + } + }, + compare: func(src interface{}, convertedRec arrow.Record) int { + srcvs := src.([]time.Time) + for i := 0; i < convertedRec.Column(0).Len(); i++ { + ts := arrowSnowflakeTimestampToTime(convertedRec.Column(0), timestampTzType, 3, i, nil) + if srcvs[i].Compare(*ts) != 0 { + return i + } + } + return -1 + }, + }, + { + logical: "timestamp_tz", + physical: "struct3 with originalTimestamp", // timestamp_ntz with scale 4..9 -> int64 + int32 + int32 + values: []time.Time{time.Now(), localTime, localTimeFarIntoFuture}, + orgTS: true, + nrows: 3, + rowType: execResponseRowType{Scale: 9}, + sc: arrow.NewSchema([]arrow.Field{{Type: timestampTzStruct3}}, nil), + builder: array.NewStructBuilder(pool, timestampTzStruct3), + append: func(b array.Builder, vs interface{}) { + sb := b.(*array.StructBuilder) + valids = []bool{true, true, true} + sb.AppendValues(valids) + for _, t := range vs.([]time.Time) { + sb.FieldBuilder(0).(*array.Int64Builder).Append(t.Unix()) + sb.FieldBuilder(1).(*array.Int32Builder).Append(int32(t.Nanosecond())) + sb.FieldBuilder(2).(*array.Int32Builder).Append(int32(0)) // timezone index + } + }, + compare: func(src interface{}, convertedRec arrow.Record) int { + srcvs := src.([]time.Time) + for i := 0; i < convertedRec.Column(0).Len(); i++ { + ts := arrowSnowflakeTimestampToTime(convertedRec.Column(0), timestampTzType, 9, i, nil) + if srcvs[i].Compare(*ts) != 0 { return i } } @@ -1218,9 +1511,16 @@ func TestArrowToRecord(t *testing.T) { meta := tc.rowType meta.Type = tc.logical - transformedRec, err := arrowToRecord(rawRec, pool, []execResponseRowType{meta}, localTime.Location()) + transformedRec, err := arrowToRecord(rawRec, pool, []execResponseRowType{meta}, localTime.Location(), tc.orgTS) if err != nil { + if strings.Contains(err.Error(), tc.error) { + return + } t.Fatalf("error: %s", err) + } else { + if len(tc.error) > 0 { + t.Fatalf("expected error: %s", tc.error) + } } defer transformedRec.Release() @@ -1299,30 +1599,135 @@ func TestSmallTimestampBinding(t *testing.T) { }) } -func TestLargeTimestampBinding(t *testing.T) { - runSnowflakeConnTest(t, func(sct *SCTest) { +func TestTimestampConversionWithoutArrowBatches(t *testing.T) { + timestamps := [3]string{ + "2000-10-10 10:10:10.123456789", // neutral + "9999-12-12 23:59:59.123456789", // max + "0001-01-01 00:00:00.123456789"} // min + types := [3]string{"TIMESTAMP_NTZ", "TIMESTAMP_LTZ", "TIMESTAMP_TZ"} + + runDBTest(t, func(sct *DBTest) { ctx := context.Background() - timeValue, err := time.Parse("2006-01-02 15:04:05", "9000-10-10 10:10:10") - if err != nil { - t.Fatalf("failed to parse time: %v", err) + + for _, tsStr := range timestamps { + ts, err := time.Parse("2006-01-02 15:04:05", tsStr) + if err != nil { + t.Fatalf("failed to parse time: %v", err) + } + for _, tp := range types { + for scale := 0; scale <= 9; scale++ { + t.Run(tp+"("+strconv.Itoa(scale)+")_"+tsStr, func(t *testing.T) { + sql := fmt.Sprintf("SELECT '%s'::%s(%v)", tsStr, tp, scale) + rows := sct.mustQueryContext(ctx, sql, nil) + defer rows.Close() + + var act time.Time + if rows.Next() { + rows.Scan(&act) + exp := ts.Truncate(time.Duration(math.Pow10(9 - scale))) + if exp.Compare(act) != 0 { + t.Fatalf("unexpected result. expected: %v, got: %v", exp, act) + } + } else { + t.Fatalf("failed to run query: %v", sql) + } + }) + } + } } - parameters := []driver.NamedValue{ - {Ordinal: 1, Value: DataTypeTimestampNtz}, - {Ordinal: 2, Value: timeValue}, + }) +} + +func TestTimestampConversionWithArrowBatchesFailsForDistantDates(t *testing.T) { + timestamps := [2]string{ + "9999-12-12 23:59:59.123456789", // max + "0001-01-01 00:00:00.123456789"} // min + types := [3]string{"TIMESTAMP_NTZ", "TIMESTAMP_LTZ", "TIMESTAMP_TZ"} + + expectedError := "Cannot convert timestamp" + + runSnowflakeConnTest(t, func(sct *SCTest) { + ctx := WithArrowBatches(sct.sc.ctx) + + pool := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer pool.AssertSize(t, 0) + ctx = WithArrowAllocator(ctx, pool) + + for _, tsStr := range timestamps { + for _, tp := range types { + for scale := 0; scale <= 9; scale++ { + t.Run(tp+"("+strconv.Itoa(scale)+")_"+tsStr, func(t *testing.T) { + + sql := fmt.Sprintf("SELECT '%s'::%s(%v)", tsStr, tp, scale) + _, err := sct.sc.QueryContext(ctx, sql, []driver.NamedValue{}) + if err != nil { + if !strings.Contains(err.Error(), expectedError) { + t.Fatalf("improper error, expected: %v, got: %v", expectedError, err.Error()) + } + } else { + t.Fatalf("no error, expected: %v ", expectedError) + + } + }) + } + } } + }) +} - rows := sct.mustQueryContext(ctx, "SELECT ?", parameters) - defer rows.Close() +func TestTimestampConversionWithArrowBatchesAndWithOriginalTimestamp(t *testing.T) { + timestamps := [3]string{ + "2000-10-10 10:10:10.123456789", // neutral + "9999-12-12 23:59:59.123456789", // max + "0001-01-01 00:00:00.123456789"} // min + types := [3]string{"TIMESTAMP_NTZ", "TIMESTAMP_LTZ", "TIMESTAMP_TZ"} - scanValues := make([]driver.Value, 1) - for { - if err := rows.Next(scanValues); err == io.EOF { - break - } else if err != nil { - t.Fatalf("failed to run query: %v", err) + runSnowflakeConnTest(t, func(sct *SCTest) { + ctx := WithOriginalTimestamp(WithArrowBatches(sct.sc.ctx)) + pool := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer pool.AssertSize(t, 0) + ctx = WithArrowAllocator(ctx, pool) + + for _, tsStr := range timestamps { + ts, err := time.Parse("2006-01-02 15:04:05", tsStr) + if err != nil { + t.Fatalf("failed to parse time: %v", err) } - if scanValues[0] != timeValue { - t.Fatalf("unexpected result. expected: %v, got: %v", timeValue, scanValues[0]) + for _, tp := range types { + for scale := 0; scale <= 9; scale++ { + t.Run(tp+"("+strconv.Itoa(scale)+")_"+tsStr, func(t *testing.T) { + + sql := fmt.Sprintf("SELECT '%s'::%s(%v)", tsStr, tp, scale) + rows := sct.mustQueryContext(ctx, sql, []driver.NamedValue{}) + defer rows.Close() + + // getting result batches + batches, err := rows.(*snowflakeRows).GetArrowBatches() + if err != nil { + t.Error(err) + } + + numBatches := len(batches) + if numBatches != 1 { + t.Errorf("incorrect number of batches, expected: 1, got: %v", numBatches) + } + + rec, err := batches[0].Fetch() + if err != nil { + t.Error(err) + } + exp := ts.Truncate(time.Duration(math.Pow10(9 - scale))) + for _, r := range *rec { + act := ArrowSnowflakeTimestampToTime(r, batches[0], 0, 0) + if act == nil { + t.Fatalf("unexpected result. expected: %v, got: nil", exp) + } else if exp.Compare(*act) != 0 { + t.Fatalf("unexpected result. expected: %v, got: %v", exp, act) + } + r.Release() + } + }) + } } } }) diff --git a/errors.go b/errors.go index a4104f66c..64a77ac5f 100644 --- a/errors.go +++ b/errors.go @@ -216,6 +216,8 @@ const ( ErrInvalidOffsetStr = 268001 // ErrInvalidBinaryHexForm is an error code for the case where a binary data in hex form is invalid. ErrInvalidBinaryHexForm = 268002 + // ErrTooHighTimestampPrecision is an error code for the case where cannot convert Snowflake timestamp to arrow.Timestamp + ErrTooHighTimestampPrecision = 268003 /* OCSP */ diff --git a/util.go b/util.go index ade109364..96f2e9bb5 100644 --- a/util.go +++ b/util.go @@ -18,16 +18,17 @@ import ( type contextKey string const ( - multiStatementCount contextKey = "MULTI_STATEMENT_COUNT" - asyncMode contextKey = "ASYNC_MODE_QUERY" - queryIDChannel contextKey = "QUERY_ID_CHANNEL" - snowflakeRequestIDKey contextKey = "SNOWFLAKE_REQUEST_ID" - fetchResultByID contextKey = "SF_FETCH_RESULT_BY_ID" - fileStreamFile contextKey = "STREAMING_PUT_FILE" - fileTransferOptions contextKey = "FILE_TRANSFER_OPTIONS" - enableHigherPrecision contextKey = "ENABLE_HIGHER_PRECISION" - arrowBatches contextKey = "ARROW_BATCHES" - arrowAlloc contextKey = "ARROW_ALLOC" + multiStatementCount contextKey = "MULTI_STATEMENT_COUNT" + asyncMode contextKey = "ASYNC_MODE_QUERY" + queryIDChannel contextKey = "QUERY_ID_CHANNEL" + snowflakeRequestIDKey contextKey = "SNOWFLAKE_REQUEST_ID" + fetchResultByID contextKey = "SF_FETCH_RESULT_BY_ID" + fileStreamFile contextKey = "STREAMING_PUT_FILE" + fileTransferOptions contextKey = "FILE_TRANSFER_OPTIONS" + enableHigherPrecision contextKey = "ENABLE_HIGHER_PRECISION" + arrowBatches contextKey = "ARROW_BATCHES" + arrowAlloc contextKey = "ARROW_ALLOC" + enableOriginalTimestamp contextKey = "ENABLE_ORIGINAL_TIMESTAMP" ) const ( @@ -105,6 +106,13 @@ func WithArrowAllocator(ctx context.Context, pool memory.Allocator) context.Cont return context.WithValue(ctx, arrowAlloc, pool) } +// WithOriginalTimestamp in combination with WithArrowBatches returns a context +// that allows users to retrieve arrow.Record with original timestamp struct returned by Snowflake. +// It can be used in case arrow.Timestamp cannot fit original timestamp values. +func WithOriginalTimestamp(ctx context.Context) context.Context { + return context.WithValue(ctx, enableOriginalTimestamp, true) +} + // Get the request ID from the context if specified, otherwise generate one func getOrGenerateRequestIDFromContext(ctx context.Context) UUID { requestID, ok := ctx.Value(snowflakeRequestIDKey).(UUID)