Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix lakectl local verify bad path error on Windows #6602

Merged
merged 1 commit into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/lakectl/cmd/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func getLocalSyncFlags(cmd *cobra.Command, client *apigen.ClientWithResponses) s
}

// getLocalArgs parses arguments to extract a remote URI and deduces the local path.
// If local path isn't provided and considerGitRoot is true, it uses the git repository root.
// If the local path isn't provided and considerGitRoot is true, it uses the git repository root.
func getLocalArgs(args []string, requireRemote bool, considerGitRoot bool) (remote *uri.URI, localPath string) {
idx := 0
if requireRemote {
Expand Down
8 changes: 4 additions & 4 deletions cmd/lakectl/cmd/local_clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ var localCloneCmd = &cobra.Command{
}
stableRemote := remote.WithRef(head)
// Dynamically construct changes
c := make(chan *local.Change, filesChanSize)
ch := make(chan *local.Change, filesChanSize)
go func() {
defer close(c)
defer close(ch)
remotePath := remote.GetPath()
var after string
for {
Expand All @@ -70,7 +70,7 @@ var localCloneCmd = &cobra.Command{
if relPath == "" || strings.HasSuffix(relPath, uri.PathSeparator) {
continue
}
c <- &local.Change{
ch <- &local.Change{
Source: local.ChangeSourceRemote,
Path: relPath,
Type: local.ChangeTypeAdded,
Expand All @@ -88,7 +88,7 @@ var localCloneCmd = &cobra.Command{
}
sigCtx := localHandleSyncInterrupt(ctx, idx, string(cloneOperation))
s := local.NewSyncManager(sigCtx, client, syncFlags.parallelism, syncFlags.presign)
err = s.Sync(localPath, stableRemote, c)
err = s.Sync(localPath, stableRemote, ch)
if err != nil {
DieErr(err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/fileutil/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func VerifyAbsPath(absPath, basePath string) error {
}

func VerifyRelPath(relPath, basePath string) error {
abs := basePath + string(os.PathSeparator) + relPath
abs := filepath.Join(basePath, relPath)
return VerifyAbsPath(abs, basePath)
}

Expand Down
56 changes: 24 additions & 32 deletions pkg/local/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,66 +75,58 @@ func (s *SyncManager) Sync(rootPath string, remote *uri.URI, changeSet <-chan *C
defer s.progressBar.Stop()

wg, ctx := errgroup.WithContext(s.ctx)
wg.SetLimit(s.maxParallelism)
for change := range changeSet {
c := change
if err := ctx.Err(); err != nil {
return err
}
for i := 0; i < s.maxParallelism; i++ {
wg.Go(func() error {
return s.apply(ctx, rootPath, remote, c)
for change := range changeSet {
if err := s.apply(ctx, rootPath, remote, change); err != nil {
return err
}
}
return nil
})
}

if err := wg.Wait(); err != nil {
return err
}
_, err := fileutil.PruneEmptyDirectories(rootPath)
return err
}

func (s *SyncManager) apply(ctx context.Context, rootPath string, remote *uri.URI, change *Change) (err error) {
func (s *SyncManager) apply(ctx context.Context, rootPath string, remote *uri.URI, change *Change) error {
switch change.Type {
case ChangeTypeAdded, ChangeTypeModified:
switch change.Source {
case ChangeSourceRemote:
// remote changed something, download it!
err = s.download(ctx, rootPath, remote, change)
if err != nil {
err = fmt.Errorf("download %s failed: %w", change.Path, err)
// remotely changed something, download it!
if err := s.download(ctx, rootPath, remote, change); err != nil {
return fmt.Errorf("download %s failed: %w", change.Path, err)
}
return err
case ChangeSourceLocal:
// we wrote something, upload it!
err = s.upload(ctx, rootPath, remote, change)
if err != nil {
err = fmt.Errorf("upload %s failed: %w", change.Path, err)
if err := s.upload(ctx, rootPath, remote, change); err != nil {
return fmt.Errorf("upload %s failed: %w", change.Path, err)
}
return err
default:
panic("invalid change source")
}
case ChangeTypeRemoved:
if change.Source == ChangeSourceRemote {
// remote deleted something, delete it locally!
err = s.deleteLocal(rootPath, change)
if err != nil {
err = fmt.Errorf("delete local %s failed: %w", change.Path, err)
if err := s.deleteLocal(rootPath, change); err != nil {
return fmt.Errorf("delete local %s failed: %w", change.Path, err)
}
return err
} else {
// we deleted something, delete it on remote!
err = s.deleteRemote(ctx, remote, change)
if err != nil {
err = fmt.Errorf("delete remote %s failed: %w", change.Path, err)
if err := s.deleteRemote(ctx, remote, change); err != nil {
return fmt.Errorf("delete remote %s failed: %w", change.Path, err)
}
return err
}
case ChangeTypeConflict:
return ErrConflict
default:
panic("invalid change type")
}
return nil
}

func (s *SyncManager) download(ctx context.Context, rootPath string, remote *uri.URI, change *Change) error {
Expand Down Expand Up @@ -175,8 +167,8 @@ func (s *SyncManager) download(ctx context.Context, rootPath string, remote *uri
sizeBytes := swag.Int64Value(statResp.JSON200.SizeBytes)
f, err := os.Create(destination)
if err != nil {
// sometimes we get a file that is actually a directory marker.
// spark loves writing those. If we already have the directory we can skip it.
// Sometimes we get a file that is actually a directory marker (Spark loves writing those).
// If we already have the directory, we can skip it.
if errors.Is(err, syscall.EISDIR) && sizeBytes == 0 {
return nil // no further action required!
}
Expand All @@ -187,7 +179,7 @@ func (s *SyncManager) download(ctx context.Context, rootPath string, remote *uri
}()

if sizeBytes == 0 { // if size is empty just create file
spinner := s.progressBar.AddSpinner(fmt.Sprintf("download %s", change.Path))
spinner := s.progressBar.AddSpinner("download " + change.Path)
atomic.AddUint64(&s.tasks.Downloaded, 1)
defer spinner.Done()
} else { // Download file
Expand Down Expand Up @@ -239,7 +231,7 @@ func (s *SyncManager) download(ctx context.Context, rootPath string, remote *uri
}

// set mtime to the server returned one
err = os.Chtimes(destination, time.Now(), lastModified) // Explicit to catch in defer func
err = os.Chtimes(destination, time.Now(), lastModified) // Explicit to catch in deferred func
return err
}

Expand Down Expand Up @@ -292,7 +284,7 @@ func (s *SyncManager) upload(ctx context.Context, rootPath string, remote *uri.U
}

func (s *SyncManager) deleteLocal(rootPath string, change *Change) (err error) {
b := s.progressBar.AddSpinner(fmt.Sprintf("delete local: %s", change.Path))
b := s.progressBar.AddSpinner("delete local: " + change.Path)
defer func() {
defer func() {
if err != nil {
Expand All @@ -312,7 +304,7 @@ func (s *SyncManager) deleteLocal(rootPath string, change *Change) (err error) {
}

func (s *SyncManager) deleteRemote(ctx context.Context, remote *uri.URI, change *Change) (err error) {
b := s.progressBar.AddSpinner(fmt.Sprintf("delete remote path: %s", change.Path))
b := s.progressBar.AddSpinner("delete remote path: " + change.Path)
defer func() {
if err != nil {
b.Error()
Expand Down