Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dev'
Browse files Browse the repository at this point in the history
  • Loading branch information
RobiNino committed Jul 31, 2024
2 parents b96b0ad + dbcb87b commit 11e9718
Show file tree
Hide file tree
Showing 12 changed files with 102 additions and 104 deletions.
16 changes: 0 additions & 16 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,6 @@ jobs:
- name: ${{ matrix.suite }} tests
run: go test -v github.com/jfrog/jfrog-client-go/tests --timeout 0 --test.${{ matrix.suite }} --rt.url=${{ secrets.PLATFORM_URL }}/artifactory --ds.url=${{ secrets.PLATFORM_URL }}/distribution --xr.url=${{ secrets.PLATFORM_URL }}/xray --xsc.url=${{ secrets.PLATFORM_URL }}/xsc --access.url=${{ secrets.PLATFORM_URL }}/access --rt.user=${{ secrets.PLATFORM_USER }} --rt.password=${{ secrets.PLATFORM_PASSWORD }} --access.token=${{ secrets.PLATFORM_ADMIN_TOKEN }} --ci.runId=${{ runner.os }}-${{ matrix.suite }}

JFrog-Client-Go-Pipelines-Tests:
needs: Pretest
name: pipelines ubuntu-latest
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
ref: ${{ github.event.pull_request.head.sha }}

- name: Setup Go with cache
uses: jfrog/.github/actions/install-go-with-cache@main

- name: pipelines tests
run: go test -v github.com/jfrog/jfrog-client-go/tests --timeout 0 --test.pipelines --rt.url=${{ secrets.PLATFORM_URL }}/artifactory --pipe.url=${{ secrets.PLATFORM_URL }}/pipelines --rt.user=${{ secrets.PLATFORM_USER }} --rt.password=${{ secrets.PLATFORM_PASSWORD }} --pipe.accessToken=${{ secrets.PLATFORM_ADMIN_TOKEN }} --pipe.vcsToken=${{ secrets.CLI_PIPE_VCS_TOKEN }} --pipe.vcsRepo=${{ secrets.CLI_PIPE_VCS_REPO }} --pipe.vcsBranch=${{ secrets.CLI_PIPE_VCS_BRANCH }} --ci.runId=${{ runner.os }}_pipe

JFrog-Client-Go-Repositories-Tests:
needs: Pretest
name: repositories ubuntu-latest
Expand Down
35 changes: 20 additions & 15 deletions artifactory/services/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package services

import (
"errors"
"fmt"
"github.com/jfrog/build-info-go/entities"
biutils "github.com/jfrog/build-info-go/utils"
ioutils "github.com/jfrog/gofrog/io"
Expand Down Expand Up @@ -359,7 +360,7 @@ func addCreateDirsTasks(directoriesDataKeys []string, alreadyCreatedDirs map[str
sort.Sort(sort.Reverse(sort.StringSlice(directoriesDataKeys)))
for index, v := range directoriesDataKeys {
// In order to avoid duplication we need to check the path wasn't already created by the previous action.
if v != "." && // For some files the returned path can be the root path, ".", in that case we doing need to create any directory.
if v != "." && // For some files the returned path can be the root path, ".", in that case we don't need to create any directory.
(index == 0 || !utils.IsSubPath(directoriesDataKeys, index, "/")) { // directoriesDataKeys store all the path which might needed to be created, that's include duplicated paths.
// By sorting the directoriesDataKeys we can assure that the longest path was created and therefore no need to create all it's sub paths.

Expand Down Expand Up @@ -522,7 +523,7 @@ func createLocalSymlink(localPath, localFileName, symlinkArtifact string, symlin
if errorutils.CheckError(err) != nil {
return err
}
log.Debug(logMsgPrefix + "Creating symlink file.")
log.Debug(fmt.Sprintf("%sCreated symlink: %q -> %q", logMsgPrefix, localSymlinkPath, symlinkArtifact))
return nil
}

Expand Down Expand Up @@ -553,7 +554,6 @@ func (ds *DownloadService) createFileHandlerFunc(downloadParams DownloadParams,
if err != nil {
return err
}
log.Info(logMsgPrefix+"Downloading", downloadData.Dependency.GetItemRelativePath())
if ds.DryRun {
successCounters[threadId]++
return nil
Expand All @@ -563,17 +563,19 @@ func (ds *DownloadService) createFileHandlerFunc(downloadParams DownloadParams,
return err
}
localPath, localFileName := fileutils.GetLocalPathAndFile(downloadData.Dependency.Name, downloadData.Dependency.Path, target, downloadData.Flat, placeholdersUsed)
localFullPath := filepath.Join(localPath, localFileName)
if downloadData.Dependency.Type == string(utils.Folder) {
return createDir(localPath, localFileName, logMsgPrefix)
return createDir(localFullPath, logMsgPrefix)
}
if err = removeIfSymlink(filepath.Join(localPath, localFileName)); err != nil {
if err = removeIfSymlink(localFullPath); err != nil {
return err
}
if downloadParams.IsSymlink() {
if isSymlink, err := ds.createSymlinkIfNeeded(ds.GetArtifactoryDetails().GetUrl(), localPath, localFileName, logMsgPrefix, downloadData, successCounters, threadId, downloadParams); isSymlink {
return err
}
}
log.Info(fmt.Sprintf("%sDownloading %q to %q", logMsgPrefix, downloadData.Dependency.GetItemRelativePath(), localFullPath))
if err = ds.downloadFileIfNeeded(downloadPath, localPath, localFileName, logMsgPrefix, downloadData, downloadParams); err != nil {
log.Error(logMsgPrefix, "Received an error: "+err.Error())
return err
Expand All @@ -586,12 +588,13 @@ func (ds *DownloadService) createFileHandlerFunc(downloadParams DownloadParams,
}

func (ds *DownloadService) downloadFileIfNeeded(downloadPath, localPath, localFileName, logMsgPrefix string, downloadData DownloadData, downloadParams DownloadParams) error {
isEqual, err := fileutils.IsEqualToLocalFile(filepath.Join(localPath, localFileName), downloadData.Dependency.Actual_Md5, downloadData.Dependency.Actual_Sha1)
localFilePath := filepath.Join(localPath, localFileName)
isEqual, err := fileutils.IsEqualToLocalFile(localFilePath, downloadData.Dependency.Actual_Md5, downloadData.Dependency.Actual_Sha1)
if err != nil {
return err
}
if isEqual {
log.Debug(logMsgPrefix + "File already exists locally.")
log.Debug(logMsgPrefix+"File already exists locally:", localFilePath)
if ds.Progress != nil {
ds.Progress.IncrementGeneralProgress()
}
Expand All @@ -604,8 +607,7 @@ func (ds *DownloadService) downloadFileIfNeeded(downloadPath, localPath, localFi
return ds.downloadFile(downloadFileDetails, logMsgPrefix, downloadParams)
}

func createDir(localPath, localFileName, logMsgPrefix string) error {
folderPath := filepath.Join(localPath, localFileName)
func createDir(folderPath, logMsgPrefix string) error {
if err := fileutils.CreateDirIfNotExist(folderPath); err != nil {
return err
}
Expand Down Expand Up @@ -642,13 +644,16 @@ type DownloadParams struct {
Flat bool
Explode bool
BypassArchiveInspection bool
MinSplitSize int64
SplitCount int
PublicGpgKey string
SkipChecksum bool
// Optional fields to avoid AQL request
// Min split size in Kilobytes
MinSplitSize int64
SplitCount int
PublicGpgKey string
SkipChecksum bool

// Optional fields (Sha256,Size) to avoid AQL request:
Sha256 string
Size *int64
// Size in bytes
Size *int64
}

func (ds *DownloadParams) IsFlat() bool {
Expand Down
31 changes: 19 additions & 12 deletions artifactory/services/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ const (
defaultUploadMinSplit = utils.SizeMiB * 200
// The default maximum number of parts that can be concurrently uploaded per file during a multipart upload
defaultUploadSplitCount = 5
// Minimal file size to show progress bar
minFileSizeForProgressInKb = 250 * utils.SizeKib
)

type UploadService struct {
Expand Down Expand Up @@ -92,7 +94,7 @@ func (us *UploadService) UploadFiles(uploadParams ...UploadParams) (summary *uti
errorsQueue := clientutils.NewErrorsQueue(1)
if us.saveSummary {
us.resultsManager, err = newResultManager()
if err != nil {
if err != nil || us.resultsManager == nil {
return nil, err
}
defer func() {
Expand Down Expand Up @@ -528,13 +530,13 @@ func (us *UploadService) uploadFileFromReader(getReaderFunc func() (io.Reader, e
var resp *http.Response
var body []byte
var checksumDeployed = false
var e error
var err error
httpClientsDetails := us.ArtDetails.CreateHttpClientDetails()
if !us.DryRun {
if us.shouldTryChecksumDeploy(details.Size, uploadParams) {
resp, body, e = us.doChecksumDeploy(details, targetUrlWithProps, httpClientsDetails, us.client)
if e != nil {
return false, e
resp, body, err = us.doChecksumDeploy(details, targetUrlWithProps, httpClientsDetails, us.client)
if err != nil {
return false, err
}
checksumDeployed = isSuccessfulUploadStatusCode(resp.StatusCode)
}
Expand Down Expand Up @@ -568,9 +570,9 @@ func (us *UploadService) uploadFileFromReader(getReaderFunc func() (io.Reader, e
},
}

e = retryExecutor.Execute()
if e != nil {
return false, e
err = retryExecutor.Execute()
if err != nil {
return false, err
}
}
}
Expand Down Expand Up @@ -622,11 +624,13 @@ func (us *UploadService) doUpload(artifact UploadData, targetUrlWithProps, logMs
return
}
if shouldTryMultipart {
if err = us.MultipartUpload.UploadFileConcurrently(artifact.Artifact.LocalPath, artifact.Artifact.TargetPath,
var checksumToken string
if checksumToken, err = us.MultipartUpload.UploadFileConcurrently(artifact.Artifact.LocalPath, artifact.Artifact.TargetPath,
fileInfo.Size(), details.Checksum.Sha1, us.Progress, uploadParams.SplitCount, uploadParams.ChunkSize); err != nil {
return
}
// Once the file is uploaded to the storage, we finalize the multipart upload by performing a checksum deployment to save the file in Artifactory.
utils.AddChecksumTokenHeader(httpClientsDetails.Headers, checksumToken)
resp, body, err = us.doChecksumDeploy(details, targetUrlWithProps, httpClientsDetails, us.client)
return
}
Expand All @@ -647,7 +651,8 @@ func (us *UploadService) doUploadFromReader(fileReader io.Reader, targetUrlWithP
if us.Progress != nil {
progressReader := us.Progress.NewProgressReader(details.Size, "Uploading", targetUrlWithProps)
reader = progressReader.ActionWithProgress(fileReader)
defer us.Progress.RemoveProgress(progressReader.GetId())
progressId := progressReader.GetId()
defer us.Progress.RemoveProgress(progressId)
} else {
reader = fileReader
}
Expand Down Expand Up @@ -969,10 +974,12 @@ func (us *UploadService) addFileToZip(artifact *clientutils.Artifact, progressPr
err = errors.Join(err, errorutils.CheckError(file.Close()))
}
}()
if us.Progress != nil {
// Show progress bar only for files larger than 250Kb to avoid polluting the terminal with endless progress bars.
if us.Progress != nil && info.Size() > minFileSizeForProgressInKb {
progressReader := us.Progress.NewProgressReader(info.Size(), progressPrefix, localPath)
reader = progressReader.ActionWithProgress(file)
defer us.Progress.RemoveProgress(progressReader.GetId())
progressId := progressReader.GetId()
defer us.Progress.RemoveProgress(progressId)
} else {
reader = file
}
Expand Down
6 changes: 6 additions & 0 deletions artifactory/services/utils/artifactoryutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ func AddChecksumHeaders(headers map[string]string, fileDetails *fileutils.FileDe
}
}

// Add the checksum token header to the headers map.
// This header enables Artifactory to accept the Checksum Deployment of a file uploaded via multipart upload.
func AddChecksumTokenHeader(headers map[string]string, checksumToken string) {
AddHeader("X-Checksum-Deploy-Token", checksumToken, &headers)
}

func AddAuthHeaders(headers map[string]string, artifactoryDetails auth.ServiceDetails) {
if headers == nil {
headers = make(map[string]string)
Expand Down
38 changes: 19 additions & 19 deletions artifactory/services/utils/multipartupload.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ const (
aborted completionStatus = "ABORTED"

// API constants
uploadsApi = "/api/v1/uploads/"
routeToHeader = "X-JFrog-Route-To"
artifactoryNodeId = "X-Artifactory-Node-Id"
uploadsApi = "/api/v1/uploads/"

// Sizes and limits constants
MaxMultipartUploadFileSize = SizeTiB * 5
Expand Down Expand Up @@ -123,7 +121,7 @@ type getConfigResponse struct {
}

func (mu *MultipartUpload) UploadFileConcurrently(localPath, targetPath string, fileSize int64, sha1 string,
progress ioutils.ProgressMgr, splitCount int, chunkSize int64) (err error) {
progress ioutils.ProgressMgr, splitCount int, chunkSize int64) (checksumToken string, err error) {
repoAndPath := strings.SplitN(targetPath, "/", 2)
repoKey := repoAndPath[0]
repoPath := repoAndPath[1]
Expand All @@ -144,7 +142,8 @@ func (mu *MultipartUpload) UploadFileConcurrently(localPath, targetPath string,
var progressReader ioutils.Progress
if progress != nil {
progressReader = progress.NewProgressReader(fileSize, "Multipart upload", targetPath)
defer progress.RemoveProgress(progressReader.GetId())
progressId := progressReader.GetId()
defer progress.RemoveProgress(progressId)
}

defer func() {
Expand Down Expand Up @@ -303,19 +302,18 @@ type urlPartResponse struct {
Url string `json:"url,omitempty"`
}

func (mu *MultipartUpload) completeAndPollForStatus(logMsgPrefix string, completionAttemptsLeft uint, sha1 string, multipartUploadClient *httputils.HttpClientDetails, progressReader ioutils.Progress) (err error) {
nodeId, err := mu.completeMultipartUpload(logMsgPrefix, sha1, multipartUploadClient)
func (mu *MultipartUpload) completeAndPollForStatus(logMsgPrefix string, completionAttemptsLeft uint, sha1 string, multipartUploadClient *httputils.HttpClientDetails, progressReader ioutils.Progress) (checksumToken string, err error) {
err = mu.completeMultipartUpload(logMsgPrefix, sha1, multipartUploadClient)
if err != nil {
return
}

err = mu.pollCompletionStatus(logMsgPrefix, completionAttemptsLeft, sha1, nodeId, multipartUploadClient, progressReader)
checksumToken, err = mu.pollCompletionStatus(logMsgPrefix, completionAttemptsLeft, sha1, multipartUploadClient, progressReader)
return
}

func (mu *MultipartUpload) pollCompletionStatus(logMsgPrefix string, completionAttemptsLeft uint, sha1, nodeId string, multipartUploadClient *httputils.HttpClientDetails, progressReader ioutils.Progress) error {
func (mu *MultipartUpload) pollCompletionStatus(logMsgPrefix string, completionAttemptsLeft uint, sha1 string, multipartUploadClient *httputils.HttpClientDetails, progressReader ioutils.Progress) (checksumToken string, err error) {
multipartUploadClientWithNodeId := multipartUploadClient.Clone()
multipartUploadClientWithNodeId.Headers = map[string]string{routeToHeader: nodeId}

lastMergeLog := time.Now()
pollingExecutor := &utils.RetryExecutor{
Expand All @@ -340,7 +338,7 @@ func (mu *MultipartUpload) pollCompletionStatus(logMsgPrefix string, completionA
if completionAttemptsLeft == 0 {
return false, errorutils.CheckErrorf("multipart upload failed after %d attempts", mu.client.GetHttpClient().GetRetries())
}
err = mu.completeAndPollForStatus(logMsgPrefix, completionAttemptsLeft-1, sha1, multipartUploadClient, progressReader)
checksumToken, err = mu.completeAndPollForStatus(logMsgPrefix, completionAttemptsLeft-1, sha1, multipartUploadClient, progressReader)
}

// Log status
Expand All @@ -353,28 +351,29 @@ func (mu *MultipartUpload) pollCompletionStatus(logMsgPrefix string, completionA
lastMergeLog = time.Now()
}
}
checksumToken = status.ChecksumToken
return
},
}
return pollingExecutor.Execute()
return checksumToken, pollingExecutor.Execute()
}

func (mu *MultipartUpload) completeMultipartUpload(logMsgPrefix, sha1 string, multipartUploadClient *httputils.HttpClientDetails) (string, error) {
func (mu *MultipartUpload) completeMultipartUpload(logMsgPrefix, sha1 string, multipartUploadClient *httputils.HttpClientDetails) error {
url := fmt.Sprintf("%s%scomplete?sha1=%s", mu.artifactoryUrl, uploadsApi, sha1)
resp, body, err := mu.client.GetHttpClient().SendPost(url, []byte{}, *multipartUploadClient, logMsgPrefix)
if err != nil {
return "", err
return err
}
log.Debug("Artifactory response:", string(body), resp.Status)
return resp.Header.Get(artifactoryNodeId), errorutils.CheckResponseStatusWithBody(resp, body, http.StatusAccepted)
return errorutils.CheckResponseStatusWithBody(resp, body, http.StatusAccepted)
}

func (mu *MultipartUpload) status(logMsgPrefix string, multipartUploadClientWithNodeId *httputils.HttpClientDetails) (status statusResponse, err error) {
url := fmt.Sprintf("%s%sstatus", mu.artifactoryUrl, uploadsApi)
resp, body, err := mu.client.GetHttpClient().SendPost(url, []byte{}, *multipartUploadClientWithNodeId, logMsgPrefix)
// If the Artifactory node returns a "Service unavailable" error (status 503), attempt to retry the upload completion process on a different node.
if resp != nil && resp.StatusCode == http.StatusServiceUnavailable {
unavailableNodeErr := fmt.Sprintf(logMsgPrefix + fmt.Sprintf("The Artifactory node ID '%s' is unavailable.", multipartUploadClientWithNodeId.Headers[routeToHeader]))
unavailableNodeErr := fmt.Sprintf(logMsgPrefix + "Artifactory is unavailable.")
return statusResponse{Status: retryableError, Error: unavailableNodeErr}, nil
}
if err != nil {
Expand All @@ -389,9 +388,10 @@ func (mu *MultipartUpload) status(logMsgPrefix string, multipartUploadClientWith
}

type statusResponse struct {
Status completionStatus `json:"status,omitempty"`
Error string `json:"error,omitempty"`
Progress *int `json:"progress,omitempty"`
Status completionStatus `json:"status,omitempty"`
Error string `json:"error,omitempty"`
Progress *int `json:"progress,omitempty"`
ChecksumToken string `json:"checksumToken,omitempty"`
}

func (mu *MultipartUpload) abort(logMsgPrefix string, multipartUploadClient *httputils.HttpClientDetails) (err error) {
Expand Down
Loading

0 comments on commit 11e9718

Please sign in to comment.