Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/6584 content type for lakectl local #6644

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/lakectl/cmd/local_checkout.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func localCheckout(cmd *cobra.Command, localPath string, specifiedRef string, co
}
}
}()
err = syncMgr.Sync(idx.LocalPath(), currentBase, c)
err = syncMgr.Sync(idx.LocalPath(), currentBase, c, "", false)
if err != nil {
DieErr(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lakectl/cmd/local_clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ var localCloneCmd = &cobra.Command{
}
sigCtx := localHandleSyncInterrupt(ctx, idx, string(cloneOperation))
s := local.NewSyncManager(sigCtx, client, syncFlags.parallelism, syncFlags.presign)
err = s.Sync(localPath, stableRemote, ch)
err = s.Sync(localPath, stableRemote, ch, "", false)
if err != nil {
DieErr(err)
}
Expand Down
12 changes: 9 additions & 3 deletions cmd/lakectl/cmd/local_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ import (
)

const (
localCommitAllowEmptyMessage = "allow-empty-message"
localCommitMessageFlagName = "message"
localCommitAllowEmptyMessage = "allow-empty-message"
localCommitMessageFlagName = "message"
localContentTypeFlagName = "content-type"
localIncludeContentTypeFlagName = "include-content-type"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
localIncludeContentTypeFlagName = "include-content-type"
localIncludeContentTypeFlagName = "detect-content-type"

)

func findConflicts(changes local.Changes) (conflicts []string) {
Expand All @@ -38,6 +40,8 @@ var localCommitCmd = &cobra.Command{
_, localPath := getLocalArgs(args, false, false)
syncFlags := getLocalSyncFlags(cmd, client)
message := Must(cmd.Flags().GetString(localCommitMessageFlagName))
contentType := Must(cmd.Flags().GetString(localContentTypeFlagName))
includeContentType := Must(cmd.Flags().GetBool(localIncludeContentTypeFlagName))
allowEmptyMessage := Must(cmd.Flags().GetBool(localCommitAllowEmptyMessage))
if message == "" && !allowEmptyMessage {
DieFmt("Commit message empty! To commit with empty message pass --%s flag", localCommitAllowEmptyMessage)
Expand Down Expand Up @@ -120,7 +124,7 @@ var localCommitCmd = &cobra.Command{
}()
sigCtx := localHandleSyncInterrupt(cmd.Context(), idx, string(commitOperation))
s := local.NewSyncManager(sigCtx, client, syncFlags.parallelism, syncFlags.presign)
err = s.Sync(idx.LocalPath(), remote, c)
err = s.Sync(idx.LocalPath(), remote, c, contentType, includeContentType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand. When would I want to set contentType to the same value on all the files in a Sync?

if err != nil {
DieErr(err)
}
Expand Down Expand Up @@ -187,6 +191,8 @@ func init() {
localCommitCmd.Flags().Bool(localCommitAllowEmptyMessage, false, "Allow commit with empty message")
localCommitCmd.MarkFlagsMutuallyExclusive(localCommitMessageFlagName, localCommitAllowEmptyMessage)
localCommitCmd.Flags().StringSlice(metaFlagName, []string{}, "key value pair in the form of key=value")
localCommitCmd.Flags().StringP(localContentTypeFlagName, "", "", "MIME type of contents")
localCommitCmd.Flags().Bool(localIncludeContentTypeFlagName, false, "Detects MIME type of contents")
withLocalSyncFlags(localCommitCmd)
localCmd.AddCommand(localCommitCmd)
}
2 changes: 1 addition & 1 deletion cmd/lakectl/cmd/local_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ var localPullCmd = &cobra.Command{
})
sigCtx := localHandleSyncInterrupt(cmd.Context(), idx, string(pullOperation))
s := local.NewSyncManager(sigCtx, client, syncFlags.parallelism, syncFlags.presign)
err = s.Sync(idx.LocalPath(), newBase, c)
err = s.Sync(idx.LocalPath(), newBase, c, "", false)
if err != nil {
DieErr(err)
}
Expand Down
14 changes: 8 additions & 6 deletions docs/reference/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -2527,12 +2527,14 @@ lakectl local commit [directory] [flags]
{:.no_toc}

```
--allow-empty-message Allow commit with empty message
-h, --help help for commit
-m, --message string Commit message
--meta strings key value pair in the form of key=value
-p, --parallelism int Max concurrent operations to perform (default 25)
--pre-sign Use pre-signed URLs when downloading/uploading data (recommended) (default true)
--allow-empty-message Allow commit with empty message
--content-type string MIME type of contents
-h, --help help for commit
--include-content-type Detects MIME type of contents
-m, --message string Commit message
--meta strings key value pair in the form of key=value
-p, --parallelism int Max concurrent operations to perform (default 25)
--pre-sign Use pre-signed URLs when downloading/uploading data (recommended) (default true)
```


Expand Down
18 changes: 11 additions & 7 deletions pkg/local/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"mime"
"net/http"
"os"
"path/filepath"
Expand Down Expand Up @@ -70,15 +71,18 @@ func NewSyncManager(ctx context.Context, client *apigen.ClientWithResponses, max

// Sync - sync changes between remote and local directory given the Changes channel.
// For each change, will apply download, upload or delete according to the change type and change source
func (s *SyncManager) Sync(rootPath string, remote *uri.URI, changeSet <-chan *Change) error {
func (s *SyncManager) Sync(rootPath string, remote *uri.URI, changeSet <-chan *Change, contentType string, includeContentType bool) error {
s.progressBar.Start()
defer s.progressBar.Stop()

wg, ctx := errgroup.WithContext(s.ctx)
for i := 0; i < s.maxParallelism; i++ {
wg.Go(func() error {
for change := range changeSet {
if err := s.apply(ctx, rootPath, remote, change); err != nil {
if includeContentType {
contentType = mime.TypeByExtension(filepath.Ext(change.Path))
}
if err := s.apply(ctx, rootPath, remote, change, contentType); err != nil {
return err
}
}
Expand All @@ -92,7 +96,7 @@ func (s *SyncManager) Sync(rootPath string, remote *uri.URI, changeSet <-chan *C
return err
}

func (s *SyncManager) apply(ctx context.Context, rootPath string, remote *uri.URI, change *Change) error {
func (s *SyncManager) apply(ctx context.Context, rootPath string, remote *uri.URI, change *Change, contentType string) error {
switch change.Type {
case ChangeTypeAdded, ChangeTypeModified:
switch change.Source {
Expand All @@ -103,7 +107,7 @@ func (s *SyncManager) apply(ctx context.Context, rootPath string, remote *uri.UR
}
case ChangeSourceLocal:
// we wrote something, upload it!
if err := s.upload(ctx, rootPath, remote, change); err != nil {
if err := s.upload(ctx, rootPath, remote, change, contentType); err != nil {
return fmt.Errorf("upload %s failed: %w", change.Path, err)
}
default:
Expand Down Expand Up @@ -235,7 +239,7 @@ func (s *SyncManager) download(ctx context.Context, rootPath string, remote *uri
return err
}

func (s *SyncManager) upload(ctx context.Context, rootPath string, remote *uri.URI, change *Change) error {
func (s *SyncManager) upload(ctx context.Context, rootPath string, remote *uri.URI, change *Change, contentType string) error {
source := filepath.Join(rootPath, change.Path)
if err := fileutil.VerifySafeFilename(source); err != nil {
return err
Expand Down Expand Up @@ -274,12 +278,12 @@ func (s *SyncManager) upload(ctx context.Context, rootPath string, remote *uri.U
}
if s.presign {
_, err = helpers.ClientUploadPreSign(
ctx, s.client, remote.Repository, remote.Ref, dest, metadata, "", reader)
ctx, s.client, remote.Repository, remote.Ref, dest, metadata, contentType, reader)
return err
}
// not pre-signed
_, err = helpers.ClientUpload(
ctx, s.client, remote.Repository, remote.Ref, dest, metadata, "", reader)
ctx, s.client, remote.Repository, remote.Ref, dest, metadata, contentType, reader)
return err
}

Expand Down
Loading