diff --git a/cmd/lakectl/cmd/local.go b/cmd/lakectl/cmd/local.go index be02d5683ff..c96372faa3c 100644 --- a/cmd/lakectl/cmd/local.go +++ b/cmd/lakectl/cmd/local.go @@ -99,7 +99,7 @@ func getLocalSyncFlags(cmd *cobra.Command, client *apigen.ClientWithResponses) s } // getLocalArgs parses arguments to extract a remote URI and deduces the local path. -// If local path isn't provided and considerGitRoot is true, it uses the git repository root. +// If the local path isn't provided and considerGitRoot is true, it uses the git repository root. func getLocalArgs(args []string, requireRemote bool, considerGitRoot bool) (remote *uri.URI, localPath string) { idx := 0 if requireRemote { diff --git a/cmd/lakectl/cmd/local_clone.go b/cmd/lakectl/cmd/local_clone.go index 0b18c224f86..09c615eb79b 100644 --- a/cmd/lakectl/cmd/local_clone.go +++ b/cmd/lakectl/cmd/local_clone.go @@ -46,9 +46,9 @@ var localCloneCmd = &cobra.Command{ } stableRemote := remote.WithRef(head) // Dynamically construct changes - c := make(chan *local.Change, filesChanSize) + ch := make(chan *local.Change, filesChanSize) go func() { - defer close(c) + defer close(ch) remotePath := remote.GetPath() var after string for { @@ -70,7 +70,7 @@ var localCloneCmd = &cobra.Command{ if relPath == "" || strings.HasSuffix(relPath, uri.PathSeparator) { continue } - c <- &local.Change{ + ch <- &local.Change{ Source: local.ChangeSourceRemote, Path: relPath, Type: local.ChangeTypeAdded, @@ -88,7 +88,7 @@ var localCloneCmd = &cobra.Command{ } sigCtx := localHandleSyncInterrupt(ctx, idx, string(cloneOperation)) s := local.NewSyncManager(sigCtx, client, syncFlags.parallelism, syncFlags.presign) - err = s.Sync(localPath, stableRemote, c) + err = s.Sync(localPath, stableRemote, ch) if err != nil { DieErr(err) } diff --git a/pkg/fileutil/io.go b/pkg/fileutil/io.go index 923f5012fcf..a25851e4b97 100644 --- a/pkg/fileutil/io.go +++ b/pkg/fileutil/io.go @@ -153,7 +153,7 @@ func VerifyAbsPath(absPath, basePath string) error { } func VerifyRelPath(relPath, basePath string) error { - abs := basePath + string(os.PathSeparator) + relPath + abs := filepath.Join(basePath, relPath) return VerifyAbsPath(abs, basePath) } diff --git a/pkg/local/sync.go b/pkg/local/sync.go index 35865804bce..c0afe49c7d7 100644 --- a/pkg/local/sync.go +++ b/pkg/local/sync.go @@ -75,17 +75,16 @@ func (s *SyncManager) Sync(rootPath string, remote *uri.URI, changeSet <-chan *C defer s.progressBar.Stop() wg, ctx := errgroup.WithContext(s.ctx) - wg.SetLimit(s.maxParallelism) - for change := range changeSet { - c := change - if err := ctx.Err(); err != nil { - return err - } + for i := 0; i < s.maxParallelism; i++ { wg.Go(func() error { - return s.apply(ctx, rootPath, remote, c) + for change := range changeSet { + if err := s.apply(ctx, rootPath, remote, change); err != nil { + return err + } + } + return nil }) } - if err := wg.Wait(); err != nil { return err } @@ -93,48 +92,41 @@ func (s *SyncManager) Sync(rootPath string, remote *uri.URI, changeSet <-chan *C return err } -func (s *SyncManager) apply(ctx context.Context, rootPath string, remote *uri.URI, change *Change) (err error) { +func (s *SyncManager) apply(ctx context.Context, rootPath string, remote *uri.URI, change *Change) error { switch change.Type { case ChangeTypeAdded, ChangeTypeModified: switch change.Source { case ChangeSourceRemote: - // remote changed something, download it! - err = s.download(ctx, rootPath, remote, change) - if err != nil { - err = fmt.Errorf("download %s failed: %w", change.Path, err) + // remotely changed something, download it! + if err := s.download(ctx, rootPath, remote, change); err != nil { + return fmt.Errorf("download %s failed: %w", change.Path, err) } - return err case ChangeSourceLocal: // we wrote something, upload it! - err = s.upload(ctx, rootPath, remote, change) - if err != nil { - err = fmt.Errorf("upload %s failed: %w", change.Path, err) + if err := s.upload(ctx, rootPath, remote, change); err != nil { + return fmt.Errorf("upload %s failed: %w", change.Path, err) } - return err default: panic("invalid change source") } case ChangeTypeRemoved: if change.Source == ChangeSourceRemote { // remote deleted something, delete it locally! - err = s.deleteLocal(rootPath, change) - if err != nil { - err = fmt.Errorf("delete local %s failed: %w", change.Path, err) + if err := s.deleteLocal(rootPath, change); err != nil { + return fmt.Errorf("delete local %s failed: %w", change.Path, err) } - return err } else { // we deleted something, delete it on remote! - err = s.deleteRemote(ctx, remote, change) - if err != nil { - err = fmt.Errorf("delete remote %s failed: %w", change.Path, err) + if err := s.deleteRemote(ctx, remote, change); err != nil { + return fmt.Errorf("delete remote %s failed: %w", change.Path, err) } - return err } case ChangeTypeConflict: return ErrConflict default: panic("invalid change type") } + return nil } func (s *SyncManager) download(ctx context.Context, rootPath string, remote *uri.URI, change *Change) error { @@ -175,8 +167,8 @@ func (s *SyncManager) download(ctx context.Context, rootPath string, remote *uri sizeBytes := swag.Int64Value(statResp.JSON200.SizeBytes) f, err := os.Create(destination) if err != nil { - // sometimes we get a file that is actually a directory marker. - // spark loves writing those. If we already have the directory we can skip it. + // Sometimes we get a file that is actually a directory marker (Spark loves writing those). + // If we already have the directory, we can skip it. if errors.Is(err, syscall.EISDIR) && sizeBytes == 0 { return nil // no further action required! } @@ -187,7 +179,7 @@ func (s *SyncManager) download(ctx context.Context, rootPath string, remote *uri }() if sizeBytes == 0 { // if size is empty just create file - spinner := s.progressBar.AddSpinner(fmt.Sprintf("download %s", change.Path)) + spinner := s.progressBar.AddSpinner("download " + change.Path) atomic.AddUint64(&s.tasks.Downloaded, 1) defer spinner.Done() } else { // Download file @@ -239,7 +231,7 @@ func (s *SyncManager) download(ctx context.Context, rootPath string, remote *uri } // set mtime to the server returned one - err = os.Chtimes(destination, time.Now(), lastModified) // Explicit to catch in defer func + err = os.Chtimes(destination, time.Now(), lastModified) // Explicit to catch in deferred func return err } @@ -292,7 +284,7 @@ func (s *SyncManager) upload(ctx context.Context, rootPath string, remote *uri.U } func (s *SyncManager) deleteLocal(rootPath string, change *Change) (err error) { - b := s.progressBar.AddSpinner(fmt.Sprintf("delete local: %s", change.Path)) + b := s.progressBar.AddSpinner("delete local: " + change.Path) defer func() { defer func() { if err != nil { @@ -312,7 +304,7 @@ func (s *SyncManager) deleteLocal(rootPath string, change *Change) (err error) { } func (s *SyncManager) deleteRemote(ctx context.Context, remote *uri.URI, change *Change) (err error) { - b := s.progressBar.AddSpinner(fmt.Sprintf("delete remote path: %s", change.Path)) + b := s.progressBar.AddSpinner("delete remote path: " + change.Path) defer func() { if err != nil { b.Error()