Skip to content

Commit

Permalink
SNOW-911238: Add WithOriginalTimestamp context switch
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-dstempniak committed Sep 25, 2023
1 parent 082e9f8 commit b89711e
Show file tree
Hide file tree
Showing 8 changed files with 622 additions and 219 deletions.
4 changes: 2 additions & 2 deletions arrow_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ func (arc *arrowResultChunk) decodeArrowChunk(rowType []execResponseRowType, hig
return chunkRows, arc.reader.Err()
}

func (arc *arrowResultChunk) decodeArrowBatch(scd *snowflakeChunkDownloader) (*[]arrow.Record, error) {
func (arc *arrowResultChunk) decodeArrowBatch(scd *snowflakeChunkDownloader, originalTimestamp bool) (*[]arrow.Record, error) {
var records []arrow.Record
defer arc.reader.Release()

for arc.reader.Next() {
rawRecord := arc.reader.Record()

record, err := arrowToRecord(rawRecord, arc.allocator, scd.RowSet.RowType, arc.loc)
record, err := arrowToRecord(rawRecord, arc.allocator, scd.RowSet.RowType, arc.loc, originalTimestamp)
if err != nil {
return nil, err
}
Expand Down
9 changes: 7 additions & 2 deletions chunk_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,12 @@ func (scd *snowflakeChunkDownloader) startArrowBatches() error {
idx: 0,
scd: scd,
funcDownloadHelper: scd.FuncDownloadHelper,
loc: loc,
}
// decode first chunk if possible
if firstArrowChunk.allocator != nil {
scd.FirstBatch.rec, err = firstArrowChunk.decodeArrowBatch(scd)
originalTimestamp := originalTimestampEnabled(scd.ctx)
scd.FirstBatch.rec, err = firstArrowChunk.decodeArrowBatch(scd, originalTimestamp)
if err != nil {
return err
}
Expand All @@ -293,6 +295,7 @@ func (scd *snowflakeChunkDownloader) startArrowBatches() error {
idx: i,
scd: scd,
funcDownloadHelper: scd.FuncDownloadHelper,
loc: loc,
}
}
return nil
Expand Down Expand Up @@ -440,7 +443,8 @@ func decodeChunk(scd *snowflakeChunkDownloader, idx int, bufStream *bufio.Reader
scd.pool,
}
if usesArrowBatches(scd.ctx) {
if scd.ArrowBatches[idx].rec, err = arc.decodeArrowBatch(scd); err != nil {
originalTimestamp := originalTimestampEnabled(scd.ctx)
if scd.ArrowBatches[idx].rec, err = arc.decodeArrowBatch(scd, originalTimestamp); err != nil {
return err
}
// updating metadata
Expand Down Expand Up @@ -708,6 +712,7 @@ type ArrowBatch struct {
scd *snowflakeChunkDownloader
funcDownloadHelper func(context.Context, *snowflakeChunkDownloader, int) error
ctx context.Context
loc *time.Location
}

// WithContext sets the context which will be used for this ArrowBatch.
Expand Down
25 changes: 18 additions & 7 deletions cmd/arrow/batches/arrow_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,22 @@ import (
"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/memory"
sf "github.com/snowflakedb/gosnowflake"
"log"
"sync"

sf "github.com/snowflakedb/gosnowflake"
"time"
)

type sampleRecord struct {
batchID int
workerID int
number int32
string string
ts *time.Time
}

func (s sampleRecord) String() string {
return fmt.Sprintf("batchID: %v, workerID: %v, number: %v, string: %v", s.batchID, s.workerID, s.number, s.string)
return fmt.Sprintf("batchID: %v, workerID: %v, number: %v, string: %v, ts: %v", s.batchID, s.workerID, s.number, s.string, s.ts)
}

func main() {
Expand All @@ -38,6 +39,9 @@ func main() {
{Name: "Host", EnvName: "SNOWFLAKE_TEST_HOST", FailOnMissing: false},
{Name: "Port", EnvName: "SNOWFLAKE_TEST_PORT", FailOnMissing: false},
{Name: "Protocol", EnvName: "SNOWFLAKE_TEST_PROTOCOL", FailOnMissing: false},
{Name: "Database", EnvName: "SNOWFLAKE_TEST_DATABASE", FailOnMissing: true},
{Name: "Schema", EnvName: "SNOWFLAKE_TEST_SCHEMA", FailOnMissing: true},
{Name: "Role", EnvName: "SNOWFLAKE_TEST_ROLE", FailOnMissing: false},
})
if err != nil {
log.Fatalf("failed to create Config, err: %v", err)
Expand All @@ -48,8 +52,14 @@ func main() {
log.Fatalf("failed to create DSN from Config: %v, err: %v", cfg, err)
}

ctx := sf.WithArrowAllocator(sf.WithArrowBatches(context.Background()), memory.DefaultAllocator)
query := "SELECT SEQ4(), 'example ' || (SEQ4() * 2) FROM TABLE(GENERATOR(ROWCOUNT=>30000))"
ctx :=
sf.WithOriginalTimestamp(
sf.WithArrowAllocator(
sf.WithArrowBatches(context.Background()), memory.DefaultAllocator))

query := "SELECT SEQ4(), 'example ' || (SEQ4() * 2), " +
" TO_TIMESTAMP_NTZ('9999-01-01 13:13:13.' || LPAD(SEQ4(),9,'0')) ltz " +
" FROM TABLE(GENERATOR(ROWCOUNT=>30000))"

db, err := sql.Open("snowflake", dsn)
if err != nil {
Expand Down Expand Up @@ -88,7 +98,7 @@ func main() {
}
sampleRecordsPerBatch[batchID] = make([]sampleRecord, batches[batchID].GetRowCount())
totalRowID := 0
convertFromColumnsToRows(records, sampleRecordsPerBatch, batchID, workerId, totalRowID)
convertFromColumnsToRows(records, sampleRecordsPerBatch, batchID, workerId, totalRowID, batches[batchID])
}
}(&waitGroup, batchIds, workerID)
}
Expand All @@ -110,14 +120,15 @@ func main() {
}

func convertFromColumnsToRows(records *[]arrow.Record, sampleRecordsPerBatch [][]sampleRecord, batchID int,
workerID int, totalRowID int) {
workerID int, totalRowID int, batch *sf.ArrowBatch) {
for _, record := range *records {
for rowID, intColumn := range record.Column(0).(*array.Int32).Int32Values() {
sampleRecord := sampleRecord{
batchID: batchID,
workerID: workerID,
number: intColumn,
string: record.Column(1).(*array.String).Value(rowID),
ts: sf.ArrowSnowflakeTimestampToTime(record, batch, 2, rowID),
}
sampleRecordsPerBatch[batchID][totalRowID] = sampleRecord
totalRowID++
Expand Down
2 changes: 1 addition & 1 deletion connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func (sc *snowflakeConn) queryContextInternal(
rows.addDownloader(populateChunkDownloader(ctx, sc, data.Data))
}

rows.ChunkDownloader.start()
err = rows.ChunkDownloader.start()
return rows, err
}

Expand Down
Loading

0 comments on commit b89711e

Please sign in to comment.