Skip to content

Commit

Permalink
refactor download files (#875)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-ext-simba-lb authored Aug 4, 2023
1 parent 5e1158c commit 09e2e45
Showing 1 changed file with 7 additions and 51 deletions.
58 changes: 7 additions & 51 deletions file_transfer_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (sfa *snowflakeFileTransferAgent) execute() error {
if sfa.stageLocationType != local {
sizeThreshold := sfa.options.MultiPartThreshold
meta.options.MultiPartThreshold = sizeThreshold
if meta.srcFileSize > sizeThreshold {
if meta.srcFileSize > sizeThreshold && sfa.commandType == uploadCommand {
meta.parallel = sfa.parallel
largeFileMetas = append(largeFileMetas, meta)
} else {
Expand All @@ -194,7 +194,7 @@ func (sfa *snowflakeFileTransferAgent) execute() error {
return err
}
} else {
if err = sfa.download(largeFileMetas, smallFileMetas); err != nil {
if err = sfa.download(smallFileMetas); err != nil {
return err
}
}
Expand Down Expand Up @@ -697,31 +697,19 @@ func (sfa *snowflakeFileTransferAgent) upload(
}

func (sfa *snowflakeFileTransferAgent) download(
largeFileMetadata []*fileMetadata,
smallFileMetadata []*fileMetadata) error {
fileMetadata []*fileMetadata) error {
client, err := sfa.getStorageClient(sfa.stageLocationType).
createClient(sfa.stageInfo, sfa.useAccelerateEndpoint)
if err != nil {
return err
}
for _, meta := range smallFileMetadata {
meta.client = client
}
for _, meta := range largeFileMetadata {
for _, meta := range fileMetadata {
meta.client = client
}

if len(smallFileMetadata) > 0 {
logger.WithContext(sfa.sc.ctx).Infof("downloading %v small files", len(smallFileMetadata))
if err = sfa.downloadFilesParallel(smallFileMetadata); err != nil {
return err
}
}
if len(largeFileMetadata) > 0 {
logger.WithContext(sfa.sc.ctx).Infof("downloading %v large files", len(largeFileMetadata))
if err = sfa.downloadFilesSequential(largeFileMetadata); err != nil {
return err
}
logger.WithContext(sfa.sc.ctx).Infof("downloading %v files", len(fileMetadata))
if err = sfa.downloadFilesParallel(fileMetadata); err != nil {
return err
}
return nil
}
Expand Down Expand Up @@ -962,38 +950,6 @@ func (sfa *snowflakeFileTransferAgent) downloadFilesParallel(fileMetas []*fileMe
return err
}

func (sfa *snowflakeFileTransferAgent) downloadFilesSequential(fileMetas []*fileMetadata) error {
idx := 0
fileMetaLen := len(fileMetas)
for idx < fileMetaLen {
res, err := sfa.downloadOneFile(fileMetas[idx])
if err != nil {
return err
}

if res.resStatus == renewToken {
client, err := sfa.renewExpiredClient()
if err != nil {
return err
}
for i := idx; i < fileMetaLen; i++ {
fileMetas[i].client = client
}
continue
} else if res.resStatus == renewPresignedURL {
sfa.updateFileMetadataWithPresignedURL()
continue
}

sfa.results = append(sfa.results, res)
idx++
if injectWaitPut > 0 {
time.Sleep(injectWaitPut)
}
}
return nil
}

func (sfa *snowflakeFileTransferAgent) downloadOneFile(meta *fileMetadata) (*fileMetadata, error) {
tmpDir, err := os.MkdirTemp("", "")
if err != nil {
Expand Down

0 comments on commit 09e2e45

Please sign in to comment.