Skip to content

Commit

Permalink
sync: fix temporary name when create a file (#4215)
Browse files Browse the repository at this point in the history
  • Loading branch information
davies authored Nov 30, 2023
1 parent ec24896 commit 984bf0d
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 36 deletions.
23 changes: 12 additions & 11 deletions cmd/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"os"
"path"
"sort"
"strconv"
"strings"
"sync"
"syscall"
Expand Down Expand Up @@ -127,7 +126,18 @@ func (j *juiceFS) Put(key string, in io.Reader) (err error) {
if object.PutInplace {
tmp = p
} else {
tmp = path.Join(path.Dir(p), "."+path.Base(p)+".tmp"+strconv.Itoa(rand.Int()))
name := path.Base(p)
if len(name) > 200 {
name = name[:200]
}
tmp = path.Join(path.Dir(p), fmt.Sprintf(".%s.tmp.%d", name, rand.Int()))
defer func() {
if err != nil {
if e := j.jfs.Delete(ctx, tmp); e != 0 {
logger.Warnf("Failed to delete %s: %s", tmp, e)
}
}
}()
}
f, eno := j.jfs.Create(ctx, tmp, 0666, j.umask)
if eno == syscall.ENOENT {
Expand All @@ -143,15 +153,6 @@ func (j *juiceFS) Put(key string, in io.Reader) (err error) {
if eno != 0 {
return toError(eno)
}
if !object.PutInplace {
defer func() {
if err != nil {
if e := j.jfs.Delete(ctx, tmp); e != 0 {
logger.Warnf("Failed to delete %s: %s", tmp, e)
}
}
}()
}
buf := bufPool.Get().(*[]byte)
defer bufPool.Put(buf)
_, err = io.CopyBuffer(&jFile{f, 0}, in, *buf)
Expand Down
6 changes: 6 additions & 0 deletions cmd/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ func testFileSystem(t *testing.T, s object.ObjectStorage) {
t.Fatalf("testKeysEqual fail: %s", err)
}
}

// put a file with very long name
longName := strings.Repeat("a", 255)
if err := s.Put("dir/"+longName, bytes.NewReader([]byte{0})); err != nil {
t.Fatalf("PUT a file with long name `%s` failed: %q", longName, err)
}
}

func TestJFS(t *testing.T) {
Expand Down
20 changes: 11 additions & 9 deletions pkg/object/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (d *filestore) Get(key string, off, limit int64) (io.ReadCloser, error) {
return f, nil
}

func (d *filestore) Put(key string, in io.Reader) error {
func (d *filestore) Put(key string, in io.Reader) (err error) {
p := d.path(key)

if strings.HasSuffix(key, dirSuffix) || key == "" && strings.HasSuffix(d.root, dirSuffix) {
Expand All @@ -148,7 +148,16 @@ func (d *filestore) Put(key string, in io.Reader) error {
if PutInplace {
tmp = p
} else {
tmp = filepath.Join(filepath.Dir(p), "."+filepath.Base(p)+".tmp"+strconv.Itoa(rand.Int()))
name := filepath.Base(p)
if len(name) > 200 {
name = name[:200]
}
tmp = filepath.Join(filepath.Dir(p), "."+name+".tmp"+strconv.Itoa(rand.Int()))
defer func() {
if err != nil {
_ = os.Remove(tmp)
}
}()
}
f, err := os.OpenFile(tmp, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
if err != nil && os.IsNotExist(err) {
Expand All @@ -160,13 +169,6 @@ func (d *filestore) Put(key string, in io.Reader) error {
if err != nil {
return err
}
if !PutInplace {
defer func() {
if err != nil {
_ = os.Remove(tmp)
}
}()
}

if TryCFR {
_, err = io.Copy(f, in)
Expand Down
6 changes: 6 additions & 0 deletions pkg/object/filesystem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,10 @@ func testFileSystem(t *testing.T, s ObjectStorage) {
}
}
}

// put a file with very long name
longName := strings.Repeat("a", 255)
if err := s.Put("dir/"+longName, bytes.NewReader([]byte{0})); err != nil {
t.Fatalf("PUT a file with long name `%s` failed: %q", longName, err)
}
}
16 changes: 11 additions & 5 deletions pkg/object/hdfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (h *hdfsclient) Get(key string, off, limit int64) (io.ReadCloser, error) {
return f, nil
}

func (h *hdfsclient) Put(key string, in io.Reader) error {
func (h *hdfsclient) Put(key string, in io.Reader) (err error) {
p := h.path(key)
if strings.HasSuffix(p, dirSuffix) {
return h.c.MkdirAll(p, 0777&^h.umask)
Expand All @@ -131,12 +131,18 @@ func (h *hdfsclient) Put(key string, in io.Reader) error {
if PutInplace {
tmp = p
} else {
tmp = path.Join(path.Dir(p), fmt.Sprintf(".%s.tmp.%d", path.Base(p), rand.Int()))
name := path.Base(p)
if len(name) > 200 {
name = name[:200]
}
tmp = path.Join(path.Dir(p), fmt.Sprintf(".%s.tmp.%d", name, rand.Int()))
defer func() {
if err != nil {
_ = h.c.Remove(tmp)
}
}()
}
f, err := h.c.CreateFile(tmp, h.dfsReplication, 128<<20, 0666&^h.umask)
if !PutInplace {
defer func() { _ = h.c.Remove(tmp) }()
}
if err != nil {
if pe, ok := err.(*os.PathError); ok && pe.Err == os.ErrNotExist {
_ = h.c.MkdirAll(path.Dir(p), 0777&^h.umask)
Expand Down
19 changes: 13 additions & 6 deletions pkg/object/nfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"io"
"io/fs"
"math/rand"
"os"
"os/user"
"path"
Expand Down Expand Up @@ -153,7 +154,7 @@ func (n *nfsStore) mkdirAll(p string, perm fs.FileMode) error {
return err
}

func (n *nfsStore) Put(key string, in io.Reader) error {
func (n *nfsStore) Put(key string, in io.Reader) (err error) {
p := n.path(key)
if strings.HasSuffix(p, dirSuffix) {
return n.mkdirAll(p, 0777)
Expand All @@ -162,9 +163,18 @@ func (n *nfsStore) Put(key string, in io.Reader) error {
if PutInplace {
tmp = p
} else {
tmp = path.Join(path.Dir(p), "."+path.Base(p)+".tmp")
name := path.Base(p)
if len(name) > 200 {
name = name[:200]
}
tmp = path.Join(path.Dir(p), fmt.Sprintf(".%s.tmp.%d", name, rand.Int()))
defer func() {
if err != nil {
_ = n.target.Remove(tmp)
}
}()
}
_, err := n.target.Create(tmp, 0777)
_, err = n.target.Create(tmp, 0777)
if os.IsNotExist(err) {
_ = n.mkdirAll(path.Dir(p), 0777)
_, err = n.target.Create(tmp, 0777)
Expand All @@ -181,9 +191,6 @@ func (n *nfsStore) Put(key string, in io.Reader) error {
return err
}

if !PutInplace {
defer func() { _ = n.target.Remove(tmp) }()
}
buf := bufPool.Get().(*[]byte)
defer bufPool.Put(buf)
_, err = io.CopyBuffer(ff, in, *buf)
Expand Down
17 changes: 12 additions & 5 deletions pkg/object/sftp.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"bytes"
"fmt"
"io"
"math/rand"
"net"
"net/url"
"os"
Expand Down Expand Up @@ -203,7 +204,7 @@ func (f *sftpStore) Get(key string, off, limit int64) (io.ReadCloser, error) {
return ff, err
}

func (f *sftpStore) Put(key string, in io.Reader) error {
func (f *sftpStore) Put(key string, in io.Reader) (err error) {
c, err := f.getSftpConnection()
if err != nil {
return err
Expand All @@ -222,15 +223,21 @@ func (f *sftpStore) Put(key string, in io.Reader) error {
if PutInplace {
tmp = p
} else {
tmp = path.Join(path.Dir(p), "."+path.Base(p)+".tmp")
name := path.Base(p)
if len(name) > 200 {
name = name[:200]
}
tmp = path.Join(path.Dir(p), fmt.Sprintf(".%s.tmp.%d", name, rand.Int()))
defer func() {
if err != nil {
_ = c.sftpClient.Remove(tmp)
}
}()
}
ff, err := c.sftpClient.OpenFile(tmp, os.O_CREATE|os.O_WRONLY|os.O_TRUNC)
if err != nil {
return err
}
if !PutInplace {
defer func() { _ = c.sftpClient.Remove(tmp) }()
}
buf := bufPool.Get().(*[]byte)
defer bufPool.Put(buf)
_, err = io.CopyBuffer(ff, in, *buf)
Expand Down

0 comments on commit 984bf0d

Please sign in to comment.