diff --git a/.github/scripts/hypo/command.py b/.github/scripts/hypo/command.py index 5bbdefd581ab..f370df6eff09 100644 --- a/.github/scripts/hypo/command.py +++ b/.github/scripts/hypo/command.py @@ -155,7 +155,7 @@ def fsck(self, entry, repair=False, recuisive=False, user='root'): @rule( entry = Entries.filter(lambda x: x != multiple()), parent = Folders.filter(lambda x: x != multiple()), - new_entry_name = st_entry_name, + new_entry_name = st_file_name, user = st_sudo_user, preserve = st.booleans() ) diff --git a/.github/scripts/hypo/command_op.py b/.github/scripts/hypo/command_op.py index 35525470d7c5..c972ff550c76 100644 --- a/.github/scripts/hypo/command_op.py +++ b/.github/scripts/hypo/command_op.py @@ -8,8 +8,6 @@ __import__('xattr') except ImportError: subprocess.check_call(["pip", "install", "xattr"]) -from common import is_jfs, get_acl, get_root, get_stat -from typing import Dict try: __import__('psutil') except ImportError: diff --git a/.github/scripts/hypo/command_test.py b/.github/scripts/hypo/command_test.py index 6cd171dba77e..eb6629d61b31 100644 --- a/.github/scripts/hypo/command_test.py +++ b/.github/scripts/hypo/command_test.py @@ -5,7 +5,7 @@ class TestCommand(unittest.TestCase): def test_dump(self): state = JuicefsCommandMachine() folders_0 = state.init_folders() - files_0 = state.create_file(content=b'', file_name='aazz', mode='w', parent=folders_0, umask=312, user='root') + files_0 = state.create_file(content='', file_name='aazz', mode='w', parent=folders_0, umask=312, user='root') value = ''.join([chr(i) for i in range(256)]) value = value.encode('latin-1') value = b'\x2580q\x2589' @@ -17,14 +17,14 @@ def test_dump(self): def skip_test_info(self): state = JuicefsCommandMachine() folders_0 = state.init_folders() - files_2 = state.create_file(content=b'0', file_name='mvvd', mode='a', parent=folders_0, umask=293, user='root') + files_2 = state.create_file(content='0', file_name='mvvd', mode='a', parent=folders_0, umask=293, user='root') state.info(entry=folders_0, raw=True, recuisive=True, user='user1') state.teardown() def test_clone(self): state = JuicefsCommandMachine() v1 = state.init_folders() - v2 = state.create_file(content=b'\x9bcR\xba', file_name='ygbl', mode='x', parent=v1, umask=466, user='root') + v2 = state.create_file(content='\x9bcR\xba', file_name='ygbl', mode='x', parent=v1, umask=466, user='root') state.chmod(entry=v1, mode=715, user='root') state.clone(entry=v2, new_entry_name='drqj', parent=v1, preserve=False, user='user1') state.teardown() diff --git a/.github/scripts/sync/sync_cluster.sh b/.github/scripts/sync/sync_cluster.sh index 264b657ae555..8a16f796a00f 100755 --- a/.github/scripts/sync/sync_cluster.sh +++ b/.github/scripts/sync/sync_cluster.sh @@ -61,7 +61,7 @@ test_sync_without_mount_point(){ sudo -u juicedata meta_url=$META_URL ./juicefs sync -v jfs://meta_url/data/ minio://minioadmin:minioadmin@172.20.0.1:9000/data1/ \ --manager-addr 172.20.0.1:8081 --worker juicedata@172.20.0.2,juicedata@172.20.0.3 \ - --list-threads 10 --list-depth 5 \ + --list-threads 10 --list-depth 5 --check-new \ 2>&1 | tee sync.log # diff data/ /jfs/data1/ check_sync_log $file_count @@ -86,6 +86,10 @@ test_sync_without_mount_point2(){ --list-threads 10 --list-depth 5\ 2>&1 | tee sync.log check_sync_log $file_count + sudo -u juicedata meta_url=$META_URL ./juicefs sync -v minio://minioadmin:minioadmin@172.20.0.1:9000/data/ jfs://meta_url/ \ + --manager-addr 172.20.0.1:8081 --worker juicedata@172.20.0.2,juicedata@172.20.0.3 \ + --list-threads 10 --list-depth 5 --check-all \ + 2>&1 | tee sync.log ./juicefs mount -d $META_URL /jfs diff data/ /jfs/data/ ./mc rm -r --force myminio/data diff --git a/.github/workflows/command2.yml b/.github/workflows/command2.yml index 434344ffd82f..4b05c8568bb9 100644 --- a/.github/workflows/command2.yml +++ b/.github/workflows/command2.yml @@ -120,8 +120,8 @@ jobs: grep ":" /var/log/juicefs.log && exit 1 || true - name: Setup upterm session - # if: failure() && (github.event.inputs.debug == 'true' || github.run_attempt != 1) - if: failure() + if: failure() && (github.event.inputs.debug == 'true' || github.run_attempt != 1) + # if: failure() timeout-minutes: 60 uses: lhotari/action-upterm@v1 diff --git a/cmd/object.go b/cmd/object.go index 6ed029210d71..5b1af9190459 100644 --- a/cmd/object.go +++ b/cmd/object.go @@ -117,6 +117,9 @@ var bufPool = sync.Pool{ } func (j *juiceFS) Put(key string, in io.Reader, getters ...object.AttrGetter) (err error) { + if vfs.IsSpecialName(key) { + return fmt.Errorf("skip special file %s for jfs: %w", key, utils.ErrSkipped) + } p := j.path(key) if strings.HasSuffix(p, "/") { eno := j.jfs.MkdirAll(ctx, p, 0777, j.umask) diff --git a/go.mod b/go.mod index b740273a441f..98b0f49c0b39 100644 --- a/go.mod +++ b/go.mod @@ -46,7 +46,6 @@ require ( github.com/minio/minio v0.0.0-20210206053228-97fe57bba92c github.com/minio/minio-go/v7 v7.0.11-0.20210302210017-6ae69c73ce78 github.com/ncw/swift/v2 v2.0.1 - github.com/olekukonko/tablewriter v0.0.1 github.com/pingcap/log v1.1.1-0.20221015072633-39906604fb81 github.com/pkg/errors v0.9.1 github.com/pkg/sftp v1.13.5 @@ -66,6 +65,7 @@ require ( github.com/urfave/cli/v2 v2.19.3 github.com/vbauerster/mpb/v7 v7.0.3 github.com/viki-org/dnscache v0.0.0-20130720023526-c70c1f23c5d8 + github.com/vimeo/go-util v1.4.1 github.com/vmware/go-nfs-client v0.0.0-20190605212624-d43b92724c1b github.com/volcengine/ve-tos-golang-sdk/v2 v2.7.0 github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a diff --git a/go.sum b/go.sum index cf8b14ef3347..2e2d52248516 100644 --- a/go.sum +++ b/go.sum @@ -668,7 +668,6 @@ github.com/nrdcg/namesilo v0.2.1/go.mod h1:lwMvfQTyYq+BbjJd30ylEG4GPSS6PII0Tia4r github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= -github.com/olekukonko/tablewriter v0.0.1 h1:b3iUnf1v+ppJiOfNX4yxxqfWKMQPZR5yoh8urCTFX88= github.com/olekukonko/tablewriter v0.0.1/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/oliverisaac/shellescape v0.0.0-20220131224704-1b6c6b87b668 h1:WUilXdVrxYH+fFkmstviAOj1o9CfoW5O/Sd0LWPIVUA= github.com/oliverisaac/shellescape v0.0.0-20220131224704-1b6c6b87b668/go.mod h1:EDgl+cvbmeOQUMTTH94gjXVtFHr8xDe5BiXhWn7Hf1E= @@ -903,6 +902,8 @@ github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVS github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= github.com/viki-org/dnscache v0.0.0-20130720023526-c70c1f23c5d8 h1:EVObHAr8DqpoJCVv6KYTle8FEImKhtkfcZetNqxDoJQ= github.com/viki-org/dnscache v0.0.0-20130720023526-c70c1f23c5d8/go.mod h1:dniwbG03GafCjFohMDmz6Zc6oCuiqgH6tGNyXTkHzXE= +github.com/vimeo/go-util v1.4.1 h1:UbNoaYH1eHv4LqBSH6zIItj+zKqbln0i01oY3iA/QPM= +github.com/vimeo/go-util v1.4.1/go.mod h1:r+yspV//C48HeMXV8nEvtUeNiIiGfVv3bbEHzOgudwE= github.com/volcengine/ve-tos-golang-sdk/v2 v2.7.0 h1:MnTrrKb7gvWoI1W5GxVnjjzdSPmms4++JiR3ioqqoRc= github.com/volcengine/ve-tos-golang-sdk/v2 v2.7.0/go.mod h1:IrjK84IJJTuOZOTMv/P18Ydjy/x+ow7fF7q11jAxXLM= github.com/vultr/govultr v0.1.4/go.mod h1:9H008Uxr/C4vFNGLqKx232C206GL0PBHzOP0809bGNA= diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index 1d77fc1749d2..bf1b8a3f0b1b 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -908,14 +908,11 @@ func (m *redisMeta) Resolve(ctx Context, parent Ino, path string, inode *Ino, at } func (m *redisMeta) doGetAttr(ctx Context, inode Ino, attr *Attr) syscall.Errno { - return errno(m.rdb.Watch(ctx, func(tx *redis.Tx) error { - val, err := tx.Get(ctx, m.inodeKey(inode)).Bytes() - if err != nil { - return err - } - m.parseAttr(val, attr) - return nil - }, m.inodeKey(inode))) + a, err := m.rdb.Get(ctx, m.inodeKey(inode)).Bytes() + if err == nil { + m.parseAttr(a, attr) + } + return errno(err) } type timeoutError interface { @@ -4552,29 +4549,27 @@ func (m *redisMeta) doSetFacl(ctx Context, ino Ino, aclType uint8, rule *aclAPI. } func (m *redisMeta) doGetFacl(ctx Context, ino Ino, aclType uint8, aclId uint32, rule *aclAPI.Rule) syscall.Errno { - return errno(m.rdb.Watch(ctx, func(tx *redis.Tx) error { - if aclId == aclAPI.None { - val, err := tx.Get(ctx, m.inodeKey(ino)).Bytes() - if err != nil { - return err - } - attr := &Attr{} - m.parseAttr(val, attr) - m.of.Update(ino, attr) - - aclId = getAttrACLId(attr, aclType) - } - - a, err := m.getACL(ctx, tx, aclId) + if aclId == aclAPI.None { + val, err := m.rdb.Get(ctx, m.inodeKey(ino)).Bytes() if err != nil { - return err - } - if a == nil { - return ENOATTR + return errno(err) } - *rule = *a - return nil - }, m.inodeKey(ino))) + attr := &Attr{} + m.parseAttr(val, attr) + m.of.Update(ino, attr) + + aclId = getAttrACLId(attr, aclType) + } + + a, err := m.getACL(ctx, nil, aclId) + if err != nil { + return errno(err) + } + if a == nil { + return ENOATTR + } + *rule = *a + return 0 } func (m *redisMeta) getACL(ctx Context, tx *redis.Tx, id uint32) (*aclAPI.Rule, error) { @@ -4585,15 +4580,13 @@ func (m *redisMeta) getACL(ctx Context, tx *redis.Tx, id uint32) (*aclAPI.Rule, return cRule, nil } - cmds, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error { - pipe.HGet(ctx, m.aclKey(), strconv.FormatUint(uint64(id), 10)) - return nil - }) - if err != nil { - return nil, err + var val []byte + var err error + if tx != nil { + val, err = tx.HGet(ctx, m.aclKey(), strconv.FormatUint(uint64(id), 10)).Bytes() + } else { + val, err = m.rdb.HGet(ctx, m.aclKey(), strconv.FormatUint(uint64(id), 10)).Bytes() } - - val, err := cmds[0].(*redis.StringCmd).Bytes() if err != nil { return nil, err } diff --git a/pkg/sync/cluster.go b/pkg/sync/cluster.go index 85f4947a57a1..b5319ad87779 100644 --- a/pkg/sync/cluster.go +++ b/pkg/sync/cluster.go @@ -330,7 +330,16 @@ func launchWorker(address string, config *Config, wg *sync.WaitGroup) { func marshalObjects(objs []object.Object) ([]byte, error) { var arr []map[string]interface{} for _, o := range objs { - arr = append(arr, object.MarshalObject(o)) + obj := object.MarshalObject(o) + switch oo := o.(type) { + case *withSize: + obj["nsize"] = oo.nsize + obj["size"] = oo.Object.Size() + case *withFSize: + obj["fnsize"] = oo.nsize + obj["size"] = oo.File.Size() + } + arr = append(arr, obj) } return json.MarshalIndent(arr, "", " ") } @@ -343,7 +352,13 @@ func unmarshalObjects(d []byte) ([]object.Object, error) { } var objs []object.Object for _, m := range arr { - objs = append(objs, object.UnmarshalObject(m)) + obj := object.UnmarshalObject(m) + if nsize, ok := m["nsize"]; ok { + obj = &withSize{obj, int64(nsize.(float64))} + } else if fnsize, ok := m["fnsize"]; ok { + obj = &withFSize{obj.(object.File), int64(fnsize.(float64))} + } + objs = append(objs, obj) } return objs, nil } diff --git a/pkg/sync/cluster_test.go b/pkg/sync/cluster_test.go index 84e3c8834883..c95cd69cb181 100644 --- a/pkg/sync/cluster_test.go +++ b/pkg/sync/cluster_test.go @@ -16,6 +16,7 @@ package sync import ( + "os" "testing" "time" @@ -36,6 +37,9 @@ func (o *obj) Mtime() time.Time { return o.mtime } func (o *obj) IsDir() bool { return o.isDir } func (o *obj) IsSymlink() bool { return o.isSymlink } func (o *obj) StorageClass() string { return "" } +func (o *obj) Owner() string { return "" } +func (o *obj) Group() string { return "" } +func (o *obj) Mode() os.FileMode { return 0 } func TestCluster(t *testing.T) { // manager @@ -62,3 +66,28 @@ func TestCluster(t *testing.T) { t.Fatalf("should end") } } + +func TestMarshal(t *testing.T) { + var objs = []object.Object{ + &obj{key: "test"}, + &withSize{&obj{key: "test1", size: 100}, -4}, + &withFSize{&obj{key: "test2", size: 200}, -1}, + } + d, err := marshalObjects(objs) + if err != nil { + t.Fatal(err) + } + objs2, e := unmarshalObjects(d) + if e != nil { + t.Fatal(e) + } + if objs2[0].Key() != "test" { + t.Fatalf("expect test but got %s", objs2[0].Key()) + } + if objs2[1].Key() != "test1" || objs2[1].Size() != -4 || objs2[1].(*withSize).Object.Size() != 100 { + t.Fatalf("expect withSize but got %s", objs2[0].Key()) + } + if objs2[2].Key() != "test2" || objs2[2].Size() != -1 || objs2[2].(*withFSize).File.Size() != 200 { + t.Fatalf("expect withFSize but got %s", objs2[0].Key()) + } +} diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index 65645a07fb61..b6f122580a36 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -20,6 +20,7 @@ import ( "bytes" "errors" "fmt" + "hash/crc32" "io" "os" "path" @@ -33,6 +34,7 @@ import ( "github.com/juicedata/juicefs/pkg/utils" "github.com/juju/ratelimit" "github.com/prometheus/client_golang/prometheus" + "github.com/vimeo/go-util/crc32combine" ) // The max number of key per listing request @@ -57,9 +59,28 @@ var ( concurrent chan int limiter *ratelimit.Bucket ) - +var crcTable = crc32.MakeTable(crc32.Castagnoli) var logger = utils.GetLogger("juicefs") +type chksumReader struct { + io.Reader + chksum uint32 + cal bool +} + +func (r *chksumReader) Read(p []byte) (n int, err error) { + n, err = r.Reader.Read(p) + if r.cal { + r.chksum = crc32.Update(r.chksum, crcTable, p[:n]) + } + return +} + +type chksumWithSz struct { + chksum uint32 + size int64 +} + // human readable bytes size func formatSize(bytes int64) string { units := [7]string{" ", "K", "M", "G", "T", "P", "E"} @@ -184,7 +205,7 @@ var bufPool = sync.Pool{ func try(n int, f func() error) (err error) { for i := 0; i < n; i++ { err = f() - if err == nil { + if err == nil || errors.Is(err, utils.ErrSkipped) { return } logger.Debugf("Try %d failed: %s", i+1, err) @@ -231,74 +252,128 @@ func copyPerms(dst object.ObjectStorage, obj object.Object, config *Config) { logger.Debugf("Copied permissions (%s:%s:%s) for %s in %s", fi.Owner(), fi.Group(), fi.Mode(), key, time.Since(start)) } -func doCheckSum(src, dst object.ObjectStorage, key string, obj object.Object, config *Config, equal *bool) error { - if obj.IsSymlink() && config.Links && (config.CheckAll || config.CheckNew) { - var srcLink, dstLink string - var err error - if s, ok := src.(object.SupportSymlink); ok { - if srcLink, err = s.Readlink(key); err != nil { - return err - } +func calPartChksum(objStor object.ObjectStorage, key string, abort chan struct{}, offset, length int64) (uint32, error) { + if limiter != nil { + limiter.Wait(length) + } + select { + case <-abort: + return 0, fmt.Errorf("aborted") + case concurrent <- 1: + defer func() { + <-concurrent + }() + } + in, err := objStor.Get(key, offset, length) + if err != nil { + return 0, fmt.Errorf("dest get: %s", err) + } + defer in.Close() + + buf := bufPool.Get().(*[]byte) + defer bufPool.Put(buf) + var chksum uint32 + for left := int(length); left > 0; left -= bufferSize { + bs := bufferSize + if left < bufferSize { + bs = left } - if s, ok := dst.(object.SupportSymlink); ok { - if dstLink, err = s.Readlink(key); err != nil { - return err + *buf = (*buf)[:bs] + if _, err = io.ReadFull(in, *buf); err != nil { + return 0, fmt.Errorf("dest read: %s", err) + } + chksum = crc32.Update(chksum, crcTable, *buf) + } + return chksum, nil +} + +func calObjChksum(objStor object.ObjectStorage, key string, abort chan struct{}, obj object.Object) (uint32, error) { + var err error + var chksum uint32 + if obj.Size() < maxBlock { + return calPartChksum(objStor, key, abort, 0, obj.Size()) + } + n := int((obj.Size()-1)/defaultPartSize) + 1 + errs := make(chan error, n) + chksums := make([]chksumWithSz, n) + for i := 0; i < n; i++ { + go func(num int) { + sz := int64(defaultPartSize) + if num == n-1 { + sz = obj.Size() - int64(num)*defaultPartSize } + chksum, err := calPartChksum(objStor, key, abort, int64(num)*defaultPartSize, sz) + chksums[num] = chksumWithSz{chksum, sz} + errs <- err + }(i) + } + for i := 0; i < n; i++ { + if err = <-errs; err != nil { + close(abort) + break } - *equal = srcLink == dstLink && srcLink != "" && dstLink != "" - return nil } - abort := make(chan struct{}) - checkPart := func(offset, length int64) error { - if limiter != nil { - limiter.Wait(length) + if err != nil { + return 0, err + } + chksum = chksums[0].chksum + for i := 1; i < n; i++ { + chksum = crc32combine.CRC32Combine(crc32.Castagnoli, chksum, chksums[i].chksum, chksums[i].size) + } + return chksum, nil +} + +func compObjPartBinary(src, dst object.ObjectStorage, key string, abort chan struct{}, offset, length int64) error { + if limiter != nil { + limiter.Wait(length) + } + select { + case <-abort: + return fmt.Errorf("aborted") + case concurrent <- 1: + defer func() { + <-concurrent + }() + } + in, err := src.Get(key, offset, length) + if err != nil { + return fmt.Errorf("src get: %s", err) + } + defer in.Close() + in2, err := dst.Get(key, offset, length) + if err != nil { + return fmt.Errorf("dest get: %s", err) + } + defer in2.Close() + + buf := bufPool.Get().(*[]byte) + defer bufPool.Put(buf) + buf2 := bufPool.Get().(*[]byte) + defer bufPool.Put(buf2) + for left := int(length); left > 0; left -= bufferSize { + bs := bufferSize + if left < bufferSize { + bs = left } - select { - case <-abort: - return fmt.Errorf("aborted") - case concurrent <- 1: - defer func() { - <-concurrent - }() + *buf = (*buf)[:bs] + *buf2 = (*buf2)[:bs] + if _, err = io.ReadFull(in, *buf); err != nil { + return fmt.Errorf("src read: %s", err) } - in, err := src.Get(key, offset, length) - if err != nil { - return fmt.Errorf("src get: %s", err) + if _, err = io.ReadFull(in2, *buf2); err != nil { + return fmt.Errorf("dest read: %s", err) } - defer in.Close() - in2, err := dst.Get(key, offset, length) - if err != nil { - return fmt.Errorf("dest get: %s", err) - } - defer in2.Close() - - buf := bufPool.Get().(*[]byte) - defer bufPool.Put(buf) - buf2 := bufPool.Get().(*[]byte) - defer bufPool.Put(buf2) - for left := int(length); left > 0; left -= bufferSize { - bs := bufferSize - if left < bufferSize { - bs = left - } - *buf = (*buf)[:bs] - *buf2 = (*buf2)[:bs] - if _, err = io.ReadFull(in, *buf); err != nil { - return fmt.Errorf("src read: %s", err) - } - if _, err = io.ReadFull(in2, *buf2); err != nil { - return fmt.Errorf("dest read: %s", err) - } - if !bytes.Equal(*buf, *buf2) { - return fmt.Errorf("bytes not equal") - } + if !bytes.Equal(*buf, *buf2) { + return fmt.Errorf("bytes not equal") } - return nil } + return nil +} +func compObjBinary(src, dst object.ObjectStorage, key string, abort chan struct{}, obj object.Object) (bool, error) { var err error if obj.Size() < maxBlock { - err = checkPart(0, obj.Size()) + err = compObjPartBinary(src, dst, key, abort, 0, obj.Size()) } else { n := int((obj.Size()-1)/defaultPartSize) + 1 errs := make(chan error, n) @@ -308,7 +383,7 @@ func doCheckSum(src, dst object.ObjectStorage, key string, obj object.Object, co if num == n-1 { sz = obj.Size() - int64(num)*defaultPartSize } - errs <- checkPart(int64(num)*defaultPartSize, sz) + errs <- compObjPartBinary(src, dst, key, abort, int64(num)*defaultPartSize, sz) }(i) } for i := 0; i < n; i++ { @@ -318,20 +393,54 @@ func doCheckSum(src, dst object.ObjectStorage, key string, obj object.Object, co } } } - + equal := false if err != nil && err.Error() == "bytes not equal" { - *equal = false err = nil } else { - *equal = err == nil + equal = err == nil + } + return equal, err +} + +func doCheckSum(src, dst object.ObjectStorage, key string, srcChksumPtr *uint32, obj object.Object, config *Config, equal *bool) error { + if obj.IsSymlink() && config.Links && (config.CheckAll || config.CheckNew) { + var srcLink, dstLink string + var err error + if s, ok := src.(object.SupportSymlink); ok { + if srcLink, err = s.Readlink(key); err != nil { + return err + } + } + if s, ok := dst.(object.SupportSymlink); ok { + if dstLink, err = s.Readlink(key); err != nil { + return err + } + } + *equal = srcLink == dstLink && srcLink != "" && dstLink != "" + return nil + } + abort := make(chan struct{}) + var err error + if srcChksumPtr != nil { + var srcChksum uint32 + var dstChksum uint32 + srcChksum = *srcChksumPtr + dstChksum, err = calObjChksum(dst, key, abort, obj) + if err == nil { + *equal = srcChksum == dstChksum + } else { + *equal = false + } + } else { + *equal, err = compObjBinary(src, dst, key, abort, obj) } return err } -func checkSum(src, dst object.ObjectStorage, key string, obj object.Object, config *Config) (bool, error) { +func checkSum(src, dst object.ObjectStorage, key string, srcChksum *uint32, obj object.Object, config *Config) (bool, error) { start := time.Now() var equal bool - err := try(3, func() error { return doCheckSum(src, dst, key, obj, config, &equal) }) + err := try(3, func() error { return doCheckSum(src, dst, key, srcChksum, obj, config, &equal) }) if err == nil { checked.Increment() checkedBytes.IncrInt64(obj.Size()) @@ -355,7 +464,7 @@ func inMap(obj object.ObjectStorage, m map[string]struct{}) bool { return ok } -func doCopySingle(src, dst object.ObjectStorage, key string, size int64) error { +func doCopySingle(src, dst object.ObjectStorage, key string, size int64, calChksum bool) (uint32, error) { if size > maxBlock && !inMap(dst, readInMem) && !inMap(src, fastStreamRead) { var err error var in io.Reader @@ -368,19 +477,21 @@ func doCopySingle(src, dst object.ObjectStorage, key string, size int64) error { // download the object into disk if f, err = os.CreateTemp("", "rep"); err != nil { logger.Warnf("create temp file: %s", err) - return doCopySingle0(src, dst, key, size) + return doCopySingle0(src, dst, key, size, calChksum) } _ = os.Remove(f.Name()) // will be deleted after Close() defer f.Close() buf := bufPool.Get().(*[]byte) defer bufPool.Put(buf) + // hide f.ReadFrom to avoid discarding buf if _, err = io.CopyBuffer(struct{ io.Writer }{f}, downer, *buf); err == nil { _, err = f.Seek(0, 0) in = f } } + r := &chksumReader{in, 0, calChksum} if err == nil { - err = dst.Put(key, in) + err = dst.Put(key, r) } if err != nil { if _, e := src.Head(key); os.IsNotExist(e) { @@ -389,12 +500,12 @@ func doCopySingle(src, dst object.ObjectStorage, key string, size int64) error { err = nil } } - return err + return r.chksum, err } - return doCopySingle0(src, dst, key, size) + return doCopySingle0(src, dst, key, size, calChksum) } -func doCopySingle0(src, dst object.ObjectStorage, key string, size int64) error { +func doCopySingle0(src, dst object.ObjectStorage, key string, size int64, calChksum bool) (uint32, error) { concurrent <- 1 defer func() { <-concurrent @@ -406,7 +517,7 @@ func doCopySingle0(src, dst object.ObjectStorage, key string, size int64) error // for check permissions r, err := src.Get(key, 0, -1) if err != nil { - return err + return 0, err } _ = r.Close() } @@ -419,11 +530,13 @@ func doCopySingle0(src, dst object.ObjectStorage, key string, size int64) error copied.IncrInt64(-1) err = nil } - return err + return 0, err } } + r := &chksumReader{in, 0, calChksum} defer in.Close() - return dst.Put(key, &withProgress{in}) + err = dst.Put(key, &withProgress{r}) + return r.chksum, err } type withProgress struct { @@ -439,7 +552,7 @@ func (w *withProgress) Read(b []byte) (int, error) { return n, err } -func doUploadPart(src, dst object.ObjectStorage, srckey string, off, size int64, key, uploadID string, num int) (*object.Part, error) { +func doUploadPart(src, dst object.ObjectStorage, srckey string, off, size int64, key, uploadID string, num int, calChksum bool) (*object.Part, uint32, error) { if limiter != nil { limiter.Wait(size) } @@ -449,26 +562,29 @@ func doUploadPart(src, dst object.ObjectStorage, srckey string, off, size int64, defer p.Release() data := p.Data var part *object.Part + var chksum uint32 err := try(3, func() error { in, err := src.Get(srckey, off, sz) if err != nil { return err } defer in.Close() - if _, err = io.ReadFull(in, data); err != nil { + r := &chksumReader{in, 0, calChksum} + if _, err = io.ReadFull(r, data); err != nil { return err } + chksum = r.chksum // PartNumber starts from 1 part, err = dst.UploadPart(key, uploadID, num+1, data) return err }) if err != nil { logger.Warnf("Failed to copy data of %s part %d: %s", key, num, err) - return nil, fmt.Errorf("part %d: %s", num, err) + return nil, 0, fmt.Errorf("part %d: %s", num, err) } logger.Debugf("Copied data of %s part %d in %s", key, num, time.Since(start)) copiedBytes.IncrInt64(sz) - return part, nil + return part, chksum, nil } func choosePartSize(upload *object.MultipartUpload, size int64) int64 { @@ -483,10 +599,10 @@ func choosePartSize(upload *object.MultipartUpload, size int64) int64 { return partSize } -func doCopyRange(src, dst object.ObjectStorage, key string, off, size int64, upload *object.MultipartUpload, num int, abort chan struct{}) (*object.Part, error) { +func doCopyRange(src, dst object.ObjectStorage, key string, off, size int64, upload *object.MultipartUpload, num int, abort chan struct{}, calChksum bool) (*object.Part, uint32, error) { select { case <-abort: - return nil, fmt.Errorf("aborted") + return nil, 0, fmt.Errorf("aborted") case concurrent <- 1: defer func() { <-concurrent @@ -495,7 +611,7 @@ func doCopyRange(src, dst object.ObjectStorage, key string, off, size int64, upl limits := dst.Limits() if size <= 32<<20 || !limits.IsSupportUploadPartCopy { - return doUploadPart(src, dst, key, off, size, key, upload.UploadID, num) + return doUploadPart(src, dst, key, off, size, key, upload.UploadID, num, calChksum) } tmpkey := fmt.Sprintf("%s.part%d", key, num) @@ -506,13 +622,15 @@ func doCopyRange(src, dst object.ObjectStorage, key string, off, size int64, upl return err }) if err != nil { - return nil, fmt.Errorf("range(%d,%d): %s", off, size, err) + return nil, 0, fmt.Errorf("range(%d,%d): %s", off, size, err) } partSize := choosePartSize(up, size) n := int((size-1)/partSize) + 1 logger.Debugf("Copying data of %s (range: %d,%d) as %d parts (size: %d): %s", key, off, size, n, partSize, up.UploadID) parts := make([]*object.Part, n) + var tmpChksum uint32 + first := true for i := 0; i < n; i++ { sz := partSize @@ -522,20 +640,29 @@ func doCopyRange(src, dst object.ObjectStorage, key string, off, size int64, upl select { case <-abort: dst.AbortUpload(tmpkey, up.UploadID) - return nil, fmt.Errorf("aborted") + return nil, 0, fmt.Errorf("aborted") default: } - parts[i], err = doUploadPart(src, dst, key, off+int64(i)*partSize, sz, tmpkey, up.UploadID, i) + var chksum uint32 + parts[i], chksum, err = doUploadPart(src, dst, key, off+int64(i)*partSize, sz, tmpkey, up.UploadID, i, calChksum) if err != nil { dst.AbortUpload(tmpkey, up.UploadID) - return nil, fmt.Errorf("range(%d,%d): %s", off, size, err) + return nil, 0, fmt.Errorf("range(%d,%d): %s", off, size, err) + } + if calChksum { + if first { + tmpChksum = chksum + first = false + } else { + tmpChksum = crc32combine.CRC32Combine(crc32.Castagnoli, tmpChksum, chksum, sz) + } } } err = try(3, func() error { return dst.CompleteUpload(tmpkey, up.UploadID, parts) }) if err != nil { dst.AbortUpload(tmpkey, up.UploadID) - return nil, fmt.Errorf("multipart: %s", err) + return nil, 0, fmt.Errorf("multipart: %s", err) } var part *object.Part err = try(3, func() error { @@ -543,13 +670,13 @@ func doCopyRange(src, dst object.ObjectStorage, key string, off, size int64, upl return err }) _ = dst.Delete(tmpkey) - return part, err + return part, tmpChksum, err } -func doCopyMultiple(src, dst object.ObjectStorage, key string, size int64, upload *object.MultipartUpload) error { +func doCopyMultiple(src, dst object.ObjectStorage, key string, size int64, upload *object.MultipartUpload, calChksum bool) (uint32, error) { limits := dst.Limits() if size > limits.MaxPartSize*int64(upload.MaxCount) { - return fmt.Errorf("object size %d is too large to copy", size) + return 0, fmt.Errorf("object size %d is too large to copy", size) } partSize := choosePartSize(upload, size) @@ -558,6 +685,7 @@ func doCopyMultiple(src, dst object.ObjectStorage, key string, size int64, uploa abort := make(chan struct{}) parts := make([]*object.Part, n) errs := make(chan error, n) + chksums := make([]chksumWithSz, n) var err error for i := 0; i < n; i++ { @@ -567,7 +695,9 @@ func doCopyMultiple(src, dst object.ObjectStorage, key string, size int64, uploa sz = size - int64(num)*partSize } var copyErr error - parts[num], copyErr = doCopyRange(src, dst, key, int64(num)*partSize, sz, upload, num, abort) + var chksum uint32 + parts[num], chksum, copyErr = doCopyRange(src, dst, key, int64(num)*partSize, sz, upload, num, abort, calChksum) + chksums[num] = chksumWithSz{chksum, sz} errs <- copyErr }(i) } @@ -583,28 +713,43 @@ func doCopyMultiple(src, dst object.ObjectStorage, key string, size int64, uploa } if err != nil { dst.AbortUpload(key, upload.UploadID) - return fmt.Errorf("multipart: %s", err) + return 0, fmt.Errorf("multipart: %s", err) } - return nil + var chksum uint32 + if calChksum { + chksum = chksums[0].chksum + for i := 1; i < n; i++ { + chksum = crc32combine.CRC32Combine(crc32.Castagnoli, chksum, chksums[i].chksum, chksums[i].size) + } + } + + return chksum, nil } -func copyData(src, dst object.ObjectStorage, key string, size int64) error { +func copyData(src, dst object.ObjectStorage, key string, size int64, calChksum bool) (uint32, error) { start := time.Now() var err error + var srcChksum uint32 if size < maxBlock { - err = try(3, func() error { return doCopySingle(src, dst, key, size) }) + err = try(3, func() (err error) { + srcChksum, err = doCopySingle(src, dst, key, size, calChksum) + return + }) } else { var upload *object.MultipartUpload if upload, err = dst.CreateMultipartUpload(key); err == nil { - err = doCopyMultiple(src, dst, key, size, upload) + srcChksum, err = doCopyMultiple(src, dst, key, size, upload, calChksum) } else if err == utils.ENOTSUP { - err = try(3, func() error { return doCopySingle(src, dst, key, size) }) + err = try(3, func() (err error) { + srcChksum, err = doCopySingle(src, dst, key, size, calChksum) + return + }) } else { // other error retry if err = try(2, func() error { upload, err = dst.CreateMultipartUpload(key) return err }); err == nil { - err = doCopyMultiple(src, dst, key, size, upload) + srcChksum, err = doCopyMultiple(src, dst, key, size, upload, calChksum) } } } @@ -613,7 +758,7 @@ func copyData(src, dst object.ObjectStorage, key string, size int64) error { } else { logger.Errorf("Failed to copy data of %s in %s: %s", key, time.Since(start), err) } - return err + return srcChksum, err } func worker(tasks <-chan object.Object, src, dst object.ObjectStorage, config *Config) { @@ -638,7 +783,7 @@ func worker(tasks <-chan object.Object, src, dst object.ObjectStorage, config *C break } obj = obj.(*withSize).Object - if equal, err := checkSum(src, dst, key, obj, config); err != nil { + if equal, err := checkSum(src, dst, key, nil, obj, config); err != nil { failed.Increment() break } else if equal { @@ -673,17 +818,18 @@ func worker(tasks <-chan object.Object, src, dst object.ObjectStorage, config *C break } var err error + var srcChksum uint32 if config.Links && obj.IsSymlink() { if err = copyLink(src, dst, key); err != nil { logger.Errorf("copy link failed: %s", err) } } else { - err = copyData(src, dst, key, obj.Size()) + srcChksum, err = copyData(src, dst, key, obj.Size(), config.CheckAll || config.CheckNew) } if err == nil && (config.CheckAll || config.CheckNew) { var equal bool - if equal, err = checkSum(src, dst, key, obj, config); err == nil && !equal { + if equal, err = checkSum(src, dst, key, &srcChksum, obj, config); err == nil && !equal { err = fmt.Errorf("checksums of copied object %s don't match", key) } } @@ -697,6 +843,8 @@ func worker(tasks <-chan object.Object, src, dst object.ObjectStorage, config *C copyPerms(dst, obj, config) } copied.Increment() + } else if errors.Is(err, utils.ErrSkipped) { + skipped.Increment() } else { failed.Increment() logger.Errorf("Failed to copy object %s: %s", key, err) diff --git a/pkg/utils/errors.go b/pkg/utils/errors.go index d93bf5b81231..a1da3c790277 100644 --- a/pkg/utils/errors.go +++ b/pkg/utils/errors.go @@ -23,4 +23,5 @@ import ( var ( ENOTSUP = errors.New("not supported") ErrFuncTimeout = errors.New("function timeout") + ErrSkipped = errors.New("skipped") )