From e89ecd3a4d3526240f9095e5b60bbcdcc4d25091 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 30 Nov 2023 12:26:28 +0800 Subject: [PATCH] object: fix thread-safety in list of ceph (#4216) --- pkg/object/ceph.go | 75 ++++++++++++++++++++++++---------------------- 1 file changed, 39 insertions(+), 36 deletions(-) diff --git a/pkg/object/ceph.go b/pkg/object/ceph.go index 7ccf83d7dc59..91701d00ed3f 100644 --- a/pkg/object/ceph.go +++ b/pkg/object/ceph.go @@ -208,46 +208,49 @@ func (c *ceph) Head(key string) (Object, error) { } func (c *ceph) ListAll(prefix, marker string, followLink bool) (<-chan Object, error) { - var objs = make(chan Object, 1000) - err := c.do(func(ctx *rados.IOContext) error { - iter, err := ctx.Iter() - if err != nil { - close(objs) - return err - } - defer iter.Close() + ctx, err := c.newContext() + if err != nil { + return nil, err + } + iter, err := ctx.Iter() + if err != nil { + ctx.Destroy() + return nil, err + } + defer iter.Close() - // FIXME: this will be really slow for many objects - keys := make([]string, 0, 1000) - for iter.Next() { - key := iter.Value() - if key <= marker || !strings.HasPrefix(key, prefix) { - continue - } - keys = append(keys, key) + // FIXME: this will be really slow for many objects + keys := make([]string, 0, 1000) + for iter.Next() { + key := iter.Value() + if key <= marker || !strings.HasPrefix(key, prefix) { + continue } - // the keys are not ordered, sort them first - sort.Strings(keys) - // TODO: parallel - go func() { - defer close(objs) - for _, key := range keys { - st, err := ctx.Stat(key) - if err != nil { - if errors.Is(err, rados.ErrNotFound) { - logger.Warnf("Skip non-existent key: %s", key) - continue - } - objs <- nil - logger.Errorf("Stat key %s: %s", key, err) - return + keys = append(keys, key) + } + // the keys are not ordered, sort them first + sort.Strings(keys) + + var objs = make(chan Object, 1000) + // TODO: parallel + go func() { + defer close(objs) + defer ctx.Destroy() + for _, key := range keys { + st, err := ctx.Stat(key) + if err != nil { + if errors.Is(err, rados.ErrNotFound) { + logger.Debugf("Skip non-existent key: %s", key) + continue } - objs <- &obj{key, int64(st.Size), st.ModTime, strings.HasSuffix(key, "/"), ""} + objs <- nil + logger.Errorf("Stat key %s: %s", key, err) + return } - }() - return nil - }) - return objs, err + objs <- &obj{key, int64(st.Size), st.ModTime, strings.HasSuffix(key, "/"), ""} + } + }() + return objs, nil } func newCeph(endpoint, cluster, user, token string) (ObjectStorage, error) {