Skip to content

Commit

Permalink
fix: added auto detect content type
Browse files Browse the repository at this point in the history
Signed-off-by: chaitu <[email protected]>
  • Loading branch information
chaitu authored and 0x-chaitu committed Jan 3, 2024
1 parent 88247ac commit 95e433e
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 14 deletions.
2 changes: 1 addition & 1 deletion cmd/lakectl/cmd/fs_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ var fsDownloadCmd = &cobra.Command{
}

s := local.NewSyncManager(ctx, client, syncFlags)
err := s.Sync(dest, remote, ch)
err := s.Sync(dest, remote, ch, false)
if err != nil {
DieErr(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lakectl/cmd/fs_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var fsUploadCmd = &cobra.Command{
if err != nil {
DieErr(err)
}
err = s.Sync(fullPath, pathURI, c)
err = s.Sync(fullPath, pathURI, c, false)
if err != nil {
DieErr(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lakectl/cmd/local_checkout.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func localCheckout(cmd *cobra.Command, localPath string, specifiedRef string, co
}
}
}()
err = syncMgr.Sync(idx.LocalPath(), currentBase, c)
err = syncMgr.Sync(idx.LocalPath(), currentBase, c, false)
if err != nil {
DieErr(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lakectl/cmd/local_clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ var localCloneCmd = &cobra.Command{
}
sigCtx := localHandleSyncInterrupt(ctx, idx, string(cloneOperation))
s := local.NewSyncManager(sigCtx, client, syncFlags)
err = s.Sync(localPath, stableRemote, ch)
err = s.Sync(localPath, stableRemote, ch, false)
if err != nil {
DieErr(err)
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/lakectl/cmd/local_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ var localCommitCmd = &cobra.Command{
_, localPath := getSyncArgs(args, false, false)
syncFlags := getSyncFlags(cmd, client)
message, kvPairs := getCommitFlags(cmd)

detectContentTypeFlag := getDetectContentTypeFlag(cmd)
idx, err := local.ReadIndex(localPath)
if err != nil {
DieErr(err)
Expand Down Expand Up @@ -112,7 +112,7 @@ var localCommitCmd = &cobra.Command{
}()
sigCtx := localHandleSyncInterrupt(cmd.Context(), idx, string(commitOperation))
s := local.NewSyncManager(sigCtx, client, syncFlags)
err = s.Sync(idx.LocalPath(), remote, c)
err = s.Sync(idx.LocalPath(), remote, c, detectContentTypeFlag)
if err != nil {
DieErr(err)
}
Expand Down Expand Up @@ -172,5 +172,6 @@ var localCommitCmd = &cobra.Command{
func init() {
withCommitFlags(localCommitCmd, false)
withSyncFlags(localCommitCmd)
withDetectContentFlag(localCommitCmd, false)
localCmd.AddCommand(localCommitCmd)
}
2 changes: 1 addition & 1 deletion cmd/lakectl/cmd/local_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ var localPullCmd = &cobra.Command{
})
sigCtx := localHandleSyncInterrupt(cmd.Context(), idx, string(pullOperation))
s := local.NewSyncManager(sigCtx, client, syncFlags)
err = s.Sync(idx.LocalPath(), newBase, c)
err = s.Sync(idx.LocalPath(), newBase, c, false)
if err != nil {
DieErr(err)
}
Expand Down
9 changes: 9 additions & 0 deletions cmd/lakectl/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ const (
allowEmptyMsgFlagName = "allow-empty-message"
fmtErrEmptyMsg = `commit with no message without specifying the "--allow-empty-message" flag`
metaFlagName = "meta"
detectContentType = "detect-content-type"
)

func withRecursiveFlag(cmd *cobra.Command, usage string) {
Expand Down Expand Up @@ -222,6 +223,10 @@ func withMetadataFlag(cmd *cobra.Command) {
cmd.Flags().StringSlice(metaFlagName, []string{}, "key value pair in the form of key=value")
}

func withDetectContentFlag(cmd *cobra.Command, detectContentFlag bool) {
cmd.Flags().Bool(detectContentType, detectContentFlag, "detect content type")
}

func withCommitFlags(cmd *cobra.Command, allowEmptyMessage bool) {
withMessageFlags(cmd, allowEmptyMessage)
withMetadataFlag(cmd)
Expand All @@ -242,6 +247,10 @@ func getCommitFlags(cmd *cobra.Command) (string, map[string]string) {
return message, kvPairs
}

func getDetectContentTypeFlag(cmd *cobra.Command) bool {
return Must(cmd.Flags().GetBool(detectContentType))
}

func getKV(cmd *cobra.Command, name string) (map[string]string, error) { //nolint:unparam
kvList, err := cmd.Flags().GetStringSlice(name)
if err != nil {
Expand Down
19 changes: 12 additions & 7 deletions pkg/local/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"mime"
"net/http"
"os"
"path/filepath"
Expand Down Expand Up @@ -73,15 +74,15 @@ func NewSyncManager(ctx context.Context, client *apigen.ClientWithResponses, fla

// Sync - sync changes between remote and local directory given the Changes channel.
// For each change, will apply download, upload or delete according to the change type and change source
func (s *SyncManager) Sync(rootPath string, remote *uri.URI, changeSet <-chan *Change) error {
func (s *SyncManager) Sync(rootPath string, remote *uri.URI, changeSet <-chan *Change, detectContentType bool) error {
s.progressBar.Start()
defer s.progressBar.Stop()

wg, ctx := errgroup.WithContext(s.ctx)
for i := 0; i < s.flags.Parallelism; i++ {
wg.Go(func() error {
for change := range changeSet {
if err := s.apply(ctx, rootPath, remote, change); err != nil {
if err := s.apply(ctx, rootPath, remote, change, detectContentType); err != nil {
return err
}
}
Expand All @@ -95,7 +96,7 @@ func (s *SyncManager) Sync(rootPath string, remote *uri.URI, changeSet <-chan *C
return err
}

func (s *SyncManager) apply(ctx context.Context, rootPath string, remote *uri.URI, change *Change) error {
func (s *SyncManager) apply(ctx context.Context, rootPath string, remote *uri.URI, change *Change, detectContentType bool) error {
switch change.Type {
case ChangeTypeAdded, ChangeTypeModified:
switch change.Source {
Expand All @@ -106,7 +107,7 @@ func (s *SyncManager) apply(ctx context.Context, rootPath string, remote *uri.UR
}
case ChangeSourceLocal:
// we wrote something, upload it!
if err := s.upload(ctx, rootPath, remote, change.Path); err != nil {
if err := s.upload(ctx, rootPath, remote, change.Path, detectContentType); err != nil {
return fmt.Errorf("upload %s failed: %w", change.Path, err)
}
default:
Expand Down Expand Up @@ -237,7 +238,11 @@ func (s *SyncManager) download(ctx context.Context, rootPath string, remote *uri
return err
}

func (s *SyncManager) upload(ctx context.Context, rootPath string, remote *uri.URI, path string) error {
func (s *SyncManager) upload(ctx context.Context, rootPath string, remote *uri.URI, path string, detectContentType bool) error {
var contentType string
if detectContentType {
contentType = mime.TypeByExtension(filepath.Ext(path))
}
source := filepath.Join(rootPath, path)
if err := fileutil.VerifySafeFilename(source); err != nil {
return err
Expand Down Expand Up @@ -276,12 +281,12 @@ func (s *SyncManager) upload(ctx context.Context, rootPath string, remote *uri.U
}
if s.flags.Presign {
_, err = helpers.ClientUploadPreSign(
ctx, s.client, remote.Repository, remote.Ref, dest, metadata, "", reader)
ctx, s.client, remote.Repository, remote.Ref, dest, metadata, contentType, reader)
return err
}
// not pre-signed
_, err = helpers.ClientUpload(
ctx, s.client, remote.Repository, remote.Ref, dest, metadata, "", reader)
ctx, s.client, remote.Repository, remote.Ref, dest, metadata, contentType, reader)
return err
}

Expand Down

0 comments on commit 95e433e

Please sign in to comment.