Skip to content

Commit

Permalink
SNOW-911238: Add WithOriginalTimestamp context switch (#918)
Browse files Browse the repository at this point in the history
SNOW-911238: Add WithOriginalTimestamp context switch
  • Loading branch information
sfc-gh-dstempniak authored Oct 10, 2023
1 parent bec4641 commit 3a29554
Show file tree
Hide file tree
Showing 10 changed files with 645 additions and 232 deletions.
2 changes: 1 addition & 1 deletion arrow_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (arc *arrowResultChunk) decodeArrowBatch(scd *snowflakeChunkDownloader) (*[
for arc.reader.Next() {
rawRecord := arc.reader.Record()

record, err := arrowToRecord(rawRecord, arc.allocator, scd.RowSet.RowType, arc.loc)
record, err := arrowToRecord(scd.ctx, rawRecord, arc.allocator, scd.RowSet.RowType, arc.loc)
if err != nil {
return nil, err
}
Expand Down
3 changes: 3 additions & 0 deletions chunk_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ func (scd *snowflakeChunkDownloader) startArrowBatches() error {
idx: 0,
scd: scd,
funcDownloadHelper: scd.FuncDownloadHelper,
loc: loc,
}
// decode first chunk if possible
if firstArrowChunk.allocator != nil {
Expand All @@ -293,6 +294,7 @@ func (scd *snowflakeChunkDownloader) startArrowBatches() error {
idx: i,
scd: scd,
funcDownloadHelper: scd.FuncDownloadHelper,
loc: loc,
}
}
return nil
Expand Down Expand Up @@ -708,6 +710,7 @@ type ArrowBatch struct {
scd *snowflakeChunkDownloader
funcDownloadHelper func(context.Context, *snowflakeChunkDownloader, int) error
ctx context.Context
loc *time.Location
}

// WithContext sets the context which will be used for this ArrowBatch.
Expand Down
19 changes: 14 additions & 5 deletions cmd/arrow/batches/arrow_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/apache/arrow/go/v12/arrow/memory"
"log"
"sync"
"time"

sf "github.com/snowflakedb/gosnowflake"
)
Expand All @@ -20,10 +21,11 @@ type sampleRecord struct {
workerID int
number int32
string string
ts *time.Time
}

func (s sampleRecord) String() string {
return fmt.Sprintf("batchID: %v, workerID: %v, number: %v, string: %v", s.batchID, s.workerID, s.number, s.string)
return fmt.Sprintf("batchID: %v, workerID: %v, number: %v, string: %v, ts: %v", s.batchID, s.workerID, s.number, s.string, s.ts)
}

func main() {
Expand All @@ -48,8 +50,14 @@ func main() {
log.Fatalf("failed to create DSN from Config: %v, err: %v", cfg, err)
}

ctx := sf.WithArrowAllocator(sf.WithArrowBatches(context.Background()), memory.DefaultAllocator)
query := "SELECT SEQ4(), 'example ' || (SEQ4() * 2) FROM TABLE(GENERATOR(ROWCOUNT=>30000))"
ctx :=
sf.WithOriginalTimestamp(
sf.WithArrowAllocator(
sf.WithArrowBatches(context.Background()), memory.DefaultAllocator))

query := "SELECT SEQ4(), 'example ' || (SEQ4() * 2), " +
" TO_TIMESTAMP_NTZ('9999-01-01 13:13:13.' || LPAD(SEQ4(),9,'0')) ltz " +
" FROM TABLE(GENERATOR(ROWCOUNT=>30000))"

db, err := sql.Open("snowflake", dsn)
if err != nil {
Expand Down Expand Up @@ -88,7 +96,7 @@ func main() {
}
sampleRecordsPerBatch[batchID] = make([]sampleRecord, batches[batchID].GetRowCount())
totalRowID := 0
convertFromColumnsToRows(records, sampleRecordsPerBatch, batchID, workerId, totalRowID)
convertFromColumnsToRows(records, sampleRecordsPerBatch, batchID, workerId, totalRowID, batches[batchID])
}
}(&waitGroup, batchIds, workerID)
}
Expand All @@ -110,14 +118,15 @@ func main() {
}

func convertFromColumnsToRows(records *[]arrow.Record, sampleRecordsPerBatch [][]sampleRecord, batchID int,
workerID int, totalRowID int) {
workerID int, totalRowID int, batch *sf.ArrowBatch) {
for _, record := range *records {
for rowID, intColumn := range record.Column(0).(*array.Int32).Int32Values() {
sampleRecord := sampleRecord{
batchID: batchID,
workerID: workerID,
number: intColumn,
string: record.Column(1).(*array.String).Value(rowID),
ts: batch.ArrowSnowflakeTimestampToTime(record, 2, rowID),
}
sampleRecordsPerBatch[batchID][totalRowID] = sampleRecord
totalRowID++
Expand Down
2 changes: 1 addition & 1 deletion connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func (sc *snowflakeConn) queryContextInternal(
rows.addDownloader(populateChunkDownloader(ctx, sc, data.Data))
}

rows.ChunkDownloader.start()
err = rows.ChunkDownloader.start()
return rows, err
}

Expand Down
Loading

0 comments on commit 3a29554

Please sign in to comment.