diff --git a/ChangeLog.md b/ChangeLog.md index e55a12a40..5aba4f82e 100644 --- a/ChangeLog.md +++ b/ChangeLog.md @@ -1,7 +1,19 @@ # Change Log -## Version 10.5 +## Version 10.5.1 + +### New features + +- Allow more accurate values for job status in `jobs` commands, e.g. completed with failed or skipped transfers. + +### Bug fixes + +- Fixed issue with removing blobs with hdi_isfolder=true metadata when the list-of-files flag is used. +- Manually unfurl symbolic links to fix long file path issue on UNC locations. + + +## Version 10.5.0 ### New features diff --git a/cmd/jobsClean.go b/cmd/jobsClean.go index 9af7a1016..2ddcbf4d6 100644 --- a/cmd/jobsClean.go +++ b/cmd/jobsClean.go @@ -78,7 +78,8 @@ func init() { // NOTE: we have way more job status than we normally need, only show the most common ones jobsCleanCmd.PersistentFlags().StringVar(&commandLineInput.withStatus, "with-status", "All", - "only remove the jobs with this status, available values: Cancelled, Completed, Failed, InProgress, All") + "only remove the jobs with this status, available values: All, Cancelled, Failed, Completed"+ + " CompletedWithErrors, CompletedWithSkipped, CompletedWithErrorsAndSkipped") } func handleCleanJobsCommand(givenStatus common.JobStatus) error { @@ -90,7 +91,7 @@ func handleCleanJobsCommand(givenStatus common.JobStatus) error { // we must query the jobs and find out which one to remove resp := common.ListJobsResponse{} - Rpc(common.ERpcCmd.ListJobs(), nil, &resp) + Rpc(common.ERpcCmd.ListJobs(), common.EJobStatus.All(), &resp) if resp.ErrorMessage != "" { return errors.New("failed to query the list of jobs") diff --git a/cmd/jobsList.go b/cmd/jobsList.go index 8d8515d08..c734ae4c7 100644 --- a/cmd/jobsList.go +++ b/cmd/jobsList.go @@ -36,6 +36,12 @@ type ListResponse struct { } func init() { + type JobsListReq struct { + withStatus string + } + + commandLineInput := JobsListReq{} + // lsCmd represents the listJob command lsCmd := &cobra.Command{ Use: "list", @@ -52,7 +58,13 @@ func init() { return nil }, Run: func(cmd *cobra.Command, args []string) { - err := HandleListJobsCommand() + withStatus := common.EJobStatus + err := withStatus.Parse(commandLineInput.withStatus) + if err != nil { + glcm.Error(fmt.Sprintf("Failed to parse --with-status due to error: %s.", err)) + } + + err = HandleListJobsCommand(withStatus) if err == nil { glcm.Exit(nil, common.EExitCode.Success()) } else { @@ -62,13 +74,17 @@ func init() { } jobsCmd.AddCommand(lsCmd) + + jobsCmd.PersistentFlags().StringVar(&commandLineInput.withStatus, "with-status", "All", + "List the jobs with given status, available values: All, Cancelled, Failed, InProgress, Completed,"+ + " CompletedWithErrors, CompletedWithFailures, CompletedWithErrorsAndSkipped") } // HandleListJobsCommand sends the ListJobs request to transfer engine // Print the Jobs in the history of Azcopy -func HandleListJobsCommand() error { +func HandleListJobsCommand(jobStatus common.JobStatus) error { resp := common.ListJobsResponse{} - Rpc(common.ERpcCmd.ListJobs(), nil, &resp) + Rpc(common.ERpcCmd.ListJobs(), jobStatus, &resp) return PrintExistingJobIds(resp) } diff --git a/cmd/rpc.go b/cmd/rpc.go index ad8007bbf..e0895d34c 100644 --- a/cmd/rpc.go +++ b/cmd/rpc.go @@ -47,7 +47,7 @@ func inprocSend(rpcCmd common.RpcCmd, requestData interface{}, responseData inte *(responseData.(*common.LifecycleMgr)) = ste.GetJobLCMWrapper(*requestData.(*common.JobID)) case common.ERpcCmd.ListJobs(): - *(responseData.(*common.ListJobsResponse)) = ste.ListJobs() + *(responseData.(*common.ListJobsResponse)) = ste.ListJobs(requestData.(common.JobStatus)) case common.ERpcCmd.ListJobSummary(): *(responseData.(*common.ListJobSummaryResponse)) = ste.GetJobSummary(*requestData.(*common.JobID)) diff --git a/cmd/syncIndexer.go b/cmd/syncIndexer.go index d948b6468..f9863eb02 100644 --- a/cmd/syncIndexer.go +++ b/cmd/syncIndexer.go @@ -51,6 +51,7 @@ func (i *objectIndexer) store(storedObject storedObject) (err error) { func (i *objectIndexer) traverse(processor objectProcessor, filters []objectFilter) (err error) { for _, value := range i.indexMap { err = processIfPassedFilters(filters, value, processor) + _, err = getProcessingError(err) if err != nil { return } diff --git a/cmd/zc_enumerator.go b/cmd/zc_enumerator.go index 6ea01d1e3..2aaf24a61 100644 --- a/cmd/zc_enumerator.go +++ b/cmd/zc_enumerator.go @@ -329,7 +329,7 @@ func initResourceTraverser(resource common.ResourceString, location common.Locat } } - output = newListTraverser(resource, location, credential, ctx, recursive, toFollow, getProperties, listOfFilesChannel, incrementEnumerationCounter) + output = newListTraverser(resource, location, credential, ctx, recursive, toFollow, getProperties, listOfFilesChannel, includeDirectoryStubs, incrementEnumerationCounter) return output, nil } @@ -356,7 +356,7 @@ func initResourceTraverser(resource common.ResourceString, location common.Locat }() baseResource := resource.CloneWithValue(cleanLocalPath(basePath)) - output = newListTraverser(baseResource, location, nil, nil, recursive, toFollow, getProperties, globChan, incrementEnumerationCounter) + output = newListTraverser(baseResource, location, nil, nil, recursive, toFollow, getProperties, globChan, includeDirectoryStubs, incrementEnumerationCounter) } else { output = newLocalTraverser(resource.ValueLocal(), recursive, toFollow, incrementEnumerationCounter) } @@ -659,9 +659,24 @@ func passedFilters(filters []objectFilter, storedObject storedObject) bool { return true } +// This error should be treated as a flag, that we didn't fail processing, but instead, we just didn't process it. +// Currently, this is only really used for symlink processing, but it _is_ an error, so it must be handled in all new traversers. +// Basically, anywhere processIfPassedFilters is called, additionally call getProcessingError. +var ignoredError = errors.New("FileIgnored") + +func getProcessingError(errin error) (ignored bool, err error) { + if errin == ignoredError { + return true, nil + } + + return false, err +} + func processIfPassedFilters(filters []objectFilter, storedObject storedObject, processor objectProcessor) (err error) { if passedFilters(filters, storedObject) { err = processor(storedObject) + } else { + err = ignoredError } return diff --git a/cmd/zc_traverser_benchmark.go b/cmd/zc_traverser_benchmark.go index 43b0418fc..54fe680db 100644 --- a/cmd/zc_traverser_benchmark.go +++ b/cmd/zc_traverser_benchmark.go @@ -91,6 +91,7 @@ func (t *benchmarkTraverser) traverse(preprocessor objectMorpher, processor obje noBlobProps, noMetdata, ""), processor) + _, err = getProcessingError(err) if err != nil { return err } diff --git a/cmd/zc_traverser_blob.go b/cmd/zc_traverser_blob.go index cb3fce5e7..64ae5f4c6 100644 --- a/cmd/zc_traverser_blob.go +++ b/cmd/zc_traverser_blob.go @@ -136,6 +136,7 @@ func (t *blobTraverser) traverse(preprocessor objectMorpher, processor objectPro } err := processIfPassedFilters(filters, storedObject, processor) + _, err = getProcessingError(err) // short-circuit if we don't have anything else to scan if isBlob || err != nil { @@ -219,12 +220,15 @@ func (t *blobTraverser) traverse(preprocessor objectMorpher, processor objectPro return workerError } + + if t.incrementEnumerationCounter != nil { t.incrementEnumerationCounter(common.EEntityType.File()) } object := item.(storedObject) processErr := processIfPassedFilters(filters, object, processor) + _, processErr = getProcessingError(processErr) if processErr != nil { cancelWorkers() return processErr diff --git a/cmd/zc_traverser_blobfs.go b/cmd/zc_traverser_blobfs.go index 163adf267..16019b835 100644 --- a/cmd/zc_traverser_blobfs.go +++ b/cmd/zc_traverser_blobfs.go @@ -109,7 +109,9 @@ func (t *blobFSTraverser) traverse(preprocessor objectMorpher, processor objectP t.incrementEnumerationCounter(common.EEntityType.File()) } - return processIfPassedFilters(filters, storedObject, processor) + err := processIfPassedFilters(filters, storedObject, processor) + _, err = getProcessingError(err) + return err } // else, its not just one file @@ -139,6 +141,7 @@ func (t *blobFSTraverser) traverse(preprocessor objectMorpher, processor objectP t.incrementEnumerationCounter(common.EEntityType.Folder()) } err = processIfPassedFilters(filters, storedObject, processor) + _, err = getProcessingError(err) if err != nil { return err } @@ -194,6 +197,7 @@ func (t *blobFSTraverser) traverse(preprocessor objectMorpher, processor objectP } err := processIfPassedFilters(filters, storedObject, processor) + _, err = getProcessingError(err) if err != nil { return err } diff --git a/cmd/zc_traverser_file.go b/cmd/zc_traverser_file.go index c09701a06..3eebe7f5f 100644 --- a/cmd/zc_traverser_file.go +++ b/cmd/zc_traverser_file.go @@ -87,7 +87,9 @@ func (t *fileTraverser) traverse(preprocessor objectMorpher, processor objectPro t.incrementEnumerationCounter(common.EEntityType.File()) } - return processIfPassedFilters(filters, storedObject, processor) + err := processIfPassedFilters(filters, storedObject, processor) + _, err = getProcessingError(err) + return err } } @@ -145,7 +147,9 @@ func (t *fileTraverser) traverse(preprocessor objectMorpher, processor objectPro if t.incrementEnumerationCounter != nil { t.incrementEnumerationCounter(s.entityType) } - return processIfPassedFilters(filters, s, processor) + err := processIfPassedFilters(filters, s, processor) + _, err = getProcessingError(err) + return err } // get the directory URL so that we can list the files diff --git a/cmd/zc_traverser_list.go b/cmd/zc_traverser_list.go index 5965745e0..d3db56c3a 100644 --- a/cmd/zc_traverser_list.go +++ b/cmd/zc_traverser_list.go @@ -88,7 +88,7 @@ func (l *listTraverser) traverse(preprocessor objectMorpher, processor objectPro } func newListTraverser(parent common.ResourceString, parentType common.Location, credential *common.CredentialInfo, ctx *context.Context, - recursive, followSymlinks, getProperties bool, listChan chan string, incrementEnumerationCounter enumerationCounterFunc) resourceTraverser { + recursive, followSymlinks, getProperties bool, listChan chan string, includeDirectoryStubs bool, incrementEnumerationCounter enumerationCounterFunc) resourceTraverser { var traverserGenerator childTraverserGenerator traverserGenerator = func(relativeChildPath string) (resourceTraverser, error) { @@ -104,7 +104,7 @@ func newListTraverser(parent common.ResourceString, parentType common.Location, } // Construct a traverser that goes through the child - traverser, err := initResourceTraverser(source, parentType, ctx, credential, &followSymlinks, nil, recursive, getProperties, false, incrementEnumerationCounter) + traverser, err := initResourceTraverser(source, parentType, ctx, credential, &followSymlinks, nil, recursive, getProperties, includeDirectoryStubs, incrementEnumerationCounter) if err != nil { return nil, err } diff --git a/cmd/zc_traverser_local.go b/cmd/zc_traverser_local.go index c3fa35cb9..417728749 100644 --- a/cmd/zc_traverser_local.go +++ b/cmd/zc_traverser_local.go @@ -21,6 +21,7 @@ package cmd import ( + "errors" "fmt" "github.com/Azure/azure-storage-azcopy/common" "github.com/Azure/azure-storage-azcopy/common/parallel" @@ -68,6 +69,50 @@ func (t *localTraverser) getInfoIfSingleFile() (os.FileInfo, bool, error) { return fileInfo, true, nil } +func UnfurlSymlinks(symlinkPath string) (result string, err error) { + unfurlingPlan := []string{symlinkPath} + + for len(unfurlingPlan) > 0 { + item := unfurlingPlan[0] + + fi, err := os.Lstat(item) + + if err != nil { + return item, err + } + + if fi.Mode()&os.ModeSymlink != 0 { + result, err := os.Readlink(item) + + if err != nil { + return result, err + } + + // Previously, we'd try to detect if the read link was a relative path by appending and stat'ing the item + // However, it seems to be a fairly unlikely and hard to reproduce scenario upon investigation (Couldn't manage to reproduce the scenario) + // So it was dropped. However, on the off chance, we'll still do it if syntactically it makes sense. + if len(result) == 0 || result[0] == '.' { // A relative path being "" or "." likely (and in the latter case, on our officially supported OSes, always) means that it's just the same folder. + result = filepath.Dir(item) + } else if !os.IsPathSeparator(result[0]) { // We can assume that a relative path won't start with a separator + possiblyResult := filepath.Join(filepath.Dir(item), result) + if _, err := os.Lstat(possiblyResult); err == nil { + result = possiblyResult + } + } + + result = common.ToExtendedPath(result) + + unfurlingPlan = append(unfurlingPlan, result) + } else { + return item, nil + } + + unfurlingPlan = unfurlingPlan[1:] + } + + return "", errors.New("failed to unfurl symlink: exited loop early") +} + type seenPathsRecorder interface { Record(path string) HasSeen(path string) bool @@ -153,7 +198,7 @@ func WalkWithSymlinks(fullPath string, walkFunc filepath.WalkFunc, followSymlink if !followSymlinks { return nil // skip it } - result, err := filepath.EvalSymlinks(filePath) + result, err := UnfurlSymlinks(filePath) if err != nil { WarnStdoutAndJobLog(fmt.Sprintf("Failed to resolve symlink %s: %s", filePath, err)) @@ -180,14 +225,20 @@ func WalkWithSymlinks(fullPath string, walkFunc filepath.WalkFunc, followSymlink if rStat.IsDir() { if !seenPaths.HasSeen(result) { - seenPaths.Record(result) - seenPaths.Record(slPath) // Note we've seen the symlink as well. We shouldn't ever have issues if we _don't_ do this because we'll just catch it by symlink result - walkQueue = append(walkQueue, walkItem{ - fullPath: result, - relativeBase: computedRelativePath, - }) + err := walkFunc(common.GenerateFullPath(fullPath, computedRelativePath), symlinkTargetFileInfo{rStat, fileInfo.Name()}, fileError) + // Since this doesn't directly manipulate the error, and only checks for a specific error, it's OK to use in a generic function. + skipped, err := getProcessingError(err) + + if !skipped { // Don't go any deeper (or record it) if we skipped it. + seenPaths.Record(result) + seenPaths.Record(slPath) // Note we've seen the symlink as well. We shouldn't ever have issues if we _don't_ do this because we'll just catch it by symlink result + walkQueue = append(walkQueue, walkItem{ + fullPath: result, + relativeBase: computedRelativePath, + }) + } // enumerate the FOLDER now (since its presence in seenDirs will prevent its properties getting enumerated later) - return walkFunc(common.GenerateFullPath(fullPath, computedRelativePath), symlinkTargetFileInfo{rStat, fileInfo.Name()}, fileError) + return err } else { WarnStdoutAndJobLog(fmt.Sprintf("Ignored already linked directory pointed at %s (link at %s)", result, common.GenerateFullPath(fullPath, computedRelativePath))) } @@ -222,8 +273,16 @@ func WalkWithSymlinks(fullPath string, walkFunc filepath.WalkFunc, followSymlink } if !seenPaths.HasSeen(result) { - seenPaths.Record(result) - return walkFunc(common.GenerateFullPath(fullPath, computedRelativePath), fileInfo, fileError) + err := walkFunc(common.GenerateFullPath(fullPath, computedRelativePath), fileInfo, fileError) + // Since this doesn't directly manipulate the error, and only checks for a specific error, it's OK to use in a generic function. + skipped, err := getProcessingError(err) + + // If the file was skipped, don't record it. + if !skipped { + seenPaths.Record(result) + } + + return err } else { if fileInfo.IsDir() { // We can't output a warning here (and versions 10.3.x never did) @@ -253,7 +312,7 @@ func (t *localTraverser) traverse(preprocessor objectMorpher, processor objectPr t.incrementEnumerationCounter(common.EEntityType.File()) } - return processIfPassedFilters(filters, + err := processIfPassedFilters(filters, newStoredObject( preprocessor, singleFileInfo.Name(), @@ -268,6 +327,8 @@ func (t *localTraverser) traverse(preprocessor objectMorpher, processor objectPr ), processor, ) + _, err = getProcessingError(err) + return err } else { if t.recursive { processFile := func(filePath string, fileInfo os.FileInfo, fileError error) error { @@ -293,6 +354,7 @@ func (t *localTraverser) traverse(preprocessor objectMorpher, processor objectPr t.incrementEnumerationCounter(entityType) } + // This is an exception to the rule. We don't strip the error here, because WalkWithSymlinks catches it. return processIfPassedFilters(filters, newStoredObject( preprocessor, @@ -332,7 +394,7 @@ func (t *localTraverser) traverse(preprocessor objectMorpher, processor objectPr // Because this only goes one layer deep, we can just append the filename to fullPath and resolve with it. symlinkPath := common.GenerateFullPath(t.fullPath, singleFile.Name()) // Evaluate the symlink - result, err := filepath.EvalSymlinks(symlinkPath) + result, err := UnfurlSymlinks(symlinkPath) if err != nil { return err @@ -377,7 +439,7 @@ func (t *localTraverser) traverse(preprocessor objectMorpher, processor objectPr "", // Local has no such thing as containers ), processor) - + _, err = getProcessingError(err) if err != nil { return err } diff --git a/cmd/zc_traverser_s3.go b/cmd/zc_traverser_s3.go index 47fb68276..d0098222a 100644 --- a/cmd/zc_traverser_s3.go +++ b/cmd/zc_traverser_s3.go @@ -93,7 +93,7 @@ func (t *s3Traverser) traverse(preprocessor objectMorpher, processor objectProce filters, storedObject, processor) - + _, err = getProcessingError(err) if err != nil { return err } @@ -158,7 +158,7 @@ func (t *s3Traverser) traverse(preprocessor objectMorpher, processor objectProce err = processIfPassedFilters(filters, storedObject, processor) - + _, err = getProcessingError(err) if err != nil { return } diff --git a/cmd/zt_generic_filter_test.go b/cmd/zt_generic_filter_test.go index 83617e587..8f5ae6f38 100644 --- a/cmd/zt_generic_filter_test.go +++ b/cmd/zt_generic_filter_test.go @@ -73,7 +73,7 @@ func (s *genericFilterSuite) TestExcludeFilter(c *chk.C) { for _, file := range filesToNotPass { dummyProcessor := &dummyProcessor{} err := processIfPassedFilters(excludeFilterList, storedObject{name: file}, dummyProcessor.process) - c.Assert(err, chk.IsNil) + c.Assert(err, chk.Equals, ignoredError) c.Assert(len(dummyProcessor.record), chk.Equals, 0) } } diff --git a/cmd/zt_remove_blob_test.go b/cmd/zt_remove_blob_test.go index 5e1875729..67c8471f4 100644 --- a/cmd/zt_remove_blob_test.go +++ b/cmd/zt_remove_blob_test.go @@ -401,7 +401,8 @@ func (s *cmdIntegrationSuite) TestRemoveBlobsWithDirectoryStubs(c *chk.C) { c.Assert(len(mockedRPC.transfers), chk.Equals, len(blobAndDirStubsList)) // validate that the right transfers were sent - expectedTransfers := scenarioHelper{}.shaveOffPrefix(blobAndDirStubsList, vdirName) + expectedTransfers := scenarioHelper{}.shaveOffPrefix(blobAndDirStubsList, strings.TrimSuffix(vdirName, "/")) + expectedTransfers = scenarioHelper{}.shaveOffPrefix(expectedTransfers, "/") validateRemoveTransfersAreScheduled(c, true, expectedTransfers, mockedRPC) }) @@ -420,3 +421,53 @@ func (s *cmdIntegrationSuite) TestRemoveBlobsWithDirectoryStubs(c *chk.C) { } }) } + +func (s *cmdIntegrationSuite) TestRemoveBlobsWithDirectoryStubsWithListOfFiles(c *chk.C) { + bsu := getBSU() + vdirName := "vdir1/" + + // set up the container with numerous blobs + containerURL, containerName := createNewContainer(c, bsu) + defer deleteContainer(c, containerURL) + blobAndDirStubsList := scenarioHelper{}.generateCommonRemoteScenarioForWASB(c, containerURL, vdirName) + c.Assert(containerURL, chk.NotNil) + c.Assert(len(blobAndDirStubsList), chk.Not(chk.Equals), 0) + + // set up another empty dir + vdirName2 := "emptydir" + createNewDirectoryStub(c, containerURL, vdirName2) + blobAndDirStubsList = append(blobAndDirStubsList, vdirName2) + + // set up interceptor + mockedRPC := interceptor{} + Rpc = mockedRPC.intercept + mockedRPC.init() + + // construct the raw input to simulate user input + rawContainerURLWithSAS := scenarioHelper{}.getRawContainerURLWithSAS(c, containerName) + raw := getDefaultRemoveRawInput(rawContainerURLWithSAS.String()) + raw.recursive = true + + // make the input for list-of-files + listOfFiles := []string{vdirName, vdirName2} + raw.listOfFilesToCopy = scenarioHelper{}.generateListOfFiles(c, listOfFiles) + + runCopyAndVerify(c, raw, func(err error) { + c.Assert(err, chk.IsNil) + + // validate that the right number of transfers were scheduled + c.Assert(len(mockedRPC.transfers), chk.Equals, len(blobAndDirStubsList)) + + // validate that the right transfers were sent + validateRemoveTransfersAreScheduled(c, true, blobAndDirStubsList, mockedRPC) + }) + + // turn off recursive, this time an error should be thrown + raw.recursive = false + mockedRPC.reset() + + runCopyAndVerify(c, raw, func(err error) { + c.Assert(err, chk.NotNil) + c.Assert(len(mockedRPC.transfers), chk.Equals, 0) + }) +} diff --git a/cmd/zt_scenario_helpers_for_test.go b/cmd/zt_scenario_helpers_for_test.go index 9fd2dd4c0..5144e9840 100644 --- a/cmd/zt_scenario_helpers_for_test.go +++ b/cmd/zt_scenario_helpers_for_test.go @@ -178,8 +178,9 @@ func (scenarioHelper) generateCommonRemoteScenarioForWASB(c *chk.C, containerURL } if prefix != "" { - createNewDirectoryStub(c, containerURL, strings.TrimSuffix(prefix, "/")) - blobList = append(blobList, "") + rootDir := strings.TrimSuffix(prefix, "/") + createNewDirectoryStub(c, containerURL, rootDir) + blobList = append(blobList, rootDir) } createNewDirectoryStub(c, containerURL, prefix+"sub1") diff --git a/common/fe-ste-models.go b/common/fe-ste-models.go index 630a4edd3..b0e363d4a 100644 --- a/common/fe-ste-models.go +++ b/common/fe-ste-models.go @@ -619,12 +619,13 @@ func (TransferStatus) NotStarted() TransferStatus { return TransferStatus(0) } // Outdated: // Transfer started & at least 1 chunk has successfully been transfered. // Used to resume a transfer that started to avoid transferring all chunks thereby improving performance +// Update(Jul 2020): This represents the state of transfer as soon as the file is scheduled. func (TransferStatus) Started() TransferStatus { return TransferStatus(1) } // Transfer successfully completed func (TransferStatus) Success() TransferStatus { return TransferStatus(2) } -// Transfer failed due to some error. This status does represent the state when transfer is cancelled +// Transfer failed due to some error. func (TransferStatus) Failed() TransferStatus { return TransferStatus(-1) } // Transfer failed due to failure while Setting blob tier. @@ -636,6 +637,8 @@ func (TransferStatus) SkippedBlobHasSnapshots() TransferStatus { return Transfer func (TransferStatus) TierAvailabilityCheckFailure() TransferStatus { return TransferStatus(-5) } +func (TransferStatus) Cancelled() TransferStatus { return TransferStatus(-6) } + func (ts TransferStatus) ShouldTransfer() bool { return ts == ETransferStatus.NotStarted() || ts == ETransferStatus.Started() } diff --git a/common/version.go b/common/version.go index 0c7c46fc7..0930e0dc8 100644 --- a/common/version.go +++ b/common/version.go @@ -1,6 +1,6 @@ package common -const AzcopyVersion = "10.5.0" +const AzcopyVersion = "10.5.1" const UserAgent = "AzCopy/" + AzcopyVersion const S3ImportUserAgent = "S3Import " + UserAgent const BenchmarkUserAgent = "Benchmark " + UserAgent diff --git a/ste/JobsAdmin.go b/ste/JobsAdmin.go index 7753fb598..f4a9dd5d4 100644 --- a/ste/JobsAdmin.go +++ b/ste/JobsAdmin.go @@ -455,6 +455,7 @@ func (ja *jobsAdmin) transferProcessor(workerID int) { if jptm.ShouldLog(pipeline.LogInfo) { jptm.Log(pipeline.LogInfo, fmt.Sprintf(" is not picked up worked %d because transfer was cancelled", workerID)) } + jptm.SetStatus(common.ETransferStatus.Cancelled()) jptm.ReportTransferDone() } else { // TODO fix preceding space diff --git a/ste/init.go b/ste/init.go index 8db4c1ddd..5a6ac1715 100644 --- a/ste/init.go +++ b/ste/init.go @@ -86,7 +86,7 @@ func MainSTE(concurrency ConcurrencySettings, targetRateInMegaBitsPerSec float64 func(writer http.ResponseWriter, request *http.Request) { //var payload common.ListRequest //deserialize(request, &payload) - serialize(ListJobs( /*payload*/ ), writer) + serialize(ListJobs(common.EJobStatus.All()), writer) }) http.HandleFunc(common.ERpcCmd.ListJobSummary().Pattern(), func(writer http.ResponseWriter, request *http.Request) { @@ -333,6 +333,9 @@ func ResumeJobOrder(req common.ResumeJobRequest) common.CancelPauseResumeRespons // Resume all the failed / In Progress Transfers. case common.EJobStatus.InProgress(), common.EJobStatus.Completed(), + common.EJobStatus.CompletedWithErrors(), + common.EJobStatus.CompletedWithSkipped(), + common.EJobStatus.CompletedWithErrorsAndSkipped(), common.EJobStatus.Cancelled(), common.EJobStatus.Paused(): //go func() { @@ -521,16 +524,12 @@ func GetJobSummary(jobID common.JobID) common.ListJobSummaryResponse { } // Job is completed if Job order is complete AND ALL transfers are completed/failed // FIX: active or inactive state, then job order is said to be completed if final part of job has been ordered. - if (js.CompleteJobOrdered) && (part0PlanStatus == common.EJobStatus.Completed()) { + if (js.CompleteJobOrdered) && (part0PlanStatus.IsJobDone()) { js.JobStatus = part0PlanStatus } - if js.JobStatus == common.EJobStatus.Completed() { - js.JobStatus = js.JobStatus.EnhanceJobStatusInfo(js.TransfersSkipped > 0, js.TransfersFailed > 0, - js.TransfersCompleted > 0) - + if js.JobStatus.IsJobDone() { js.PerformanceAdvice = jm.TryGetPerformanceAdvice(js.TotalBytesExpected, js.TotalTransfers-js.TransfersSkipped, part0.Plan().FromTo) - } return js @@ -607,7 +606,7 @@ func GetJobLCMWrapper(jobID common.JobID) common.LifecycleMgr { } // ListJobs returns the jobId of all the jobs existing in the current instance of azcopy -func ListJobs() common.ListJobsResponse { +func ListJobs(givenStatus common.JobStatus) common.ListJobsResponse { // Resurrect all the Jobs from the existing JobPart Plan files JobsAdmin.ResurrectJobParts() // building the ListJobsResponse for sending response back to front-end @@ -625,9 +624,11 @@ func ListJobs() common.ListJobsResponse { if !found { continue } - listJobResponse.JobIDDetails = append(listJobResponse.JobIDDetails, - common.JobIDDetails{JobId: jobId, CommandString: jpm.Plan().CommandString(), - StartTime: jpm.Plan().StartTime, JobStatus: jpm.Plan().JobStatus()}) + if givenStatus == common.EJobStatus.All() || givenStatus == jpm.Plan().JobStatus() { + listJobResponse.JobIDDetails = append(listJobResponse.JobIDDetails, + common.JobIDDetails{JobId: jobId, CommandString: jpm.Plan().CommandString(), + StartTime: jpm.Plan().StartTime, JobStatus: jpm.Plan().JobStatus()}) + } // Close the job part managers and the log. jm.(*jobMgr).jobPartMgrs.Iterate(false, func(k common.PartNumber, v IJobPartMgr) { diff --git a/ste/mgr-JobMgr.go b/ste/mgr-JobMgr.go index 1b1e0429c..45ff9df23 100644 --- a/ste/mgr-JobMgr.go +++ b/ste/mgr-JobMgr.go @@ -62,7 +62,7 @@ type IJobMgr interface { ConfirmAllTransfersScheduled() ResetAllTransfersScheduled() PipelineLogInfo() pipeline.LogOptions - ReportJobPartDone() uint32 + ReportJobPartDone(jobPartProgressInfo) Context() context.Context Cancel() // TODO: added for debugging purpose. remove later @@ -88,6 +88,7 @@ type IJobMgr interface { func newJobMgr(concurrency ConcurrencySettings, appLogger common.ILogger, jobID common.JobID, appCtx context.Context, cpuMon common.CPUMonitor, level common.LogLevel, commandString string, logFileFolder string) IJobMgr { // atomicAllTransfersScheduled is set to 1 since this api is also called when new job part is ordered. enableChunkLogOutput := level.ToPipelineLogLevel() == pipeline.LogDebug + jobPartProgressCh := make(chan jobPartProgressInfo) jm := jobMgr{jobID: jobID, jobPartMgrs: newJobPartToJobPartMgr(), include: map[string]int{}, exclude: map[string]int{}, httpClient: NewAzcopyHTTPClient(concurrency.MaxIdleConnections), logger: common.NewJobLogger(jobID, level, appLogger, logFileFolder), @@ -97,9 +98,11 @@ func newJobMgr(concurrency ConcurrencySettings, appLogger common.ILogger, jobID pipelineNetworkStats: newPipelineNetworkStats(JobsAdmin.(*jobsAdmin).concurrencyTuner), // let the stats coordinate with the concurrency tuner exclusiveDestinationMapHolder: &atomic.Value{}, initMu: &sync.Mutex{}, + jobPartProgress: jobPartProgressCh, /*Other fields remain zero-value until this job is scheduled */} jm.reset(appCtx, commandString) jm.logJobsAdminMessages() + go jm.reportJobPartDoneHandler() return &jm } @@ -215,6 +218,8 @@ type jobMgr struct { initMu *sync.Mutex initState *jobMgrInitState + + jobPartProgress chan jobPartProgressInfo } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -467,30 +472,46 @@ func (jm *jobMgr) ResetAllTransfersScheduled() { } // ReportJobPartDone is called to report that a job part completed or failed -func (jm *jobMgr) ReportJobPartDone() uint32 { +func (jm *jobMgr) ReportJobPartDone(progressInfo jobPartProgressInfo) { + jm.jobPartProgress <- progressInfo +} + +func (jm *jobMgr) reportJobPartDoneHandler() { + var haveFinalPart bool + var jobProgressInfo jobPartProgressInfo shouldLog := jm.ShouldLog(pipeline.LogInfo) - jobPart0Mgr, ok := jm.jobPartMgrs.Get(0) - if !ok { - jm.Panic(fmt.Errorf("Failed to find Job %v, Part #0", jm.jobID)) - } - part0Plan := jobPart0Mgr.Plan() - jobStatus := part0Plan.JobStatus() // status of part 0 is status of job as a whole - - partsDone := atomic.AddUint32(&jm.partsDone, 1) - // If the last part is still awaited or other parts all still not complete, - // JobPart 0 status is not changed (unless we are cancelling) - allKnownPartsDone := partsDone == jm.jobPartMgrs.Count() - haveFinalPart := atomic.LoadInt32(&jm.atomicFinalPartOrderedIndicator) == 1 - isCancelling := jobStatus == common.EJobStatus.Cancelling() - shouldComplete := allKnownPartsDone && (haveFinalPart || isCancelling) - if !shouldComplete { + for { + partProgressInfo := <-jm.jobPartProgress + jobPart0Mgr, ok := jm.jobPartMgrs.Get(0) + if !ok { + jm.Panic(fmt.Errorf("Failed to find Job %v, Part #0", jm.jobID)) + } + part0Plan := jobPart0Mgr.Plan() + jobStatus := part0Plan.JobStatus() // status of part 0 is status of job as a whole + partsDone := atomic.AddUint32(&jm.partsDone, 1) + jobProgressInfo.transfersCompleted += partProgressInfo.transfersCompleted + jobProgressInfo.transfersSkipped += partProgressInfo.transfersSkipped + jobProgressInfo.transfersFailed += partProgressInfo.transfersFailed + + // If the last part is still awaited or other parts all still not complete, + // JobPart 0 status is not changed (unless we are cancelling) + haveFinalPart = atomic.LoadInt32(&jm.atomicFinalPartOrderedIndicator) == 1 + allKnownPartsDone := partsDone == jm.jobPartMgrs.Count() + isCancelling := jobStatus == common.EJobStatus.Cancelling() + shouldComplete := allKnownPartsDone && (haveFinalPart || isCancelling) + if shouldComplete { + break + } //Else log and wait for next part to complete + if shouldLog { jm.Log(pipeline.LogInfo, fmt.Sprintf("is part of Job which %d total number of parts done ", partsDone)) } - return partsDone } + jobPart0Mgr, _ := jm.jobPartMgrs.Get(0) + part0Plan := jobPart0Mgr.Plan() // status of part 0 is status of job as whole. + partDescription := "all parts of entire Job" if !haveFinalPart { partDescription = "known parts of incomplete Job" @@ -499,19 +520,19 @@ func (jm *jobMgr) ReportJobPartDone() uint32 { jm.Log(pipeline.LogInfo, fmt.Sprintf("%s %s successfully completed, cancelled or paused", partDescription, jm.jobID.String())) } - switch jobStatus { + switch part0Plan.JobStatus() { case common.EJobStatus.Cancelling(): part0Plan.SetJobStatus(common.EJobStatus.Cancelled()) if shouldLog { jm.Log(pipeline.LogInfo, fmt.Sprintf("%s %v successfully cancelled", partDescription, jm.jobID)) } case common.EJobStatus.InProgress(): - part0Plan.SetJobStatus((common.EJobStatus).Completed()) + part0Plan.SetJobStatus((common.EJobStatus).EnhanceJobStatusInfo(jobProgressInfo.transfersSkipped > 0, + jobProgressInfo.transfersFailed > 0, + jobProgressInfo.transfersCompleted > 0)) } jm.chunkStatusLogger.FlushLog() // TODO: remove once we sort out what will be calling CloseLog (currently nothing) - - return partsDone } func (jm *jobMgr) getInMemoryTransitJobState() InMemoryTransitJobState { diff --git a/ste/mgr-JobPartMgr.go b/ste/mgr-JobPartMgr.go index bae4dd2b5..d6fd61814 100644 --- a/ste/mgr-JobPartMgr.go +++ b/ste/mgr-JobPartMgr.go @@ -26,7 +26,7 @@ type IJobPartMgr interface { Plan() *JobPartPlanHeader ScheduleTransfers(jobCtx context.Context) StartJobXfer(jptm IJobPartTransferMgr) - ReportTransferDone() uint32 + ReportTransferDone(status common.TransferStatus) uint32 GetOverwriteOption() common.OverwriteOption GetForceIfReadOnly() bool AutoDecompress() bool @@ -215,6 +215,13 @@ func NewFilePipeline(c azfile.Credential, o azfile.PipelineOptions, r azfile.Ret //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Holds the status of tranfers in this jptm +type jobPartProgressInfo struct { + transfersCompleted int + transfersSkipped int + transfersFailed int +} + // jobPartMgr represents the runtime information for a Job's Part type jobPartMgr struct { // These fields represent the part's existence @@ -272,7 +279,10 @@ type jobPartMgr struct { // numberOfTransfersDone_doNotUse represents the number of transfer of JobPartOrder // which are either completed or failed // numberOfTransfersDone_doNotUse determines the final cancellation of JobPartOrder - atomicTransfersDone uint32 + atomicTransfersDone uint32 + atomicTransfersCompleted uint32 + atomicTransfersFailed uint32 + atomicTransfersSkipped uint32 } func (jpm *jobPartMgr) getOverwritePrompter() *overwritePrompter { @@ -303,6 +313,10 @@ func (jpm *jobPartMgr) ScheduleTransfers(jobCtx context.Context) { // get the list of include / exclude transfers includeTransfer, excludeTransfer := jpm.jobMgr.IncludeExclude() + if len(includeTransfer) > 0 || len(excludeTransfer) > 0 { + panic("List of transfers is obsolete.") + } + // *** Open the job part: process any job part plan-setting used by all transfers *** dstData := plan.DstBlobData @@ -342,38 +356,10 @@ func (jpm *jobPartMgr) ScheduleTransfers(jobCtx context.Context) { jppt := plan.Transfer(t) ts := jppt.TransferStatus() if ts == common.ETransferStatus.Success() { - jpm.ReportTransferDone() // Don't schedule an already-completed/failed transfer + jpm.ReportTransferDone(ts) // Don't schedule an already-completed/failed transfer continue } - // If the list of transfer to be included is passed - // then check current transfer exists in the list of included transfer - // If it doesn't exists, skip the transfer - if len(includeTransfer) > 0 { - // Get the source string from the part plan header - src, _, _ := plan.TransferSrcDstStrings(t) - // If source doesn't exists, skip the transfer - _, ok := includeTransfer[src] - if !ok { - jpm.ReportTransferDone() // Don't schedule transfer which is not mentioned to be included - continue - } - } - // If the list of transfer to be excluded is passed - // then check the current transfer in the list of excluded transfer - // If it exists, then skip the transfer - if len(excludeTransfer) > 0 { - // Get the source string from the part plan header - src, _, _ := plan.TransferSrcDstStrings(t) - // If the source exists in the list of excluded transfer - // skip the transfer - _, ok := excludeTransfer[src] - if ok { - jpm.ReportTransferDone() // Don't schedule transfer which is mentioned to be excluded - continue - } - } - // If the transfer was failed, then while rescheduling the transfer marking it Started. if ts == common.ETransferStatus.Failed() { jppt.SetTransferStatus(common.ETransferStatus.Started(), true) @@ -663,15 +649,38 @@ func (jpm *jobPartMgr) deleteSnapshotsOption() common.DeleteSnapshotsOption { return jpm.Plan().DeleteSnapshotsOption } +func (jpm *jobPartMgr) updateJobPartProgress(status common.TransferStatus) { + switch status { + case common.ETransferStatus.Success(): + atomic.AddUint32(&jpm.atomicTransfersCompleted, 1) + case common.ETransferStatus.Failed(), common.ETransferStatus.BlobTierFailure(): + atomic.AddUint32(&jpm.atomicTransfersFailed, 1) + case common.ETransferStatus.SkippedEntityAlreadyExists(), common.ETransferStatus.SkippedBlobHasSnapshots(): + atomic.AddUint32(&jpm.atomicTransfersSkipped, 1) + case common.ETransferStatus.Cancelled(): + default: + panic("Unexpected status") + } +} + // Call Done when a transfer has completed its epilog; this method returns the number of transfers completed so far -func (jpm *jobPartMgr) ReportTransferDone() (transfersDone uint32) { +func (jpm *jobPartMgr) ReportTransferDone(status common.TransferStatus) (transfersDone uint32) { transfersDone = atomic.AddUint32(&jpm.atomicTransfersDone, 1) + jpm.updateJobPartProgress(status) + + //Add a safety count-check + if jpm.ShouldLog(pipeline.LogInfo) { plan := jpm.Plan() jpm.Log(pipeline.LogInfo, fmt.Sprintf("JobID=%v, Part#=%d, TransfersDone=%d of %d", plan.JobID, plan.PartNum, transfersDone, plan.NumTransfers)) } if transfersDone == jpm.planMMF.Plan().NumTransfers { - jpm.jobMgr.ReportJobPartDone() + jppi := jobPartProgressInfo{ + transfersCompleted: int(atomic.LoadUint32(&jpm.atomicTransfersCompleted)), + transfersSkipped: int(atomic.LoadUint32(&jpm.atomicTransfersSkipped)), + transfersFailed: int(atomic.LoadUint32(&jpm.atomicTransfersFailed)), + } + jpm.jobMgr.ReportJobPartDone(jppi) } return transfersDone } diff --git a/ste/mgr-JobPartTransferMgr.go b/ste/mgr-JobPartTransferMgr.go index 683ba29ff..10077a83f 100644 --- a/ste/mgr-JobPartTransferMgr.go +++ b/ste/mgr-JobPartTransferMgr.go @@ -547,6 +547,7 @@ func (jptm *jobPartTransferMgr) hasStartedWork() bool { // the raw status values do not reflect possible cancellation. // Do not call directly. Use IsDeadBeforeStart or IsDeadInflight // instead because they usually require different handling +// Practically, a jptm is dead as soon as the context is releeased. func (jptm *jobPartTransferMgr) isDead() bool { return jptm.TransferStatusIgnoringCancellation() < 0 || jptm.WasCanceled() } @@ -818,7 +819,7 @@ func (jptm *jobPartTransferMgr) ReportTransferDone() uint32 { panic("cannot report the same transfer done twice") } - return jptm.jobPartMgr.ReportTransferDone() + return jptm.jobPartMgr.ReportTransferDone(jptm.jobPartPlanTransfer.TransferStatus()) } func (jptm *jobPartTransferMgr) SourceProviderPipeline() pipeline.Pipeline { diff --git a/ste/xfer-anyToRemote-file.go b/ste/xfer-anyToRemote-file.go index 4ec578d52..f962a85f5 100644 --- a/ste/xfer-anyToRemote-file.go +++ b/ste/xfer-anyToRemote-file.go @@ -34,7 +34,6 @@ import ( "github.com/Azure/azure-pipeline-go/pipeline" "github.com/Azure/azure-storage-blob-go/azblob" - "github.com/Azure/azure-storage-azcopy/common" ) @@ -42,7 +41,7 @@ import ( // That's alright, but it's good to know on the off chance. // This sync.Once is present to ensure we output information about a S2S access tier preservation failure to stdout once var s2sAccessTierFailureLogStdout sync.Once -var checkLengthFailureOnReadOnlyDst sync .Once +var checkLengthFailureOnReadOnlyDst sync.Once // This sync.Once and string pair ensures that we only get a user's destination account kind once when handling set-tier // Premium block blob doesn't support tiering, and page blobs only support P1-80. @@ -197,6 +196,8 @@ func anyToRemote_file(jptm IJobPartTransferMgr, info TransferInfo, p pipeline.Pi // step 1. perform initial checks if jptm.WasCanceled() { + /* This is the earliest we detect jptm has been cancelled before scheduling chunks */ + jptm.SetStatus(common.ETransferStatus.Cancelled()) jptm.ReportTransferDone() return } @@ -491,6 +492,11 @@ func epilogueWithCleanupSendToRemote(jptm IJobPartTransferMgr, s sender, sip ISo jptm.LogChunkStatus(pseudoId, common.EWaitReason.Epilogue()) defer jptm.LogChunkStatus(pseudoId, common.EWaitReason.ChunkDone()) // normal setting to done doesn't apply to these pseudo ids + if jptm.WasCanceled() { + // This is where we detect that transfer has been cancelled. Further statments do not act on + // dead jptm. We set the status here. + jptm.SetStatus(common.ETransferStatus.Cancelled()) + } if jptm.IsLive() { if _, isS2SCopier := s.(s2sCopier); sip.IsLocal() || (isS2SCopier && info.S2SSourceChangeValidation) { // Check the source to see if it was changed during transfer. If it was, mark the transfer as failed. diff --git a/ste/xfer-anyToRemote-folder.go b/ste/xfer-anyToRemote-folder.go index 2a08c9001..55d0a7eb4 100644 --- a/ste/xfer-anyToRemote-folder.go +++ b/ste/xfer-anyToRemote-folder.go @@ -30,6 +30,8 @@ func anyToRemote_folder(jptm IJobPartTransferMgr, info TransferInfo, p pipeline. // step 1. perform initial checks if jptm.WasCanceled() { + /* This is earliest we detect that jptm has been cancelled before we reach destination */ + jptm.SetStatus(common.ETransferStatus.Cancelled()) jptm.ReportTransferDone() return } diff --git a/ste/xfer-remoteToLocal-file.go b/ste/xfer-remoteToLocal-file.go index 693a3bf70..8d9a8878c 100644 --- a/ste/xfer-remoteToLocal-file.go +++ b/ste/xfer-remoteToLocal-file.go @@ -59,6 +59,8 @@ func remoteToLocal_file(jptm IJobPartTransferMgr, p pipeline.Pipeline, pacer pac // If the transfer was cancelled, then report transfer as done // TODO Question: the above comment had this following text too: "and increasing the bytestransferred by the size of the source." what does it mean? if jptm.WasCanceled() { + /* This is the earliest we detect that jptm has been cancelled before we schedule chunks */ + jptm.SetStatus(common.ETransferStatus.Cancelled()) jptm.ReportTransferDone() return } @@ -300,6 +302,12 @@ func epilogueWithCleanupDownload(jptm IJobPartTransferMgr, dl downloader, active jptm.LogChunkStatus(pseudoId, common.EWaitReason.Epilogue()) defer jptm.LogChunkStatus(pseudoId, common.EWaitReason.ChunkDone()) // normal setting to done doesn't apply to these pseudo ids + if jptm.WasCanceled() { + // This is where we first realize that the transfer is cancelled. Further statements are no-op and + // do not set any transfer status because they all of them verify that jptm is live. + jptm.SetStatus(common.ETransferStatus.Cancelled()) + } + haveNonEmptyFile := activeDstFile != nil if haveNonEmptyFile { diff --git a/ste/xfer-remoteToLocal-folder.go b/ste/xfer-remoteToLocal-folder.go index fc887aa8e..bd9802d93 100644 --- a/ste/xfer-remoteToLocal-folder.go +++ b/ste/xfer-remoteToLocal-folder.go @@ -33,6 +33,8 @@ func remoteToLocal_folder(jptm IJobPartTransferMgr, p pipeline.Pipeline, pacer p // Perform initial checks // If the transfer was cancelled, then report transfer as done if jptm.WasCanceled() { + /* This is the earliest we detect that jptm was cancelled, before we go to destination */ + jptm.SetStatus(common.ETransferStatus.Cancelled()) jptm.ReportTransferDone() return }