Skip to content

Commit

Permalink
Merge branch 'SNOW-878073-retry-strategy' of github.com:snowflakedb/g…
Browse files Browse the repository at this point in the history
…osnowflake into SNOW-878073-retry-strategy
  • Loading branch information
sfc-gh-dheyman committed Oct 11, 2023
2 parents 4a09136 + da1a79e commit e0219c6
Show file tree
Hide file tree
Showing 23 changed files with 847 additions and 280 deletions.
2 changes: 1 addition & 1 deletion arrow_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (arc *arrowResultChunk) decodeArrowBatch(scd *snowflakeChunkDownloader) (*[
for arc.reader.Next() {
rawRecord := arc.reader.Record()

record, err := arrowToRecord(rawRecord, arc.allocator, scd.RowSet.RowType, arc.loc)
record, err := arrowToRecord(scd.ctx, rawRecord, arc.allocator, scd.RowSet.RowType, arc.loc)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions authokta.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func postAuthSAML(
fullURL := sr.getFullURL(authenticatorRequestPath, params)

logger.Infof("fullURL: %v", fullURL)
resp, err := sr.FuncPost(ctx, sr, fullURL, headers, body, timeout, defaultTimeProvider)
resp, err := sr.FuncPost(ctx, sr, fullURL, headers, body, timeout, defaultTimeProvider, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -274,7 +274,7 @@ func postAuthOKTA(
if err != nil {
return nil, err
}
resp, err := sr.FuncPost(ctx, sr, targetURL, headers, body, timeout, defaultTimeProvider)
resp, err := sr.FuncPost(ctx, sr, targetURL, headers, body, timeout, defaultTimeProvider, nil)
if err != nil {
return nil, err
}
Expand Down
7 changes: 5 additions & 2 deletions chunk_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func getChunk(
if err != nil {
return nil, err
}
return newRetryHTTP(ctx, sc.rest.Client, http.NewRequest, u, headers, timeout, sc.currentTimeProvider).execute()
return newRetryHTTP(ctx, sc.rest.Client, http.NewRequest, u, headers, timeout, sc.currentTimeProvider, sc.cfg).execute()
}

func (scd *snowflakeChunkDownloader) startArrowBatches() error {
Expand All @@ -279,6 +279,7 @@ func (scd *snowflakeChunkDownloader) startArrowBatches() error {
idx: 0,
scd: scd,
funcDownloadHelper: scd.FuncDownloadHelper,
loc: loc,
}
// decode first chunk if possible
if firstArrowChunk.allocator != nil {
Expand All @@ -293,6 +294,7 @@ func (scd *snowflakeChunkDownloader) startArrowBatches() error {
idx: i,
scd: scd,
funcDownloadHelper: scd.FuncDownloadHelper,
loc: loc,
}
}
return nil
Expand Down Expand Up @@ -636,7 +638,7 @@ func (f *httpStreamChunkFetcher) fetch(URL string, rows chan<- []*string) error
if err != nil {
return err
}
res, err := newRetryHTTP(context.Background(), f.client, http.NewRequest, fullURL, f.headers, 0, defaultTimeProvider).execute()
res, err := newRetryHTTP(context.Background(), f.client, http.NewRequest, fullURL, f.headers, 0, defaultTimeProvider, nil).execute()
if err != nil {
return err
}
Expand Down Expand Up @@ -708,6 +710,7 @@ type ArrowBatch struct {
scd *snowflakeChunkDownloader
funcDownloadHelper func(context.Context, *snowflakeChunkDownloader, int) error
ctx context.Context
loc *time.Location
}

// WithContext sets the context which will be used for this ArrowBatch.
Expand Down
2 changes: 1 addition & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ func (cli *httpClient) Post(
body []byte,
timeout time.Duration,
currentTimeProvider currentTimeProvider) (*http.Response, error) {
return cli.sr.FuncPost(ctx, cli.sr, url, headers, body, timeout, currentTimeProvider)
return cli.sr.FuncPost(ctx, cli.sr, url, headers, body, timeout, currentTimeProvider, nil)
}
19 changes: 14 additions & 5 deletions cmd/arrow/batches/arrow_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/apache/arrow/go/v12/arrow/memory"
"log"
"sync"
"time"

sf "github.com/snowflakedb/gosnowflake"
)
Expand All @@ -20,10 +21,11 @@ type sampleRecord struct {
workerID int
number int32
string string
ts *time.Time
}

func (s sampleRecord) String() string {
return fmt.Sprintf("batchID: %v, workerID: %v, number: %v, string: %v", s.batchID, s.workerID, s.number, s.string)
return fmt.Sprintf("batchID: %v, workerID: %v, number: %v, string: %v, ts: %v", s.batchID, s.workerID, s.number, s.string, s.ts)
}

func main() {
Expand All @@ -48,8 +50,14 @@ func main() {
log.Fatalf("failed to create DSN from Config: %v, err: %v", cfg, err)
}

ctx := sf.WithArrowAllocator(sf.WithArrowBatches(context.Background()), memory.DefaultAllocator)
query := "SELECT SEQ4(), 'example ' || (SEQ4() * 2) FROM TABLE(GENERATOR(ROWCOUNT=>30000))"
ctx :=
sf.WithOriginalTimestamp(
sf.WithArrowAllocator(
sf.WithArrowBatches(context.Background()), memory.DefaultAllocator))

query := "SELECT SEQ4(), 'example ' || (SEQ4() * 2), " +
" TO_TIMESTAMP_NTZ('9999-01-01 13:13:13.' || LPAD(SEQ4(),9,'0')) ltz " +
" FROM TABLE(GENERATOR(ROWCOUNT=>30000))"

db, err := sql.Open("snowflake", dsn)
if err != nil {
Expand Down Expand Up @@ -88,7 +96,7 @@ func main() {
}
sampleRecordsPerBatch[batchID] = make([]sampleRecord, batches[batchID].GetRowCount())
totalRowID := 0
convertFromColumnsToRows(records, sampleRecordsPerBatch, batchID, workerId, totalRowID)
convertFromColumnsToRows(records, sampleRecordsPerBatch, batchID, workerId, totalRowID, batches[batchID])
}
}(&waitGroup, batchIds, workerID)
}
Expand All @@ -110,14 +118,15 @@ func main() {
}

func convertFromColumnsToRows(records *[]arrow.Record, sampleRecordsPerBatch [][]sampleRecord, batchID int,
workerID int, totalRowID int) {
workerID int, totalRowID int, batch *sf.ArrowBatch) {
for _, record := range *records {
for rowID, intColumn := range record.Column(0).(*array.Int32).Int32Values() {
sampleRecord := sampleRecord{
batchID: batchID,
workerID: workerID,
number: intColumn,
string: record.Column(1).(*array.String).Value(rowID),
ts: batch.ArrowSnowflakeTimestampToTime(record, 2, rowID),
}
sampleRecordsPerBatch[batchID][totalRowID] = sampleRecord
totalRowID++
Expand Down
2 changes: 1 addition & 1 deletion connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func (sc *snowflakeConn) queryContextInternal(
rows.addDownloader(populateChunkDownloader(ctx, sc, data.Data))
}

rows.ChunkDownloader.start()
err = rows.ChunkDownloader.start()
return rows, err
}

Expand Down
Loading

0 comments on commit e0219c6

Please sign in to comment.