diff --git a/pkg/local_object_storage/engine/container.go b/pkg/local_object_storage/engine/container.go index d933d18fcf..7fe6a63557 100644 --- a/pkg/local_object_storage/engine/container.go +++ b/pkg/local_object_storage/engine/container.go @@ -16,26 +16,20 @@ import ( // // Returns an error if executions are blocked (see BlockExecution). func (e *StorageEngine) ContainerSize(cnr cid.ID) (uint64, error) { - var ( - err error - size uint64 - ) if e.metrics != nil { defer elapsed(e.metrics.AddEstimateContainerSizeDuration)() } - err = e.execIfNotBlocked(func() error { - size, err = e.containerSize(cnr) - return err - }) + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() - return size, err -} + if e.blockErr != nil { + return 0, e.blockErr + } -func (e *StorageEngine) containerSize(cnr cid.ID) (uint64, error) { var size uint64 - e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + for _, sh := range e.unsortedShards() { var csPrm shard.ContainerSizePrm csPrm.SetContainerID(cnr) @@ -43,13 +37,11 @@ func (e *StorageEngine) containerSize(cnr cid.ID) (uint64, error) { if err != nil { e.reportShardError(sh, "can't get container size", err, zap.Stringer("container_id", cnr)) - return false + continue } size += csRes.Size() - - return false - }) + } return size, nil } @@ -58,30 +50,24 @@ func (e *StorageEngine) containerSize(cnr cid.ID) (uint64, error) { // // Returns an error if executions are blocked (see BlockExecution). func (e *StorageEngine) ListContainers() ([]cid.ID, error) { - var ( - res []cid.ID - err error - ) if e.metrics != nil { defer elapsed(e.metrics.AddListContainersDuration)() } - err = e.execIfNotBlocked(func() error { - res, err = e.listContainers() - return err - }) + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() - return res, err -} + if e.blockErr != nil { + return nil, e.blockErr + } -func (e *StorageEngine) listContainers() ([]cid.ID, error) { uniqueIDs := make(map[cid.ID]struct{}) - e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + for _, sh := range e.unsortedShards() { res, err := sh.Shard.ListContainers(shard.ListContainersPrm{}) if err != nil { e.reportShardError(sh, "can't get list of containers", err) - return false + continue } for _, cnr := range res.Containers() { @@ -89,9 +75,7 @@ func (e *StorageEngine) listContainers() ([]cid.ID, error) { uniqueIDs[cnr] = struct{}{} } } - - return false - }) + } result := make([]cid.ID, 0, len(uniqueIDs)) for cnr := range uniqueIDs { @@ -103,27 +87,30 @@ func (e *StorageEngine) listContainers() ([]cid.ID, error) { // DeleteContainer deletes container's objects that engine stores. func (e *StorageEngine) DeleteContainer(ctx context.Context, cID cid.ID) error { - return e.execIfNotBlocked(func() error { - var wg errgroup.Group + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() + + if e.blockErr != nil { + return e.blockErr + } - e.iterateOverUnsortedShards(func(hs hashedShard) bool { - wg.Go(func() error { - err := hs.Shard.DeleteContainer(ctx, cID) - if err != nil { - err = fmt.Errorf("container cleanup in %s shard: %w", hs.ID(), err) - e.log.Warn("container cleanup", zap.Error(err)) + var wg errgroup.Group - return err - } + for _, sh := range e.unsortedShards() { + wg.Go(func() error { + err := sh.Shard.DeleteContainer(ctx, cID) + if err != nil { + err = fmt.Errorf("container cleanup in %s shard: %w", sh.ID(), err) + e.log.Warn("container cleanup", zap.Error(err)) - return nil - }) + return err + } - return false + return nil }) + } - return wg.Wait() - }) + return wg.Wait() } func (e *StorageEngine) deleteNotFoundContainers() error { diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index 7768720b50..051a62ebf0 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -99,20 +99,6 @@ func (e *StorageEngine) close(releasePools bool) error { return nil } -// executes op if execution is not blocked, otherwise returns blocking error. -// -// Can be called concurrently with setBlockExecErr. -func (e *StorageEngine) execIfNotBlocked(op func() error) error { - e.blockExec.mtx.RLock() - defer e.blockExec.mtx.RUnlock() - - if e.blockExec.err != nil { - return e.blockExec.err - } - - return op() -} - // sets the flag of blocking execution of all data operations according to err: // - err != nil, then blocks the execution. If exec wasn't blocked, calls close method // (if err == errClosed => additionally releases pools and does not allow to resume executions). @@ -120,17 +106,16 @@ func (e *StorageEngine) execIfNotBlocked(op func() error) error { // // Can be called concurrently with exec. In this case it waits for all executions to complete. func (e *StorageEngine) setBlockExecErr(err error) error { - e.blockExec.mtx.Lock() - defer e.blockExec.mtx.Unlock() + e.blockMtx.Lock() + defer e.blockMtx.Unlock() - prevErr := e.blockExec.err + prevErr := e.blockErr - wasClosed := errors.Is(prevErr, errClosed) - if wasClosed { + if errors.Is(prevErr, errClosed) { return errClosed } - e.blockExec.err = err + e.blockErr = err if err == nil { if prevErr != nil { // block -> ok diff --git a/pkg/local_object_storage/engine/delete.go b/pkg/local_object_storage/engine/delete.go index 185b9a2dcb..ceecb3a8a6 100644 --- a/pkg/local_object_storage/engine/delete.go +++ b/pkg/local_object_storage/engine/delete.go @@ -15,9 +15,13 @@ func (e *StorageEngine) Delete(addr oid.Address) error { defer elapsed(e.metrics.AddDeleteDuration)() } - return e.execIfNotBlocked(func() error { - return e.deleteObj(addr, true) - }) + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() + + if e.blockErr != nil { + return e.blockErr + } + return e.deleteObj(addr, true) } func (e *StorageEngine) deleteObj(addr oid.Address, force bool) error { diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index c18c53ced1..7e5f47d207 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -30,11 +30,8 @@ type StorageEngine struct { setModeCh chan setModeRequest wg sync.WaitGroup - blockExec struct { - mtx sync.RWMutex - - err error - } + blockMtx sync.RWMutex + blockErr error } type shardWrapper struct { @@ -137,7 +134,7 @@ func (e *StorageEngine) reportShardErrorBackground(id string, msg string, err er // reportShardError checks that the amount of errors doesn't exceed the configured threshold. // If it does, shard is set to read-only mode. func (e *StorageEngine) reportShardError( - sh hashedShard, + sh shardWrapper, msg string, err error, fields ...zap.Field) { diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index a1629316e9..b42edad30f 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -16,7 +16,7 @@ import ( const defaultEvacuateBatchSize = 100 type pooledShard struct { - hashedShard + shardWrapper pool util.WorkerPool } @@ -63,8 +63,8 @@ func (e *StorageEngine) Evacuate(shardIDs []*shard.ID, ignoreErrors bool, faultH shards := make([]pooledShard, 0, len(e.shards)) for id := range e.shards { shards = append(shards, pooledShard{ - hashedShard: hashedShard(e.shards[id]), - pool: e.shardPools[id], + shardWrapper: e.shards[id], + pool: e.shardPools[id], }) } e.mtx.RUnlock() @@ -125,7 +125,7 @@ mainLoop: if _, ok := shardMap[shards[j].ID().String()]; ok { continue } - putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, getRes.Object(), nil, 0) + putDone, exists := e.putToShard(shards[j].shardWrapper, j, shards[j].pool, addr, getRes.Object(), nil, 0) if putDone || exists { if putDone { e.log.Debug("object is moved to another shard", diff --git a/pkg/local_object_storage/engine/exists.go b/pkg/local_object_storage/engine/exists.go index 16b5d8fb96..34004b8107 100644 --- a/pkg/local_object_storage/engine/exists.go +++ b/pkg/local_object_storage/engine/exists.go @@ -12,45 +12,33 @@ import ( func (e *StorageEngine) exists(addr oid.Address) (bool, error) { var shPrm shard.ExistsPrm shPrm.SetAddress(addr) - alreadyRemoved := false - exists := false - e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { + for _, sh := range e.sortedShards(addr) { res, err := sh.Exists(shPrm) if err != nil { if shard.IsErrRemoved(err) { - alreadyRemoved = true - - return true + return false, apistatus.ObjectAlreadyRemoved{} } var siErr *objectSDK.SplitInfoError if errors.As(err, &siErr) { - return true + return false, nil } if shard.IsErrObjectExpired(err) { - return true + return false, nil } if !shard.IsErrNotFound(err) { e.reportShardError(sh, "could not check existence of object in shard", err) } - return false + continue } - if !exists { - exists = res.Exists() + if res.Exists() { + return true, nil } - - return false - }) - - if alreadyRemoved { - var errRemoved apistatus.ObjectAlreadyRemoved - - return false, errRemoved } - return exists, nil + return false, nil } diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index d7ac1941c9..ed4ba2249d 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -22,143 +22,142 @@ import ( // // Returns an error if executions are blocked (see BlockExecution). func (e *StorageEngine) Get(addr oid.Address) (*objectSDK.Object, error) { + if e.metrics != nil { + defer elapsed(e.metrics.AddGetDuration)() + } + + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() + + if e.blockErr != nil { + return nil, e.blockErr + } + var ( err error obj *objectSDK.Object sp shard.GetPrm ) - - if e.metrics != nil { - defer elapsed(e.metrics.AddGetDuration)() - } - sp.SetAddress(addr) - err = e.execIfNotBlocked(func() error { - return e.get(addr, func(s *shard.Shard, ignoreMetadata bool) (bool, error) { - sp.SetIgnoreMeta(ignoreMetadata) - sr, err := s.Get(sp) - if err != nil { - return sr.HasMeta(), err - } - obj = sr.Object() - return sr.HasMeta(), nil - }) - }) + err = e.get(addr, func(s *shard.Shard, ignoreMetadata bool) (bool, error) { + sp.SetIgnoreMeta(ignoreMetadata) + sr, err := s.Get(sp) + if err != nil { + return sr.HasMeta(), err + } + obj = sr.Object() + return sr.HasMeta(), nil + }) return obj, err } func (e *StorageEngine) get(addr oid.Address, shardFunc func(s *shard.Shard, ignoreMetadata bool) (hasMetadata bool, err error)) error { var ( - ok bool - siErr *objectSDK.SplitInfoError - - errNotFound apistatus.ObjectNotFound - - outSI *objectSDK.SplitInfo - outError error = errNotFound - - shardWithMeta hashedShard + hasDegraded bool + shardWithMeta shardWrapper + splitInfo *objectSDK.SplitInfo metaError error ) - var hasDegraded bool - var objectExpired bool - - e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { + for _, sh := range e.sortedShards(addr) { noMeta := sh.GetMode().NoMetabase() hasDegraded = hasDegraded || noMeta hasMetadata, err := shardFunc(sh.Shard, noMeta) if err != nil { + var siErr *objectSDK.SplitInfoError + if hasMetadata { shardWithMeta = sh metaError = err } switch { case shard.IsErrNotFound(err): - return false // ignore, go to next shard + continue // ignore, go to next shard case errors.As(err, &siErr): - if outSI == nil { - outSI = objectSDK.NewSplitInfo() + if splitInfo == nil { + splitInfo = objectSDK.NewSplitInfo() } - util.MergeSplitInfo(siErr.SplitInfo(), outSI) + util.MergeSplitInfo(siErr.SplitInfo(), splitInfo) // stop iterating over shards if SplitInfo structure is complete - return !outSI.GetLink().IsZero() && !outSI.GetLastPart().IsZero() - case shard.IsErrRemoved(err): - outError = err - - return true // stop, return it back + if !splitInfo.GetLink().IsZero() && !splitInfo.GetLastPart().IsZero() { + return logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo)) + } + continue + case + shard.IsErrRemoved(err), + shard.IsErrOutOfRange(err): + return err // stop, return it back case shard.IsErrObjectExpired(err): // object is found but should not // be returned - objectExpired = true - return true + return apistatus.ObjectNotFound{} default: e.reportShardError(sh, "could not get object from shard", err) - return false + continue } } - ok = true - - return true - }) + return nil // shardFunc is successful and it has the result + } - if outSI != nil { - return logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) + if splitInfo != nil { + return logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo)) } - if objectExpired { - return errNotFound + if !hasDegraded && shardWithMeta.Shard == nil { + return apistatus.ObjectNotFound{} } - if !ok { - if !hasDegraded && shardWithMeta.Shard == nil || !shard.IsErrNotFound(outError) { - return outError + // If the object is not found but is present in metabase, + // try to fetch it from blobstor directly. If it is found in any + // blobstor, increase the error counter for the shard which contains the meta. + for _, sh := range e.sortedShards(addr) { + if sh.GetMode().NoMetabase() { + // Already visited. + continue } - // If the object is not found but is present in metabase, - // try to fetch it from blobstor directly. If it is found in any - // blobstor, increase the error counter for the shard which contains the meta. - e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { - if sh.GetMode().NoMetabase() { - // Already visited. - return false - } - - _, err := shardFunc(sh.Shard, true) - ok = err == nil - return ok - }) - if !ok { - return outError + _, err := shardFunc(sh.Shard, true) + if shard.IsErrOutOfRange(err) { + return apistatus.ObjectOutOfRange{} } - if shardWithMeta.Shard != nil { - e.reportShardError(shardWithMeta, "meta info was present, but object is missing", - metaError, zap.Stringer("address", addr)) + if err == nil { + if shardWithMeta.Shard != nil { + e.reportShardError(shardWithMeta, "meta info was present, but object is missing", + metaError, zap.Stringer("address", addr)) + } + return nil } } - - return nil + return apistatus.ObjectNotFound{} } // GetBytes reads object from the StorageEngine by address into memory buffer in // a canonical NeoFS binary format. Returns [apistatus.ObjectNotFound] if object // is missing. func (e *StorageEngine) GetBytes(addr oid.Address) ([]byte, error) { - var b []byte - err := e.execIfNotBlocked(func() error { - return e.get(addr, func(s *shard.Shard, ignoreMetadata bool) (hasMetadata bool, err error) { - if ignoreMetadata { - b, err = s.GetBytes(addr) - } else { - b, hasMetadata, err = s.GetBytesWithMetadataLookup(addr) - } - return - }) + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() + + if e.blockErr != nil { + return nil, e.blockErr + } + + var ( + b []byte + err error + ) + err = e.get(addr, func(s *shard.Shard, ignoreMetadata bool) (hasMetadata bool, err error) { + if ignoreMetadata { + b, err = s.GetBytes(addr) + } else { + b, hasMetadata, err = s.GetBytesWithMetadataLookup(addr) + } + return }) return b, err } diff --git a/pkg/local_object_storage/engine/head.go b/pkg/local_object_storage/engine/head.go index 3fbd090900..ae080fff09 100644 --- a/pkg/local_object_storage/engine/head.go +++ b/pkg/local_object_storage/engine/head.go @@ -22,83 +22,63 @@ import ( // // Returns an error if executions are blocked (see BlockExecution). func (e *StorageEngine) Head(addr oid.Address, raw bool) (*objectSDK.Object, error) { - var ( - obj *objectSDK.Object - err error - ) - if e.metrics != nil { defer elapsed(e.metrics.AddHeadDuration)() } - err = e.execIfNotBlocked(func() error { - obj, err = e.head(addr, raw) - return err - }) + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() - return obj, err -} + if e.blockErr != nil { + return nil, e.blockErr + } -func (e *StorageEngine) head(addr oid.Address, raw bool) (*objectSDK.Object, error) { var ( - head *objectSDK.Object - siErr *objectSDK.SplitInfoError - - errNotFound apistatus.ObjectNotFound - - outSI *objectSDK.SplitInfo - outError error = errNotFound + shPrm shard.HeadPrm + splitInfo *objectSDK.SplitInfo ) - var shPrm shard.HeadPrm shPrm.SetAddress(addr) shPrm.SetRaw(raw) - e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { + for _, sh := range e.sortedShards(addr) { res, err := sh.Head(shPrm) if err != nil { + var siErr *objectSDK.SplitInfoError + switch { case shard.IsErrNotFound(err): - return false // ignore, go to next shard + continue // ignore, go to next shard case errors.As(err, &siErr): - if outSI == nil { - outSI = objectSDK.NewSplitInfo() + if splitInfo == nil { + splitInfo = objectSDK.NewSplitInfo() } - util.MergeSplitInfo(siErr.SplitInfo(), outSI) + util.MergeSplitInfo(siErr.SplitInfo(), splitInfo) // stop iterating over shards if SplitInfo structure is complete - return !outSI.GetLink().IsZero() && !outSI.GetLastPart().IsZero() + if !splitInfo.GetLink().IsZero() && !splitInfo.GetLastPart().IsZero() { + return nil, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo)) + } + continue case shard.IsErrRemoved(err): - outError = err - - return true // stop, return it back + return nil, err // stop, return it back case shard.IsErrObjectExpired(err): - var notFoundErr apistatus.ObjectNotFound - // object is found but should not // be returned - outError = notFoundErr - - return true + return nil, apistatus.ObjectNotFound{} default: e.reportShardError(sh, "could not head object from shard", err) - return false + continue } } - head = res.Object() - - return true - }) - - if outSI != nil { - return nil, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) + return res.Object(), nil } - if head == nil { - return nil, outError + if splitInfo != nil { + return nil, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo)) } - return head, nil + return nil, apistatus.ObjectNotFound{} } diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 45df2e010c..e758e49b2a 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -27,9 +27,14 @@ func (e *StorageEngine) Inhume(tombstone oid.Address, tombExpiration uint64, add if e.metrics != nil { defer elapsed(e.metrics.AddInhumeDuration)() } - return e.execIfNotBlocked(func() error { - return e.inhume(addrs, false, &tombstone, tombExpiration) - }) + + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() + + if e.blockErr != nil { + return e.blockErr + } + return e.inhume(addrs, false, &tombstone, tombExpiration) } func (e *StorageEngine) inhume(addrs []oid.Address, force bool, tombstone *oid.Address, tombExpiration uint64) error { @@ -76,54 +81,52 @@ func (e *StorageEngine) inhume(addrs []oid.Address, force bool, tombstone *oid.A // every object that belongs to a provided container will be marked // as a removed one. func (e *StorageEngine) InhumeContainer(cID cid.ID) error { - return e.execIfNotBlocked(func() error { - e.iterateOverUnsortedShards(func(sh hashedShard) bool { - err := sh.InhumeContainer(cID) - if err != nil { - e.log.Warn("inhuming container", - zap.Stringer("shard", sh.ID()), - zap.Error(err)) - } + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() - return false - }) + if e.blockErr != nil { + return e.blockErr + } + for _, sh := range e.unsortedShards() { + err := sh.InhumeContainer(cID) + if err != nil { + e.log.Warn("inhuming container", + zap.Stringer("shard", sh.ID()), + zap.Error(err)) + } + } - return nil - }) + return nil } // Returns ok if object was inhumed during this invocation or before. func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm) (bool, error) { - var errLocked apistatus.ObjectLocked var existPrm shard.ExistsPrm - var retErr error - var ok bool var shardWithObject string var root bool var children []oid.Address // see if the object is root - e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + for _, sh := range e.unsortedShards() { existPrm.SetAddress(addr) existPrm.IgnoreExpiration() res, err := sh.Exists(existPrm) if err != nil { if shard.IsErrNotFound(err) { - return false + continue } if shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) { // inhumed once - no need to be inhumed again - ok = true - return true + return true, nil } var siErr *objectSDK.SplitInfoError if !errors.As(err, &siErr) { e.reportShardError(sh, "could not check for presence in shard", err, zap.Stringer("addr", addr)) - return + continue } root = true @@ -134,7 +137,7 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm) (bool, linkID := siErr.SplitInfo().GetLink() if linkID.IsZero() { // keep searching for the link object - return false + continue } var linkAddr oid.Address @@ -147,9 +150,7 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm) (bool, // nothing can be done here, so just returning ok // to continue handling other addresses - ok = true - - return true + return true, nil } // v2 split @@ -161,9 +162,7 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm) (bool, // nothing can be done here, so just returning ok // to continue handling other addresses - ok = true - - return true + return true, nil } children = measuredObjsToAddresses(addr.Container(), link.Objects()) @@ -174,19 +173,13 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm) (bool, children = append(children, linkAddr) - return true + break } if res.Exists() { shardWithObject = sh.ID().String() - return true + break } - - return false - }) - - if ok { - return true, nil } prm.SetTargets(append(children, addr)...) @@ -197,7 +190,7 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm) (bool, _, err := sh.Inhume(prm) if err != nil { if !errors.Is(err, logicerr.Error) { - e.reportShardError(hashedShard(sh), "could not inhume object in shard", err) + e.reportShardError(sh, "could not inhume object in shard", err) } return false, err @@ -206,76 +199,65 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm) (bool, return true, nil } - // has not found the object on any shard, so mark it as inhumed on the most probable one - - e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { - defer func() { - // if object is root we continue since information about it - // can be presented in other shards - if root { - stop = false - } - }() + var ( + ok bool + retErr error + ) + // has not found the object on any shard, so mark it as inhumed on the most probable one + for _, sh := range e.sortedShards(addr) { _, err := sh.Inhume(prm) if err != nil { + var errLocked apistatus.ObjectLocked + switch { case errors.As(err, &errLocked): - retErr = apistatus.ObjectLocked{} - return true + return false, apistatus.ObjectLocked{} // Always a final error if returned. case errors.Is(err, shard.ErrLockObjectRemoval): - retErr = meta.ErrLockObjectRemoval - return true + return false, meta.ErrLockObjectRemoval // Always a final error if returned. case errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, shard.ErrDegradedMode): - retErr = err - return true + if root { + retErr = err + continue + } + return false, err } e.reportShardError(sh, "could not inhume object in shard", err) - return false + continue } ok = true - return true - }) + if !root { + break + } + } return ok, retErr } // IsLocked checks whether an object is locked according to StorageEngine's state. func (e *StorageEngine) IsLocked(addr oid.Address) (bool, error) { - var res bool - var err error - - err = e.execIfNotBlocked(func() error { - res, err = e.isLocked(addr) - return err - }) + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() - return res, err -} - -func (e *StorageEngine) isLocked(addr oid.Address) (bool, error) { - var locked bool - var err error - var outErr error + if e.blockErr != nil { + return false, e.blockErr + } - e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) { - locked, err = h.Shard.IsLocked(addr) + for _, sh := range e.unsortedShards() { + locked, err := sh.Shard.IsLocked(addr) if err != nil { - e.reportShardError(h, "can't check object's lockers", err, zap.Stringer("addr", addr)) - outErr = err - return false + e.reportShardError(sh, "can't check object's lockers", err, zap.Stringer("addr", addr)) + return false, err } - return locked - }) - - if locked { - return locked, nil + if locked { + return true, nil + } } - return locked, outErr + return false, nil } func (e *StorageEngine) processExpiredObjects(addrs []oid.Address) { @@ -286,17 +268,15 @@ func (e *StorageEngine) processExpiredObjects(addrs []oid.Address) { } func (e *StorageEngine) processExpiredLocks(lockers []oid.Address) { - e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + for _, sh := range e.unsortedShards() { sh.HandleExpiredLocks(lockers) - return false - }) + } } func (e *StorageEngine) processDeletedLocks(lockers []oid.Address) { - e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + for _, sh := range e.unsortedShards() { sh.HandleDeletedLocks(lockers) - return false - }) + } } func measuredObjsToAddresses(cID cid.ID, mm []objectSDK.MeasuredObject) []oid.Address { diff --git a/pkg/local_object_storage/engine/inhume_test.go b/pkg/local_object_storage/engine/inhume_test.go index 343ec44ddc..77e6d2088d 100644 --- a/pkg/local_object_storage/engine/inhume_test.go +++ b/pkg/local_object_storage/engine/inhume_test.go @@ -116,13 +116,11 @@ func TestStorageEngine_Inhume(t *testing.T) { var wrongShardID string - e.iterateOverSortedShards(addr, func(i int, h hashedShard) (stop bool) { + for i, sh := range e.sortedShards(addr) { if i != 0 { - wrongShardID = h.ID().String() + wrongShardID = sh.ID().String() } - - return false - }) + } wrongShard := e.getShard(wrongShardID) diff --git a/pkg/local_object_storage/engine/lock.go b/pkg/local_object_storage/engine/lock.go index 9a64407192..503c61d67d 100644 --- a/pkg/local_object_storage/engine/lock.go +++ b/pkg/local_object_storage/engine/lock.go @@ -20,12 +20,13 @@ var errLockFailed = errors.New("lock operation failed") // // Locked list should be unique. Panics if it is empty. func (e *StorageEngine) Lock(idCnr cid.ID, locker oid.ID, locked []oid.ID) error { - return e.execIfNotBlocked(func() error { - return e.lock(idCnr, locker, locked) - }) -} + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() + + if e.blockErr != nil { + return e.blockErr + } -func (e *StorageEngine) lock(idCnr cid.ID, locker oid.ID, locked []oid.ID) error { for i := range locked { switch e.lockSingle(idCnr, locker, locked[i], true) { case 1: @@ -47,24 +48,17 @@ func (e *StorageEngine) lock(idCnr cid.ID, locker oid.ID, locked []oid.ID) error // - 0: fail // - 1: locking irregular object // - 2: ok -func (e *StorageEngine) lockSingle(idCnr cid.ID, locker, locked oid.ID, checkExists bool) (status uint8) { +func (e *StorageEngine) lockSingle(idCnr cid.ID, locker, locked oid.ID, checkExists bool) uint8 { // code is pretty similar to inhumeAddr, maybe unify? - root := false - var errIrregular apistatus.LockNonRegularObject - - var addrLocked oid.Address + var ( + addrLocked oid.Address + root bool + status uint8 + ) addrLocked.SetContainer(idCnr) addrLocked.SetObject(locked) - e.iterateOverSortedShards(addrLocked, func(_ int, sh hashedShard) (stop bool) { - defer func() { - // if object is root we continue since information about it - // can be presented in other shards - if checkExists && root { - stop = false - } - }() - + for _, sh := range e.sortedShards(addrLocked) { if checkExists { var existsPrm shard.ExistsPrm existsPrm.SetAddress(addrLocked) @@ -76,35 +70,46 @@ func (e *StorageEngine) lockSingle(idCnr cid.ID, locker, locked oid.ID, checkExi if shard.IsErrObjectExpired(err) { // object is already expired => // do not lock it - return true + return 0 } e.reportShardError(sh, "could not check locked object for presence in shard", err) - return + if !root { + return 0 + } + continue } root = true } else if !exRes.Exists() { - return + if !root { + return 0 + } + continue } } err := sh.Lock(idCnr, locker, []oid.ID{locked}) if err != nil { + var errIrregular apistatus.LockNonRegularObject + e.reportShardError(sh, "could not lock object in shard", err) if errors.As(err, &errIrregular) { status = 1 - return true + } else { + continue } - - return false + } else { + status = 2 } - status = 2 - - return true - }) + // if object is root we continue since information about it + // can be presented in other shards + if !root { + break + } + } - return + return status } diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index da2a3922cd..87bc077d64 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -30,12 +30,13 @@ func (e *StorageEngine) Put(obj *objectSDK.Object, objBin []byte, hdrLen int) er defer elapsed(e.metrics.AddPutDuration)() } - return e.execIfNotBlocked(func() error { - return e.put(obj, objBin, hdrLen) - }) -} + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() + + if e.blockErr != nil { + return e.blockErr + } -func (e *StorageEngine) put(obj *objectSDK.Object, objBin []byte, hdrLen int) error { addr := object.AddressOf(obj) // In #1146 this check was parallelized, however, it became @@ -45,33 +46,28 @@ func (e *StorageEngine) put(obj *objectSDK.Object, objBin []byte, hdrLen int) er return err } - finished := false - - e.iterateOverSortedShards(addr, func(ind int, sh hashedShard) (stop bool) { + for i, sh := range e.sortedShards(addr) { e.mtx.RLock() pool, ok := e.shardPools[sh.ID().String()] e.mtx.RUnlock() if !ok { // Shard was concurrently removed, skip. - return false + continue } - putDone, exists := e.putToShard(sh, ind, pool, addr, obj, objBin, hdrLen) - finished = putDone || exists - return finished - }) - - if !finished { - err = errPutShard + putDone, exists := e.putToShard(sh, i, pool, addr, obj, objBin, hdrLen) + if putDone || exists { + return nil + } } - return err + return errPutShard } // putToShard puts object to sh. // First return value is true iff put has been successfully done. // Second return value is true iff object already exists. -func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) (bool, bool) { +func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) (bool, bool) { var putSuccess, alreadyExists bool exitCh := make(chan struct{}) diff --git a/pkg/local_object_storage/engine/range.go b/pkg/local_object_storage/engine/range.go index abd4b479b5..55e0a2f3ab 100644 --- a/pkg/local_object_storage/engine/range.go +++ b/pkg/local_object_storage/engine/range.go @@ -1,15 +1,9 @@ package engine import ( - "errors" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" - apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" - "go.uber.org/zap" ) // RngRes groups the resulting values of GetRange operation. @@ -36,122 +30,31 @@ func (r RngRes) Object() *objectSDK.Object { // // Returns an error if executions are blocked (see BlockExecution). func (e *StorageEngine) GetRange(addr oid.Address, offset uint64, length uint64) ([]byte, error) { - var ( - err error - res []byte - ) if e.metrics != nil { defer elapsed(e.metrics.AddRangeDuration)() } - err = e.execIfNotBlocked(func() error { - res, err = e.getRange(addr, offset, length) - return err - }) + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() - return res, err -} + if e.blockErr != nil { + return nil, e.blockErr + } -func (e *StorageEngine) getRange(addr oid.Address, offset uint64, length uint64) ([]byte, error) { var ( - out []byte - siErr *objectSDK.SplitInfoError - - errNotFound apistatus.ObjectNotFound - - outSI *objectSDK.SplitInfo - outError error = errNotFound - - shardWithMeta hashedShard - metaError error + err error + data []byte + shPrm shard.RngPrm ) - - var hasDegraded bool - - var shPrm shard.RngPrm shPrm.SetAddress(addr) shPrm.SetRange(offset, length) - e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { - noMeta := sh.GetMode().NoMetabase() - hasDegraded = hasDegraded || noMeta - shPrm.SetIgnoreMeta(noMeta) - + err = e.get(addr, func(sh *shard.Shard, ignoreMetadata bool) (bool, error) { + shPrm.SetIgnoreMeta(ignoreMetadata) res, err := sh.GetRange(shPrm) - if err != nil { - if res.HasMeta() { - shardWithMeta = sh - metaError = err - } - switch { - case shard.IsErrNotFound(err): - return false // ignore, go to next shard - case errors.As(err, &siErr): - if outSI == nil { - outSI = objectSDK.NewSplitInfo() - } - - util.MergeSplitInfo(siErr.SplitInfo(), outSI) - - // stop iterating over shards if SplitInfo structure is complete - return !outSI.GetLink().IsZero() && !outSI.GetLastPart().IsZero() - case - shard.IsErrRemoved(err), - shard.IsErrOutOfRange(err): - outError = err - - return true // stop, return it back - default: - e.reportShardError(sh, "could not get object from shard", err) - return false - } + if err == nil { + data = res.Object().Payload() } - - out = res.Object().Payload() - - return true + return res.HasMeta(), err }) - - if outSI != nil { - return nil, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) - } - - if out == nil { - // If any shard is in a degraded mode, we should assume that metabase could store - // info about some object. - if shardWithMeta.Shard == nil && !hasDegraded || !shard.IsErrNotFound(outError) { - return nil, outError - } - - // If the object is not found but is present in metabase, - // try to fetch it from blobstor directly. If it is found in any - // blobstor, increase the error counter for the shard which contains the meta. - shPrm.SetIgnoreMeta(true) - - e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { - if sh.GetMode().NoMetabase() { - // Already processed it without a metabase. - return false - } - - res, err := sh.GetRange(shPrm) - if shard.IsErrOutOfRange(err) { - var errOutOfRange apistatus.ObjectOutOfRange - - outError = errOutOfRange - return true - } - out = res.Object().Payload() - return err == nil - }) - if out == nil { - return nil, outError - } - if shardWithMeta.Shard != nil { - e.reportShardError(shardWithMeta, "meta info was present, but object is missing", - metaError, - zap.Stringer("address", addr)) - } - } - - return out, nil + return data, err } diff --git a/pkg/local_object_storage/engine/revive.go b/pkg/local_object_storage/engine/revive.go index c1e41917bf..832f0cec4e 100644 --- a/pkg/local_object_storage/engine/revive.go +++ b/pkg/local_object_storage/engine/revive.go @@ -19,8 +19,10 @@ type ReviveStatus struct { // ReviveObject forcefully revives object by oid.Address in the StorageEngine. // Iterate over all shards despite errors and purge all removal marks from all metabases. -func (e *StorageEngine) ReviveObject(address oid.Address) (res ReviveStatus, err error) { - e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { +func (e *StorageEngine) ReviveObject(address oid.Address) (ReviveStatus, error) { + var res ReviveStatus + + for _, sh := range e.unsortedShards() { reviveStatus, err := sh.ReviveObject(address) id := *sh.ID() res.Shards = append(res.Shards, ReviveShardStatus{ @@ -34,8 +36,6 @@ func (e *StorageEngine) ReviveObject(address oid.Address) (res ReviveStatus, err zap.Error(err), ) } - - return false - }) - return + } + return res, nil } diff --git a/pkg/local_object_storage/engine/select.go b/pkg/local_object_storage/engine/select.go index ef124459c3..3d00c99dfc 100644 --- a/pkg/local_object_storage/engine/select.go +++ b/pkg/local_object_storage/engine/select.go @@ -16,42 +16,32 @@ import ( // // Returns an error if executions are blocked (see BlockExecution). func (e *StorageEngine) Select(cnr cid.ID, filters object.SearchFilters) ([]oid.Address, error) { - var ( - err error - res []oid.Address - ) - if e.metrics != nil { defer elapsed(e.metrics.AddSearchDuration)() } - err = e.execIfNotBlocked(func() error { - res, err = e._select(cnr, filters) - return err - }) + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() - return res, err -} + if e.blockErr != nil { + return nil, e.blockErr + } -func (e *StorageEngine) _select(cnr cid.ID, filters object.SearchFilters) ([]oid.Address, error) { addrList := make([]oid.Address, 0) uniqueMap := make(map[string]struct{}) - var outError error - var shPrm shard.SelectPrm shPrm.SetContainerID(cnr) shPrm.SetFilters(filters) - e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + for _, sh := range e.unsortedShards() { res, err := sh.Select(shPrm) if err != nil { if errors.Is(err, objectcore.ErrInvalidSearchQuery) { - outError = err - return true + return addrList, err } e.reportShardError(sh, "could not select objects from shard", err) - return false + continue } for _, addr := range res.AddressList() { // save only unique values @@ -60,11 +50,9 @@ func (e *StorageEngine) _select(cnr cid.ID, filters object.SearchFilters) ([]oid addrList = append(addrList, addr) } } + } - return false - }) - - return addrList, outError + return addrList, nil } // List returns `limit` available physically storage object addresses in engine. @@ -72,49 +60,40 @@ func (e *StorageEngine) _select(cnr cid.ID, filters object.SearchFilters) ([]oid // // Returns an error if executions are blocked (see BlockExecution). func (e *StorageEngine) List(limit uint64) ([]oid.Address, error) { - var ( - err error - res []oid.Address - ) - if e.metrics != nil { defer elapsed(e.metrics.AddListObjectsDuration)() } - err = e.execIfNotBlocked(func() error { - res, err = e.list(limit) - return err - }) + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() - return res, err -} + if e.blockErr != nil { + return nil, e.blockErr + } -func (e *StorageEngine) list(limit uint64) ([]oid.Address, error) { addrList := make([]oid.Address, 0, limit) uniqueMap := make(map[string]struct{}) ln := uint64(0) // consider iterating over shuffled shards - e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + for _, sh := range e.unsortedShards() { res, err := sh.List() // consider limit result of shard iterator if err != nil { e.reportShardError(sh, "could not select objects from shard", err) - } else { - for _, addr := range res.AddressList() { // save only unique values - if _, ok := uniqueMap[addr.EncodeToString()]; !ok { - uniqueMap[addr.EncodeToString()] = struct{}{} - addrList = append(addrList, addr) - - ln++ - if limit > 0 && ln >= limit { - return true - } + continue + } + for _, addr := range res.AddressList() { // save only unique values + if _, ok := uniqueMap[addr.EncodeToString()]; !ok { + uniqueMap[addr.EncodeToString()] = struct{}{} + addrList = append(addrList, addr) + + ln++ + if limit > 0 && ln >= limit { + return addrList, nil } } } - - return false - }) + } return addrList, nil } diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 42254c4bf9..4380b750fe 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -9,15 +9,12 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" - oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) var errShardNotFound = logicerr.New("shard not found") -type hashedShard shardWrapper - type metricsWithID struct { id string mw MetricRegister @@ -192,49 +189,27 @@ func generateShardID() (*shard.ID, error) { return shard.NewIDFromBytes(bin), nil } -func (e *StorageEngine) sortShardsByWeight(objAddr interface{ EncodeToString() string }) []hashedShard { - e.mtx.RLock() - defer e.mtx.RUnlock() - - shards := make([]hashedShard, 0, len(e.shards)) - for _, sh := range e.shards { - shards = append(shards, hashedShard(sh)) - } +func (e *StorageEngine) sortedShards(objAddr interface{ EncodeToString() string }) []shardWrapper { + shards := e.unsortedShards() hrw.Sort(shards, hrw.WrapBytes([]byte(objAddr.EncodeToString()))) return shards } -func (e *StorageEngine) unsortedShards() []hashedShard { +func (e *StorageEngine) unsortedShards() []shardWrapper { e.mtx.RLock() defer e.mtx.RUnlock() - shards := make([]hashedShard, 0, len(e.shards)) + shards := make([]shardWrapper, 0, len(e.shards)) for _, sh := range e.shards { - shards = append(shards, hashedShard(sh)) + shards = append(shards, sh) } return shards } -func (e *StorageEngine) iterateOverSortedShards(addr oid.Address, handler func(int, hashedShard) (stop bool)) { - for i, sh := range e.sortShardsByWeight(addr) { - if handler(i, sh) { - break - } - } -} - -func (e *StorageEngine) iterateOverUnsortedShards(handler func(hashedShard) (stop bool)) { - for _, sh := range e.unsortedShards() { - if handler(sh) { - break - } - } -} - func (e *StorageEngine) getShard(id string) shardWrapper { e.mtx.RLock() defer e.mtx.RUnlock() @@ -273,7 +248,7 @@ func (e *StorageEngine) HandleNewEpoch(epoch uint64) { } } -func (s hashedShard) Hash() uint64 { +func (s shardWrapper) Hash() uint64 { return hrw.Hash( []byte(s.Shard.ID().String()), ) diff --git a/pkg/local_object_storage/engine/status.go b/pkg/local_object_storage/engine/status.go index e513955f04..b6b3c99b32 100644 --- a/pkg/local_object_storage/engine/status.go +++ b/pkg/local_object_storage/engine/status.go @@ -19,19 +19,17 @@ type ObjectStatus struct { // ObjectStatus returns the status of the object in the StorageEngine. It contains status of the object in all shards. func (e *StorageEngine) ObjectStatus(address oid.Address) (ObjectStatus, error) { var res ObjectStatus - var err error - e.iterateOverSortedShards(address, func(_ int, sh hashedShard) (stop bool) { - var shardStatus shard.ObjectStatus - shardStatus, err = sh.ObjectStatus(address) + for _, sh := range e.sortedShards(address) { + shardStatus, err := sh.ObjectStatus(address) id := *sh.ID() - if err == nil { - res.Shards = append(res.Shards, ObjectShardStatus{ - ID: id.String(), - Shard: shardStatus, - }) + if err != nil { + return res, err } - return err != nil - }) - return res, err + res.Shards = append(res.Shards, ObjectShardStatus{ + ID: id.String(), + Shard: shardStatus, + }) + } + return res, nil } diff --git a/pkg/local_object_storage/engine/tree.go b/pkg/local_object_storage/engine/tree.go index 39cdc1ab7d..32192564d2 100644 --- a/pkg/local_object_storage/engine/tree.go +++ b/pkg/local_object_storage/engine/tree.go @@ -73,7 +73,7 @@ func (e *StorageEngine) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pi func (e *StorageEngine) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) { var err error var nodes []pilorama.Node - for _, sh := range e.sortShardsByWeight(cid) { + for _, sh := range e.sortedShards(cid) { nodes, err = sh.TreeGetByPath(cid, treeID, attr, path, latest) if err != nil { if errors.Is(err, shard.ErrPiloramaDisabled) { @@ -96,7 +96,7 @@ func (e *StorageEngine) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID piloram var err error var m pilorama.Meta var p uint64 - for _, sh := range e.sortShardsByWeight(cid) { + for _, sh := range e.sortedShards(cid) { m, p, err = sh.TreeGetMeta(cid, treeID, nodeID) if err != nil { if errors.Is(err, shard.ErrPiloramaDisabled) { @@ -118,7 +118,7 @@ func (e *StorageEngine) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID piloram func (e *StorageEngine) TreeGetChildren(cid cidSDK.ID, treeID string, nodeID pilorama.Node) ([]uint64, error) { var err error var nodes []uint64 - for _, sh := range e.sortShardsByWeight(cid) { + for _, sh := range e.sortedShards(cid) { nodes, err = sh.TreeGetChildren(cid, treeID, nodeID) if err != nil { if errors.Is(err, shard.ErrPiloramaDisabled) { @@ -140,7 +140,7 @@ func (e *StorageEngine) TreeGetChildren(cid cidSDK.ID, treeID string, nodeID pil func (e *StorageEngine) TreeGetOpLog(cid cidSDK.ID, treeID string, height uint64) (pilorama.Move, error) { var err error var lm pilorama.Move - for _, sh := range e.sortShardsByWeight(cid) { + for _, sh := range e.sortedShards(cid) { lm, err = sh.TreeGetOpLog(cid, treeID, height) if err != nil { if errors.Is(err, shard.ErrPiloramaDisabled) { @@ -161,7 +161,7 @@ func (e *StorageEngine) TreeGetOpLog(cid cidSDK.ID, treeID string, height uint64 // TreeDrop implements the pilorama.Forest interface. func (e *StorageEngine) TreeDrop(cid cidSDK.ID, treeID string) error { var err error - for _, sh := range e.sortShardsByWeight(cid) { + for _, sh := range e.sortedShards(cid) { err = sh.TreeDrop(cid, treeID) if err != nil { if errors.Is(err, shard.ErrPiloramaDisabled) { @@ -213,8 +213,8 @@ func (e *StorageEngine) TreeExists(cid cidSDK.ID, treeID string) (bool, error) { return err == nil, err } -func (e *StorageEngine) getTreeShard(cid cidSDK.ID, treeID string) (int, []hashedShard, error) { - lst := e.sortShardsByWeight(cid) +func (e *StorageEngine) getTreeShard(cid cidSDK.ID, treeID string) (int, []shardWrapper, error) { + lst := e.sortedShards(cid) for i, sh := range lst { exists, err := sh.TreeExists(cid, treeID) if err != nil {