Skip to content

Commit

Permalink
object: fix thread-safety in list of ceph (#4216)
Browse files Browse the repository at this point in the history
  • Loading branch information
davies authored Nov 30, 2023
1 parent 984bf0d commit e89ecd3
Showing 1 changed file with 39 additions and 36 deletions.
75 changes: 39 additions & 36 deletions pkg/object/ceph.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,46 +208,49 @@ func (c *ceph) Head(key string) (Object, error) {
}

func (c *ceph) ListAll(prefix, marker string, followLink bool) (<-chan Object, error) {
var objs = make(chan Object, 1000)
err := c.do(func(ctx *rados.IOContext) error {
iter, err := ctx.Iter()
if err != nil {
close(objs)
return err
}
defer iter.Close()
ctx, err := c.newContext()
if err != nil {
return nil, err
}
iter, err := ctx.Iter()
if err != nil {
ctx.Destroy()
return nil, err
}
defer iter.Close()

// FIXME: this will be really slow for many objects
keys := make([]string, 0, 1000)
for iter.Next() {
key := iter.Value()
if key <= marker || !strings.HasPrefix(key, prefix) {
continue
}
keys = append(keys, key)
// FIXME: this will be really slow for many objects
keys := make([]string, 0, 1000)
for iter.Next() {
key := iter.Value()
if key <= marker || !strings.HasPrefix(key, prefix) {
continue
}
// the keys are not ordered, sort them first
sort.Strings(keys)
// TODO: parallel
go func() {
defer close(objs)
for _, key := range keys {
st, err := ctx.Stat(key)
if err != nil {
if errors.Is(err, rados.ErrNotFound) {
logger.Warnf("Skip non-existent key: %s", key)
continue
}
objs <- nil
logger.Errorf("Stat key %s: %s", key, err)
return
keys = append(keys, key)
}
// the keys are not ordered, sort them first
sort.Strings(keys)

var objs = make(chan Object, 1000)
// TODO: parallel
go func() {
defer close(objs)
defer ctx.Destroy()
for _, key := range keys {
st, err := ctx.Stat(key)
if err != nil {
if errors.Is(err, rados.ErrNotFound) {
logger.Debugf("Skip non-existent key: %s", key)
continue
}
objs <- &obj{key, int64(st.Size), st.ModTime, strings.HasSuffix(key, "/"), ""}
objs <- nil
logger.Errorf("Stat key %s: %s", key, err)
return
}
}()
return nil
})
return objs, err
objs <- &obj{key, int64(st.Size), st.ModTime, strings.HasSuffix(key, "/"), ""}
}
}()
return objs, nil
}

func newCeph(endpoint, cluster, user, token string) (ObjectStorage, error) {
Expand Down

0 comments on commit e89ecd3

Please sign in to comment.