From 2baeb79064f43becf3f42df62169eb67ebaf8e32 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 12 Nov 2024 00:14:32 +0300 Subject: [PATCH 1/5] engine: drop internal blockExec struct It's not used in any way as a struct. Signed-off-by: Roman Khimov --- pkg/local_object_storage/engine/control.go | 19 +++++++++---------- pkg/local_object_storage/engine/engine.go | 7 ++----- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index 7768720b50..522693299e 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -103,11 +103,11 @@ func (e *StorageEngine) close(releasePools bool) error { // // Can be called concurrently with setBlockExecErr. func (e *StorageEngine) execIfNotBlocked(op func() error) error { - e.blockExec.mtx.RLock() - defer e.blockExec.mtx.RUnlock() + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() - if e.blockExec.err != nil { - return e.blockExec.err + if e.blockErr != nil { + return e.blockErr } return op() @@ -120,17 +120,16 @@ func (e *StorageEngine) execIfNotBlocked(op func() error) error { // // Can be called concurrently with exec. In this case it waits for all executions to complete. func (e *StorageEngine) setBlockExecErr(err error) error { - e.blockExec.mtx.Lock() - defer e.blockExec.mtx.Unlock() + e.blockMtx.Lock() + defer e.blockMtx.Unlock() - prevErr := e.blockExec.err + prevErr := e.blockErr - wasClosed := errors.Is(prevErr, errClosed) - if wasClosed { + if errors.Is(prevErr, errClosed) { return errClosed } - e.blockExec.err = err + e.blockErr = err if err == nil { if prevErr != nil { // block -> ok diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index c18c53ced1..8a43432142 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -30,11 +30,8 @@ type StorageEngine struct { setModeCh chan setModeRequest wg sync.WaitGroup - blockExec struct { - mtx sync.RWMutex - - err error - } + blockMtx sync.RWMutex + blockErr error } type shardWrapper struct { From 255c9e511c0b6315b816a25d74352851be9a9aad Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 12 Nov 2024 12:14:32 +0300 Subject: [PATCH 2/5] engine: drop e.execIfNotBlocked() That's where KISS and DRY get into a little conflict. My take is that KISS wins here: * it's five lines of code * yeah it's repetitive, but e.execIfNotBlocked() is repetitive as well with three lines of code minimum * it inherently requires lambdas * variables need to be defined and properly managed, some code looks a bit strange where we return (smth, error) and smth gets its value in a lambda, but error is the result of e.execIfNotBlocked() * in most cases it also leads to the need of additional "internal" functions * overall we get _less_ code after this patch * nothing hidden, clear execution flow Signed-off-by: Roman Khimov --- pkg/local_object_storage/engine/container.go | 65 +++++++++----------- pkg/local_object_storage/engine/control.go | 14 ----- pkg/local_object_storage/engine/delete.go | 10 ++- pkg/local_object_storage/engine/get.go | 64 +++++++++++-------- pkg/local_object_storage/engine/head.go | 17 ++--- pkg/local_object_storage/engine/inhume.go | 52 ++++++++-------- pkg/local_object_storage/engine/lock.go | 11 ++-- pkg/local_object_storage/engine/put.go | 11 ++-- pkg/local_object_storage/engine/range.go | 16 ++--- pkg/local_object_storage/engine/select.go | 34 +++------- 10 files changed, 134 insertions(+), 160 deletions(-) diff --git a/pkg/local_object_storage/engine/container.go b/pkg/local_object_storage/engine/container.go index d933d18fcf..8fde7eec26 100644 --- a/pkg/local_object_storage/engine/container.go +++ b/pkg/local_object_storage/engine/container.go @@ -16,23 +16,17 @@ import ( // // Returns an error if executions are blocked (see BlockExecution). func (e *StorageEngine) ContainerSize(cnr cid.ID) (uint64, error) { - var ( - err error - size uint64 - ) if e.metrics != nil { defer elapsed(e.metrics.AddEstimateContainerSizeDuration)() } - err = e.execIfNotBlocked(func() error { - size, err = e.containerSize(cnr) - return err - }) + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() - return size, err -} + if e.blockErr != nil { + return 0, e.blockErr + } -func (e *StorageEngine) containerSize(cnr cid.ID) (uint64, error) { var size uint64 e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { @@ -58,23 +52,17 @@ func (e *StorageEngine) containerSize(cnr cid.ID) (uint64, error) { // // Returns an error if executions are blocked (see BlockExecution). func (e *StorageEngine) ListContainers() ([]cid.ID, error) { - var ( - res []cid.ID - err error - ) if e.metrics != nil { defer elapsed(e.metrics.AddListContainersDuration)() } - err = e.execIfNotBlocked(func() error { - res, err = e.listContainers() - return err - }) + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() - return res, err -} + if e.blockErr != nil { + return nil, e.blockErr + } -func (e *StorageEngine) listContainers() ([]cid.ID, error) { uniqueIDs := make(map[cid.ID]struct{}) e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { @@ -103,27 +91,32 @@ func (e *StorageEngine) listContainers() ([]cid.ID, error) { // DeleteContainer deletes container's objects that engine stores. func (e *StorageEngine) DeleteContainer(ctx context.Context, cID cid.ID) error { - return e.execIfNotBlocked(func() error { - var wg errgroup.Group + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() - e.iterateOverUnsortedShards(func(hs hashedShard) bool { - wg.Go(func() error { - err := hs.Shard.DeleteContainer(ctx, cID) - if err != nil { - err = fmt.Errorf("container cleanup in %s shard: %w", hs.ID(), err) - e.log.Warn("container cleanup", zap.Error(err)) + if e.blockErr != nil { + return e.blockErr + } - return err - } + var wg errgroup.Group - return nil - }) + e.iterateOverUnsortedShards(func(hs hashedShard) bool { + wg.Go(func() error { + err := hs.Shard.DeleteContainer(ctx, cID) + if err != nil { + err = fmt.Errorf("container cleanup in %s shard: %w", hs.ID(), err) + e.log.Warn("container cleanup", zap.Error(err)) - return false + return err + } + + return nil }) - return wg.Wait() + return false }) + + return wg.Wait() } func (e *StorageEngine) deleteNotFoundContainers() error { diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index 522693299e..051a62ebf0 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -99,20 +99,6 @@ func (e *StorageEngine) close(releasePools bool) error { return nil } -// executes op if execution is not blocked, otherwise returns blocking error. -// -// Can be called concurrently with setBlockExecErr. -func (e *StorageEngine) execIfNotBlocked(op func() error) error { - e.blockMtx.RLock() - defer e.blockMtx.RUnlock() - - if e.blockErr != nil { - return e.blockErr - } - - return op() -} - // sets the flag of blocking execution of all data operations according to err: // - err != nil, then blocks the execution. If exec wasn't blocked, calls close method // (if err == errClosed => additionally releases pools and does not allow to resume executions). diff --git a/pkg/local_object_storage/engine/delete.go b/pkg/local_object_storage/engine/delete.go index 185b9a2dcb..ceecb3a8a6 100644 --- a/pkg/local_object_storage/engine/delete.go +++ b/pkg/local_object_storage/engine/delete.go @@ -15,9 +15,13 @@ func (e *StorageEngine) Delete(addr oid.Address) error { defer elapsed(e.metrics.AddDeleteDuration)() } - return e.execIfNotBlocked(func() error { - return e.deleteObj(addr, true) - }) + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() + + if e.blockErr != nil { + return e.blockErr + } + return e.deleteObj(addr, true) } func (e *StorageEngine) deleteObj(addr oid.Address, force bool) error { diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index d7ac1941c9..832823a028 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -22,29 +22,33 @@ import ( // // Returns an error if executions are blocked (see BlockExecution). func (e *StorageEngine) Get(addr oid.Address) (*objectSDK.Object, error) { + if e.metrics != nil { + defer elapsed(e.metrics.AddGetDuration)() + } + + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() + + if e.blockErr != nil { + return nil, e.blockErr + } + var ( err error obj *objectSDK.Object sp shard.GetPrm ) - - if e.metrics != nil { - defer elapsed(e.metrics.AddGetDuration)() - } - sp.SetAddress(addr) - err = e.execIfNotBlocked(func() error { - return e.get(addr, func(s *shard.Shard, ignoreMetadata bool) (bool, error) { - sp.SetIgnoreMeta(ignoreMetadata) - sr, err := s.Get(sp) - if err != nil { - return sr.HasMeta(), err - } - obj = sr.Object() - return sr.HasMeta(), nil - }) - }) + err = e.get(addr, func(s *shard.Shard, ignoreMetadata bool) (bool, error) { + sp.SetIgnoreMeta(ignoreMetadata) + sr, err := s.Get(sp) + if err != nil { + return sr.HasMeta(), err + } + obj = sr.Object() + return sr.HasMeta(), nil + }) return obj, err } @@ -149,16 +153,24 @@ func (e *StorageEngine) get(addr oid.Address, shardFunc func(s *shard.Shard, ign // a canonical NeoFS binary format. Returns [apistatus.ObjectNotFound] if object // is missing. func (e *StorageEngine) GetBytes(addr oid.Address) ([]byte, error) { - var b []byte - err := e.execIfNotBlocked(func() error { - return e.get(addr, func(s *shard.Shard, ignoreMetadata bool) (hasMetadata bool, err error) { - if ignoreMetadata { - b, err = s.GetBytes(addr) - } else { - b, hasMetadata, err = s.GetBytesWithMetadataLookup(addr) - } - return - }) + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() + + if e.blockErr != nil { + return nil, e.blockErr + } + + var ( + b []byte + err error + ) + err = e.get(addr, func(s *shard.Shard, ignoreMetadata bool) (hasMetadata bool, err error) { + if ignoreMetadata { + b, err = s.GetBytes(addr) + } else { + b, hasMetadata, err = s.GetBytesWithMetadataLookup(addr) + } + return }) return b, err } diff --git a/pkg/local_object_storage/engine/head.go b/pkg/local_object_storage/engine/head.go index 3fbd090900..f28630a7eb 100644 --- a/pkg/local_object_storage/engine/head.go +++ b/pkg/local_object_storage/engine/head.go @@ -22,24 +22,17 @@ import ( // // Returns an error if executions are blocked (see BlockExecution). func (e *StorageEngine) Head(addr oid.Address, raw bool) (*objectSDK.Object, error) { - var ( - obj *objectSDK.Object - err error - ) - if e.metrics != nil { defer elapsed(e.metrics.AddHeadDuration)() } - err = e.execIfNotBlocked(func() error { - obj, err = e.head(addr, raw) - return err - }) + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() - return obj, err -} + if e.blockErr != nil { + return nil, e.blockErr + } -func (e *StorageEngine) head(addr oid.Address, raw bool) (*objectSDK.Object, error) { var ( head *objectSDK.Object siErr *objectSDK.SplitInfoError diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 45df2e010c..d3d9743504 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -27,9 +27,14 @@ func (e *StorageEngine) Inhume(tombstone oid.Address, tombExpiration uint64, add if e.metrics != nil { defer elapsed(e.metrics.AddInhumeDuration)() } - return e.execIfNotBlocked(func() error { - return e.inhume(addrs, false, &tombstone, tombExpiration) - }) + + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() + + if e.blockErr != nil { + return e.blockErr + } + return e.inhume(addrs, false, &tombstone, tombExpiration) } func (e *StorageEngine) inhume(addrs []oid.Address, force bool, tombstone *oid.Address, tombExpiration uint64) error { @@ -76,20 +81,24 @@ func (e *StorageEngine) inhume(addrs []oid.Address, force bool, tombstone *oid.A // every object that belongs to a provided container will be marked // as a removed one. func (e *StorageEngine) InhumeContainer(cID cid.ID) error { - return e.execIfNotBlocked(func() error { - e.iterateOverUnsortedShards(func(sh hashedShard) bool { - err := sh.InhumeContainer(cID) - if err != nil { - e.log.Warn("inhuming container", - zap.Stringer("shard", sh.ID()), - zap.Error(err)) - } + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() - return false - }) + if e.blockErr != nil { + return e.blockErr + } + e.iterateOverUnsortedShards(func(sh hashedShard) bool { + err := sh.InhumeContainer(cID) + if err != nil { + e.log.Warn("inhuming container", + zap.Stringer("shard", sh.ID()), + zap.Error(err)) + } - return nil + return false }) + + return nil } // Returns ok if object was inhumed during this invocation or before. @@ -244,18 +253,13 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm) (bool, // IsLocked checks whether an object is locked according to StorageEngine's state. func (e *StorageEngine) IsLocked(addr oid.Address) (bool, error) { - var res bool - var err error + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() - err = e.execIfNotBlocked(func() error { - res, err = e.isLocked(addr) - return err - }) - - return res, err -} + if e.blockErr != nil { + return false, e.blockErr + } -func (e *StorageEngine) isLocked(addr oid.Address) (bool, error) { var locked bool var err error var outErr error diff --git a/pkg/local_object_storage/engine/lock.go b/pkg/local_object_storage/engine/lock.go index 9a64407192..0fa436660c 100644 --- a/pkg/local_object_storage/engine/lock.go +++ b/pkg/local_object_storage/engine/lock.go @@ -20,12 +20,13 @@ var errLockFailed = errors.New("lock operation failed") // // Locked list should be unique. Panics if it is empty. func (e *StorageEngine) Lock(idCnr cid.ID, locker oid.ID, locked []oid.ID) error { - return e.execIfNotBlocked(func() error { - return e.lock(idCnr, locker, locked) - }) -} + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() + + if e.blockErr != nil { + return e.blockErr + } -func (e *StorageEngine) lock(idCnr cid.ID, locker oid.ID, locked []oid.ID) error { for i := range locked { switch e.lockSingle(idCnr, locker, locked[i], true) { case 1: diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index da2a3922cd..427970047f 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -30,12 +30,13 @@ func (e *StorageEngine) Put(obj *objectSDK.Object, objBin []byte, hdrLen int) er defer elapsed(e.metrics.AddPutDuration)() } - return e.execIfNotBlocked(func() error { - return e.put(obj, objBin, hdrLen) - }) -} + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() + + if e.blockErr != nil { + return e.blockErr + } -func (e *StorageEngine) put(obj *objectSDK.Object, objBin []byte, hdrLen int) error { addr := object.AddressOf(obj) // In #1146 this check was parallelized, however, it became diff --git a/pkg/local_object_storage/engine/range.go b/pkg/local_object_storage/engine/range.go index abd4b479b5..37d4b789ea 100644 --- a/pkg/local_object_storage/engine/range.go +++ b/pkg/local_object_storage/engine/range.go @@ -36,22 +36,16 @@ func (r RngRes) Object() *objectSDK.Object { // // Returns an error if executions are blocked (see BlockExecution). func (e *StorageEngine) GetRange(addr oid.Address, offset uint64, length uint64) ([]byte, error) { - var ( - err error - res []byte - ) if e.metrics != nil { defer elapsed(e.metrics.AddRangeDuration)() } - err = e.execIfNotBlocked(func() error { - res, err = e.getRange(addr, offset, length) - return err - }) + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() - return res, err -} + if e.blockErr != nil { + return nil, e.blockErr + } -func (e *StorageEngine) getRange(addr oid.Address, offset uint64, length uint64) ([]byte, error) { var ( out []byte siErr *objectSDK.SplitInfoError diff --git a/pkg/local_object_storage/engine/select.go b/pkg/local_object_storage/engine/select.go index ef124459c3..652d1bee55 100644 --- a/pkg/local_object_storage/engine/select.go +++ b/pkg/local_object_storage/engine/select.go @@ -16,24 +16,17 @@ import ( // // Returns an error if executions are blocked (see BlockExecution). func (e *StorageEngine) Select(cnr cid.ID, filters object.SearchFilters) ([]oid.Address, error) { - var ( - err error - res []oid.Address - ) - if e.metrics != nil { defer elapsed(e.metrics.AddSearchDuration)() } - err = e.execIfNotBlocked(func() error { - res, err = e._select(cnr, filters) - return err - }) + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() - return res, err -} + if e.blockErr != nil { + return nil, e.blockErr + } -func (e *StorageEngine) _select(cnr cid.ID, filters object.SearchFilters) ([]oid.Address, error) { addrList := make([]oid.Address, 0) uniqueMap := make(map[string]struct{}) @@ -72,24 +65,17 @@ func (e *StorageEngine) _select(cnr cid.ID, filters object.SearchFilters) ([]oid // // Returns an error if executions are blocked (see BlockExecution). func (e *StorageEngine) List(limit uint64) ([]oid.Address, error) { - var ( - err error - res []oid.Address - ) - if e.metrics != nil { defer elapsed(e.metrics.AddListObjectsDuration)() } - err = e.execIfNotBlocked(func() error { - res, err = e.list(limit) - return err - }) + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() - return res, err -} + if e.blockErr != nil { + return nil, e.blockErr + } -func (e *StorageEngine) list(limit uint64) ([]oid.Address, error) { addrList := make([]oid.Address, 0, limit) uniqueMap := make(map[string]struct{}) ln := uint64(0) From 9e6f9bb87cc8101ef0c71de2e00492474177ed34 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 12 Nov 2024 13:02:18 +0300 Subject: [PATCH 3/5] engine: drop hashedShard type It's a simple alias since 6ad2624552d572fa7ba2bdfb5520d4790aed56e5 and the only difference is an additional Hash() method that might as well be a part of shardWrapper method set. This also simplifies sortShardsByWeight, we don't need to hold the lock while sorting. Signed-off-by: Roman Khimov --- pkg/local_object_storage/engine/container.go | 6 ++--- pkg/local_object_storage/engine/engine.go | 2 +- pkg/local_object_storage/engine/evacuate.go | 8 +++---- pkg/local_object_storage/engine/exists.go | 2 +- pkg/local_object_storage/engine/get.go | 6 ++--- pkg/local_object_storage/engine/head.go | 2 +- pkg/local_object_storage/engine/inhume.go | 14 ++++++------ pkg/local_object_storage/engine/lock.go | 2 +- pkg/local_object_storage/engine/put.go | 4 ++-- pkg/local_object_storage/engine/range.go | 6 ++--- pkg/local_object_storage/engine/revive.go | 2 +- pkg/local_object_storage/engine/select.go | 4 ++-- pkg/local_object_storage/engine/shards.go | 24 +++++++------------- pkg/local_object_storage/engine/status.go | 2 +- pkg/local_object_storage/engine/tree.go | 2 +- 15 files changed, 39 insertions(+), 47 deletions(-) diff --git a/pkg/local_object_storage/engine/container.go b/pkg/local_object_storage/engine/container.go index 8fde7eec26..0cb3a567a2 100644 --- a/pkg/local_object_storage/engine/container.go +++ b/pkg/local_object_storage/engine/container.go @@ -29,7 +29,7 @@ func (e *StorageEngine) ContainerSize(cnr cid.ID) (uint64, error) { var size uint64 - e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + e.iterateOverUnsortedShards(func(sh shardWrapper) (stop bool) { var csPrm shard.ContainerSizePrm csPrm.SetContainerID(cnr) @@ -65,7 +65,7 @@ func (e *StorageEngine) ListContainers() ([]cid.ID, error) { uniqueIDs := make(map[cid.ID]struct{}) - e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + e.iterateOverUnsortedShards(func(sh shardWrapper) (stop bool) { res, err := sh.Shard.ListContainers(shard.ListContainersPrm{}) if err != nil { e.reportShardError(sh, "can't get list of containers", err) @@ -100,7 +100,7 @@ func (e *StorageEngine) DeleteContainer(ctx context.Context, cID cid.ID) error { var wg errgroup.Group - e.iterateOverUnsortedShards(func(hs hashedShard) bool { + e.iterateOverUnsortedShards(func(hs shardWrapper) bool { wg.Go(func() error { err := hs.Shard.DeleteContainer(ctx, cID) if err != nil { diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index 8a43432142..7e5f47d207 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -134,7 +134,7 @@ func (e *StorageEngine) reportShardErrorBackground(id string, msg string, err er // reportShardError checks that the amount of errors doesn't exceed the configured threshold. // If it does, shard is set to read-only mode. func (e *StorageEngine) reportShardError( - sh hashedShard, + sh shardWrapper, msg string, err error, fields ...zap.Field) { diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index a1629316e9..b42edad30f 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -16,7 +16,7 @@ import ( const defaultEvacuateBatchSize = 100 type pooledShard struct { - hashedShard + shardWrapper pool util.WorkerPool } @@ -63,8 +63,8 @@ func (e *StorageEngine) Evacuate(shardIDs []*shard.ID, ignoreErrors bool, faultH shards := make([]pooledShard, 0, len(e.shards)) for id := range e.shards { shards = append(shards, pooledShard{ - hashedShard: hashedShard(e.shards[id]), - pool: e.shardPools[id], + shardWrapper: e.shards[id], + pool: e.shardPools[id], }) } e.mtx.RUnlock() @@ -125,7 +125,7 @@ mainLoop: if _, ok := shardMap[shards[j].ID().String()]; ok { continue } - putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, getRes.Object(), nil, 0) + putDone, exists := e.putToShard(shards[j].shardWrapper, j, shards[j].pool, addr, getRes.Object(), nil, 0) if putDone || exists { if putDone { e.log.Debug("object is moved to another shard", diff --git a/pkg/local_object_storage/engine/exists.go b/pkg/local_object_storage/engine/exists.go index 16b5d8fb96..46372876ef 100644 --- a/pkg/local_object_storage/engine/exists.go +++ b/pkg/local_object_storage/engine/exists.go @@ -15,7 +15,7 @@ func (e *StorageEngine) exists(addr oid.Address) (bool, error) { alreadyRemoved := false exists := false - e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { + e.iterateOverSortedShards(addr, func(_ int, sh shardWrapper) (stop bool) { res, err := sh.Exists(shPrm) if err != nil { if shard.IsErrRemoved(err) { diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index 832823a028..147876060a 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -62,14 +62,14 @@ func (e *StorageEngine) get(addr oid.Address, shardFunc func(s *shard.Shard, ign outSI *objectSDK.SplitInfo outError error = errNotFound - shardWithMeta hashedShard + shardWithMeta shardWrapper metaError error ) var hasDegraded bool var objectExpired bool - e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { + e.iterateOverSortedShards(addr, func(_ int, sh shardWrapper) (stop bool) { noMeta := sh.GetMode().NoMetabase() hasDegraded = hasDegraded || noMeta @@ -127,7 +127,7 @@ func (e *StorageEngine) get(addr oid.Address, shardFunc func(s *shard.Shard, ign // If the object is not found but is present in metabase, // try to fetch it from blobstor directly. If it is found in any // blobstor, increase the error counter for the shard which contains the meta. - e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { + e.iterateOverSortedShards(addr, func(_ int, sh shardWrapper) (stop bool) { if sh.GetMode().NoMetabase() { // Already visited. return false diff --git a/pkg/local_object_storage/engine/head.go b/pkg/local_object_storage/engine/head.go index f28630a7eb..691c767e02 100644 --- a/pkg/local_object_storage/engine/head.go +++ b/pkg/local_object_storage/engine/head.go @@ -47,7 +47,7 @@ func (e *StorageEngine) Head(addr oid.Address, raw bool) (*objectSDK.Object, err shPrm.SetAddress(addr) shPrm.SetRaw(raw) - e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { + e.iterateOverSortedShards(addr, func(_ int, sh shardWrapper) (stop bool) { res, err := sh.Head(shPrm) if err != nil { switch { diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index d3d9743504..9bd3d6b9dc 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -87,7 +87,7 @@ func (e *StorageEngine) InhumeContainer(cID cid.ID) error { if e.blockErr != nil { return e.blockErr } - e.iterateOverUnsortedShards(func(sh hashedShard) bool { + e.iterateOverUnsortedShards(func(sh shardWrapper) bool { err := sh.InhumeContainer(cID) if err != nil { e.log.Warn("inhuming container", @@ -113,7 +113,7 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm) (bool, var children []oid.Address // see if the object is root - e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + e.iterateOverUnsortedShards(func(sh shardWrapper) (stop bool) { existPrm.SetAddress(addr) existPrm.IgnoreExpiration() @@ -206,7 +206,7 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm) (bool, _, err := sh.Inhume(prm) if err != nil { if !errors.Is(err, logicerr.Error) { - e.reportShardError(hashedShard(sh), "could not inhume object in shard", err) + e.reportShardError(sh, "could not inhume object in shard", err) } return false, err @@ -217,7 +217,7 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm) (bool, // has not found the object on any shard, so mark it as inhumed on the most probable one - e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { + e.iterateOverSortedShards(addr, func(_ int, sh shardWrapper) (stop bool) { defer func() { // if object is root we continue since information about it // can be presented in other shards @@ -264,7 +264,7 @@ func (e *StorageEngine) IsLocked(addr oid.Address) (bool, error) { var err error var outErr error - e.iterateOverUnsortedShards(func(h hashedShard) (stop bool) { + e.iterateOverUnsortedShards(func(h shardWrapper) (stop bool) { locked, err = h.Shard.IsLocked(addr) if err != nil { e.reportShardError(h, "can't check object's lockers", err, zap.Stringer("addr", addr)) @@ -290,14 +290,14 @@ func (e *StorageEngine) processExpiredObjects(addrs []oid.Address) { } func (e *StorageEngine) processExpiredLocks(lockers []oid.Address) { - e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + e.iterateOverUnsortedShards(func(sh shardWrapper) (stop bool) { sh.HandleExpiredLocks(lockers) return false }) } func (e *StorageEngine) processDeletedLocks(lockers []oid.Address) { - e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + e.iterateOverUnsortedShards(func(sh shardWrapper) (stop bool) { sh.HandleDeletedLocks(lockers) return false }) diff --git a/pkg/local_object_storage/engine/lock.go b/pkg/local_object_storage/engine/lock.go index 0fa436660c..7882ef0d81 100644 --- a/pkg/local_object_storage/engine/lock.go +++ b/pkg/local_object_storage/engine/lock.go @@ -57,7 +57,7 @@ func (e *StorageEngine) lockSingle(idCnr cid.ID, locker, locked oid.ID, checkExi addrLocked.SetContainer(idCnr) addrLocked.SetObject(locked) - e.iterateOverSortedShards(addrLocked, func(_ int, sh hashedShard) (stop bool) { + e.iterateOverSortedShards(addrLocked, func(_ int, sh shardWrapper) (stop bool) { defer func() { // if object is root we continue since information about it // can be presented in other shards diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 427970047f..792a3738f9 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -48,7 +48,7 @@ func (e *StorageEngine) Put(obj *objectSDK.Object, objBin []byte, hdrLen int) er finished := false - e.iterateOverSortedShards(addr, func(ind int, sh hashedShard) (stop bool) { + e.iterateOverSortedShards(addr, func(ind int, sh shardWrapper) (stop bool) { e.mtx.RLock() pool, ok := e.shardPools[sh.ID().String()] e.mtx.RUnlock() @@ -72,7 +72,7 @@ func (e *StorageEngine) Put(obj *objectSDK.Object, objBin []byte, hdrLen int) er // putToShard puts object to sh. // First return value is true iff put has been successfully done. // Second return value is true iff object already exists. -func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) (bool, bool) { +func (e *StorageEngine) putToShard(sh shardWrapper, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) (bool, bool) { var putSuccess, alreadyExists bool exitCh := make(chan struct{}) diff --git a/pkg/local_object_storage/engine/range.go b/pkg/local_object_storage/engine/range.go index 37d4b789ea..18fea79c7a 100644 --- a/pkg/local_object_storage/engine/range.go +++ b/pkg/local_object_storage/engine/range.go @@ -55,7 +55,7 @@ func (e *StorageEngine) GetRange(addr oid.Address, offset uint64, length uint64) outSI *objectSDK.SplitInfo outError error = errNotFound - shardWithMeta hashedShard + shardWithMeta shardWrapper metaError error ) @@ -65,7 +65,7 @@ func (e *StorageEngine) GetRange(addr oid.Address, offset uint64, length uint64) shPrm.SetAddress(addr) shPrm.SetRange(offset, length) - e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { + e.iterateOverSortedShards(addr, func(_ int, sh shardWrapper) (stop bool) { noMeta := sh.GetMode().NoMetabase() hasDegraded = hasDegraded || noMeta shPrm.SetIgnoreMeta(noMeta) @@ -121,7 +121,7 @@ func (e *StorageEngine) GetRange(addr oid.Address, offset uint64, length uint64) // blobstor, increase the error counter for the shard which contains the meta. shPrm.SetIgnoreMeta(true) - e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { + e.iterateOverSortedShards(addr, func(_ int, sh shardWrapper) (stop bool) { if sh.GetMode().NoMetabase() { // Already processed it without a metabase. return false diff --git a/pkg/local_object_storage/engine/revive.go b/pkg/local_object_storage/engine/revive.go index c1e41917bf..ed34cc7a2d 100644 --- a/pkg/local_object_storage/engine/revive.go +++ b/pkg/local_object_storage/engine/revive.go @@ -20,7 +20,7 @@ type ReviveStatus struct { // ReviveObject forcefully revives object by oid.Address in the StorageEngine. // Iterate over all shards despite errors and purge all removal marks from all metabases. func (e *StorageEngine) ReviveObject(address oid.Address) (res ReviveStatus, err error) { - e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + e.iterateOverUnsortedShards(func(sh shardWrapper) (stop bool) { reviveStatus, err := sh.ReviveObject(address) id := *sh.ID() res.Shards = append(res.Shards, ReviveShardStatus{ diff --git a/pkg/local_object_storage/engine/select.go b/pkg/local_object_storage/engine/select.go index 652d1bee55..8f37c6c05e 100644 --- a/pkg/local_object_storage/engine/select.go +++ b/pkg/local_object_storage/engine/select.go @@ -36,7 +36,7 @@ func (e *StorageEngine) Select(cnr cid.ID, filters object.SearchFilters) ([]oid. shPrm.SetContainerID(cnr) shPrm.SetFilters(filters) - e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + e.iterateOverUnsortedShards(func(sh shardWrapper) (stop bool) { res, err := sh.Select(shPrm) if err != nil { if errors.Is(err, objectcore.ErrInvalidSearchQuery) { @@ -81,7 +81,7 @@ func (e *StorageEngine) List(limit uint64) ([]oid.Address, error) { ln := uint64(0) // consider iterating over shuffled shards - e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { + e.iterateOverUnsortedShards(func(sh shardWrapper) (stop bool) { res, err := sh.List() // consider limit result of shard iterator if err != nil { e.reportShardError(sh, "could not select objects from shard", err) diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 42254c4bf9..02990a2040 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -16,8 +16,6 @@ import ( var errShardNotFound = logicerr.New("shard not found") -type hashedShard shardWrapper - type metricsWithID struct { id string mw MetricRegister @@ -192,34 +190,28 @@ func generateShardID() (*shard.ID, error) { return shard.NewIDFromBytes(bin), nil } -func (e *StorageEngine) sortShardsByWeight(objAddr interface{ EncodeToString() string }) []hashedShard { - e.mtx.RLock() - defer e.mtx.RUnlock() - - shards := make([]hashedShard, 0, len(e.shards)) - for _, sh := range e.shards { - shards = append(shards, hashedShard(sh)) - } +func (e *StorageEngine) sortShardsByWeight(objAddr interface{ EncodeToString() string }) []shardWrapper { + shards := e.unsortedShards() hrw.Sort(shards, hrw.WrapBytes([]byte(objAddr.EncodeToString()))) return shards } -func (e *StorageEngine) unsortedShards() []hashedShard { +func (e *StorageEngine) unsortedShards() []shardWrapper { e.mtx.RLock() defer e.mtx.RUnlock() - shards := make([]hashedShard, 0, len(e.shards)) + shards := make([]shardWrapper, 0, len(e.shards)) for _, sh := range e.shards { - shards = append(shards, hashedShard(sh)) + shards = append(shards, sh) } return shards } -func (e *StorageEngine) iterateOverSortedShards(addr oid.Address, handler func(int, hashedShard) (stop bool)) { +func (e *StorageEngine) iterateOverSortedShards(addr oid.Address, handler func(int, shardWrapper) (stop bool)) { for i, sh := range e.sortShardsByWeight(addr) { if handler(i, sh) { break @@ -227,7 +219,7 @@ func (e *StorageEngine) iterateOverSortedShards(addr oid.Address, handler func(i } } -func (e *StorageEngine) iterateOverUnsortedShards(handler func(hashedShard) (stop bool)) { +func (e *StorageEngine) iterateOverUnsortedShards(handler func(shardWrapper) (stop bool)) { for _, sh := range e.unsortedShards() { if handler(sh) { break @@ -273,7 +265,7 @@ func (e *StorageEngine) HandleNewEpoch(epoch uint64) { } } -func (s hashedShard) Hash() uint64 { +func (s shardWrapper) Hash() uint64 { return hrw.Hash( []byte(s.Shard.ID().String()), ) diff --git a/pkg/local_object_storage/engine/status.go b/pkg/local_object_storage/engine/status.go index e513955f04..26515ab97a 100644 --- a/pkg/local_object_storage/engine/status.go +++ b/pkg/local_object_storage/engine/status.go @@ -21,7 +21,7 @@ func (e *StorageEngine) ObjectStatus(address oid.Address) (ObjectStatus, error) var res ObjectStatus var err error - e.iterateOverSortedShards(address, func(_ int, sh hashedShard) (stop bool) { + e.iterateOverSortedShards(address, func(_ int, sh shardWrapper) (stop bool) { var shardStatus shard.ObjectStatus shardStatus, err = sh.ObjectStatus(address) id := *sh.ID() diff --git a/pkg/local_object_storage/engine/tree.go b/pkg/local_object_storage/engine/tree.go index 39cdc1ab7d..01df50ae2b 100644 --- a/pkg/local_object_storage/engine/tree.go +++ b/pkg/local_object_storage/engine/tree.go @@ -213,7 +213,7 @@ func (e *StorageEngine) TreeExists(cid cidSDK.ID, treeID string) (bool, error) { return err == nil, err } -func (e *StorageEngine) getTreeShard(cid cidSDK.ID, treeID string) (int, []hashedShard, error) { +func (e *StorageEngine) getTreeShard(cid cidSDK.ID, treeID string) (int, []shardWrapper, error) { lst := e.sortShardsByWeight(cid) for i, sh := range lst { exists, err := sh.TreeExists(cid, treeID) From 209f9ce3d60d4b3f3864cecb617e1b3ef5b1576d Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 12 Nov 2024 16:41:53 +0300 Subject: [PATCH 4/5] engine: drop iterateOverUnsortedShards and iterateOverSortedShards These _seriously_ complicate the code for zero reason. We have to: * create/pass some lambdas * mess with input/output variables * mess with iterator flow instead of shortcutting to exit * have fun with defers and named return variables Which leads to code bloat and increased cognitive load for developers. For what purpose? I have no idea, dropping them makes code much easier. Signed-off-by: Roman Khimov --- pkg/local_object_storage/engine/container.go | 26 ++--- pkg/local_object_storage/engine/exists.go | 28 ++--- pkg/local_object_storage/engine/get.go | 90 ++++++-------- pkg/local_object_storage/engine/head.go | 53 ++++----- pkg/local_object_storage/engine/inhume.go | 110 +++++++----------- .../engine/inhume_test.go | 8 +- pkg/local_object_storage/engine/lock.go | 54 +++++---- pkg/local_object_storage/engine/put.go | 19 ++- pkg/local_object_storage/engine/range.go | 105 +++++++---------- pkg/local_object_storage/engine/revive.go | 12 +- pkg/local_object_storage/engine/select.go | 41 +++---- pkg/local_object_storage/engine/shards.go | 19 +-- pkg/local_object_storage/engine/status.go | 22 ++-- pkg/local_object_storage/engine/tree.go | 12 +- 14 files changed, 241 insertions(+), 358 deletions(-) diff --git a/pkg/local_object_storage/engine/container.go b/pkg/local_object_storage/engine/container.go index 0cb3a567a2..7fe6a63557 100644 --- a/pkg/local_object_storage/engine/container.go +++ b/pkg/local_object_storage/engine/container.go @@ -29,7 +29,7 @@ func (e *StorageEngine) ContainerSize(cnr cid.ID) (uint64, error) { var size uint64 - e.iterateOverUnsortedShards(func(sh shardWrapper) (stop bool) { + for _, sh := range e.unsortedShards() { var csPrm shard.ContainerSizePrm csPrm.SetContainerID(cnr) @@ -37,13 +37,11 @@ func (e *StorageEngine) ContainerSize(cnr cid.ID) (uint64, error) { if err != nil { e.reportShardError(sh, "can't get container size", err, zap.Stringer("container_id", cnr)) - return false + continue } size += csRes.Size() - - return false - }) + } return size, nil } @@ -65,11 +63,11 @@ func (e *StorageEngine) ListContainers() ([]cid.ID, error) { uniqueIDs := make(map[cid.ID]struct{}) - e.iterateOverUnsortedShards(func(sh shardWrapper) (stop bool) { + for _, sh := range e.unsortedShards() { res, err := sh.Shard.ListContainers(shard.ListContainersPrm{}) if err != nil { e.reportShardError(sh, "can't get list of containers", err) - return false + continue } for _, cnr := range res.Containers() { @@ -77,9 +75,7 @@ func (e *StorageEngine) ListContainers() ([]cid.ID, error) { uniqueIDs[cnr] = struct{}{} } } - - return false - }) + } result := make([]cid.ID, 0, len(uniqueIDs)) for cnr := range uniqueIDs { @@ -100,11 +96,11 @@ func (e *StorageEngine) DeleteContainer(ctx context.Context, cID cid.ID) error { var wg errgroup.Group - e.iterateOverUnsortedShards(func(hs shardWrapper) bool { + for _, sh := range e.unsortedShards() { wg.Go(func() error { - err := hs.Shard.DeleteContainer(ctx, cID) + err := sh.Shard.DeleteContainer(ctx, cID) if err != nil { - err = fmt.Errorf("container cleanup in %s shard: %w", hs.ID(), err) + err = fmt.Errorf("container cleanup in %s shard: %w", sh.ID(), err) e.log.Warn("container cleanup", zap.Error(err)) return err @@ -112,9 +108,7 @@ func (e *StorageEngine) DeleteContainer(ctx context.Context, cID cid.ID) error { return nil }) - - return false - }) + } return wg.Wait() } diff --git a/pkg/local_object_storage/engine/exists.go b/pkg/local_object_storage/engine/exists.go index 46372876ef..34004b8107 100644 --- a/pkg/local_object_storage/engine/exists.go +++ b/pkg/local_object_storage/engine/exists.go @@ -12,45 +12,33 @@ import ( func (e *StorageEngine) exists(addr oid.Address) (bool, error) { var shPrm shard.ExistsPrm shPrm.SetAddress(addr) - alreadyRemoved := false - exists := false - e.iterateOverSortedShards(addr, func(_ int, sh shardWrapper) (stop bool) { + for _, sh := range e.sortedShards(addr) { res, err := sh.Exists(shPrm) if err != nil { if shard.IsErrRemoved(err) { - alreadyRemoved = true - - return true + return false, apistatus.ObjectAlreadyRemoved{} } var siErr *objectSDK.SplitInfoError if errors.As(err, &siErr) { - return true + return false, nil } if shard.IsErrObjectExpired(err) { - return true + return false, nil } if !shard.IsErrNotFound(err) { e.reportShardError(sh, "could not check existence of object in shard", err) } - return false + continue } - if !exists { - exists = res.Exists() + if res.Exists() { + return true, nil } - - return false - }) - - if alreadyRemoved { - var errRemoved apistatus.ObjectAlreadyRemoved - - return false, errRemoved } - return exists, nil + return false, nil } diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index 147876060a..822f029f96 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -54,99 +54,81 @@ func (e *StorageEngine) Get(addr oid.Address) (*objectSDK.Object, error) { func (e *StorageEngine) get(addr oid.Address, shardFunc func(s *shard.Shard, ignoreMetadata bool) (hasMetadata bool, err error)) error { var ( - ok bool - siErr *objectSDK.SplitInfoError - - errNotFound apistatus.ObjectNotFound - - outSI *objectSDK.SplitInfo - outError error = errNotFound - + hasDegraded bool shardWithMeta shardWrapper + splitInfo *objectSDK.SplitInfo metaError error ) - var hasDegraded bool - var objectExpired bool - - e.iterateOverSortedShards(addr, func(_ int, sh shardWrapper) (stop bool) { + for _, sh := range e.sortedShards(addr) { noMeta := sh.GetMode().NoMetabase() hasDegraded = hasDegraded || noMeta hasMetadata, err := shardFunc(sh.Shard, noMeta) if err != nil { + var siErr *objectSDK.SplitInfoError + if hasMetadata { shardWithMeta = sh metaError = err } switch { case shard.IsErrNotFound(err): - return false // ignore, go to next shard + continue // ignore, go to next shard case errors.As(err, &siErr): - if outSI == nil { - outSI = objectSDK.NewSplitInfo() + if splitInfo == nil { + splitInfo = objectSDK.NewSplitInfo() } - util.MergeSplitInfo(siErr.SplitInfo(), outSI) + util.MergeSplitInfo(siErr.SplitInfo(), splitInfo) // stop iterating over shards if SplitInfo structure is complete - return !outSI.GetLink().IsZero() && !outSI.GetLastPart().IsZero() + if !splitInfo.GetLink().IsZero() && !splitInfo.GetLastPart().IsZero() { + return logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo)) + } + continue case shard.IsErrRemoved(err): - outError = err - - return true // stop, return it back + return err // stop, return it back case shard.IsErrObjectExpired(err): // object is found but should not // be returned - objectExpired = true - return true + return apistatus.ObjectNotFound{} default: e.reportShardError(sh, "could not get object from shard", err) - return false + continue } } - ok = true - - return true - }) + return nil // shardFunc is successful and it has the result + } - if outSI != nil { - return logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) + if splitInfo != nil { + return logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo)) } - if objectExpired { - return errNotFound + if !hasDegraded && shardWithMeta.Shard == nil { + return apistatus.ObjectNotFound{} } - if !ok { - if !hasDegraded && shardWithMeta.Shard == nil || !shard.IsErrNotFound(outError) { - return outError + // If the object is not found but is present in metabase, + // try to fetch it from blobstor directly. If it is found in any + // blobstor, increase the error counter for the shard which contains the meta. + for _, sh := range e.sortedShards(addr) { + if sh.GetMode().NoMetabase() { + // Already visited. + continue } - // If the object is not found but is present in metabase, - // try to fetch it from blobstor directly. If it is found in any - // blobstor, increase the error counter for the shard which contains the meta. - e.iterateOverSortedShards(addr, func(_ int, sh shardWrapper) (stop bool) { - if sh.GetMode().NoMetabase() { - // Already visited. - return false + _, err := shardFunc(sh.Shard, true) + if err == nil { + if shardWithMeta.Shard != nil { + e.reportShardError(shardWithMeta, "meta info was present, but object is missing", + metaError, zap.Stringer("address", addr)) } - - _, err := shardFunc(sh.Shard, true) - ok = err == nil - return ok - }) - if !ok { - return outError - } - if shardWithMeta.Shard != nil { - e.reportShardError(shardWithMeta, "meta info was present, but object is missing", - metaError, zap.Stringer("address", addr)) + return nil } } - - return nil + return apistatus.ObjectNotFound{} } // GetBytes reads object from the StorageEngine by address into memory buffer in diff --git a/pkg/local_object_storage/engine/head.go b/pkg/local_object_storage/engine/head.go index 691c767e02..ae080fff09 100644 --- a/pkg/local_object_storage/engine/head.go +++ b/pkg/local_object_storage/engine/head.go @@ -34,64 +34,51 @@ func (e *StorageEngine) Head(addr oid.Address, raw bool) (*objectSDK.Object, err } var ( - head *objectSDK.Object - siErr *objectSDK.SplitInfoError - - errNotFound apistatus.ObjectNotFound - - outSI *objectSDK.SplitInfo - outError error = errNotFound + shPrm shard.HeadPrm + splitInfo *objectSDK.SplitInfo ) - var shPrm shard.HeadPrm shPrm.SetAddress(addr) shPrm.SetRaw(raw) - e.iterateOverSortedShards(addr, func(_ int, sh shardWrapper) (stop bool) { + for _, sh := range e.sortedShards(addr) { res, err := sh.Head(shPrm) if err != nil { + var siErr *objectSDK.SplitInfoError + switch { case shard.IsErrNotFound(err): - return false // ignore, go to next shard + continue // ignore, go to next shard case errors.As(err, &siErr): - if outSI == nil { - outSI = objectSDK.NewSplitInfo() + if splitInfo == nil { + splitInfo = objectSDK.NewSplitInfo() } - util.MergeSplitInfo(siErr.SplitInfo(), outSI) + util.MergeSplitInfo(siErr.SplitInfo(), splitInfo) // stop iterating over shards if SplitInfo structure is complete - return !outSI.GetLink().IsZero() && !outSI.GetLastPart().IsZero() + if !splitInfo.GetLink().IsZero() && !splitInfo.GetLastPart().IsZero() { + return nil, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo)) + } + continue case shard.IsErrRemoved(err): - outError = err - - return true // stop, return it back + return nil, err // stop, return it back case shard.IsErrObjectExpired(err): - var notFoundErr apistatus.ObjectNotFound - // object is found but should not // be returned - outError = notFoundErr - - return true + return nil, apistatus.ObjectNotFound{} default: e.reportShardError(sh, "could not head object from shard", err) - return false + continue } } - head = res.Object() - - return true - }) - - if outSI != nil { - return nil, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) + return res.Object(), nil } - if head == nil { - return nil, outError + if splitInfo != nil { + return nil, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo)) } - return head, nil + return nil, apistatus.ObjectNotFound{} } diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 9bd3d6b9dc..e758e49b2a 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -87,52 +87,46 @@ func (e *StorageEngine) InhumeContainer(cID cid.ID) error { if e.blockErr != nil { return e.blockErr } - e.iterateOverUnsortedShards(func(sh shardWrapper) bool { + for _, sh := range e.unsortedShards() { err := sh.InhumeContainer(cID) if err != nil { e.log.Warn("inhuming container", zap.Stringer("shard", sh.ID()), zap.Error(err)) } - - return false - }) + } return nil } // Returns ok if object was inhumed during this invocation or before. func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm) (bool, error) { - var errLocked apistatus.ObjectLocked var existPrm shard.ExistsPrm - var retErr error - var ok bool var shardWithObject string var root bool var children []oid.Address // see if the object is root - e.iterateOverUnsortedShards(func(sh shardWrapper) (stop bool) { + for _, sh := range e.unsortedShards() { existPrm.SetAddress(addr) existPrm.IgnoreExpiration() res, err := sh.Exists(existPrm) if err != nil { if shard.IsErrNotFound(err) { - return false + continue } if shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) { // inhumed once - no need to be inhumed again - ok = true - return true + return true, nil } var siErr *objectSDK.SplitInfoError if !errors.As(err, &siErr) { e.reportShardError(sh, "could not check for presence in shard", err, zap.Stringer("addr", addr)) - return + continue } root = true @@ -143,7 +137,7 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm) (bool, linkID := siErr.SplitInfo().GetLink() if linkID.IsZero() { // keep searching for the link object - return false + continue } var linkAddr oid.Address @@ -156,9 +150,7 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm) (bool, // nothing can be done here, so just returning ok // to continue handling other addresses - ok = true - - return true + return true, nil } // v2 split @@ -170,9 +162,7 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm) (bool, // nothing can be done here, so just returning ok // to continue handling other addresses - ok = true - - return true + return true, nil } children = measuredObjsToAddresses(addr.Container(), link.Objects()) @@ -183,19 +173,13 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm) (bool, children = append(children, linkAddr) - return true + break } if res.Exists() { shardWithObject = sh.ID().String() - return true + break } - - return false - }) - - if ok { - return true, nil } prm.SetTargets(append(children, addr)...) @@ -215,38 +199,39 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm) (bool, return true, nil } - // has not found the object on any shard, so mark it as inhumed on the most probable one - - e.iterateOverSortedShards(addr, func(_ int, sh shardWrapper) (stop bool) { - defer func() { - // if object is root we continue since information about it - // can be presented in other shards - if root { - stop = false - } - }() + var ( + ok bool + retErr error + ) + // has not found the object on any shard, so mark it as inhumed on the most probable one + for _, sh := range e.sortedShards(addr) { _, err := sh.Inhume(prm) if err != nil { + var errLocked apistatus.ObjectLocked + switch { case errors.As(err, &errLocked): - retErr = apistatus.ObjectLocked{} - return true + return false, apistatus.ObjectLocked{} // Always a final error if returned. case errors.Is(err, shard.ErrLockObjectRemoval): - retErr = meta.ErrLockObjectRemoval - return true + return false, meta.ErrLockObjectRemoval // Always a final error if returned. case errors.Is(err, shard.ErrReadOnlyMode) || errors.Is(err, shard.ErrDegradedMode): - retErr = err - return true + if root { + retErr = err + continue + } + return false, err } e.reportShardError(sh, "could not inhume object in shard", err) - return false + continue } ok = true - return true - }) + if !root { + break + } + } return ok, retErr } @@ -260,26 +245,19 @@ func (e *StorageEngine) IsLocked(addr oid.Address) (bool, error) { return false, e.blockErr } - var locked bool - var err error - var outErr error - - e.iterateOverUnsortedShards(func(h shardWrapper) (stop bool) { - locked, err = h.Shard.IsLocked(addr) + for _, sh := range e.unsortedShards() { + locked, err := sh.Shard.IsLocked(addr) if err != nil { - e.reportShardError(h, "can't check object's lockers", err, zap.Stringer("addr", addr)) - outErr = err - return false + e.reportShardError(sh, "can't check object's lockers", err, zap.Stringer("addr", addr)) + return false, err } - return locked - }) - - if locked { - return locked, nil + if locked { + return true, nil + } } - return locked, outErr + return false, nil } func (e *StorageEngine) processExpiredObjects(addrs []oid.Address) { @@ -290,17 +268,15 @@ func (e *StorageEngine) processExpiredObjects(addrs []oid.Address) { } func (e *StorageEngine) processExpiredLocks(lockers []oid.Address) { - e.iterateOverUnsortedShards(func(sh shardWrapper) (stop bool) { + for _, sh := range e.unsortedShards() { sh.HandleExpiredLocks(lockers) - return false - }) + } } func (e *StorageEngine) processDeletedLocks(lockers []oid.Address) { - e.iterateOverUnsortedShards(func(sh shardWrapper) (stop bool) { + for _, sh := range e.unsortedShards() { sh.HandleDeletedLocks(lockers) - return false - }) + } } func measuredObjsToAddresses(cID cid.ID, mm []objectSDK.MeasuredObject) []oid.Address { diff --git a/pkg/local_object_storage/engine/inhume_test.go b/pkg/local_object_storage/engine/inhume_test.go index 343ec44ddc..77e6d2088d 100644 --- a/pkg/local_object_storage/engine/inhume_test.go +++ b/pkg/local_object_storage/engine/inhume_test.go @@ -116,13 +116,11 @@ func TestStorageEngine_Inhume(t *testing.T) { var wrongShardID string - e.iterateOverSortedShards(addr, func(i int, h hashedShard) (stop bool) { + for i, sh := range e.sortedShards(addr) { if i != 0 { - wrongShardID = h.ID().String() + wrongShardID = sh.ID().String() } - - return false - }) + } wrongShard := e.getShard(wrongShardID) diff --git a/pkg/local_object_storage/engine/lock.go b/pkg/local_object_storage/engine/lock.go index 7882ef0d81..503c61d67d 100644 --- a/pkg/local_object_storage/engine/lock.go +++ b/pkg/local_object_storage/engine/lock.go @@ -48,24 +48,17 @@ func (e *StorageEngine) Lock(idCnr cid.ID, locker oid.ID, locked []oid.ID) error // - 0: fail // - 1: locking irregular object // - 2: ok -func (e *StorageEngine) lockSingle(idCnr cid.ID, locker, locked oid.ID, checkExists bool) (status uint8) { +func (e *StorageEngine) lockSingle(idCnr cid.ID, locker, locked oid.ID, checkExists bool) uint8 { // code is pretty similar to inhumeAddr, maybe unify? - root := false - var errIrregular apistatus.LockNonRegularObject - - var addrLocked oid.Address + var ( + addrLocked oid.Address + root bool + status uint8 + ) addrLocked.SetContainer(idCnr) addrLocked.SetObject(locked) - e.iterateOverSortedShards(addrLocked, func(_ int, sh shardWrapper) (stop bool) { - defer func() { - // if object is root we continue since information about it - // can be presented in other shards - if checkExists && root { - stop = false - } - }() - + for _, sh := range e.sortedShards(addrLocked) { if checkExists { var existsPrm shard.ExistsPrm existsPrm.SetAddress(addrLocked) @@ -77,35 +70,46 @@ func (e *StorageEngine) lockSingle(idCnr cid.ID, locker, locked oid.ID, checkExi if shard.IsErrObjectExpired(err) { // object is already expired => // do not lock it - return true + return 0 } e.reportShardError(sh, "could not check locked object for presence in shard", err) - return + if !root { + return 0 + } + continue } root = true } else if !exRes.Exists() { - return + if !root { + return 0 + } + continue } } err := sh.Lock(idCnr, locker, []oid.ID{locked}) if err != nil { + var errIrregular apistatus.LockNonRegularObject + e.reportShardError(sh, "could not lock object in shard", err) if errors.As(err, &errIrregular) { status = 1 - return true + } else { + continue } - - return false + } else { + status = 2 } - status = 2 - - return true - }) + // if object is root we continue since information about it + // can be presented in other shards + if !root { + break + } + } - return + return status } diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 792a3738f9..87bc077d64 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -46,27 +46,22 @@ func (e *StorageEngine) Put(obj *objectSDK.Object, objBin []byte, hdrLen int) er return err } - finished := false - - e.iterateOverSortedShards(addr, func(ind int, sh shardWrapper) (stop bool) { + for i, sh := range e.sortedShards(addr) { e.mtx.RLock() pool, ok := e.shardPools[sh.ID().String()] e.mtx.RUnlock() if !ok { // Shard was concurrently removed, skip. - return false + continue } - putDone, exists := e.putToShard(sh, ind, pool, addr, obj, objBin, hdrLen) - finished = putDone || exists - return finished - }) - - if !finished { - err = errPutShard + putDone, exists := e.putToShard(sh, i, pool, addr, obj, objBin, hdrLen) + if putDone || exists { + return nil + } } - return err + return errPutShard } // putToShard puts object to sh. diff --git a/pkg/local_object_storage/engine/range.go b/pkg/local_object_storage/engine/range.go index 18fea79c7a..0b98853d20 100644 --- a/pkg/local_object_storage/engine/range.go +++ b/pkg/local_object_storage/engine/range.go @@ -47,105 +47,90 @@ func (e *StorageEngine) GetRange(addr oid.Address, offset uint64, length uint64) } var ( - out []byte - siErr *objectSDK.SplitInfoError - - errNotFound apistatus.ObjectNotFound - - outSI *objectSDK.SplitInfo - outError error = errNotFound - + hasDegraded bool + splitInfo *objectSDK.SplitInfo shardWithMeta shardWrapper + shPrm shard.RngPrm metaError error ) - var hasDegraded bool - - var shPrm shard.RngPrm shPrm.SetAddress(addr) shPrm.SetRange(offset, length) - e.iterateOverSortedShards(addr, func(_ int, sh shardWrapper) (stop bool) { + for _, sh := range e.sortedShards(addr) { noMeta := sh.GetMode().NoMetabase() hasDegraded = hasDegraded || noMeta shPrm.SetIgnoreMeta(noMeta) res, err := sh.GetRange(shPrm) if err != nil { + var siErr *objectSDK.SplitInfoError + if res.HasMeta() { shardWithMeta = sh metaError = err } switch { case shard.IsErrNotFound(err): - return false // ignore, go to next shard + continue // ignore, go to next shard case errors.As(err, &siErr): - if outSI == nil { - outSI = objectSDK.NewSplitInfo() + if splitInfo == nil { + splitInfo = objectSDK.NewSplitInfo() } - util.MergeSplitInfo(siErr.SplitInfo(), outSI) + util.MergeSplitInfo(siErr.SplitInfo(), splitInfo) // stop iterating over shards if SplitInfo structure is complete - return !outSI.GetLink().IsZero() && !outSI.GetLastPart().IsZero() + if !splitInfo.GetLink().IsZero() && !splitInfo.GetLastPart().IsZero() { + return nil, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo)) + } + continue case shard.IsErrRemoved(err), shard.IsErrOutOfRange(err): - outError = err - - return true // stop, return it back + return nil, err // stop, return it back default: e.reportShardError(sh, "could not get object from shard", err) - return false + continue } } - out = res.Object().Payload() - - return true - }) - - if outSI != nil { - return nil, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) + return res.Object().Payload(), nil } - if out == nil { - // If any shard is in a degraded mode, we should assume that metabase could store - // info about some object. - if shardWithMeta.Shard == nil && !hasDegraded || !shard.IsErrNotFound(outError) { - return nil, outError - } + if splitInfo != nil { + return nil, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo)) + } - // If the object is not found but is present in metabase, - // try to fetch it from blobstor directly. If it is found in any - // blobstor, increase the error counter for the shard which contains the meta. - shPrm.SetIgnoreMeta(true) + // If any shard is in a degraded mode, we should assume that metabase could store + // info about some object. + if shardWithMeta.Shard == nil && !hasDegraded { + return nil, apistatus.ObjectNotFound{} + } - e.iterateOverSortedShards(addr, func(_ int, sh shardWrapper) (stop bool) { - if sh.GetMode().NoMetabase() { - // Already processed it without a metabase. - return false - } + // If the object is not found but is present in metabase, + // try to fetch it from blobstor directly. If it is found in any + // blobstor, increase the error counter for the shard which contains the meta. + shPrm.SetIgnoreMeta(true) - res, err := sh.GetRange(shPrm) - if shard.IsErrOutOfRange(err) { - var errOutOfRange apistatus.ObjectOutOfRange + for _, sh := range e.sortedShards(addr) { + if sh.GetMode().NoMetabase() { + // Already processed it without a metabase. + continue + } - outError = errOutOfRange - return true - } - out = res.Object().Payload() - return err == nil - }) - if out == nil { - return nil, outError + res, err := sh.GetRange(shPrm) + if shard.IsErrOutOfRange(err) { + return nil, apistatus.ObjectOutOfRange{} } - if shardWithMeta.Shard != nil { - e.reportShardError(shardWithMeta, "meta info was present, but object is missing", - metaError, - zap.Stringer("address", addr)) + if err == nil { + if shardWithMeta.Shard != nil { + e.reportShardError(shardWithMeta, "meta info was present, but object is missing", + metaError, + zap.Stringer("address", addr)) + } + return res.Object().Payload(), nil } } - - return out, nil + return nil, apistatus.ObjectNotFound{} } diff --git a/pkg/local_object_storage/engine/revive.go b/pkg/local_object_storage/engine/revive.go index ed34cc7a2d..832f0cec4e 100644 --- a/pkg/local_object_storage/engine/revive.go +++ b/pkg/local_object_storage/engine/revive.go @@ -19,8 +19,10 @@ type ReviveStatus struct { // ReviveObject forcefully revives object by oid.Address in the StorageEngine. // Iterate over all shards despite errors and purge all removal marks from all metabases. -func (e *StorageEngine) ReviveObject(address oid.Address) (res ReviveStatus, err error) { - e.iterateOverUnsortedShards(func(sh shardWrapper) (stop bool) { +func (e *StorageEngine) ReviveObject(address oid.Address) (ReviveStatus, error) { + var res ReviveStatus + + for _, sh := range e.unsortedShards() { reviveStatus, err := sh.ReviveObject(address) id := *sh.ID() res.Shards = append(res.Shards, ReviveShardStatus{ @@ -34,8 +36,6 @@ func (e *StorageEngine) ReviveObject(address oid.Address) (res ReviveStatus, err zap.Error(err), ) } - - return false - }) - return + } + return res, nil } diff --git a/pkg/local_object_storage/engine/select.go b/pkg/local_object_storage/engine/select.go index 8f37c6c05e..3d00c99dfc 100644 --- a/pkg/local_object_storage/engine/select.go +++ b/pkg/local_object_storage/engine/select.go @@ -30,21 +30,18 @@ func (e *StorageEngine) Select(cnr cid.ID, filters object.SearchFilters) ([]oid. addrList := make([]oid.Address, 0) uniqueMap := make(map[string]struct{}) - var outError error - var shPrm shard.SelectPrm shPrm.SetContainerID(cnr) shPrm.SetFilters(filters) - e.iterateOverUnsortedShards(func(sh shardWrapper) (stop bool) { + for _, sh := range e.unsortedShards() { res, err := sh.Select(shPrm) if err != nil { if errors.Is(err, objectcore.ErrInvalidSearchQuery) { - outError = err - return true + return addrList, err } e.reportShardError(sh, "could not select objects from shard", err) - return false + continue } for _, addr := range res.AddressList() { // save only unique values @@ -53,11 +50,9 @@ func (e *StorageEngine) Select(cnr cid.ID, filters object.SearchFilters) ([]oid. addrList = append(addrList, addr) } } + } - return false - }) - - return addrList, outError + return addrList, nil } // List returns `limit` available physically storage object addresses in engine. @@ -81,26 +76,24 @@ func (e *StorageEngine) List(limit uint64) ([]oid.Address, error) { ln := uint64(0) // consider iterating over shuffled shards - e.iterateOverUnsortedShards(func(sh shardWrapper) (stop bool) { + for _, sh := range e.unsortedShards() { res, err := sh.List() // consider limit result of shard iterator if err != nil { e.reportShardError(sh, "could not select objects from shard", err) - } else { - for _, addr := range res.AddressList() { // save only unique values - if _, ok := uniqueMap[addr.EncodeToString()]; !ok { - uniqueMap[addr.EncodeToString()] = struct{}{} - addrList = append(addrList, addr) - - ln++ - if limit > 0 && ln >= limit { - return true - } + continue + } + for _, addr := range res.AddressList() { // save only unique values + if _, ok := uniqueMap[addr.EncodeToString()]; !ok { + uniqueMap[addr.EncodeToString()] = struct{}{} + addrList = append(addrList, addr) + + ln++ + if limit > 0 && ln >= limit { + return addrList, nil } } } - - return false - }) + } return addrList, nil } diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index 02990a2040..4380b750fe 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -9,7 +9,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" - oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) @@ -190,7 +189,7 @@ func generateShardID() (*shard.ID, error) { return shard.NewIDFromBytes(bin), nil } -func (e *StorageEngine) sortShardsByWeight(objAddr interface{ EncodeToString() string }) []shardWrapper { +func (e *StorageEngine) sortedShards(objAddr interface{ EncodeToString() string }) []shardWrapper { shards := e.unsortedShards() hrw.Sort(shards, hrw.WrapBytes([]byte(objAddr.EncodeToString()))) @@ -211,22 +210,6 @@ func (e *StorageEngine) unsortedShards() []shardWrapper { return shards } -func (e *StorageEngine) iterateOverSortedShards(addr oid.Address, handler func(int, shardWrapper) (stop bool)) { - for i, sh := range e.sortShardsByWeight(addr) { - if handler(i, sh) { - break - } - } -} - -func (e *StorageEngine) iterateOverUnsortedShards(handler func(shardWrapper) (stop bool)) { - for _, sh := range e.unsortedShards() { - if handler(sh) { - break - } - } -} - func (e *StorageEngine) getShard(id string) shardWrapper { e.mtx.RLock() defer e.mtx.RUnlock() diff --git a/pkg/local_object_storage/engine/status.go b/pkg/local_object_storage/engine/status.go index 26515ab97a..b6b3c99b32 100644 --- a/pkg/local_object_storage/engine/status.go +++ b/pkg/local_object_storage/engine/status.go @@ -19,19 +19,17 @@ type ObjectStatus struct { // ObjectStatus returns the status of the object in the StorageEngine. It contains status of the object in all shards. func (e *StorageEngine) ObjectStatus(address oid.Address) (ObjectStatus, error) { var res ObjectStatus - var err error - e.iterateOverSortedShards(address, func(_ int, sh shardWrapper) (stop bool) { - var shardStatus shard.ObjectStatus - shardStatus, err = sh.ObjectStatus(address) + for _, sh := range e.sortedShards(address) { + shardStatus, err := sh.ObjectStatus(address) id := *sh.ID() - if err == nil { - res.Shards = append(res.Shards, ObjectShardStatus{ - ID: id.String(), - Shard: shardStatus, - }) + if err != nil { + return res, err } - return err != nil - }) - return res, err + res.Shards = append(res.Shards, ObjectShardStatus{ + ID: id.String(), + Shard: shardStatus, + }) + } + return res, nil } diff --git a/pkg/local_object_storage/engine/tree.go b/pkg/local_object_storage/engine/tree.go index 01df50ae2b..32192564d2 100644 --- a/pkg/local_object_storage/engine/tree.go +++ b/pkg/local_object_storage/engine/tree.go @@ -73,7 +73,7 @@ func (e *StorageEngine) TreeApply(d pilorama.CIDDescriptor, treeID string, m *pi func (e *StorageEngine) TreeGetByPath(cid cidSDK.ID, treeID string, attr string, path []string, latest bool) ([]pilorama.Node, error) { var err error var nodes []pilorama.Node - for _, sh := range e.sortShardsByWeight(cid) { + for _, sh := range e.sortedShards(cid) { nodes, err = sh.TreeGetByPath(cid, treeID, attr, path, latest) if err != nil { if errors.Is(err, shard.ErrPiloramaDisabled) { @@ -96,7 +96,7 @@ func (e *StorageEngine) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID piloram var err error var m pilorama.Meta var p uint64 - for _, sh := range e.sortShardsByWeight(cid) { + for _, sh := range e.sortedShards(cid) { m, p, err = sh.TreeGetMeta(cid, treeID, nodeID) if err != nil { if errors.Is(err, shard.ErrPiloramaDisabled) { @@ -118,7 +118,7 @@ func (e *StorageEngine) TreeGetMeta(cid cidSDK.ID, treeID string, nodeID piloram func (e *StorageEngine) TreeGetChildren(cid cidSDK.ID, treeID string, nodeID pilorama.Node) ([]uint64, error) { var err error var nodes []uint64 - for _, sh := range e.sortShardsByWeight(cid) { + for _, sh := range e.sortedShards(cid) { nodes, err = sh.TreeGetChildren(cid, treeID, nodeID) if err != nil { if errors.Is(err, shard.ErrPiloramaDisabled) { @@ -140,7 +140,7 @@ func (e *StorageEngine) TreeGetChildren(cid cidSDK.ID, treeID string, nodeID pil func (e *StorageEngine) TreeGetOpLog(cid cidSDK.ID, treeID string, height uint64) (pilorama.Move, error) { var err error var lm pilorama.Move - for _, sh := range e.sortShardsByWeight(cid) { + for _, sh := range e.sortedShards(cid) { lm, err = sh.TreeGetOpLog(cid, treeID, height) if err != nil { if errors.Is(err, shard.ErrPiloramaDisabled) { @@ -161,7 +161,7 @@ func (e *StorageEngine) TreeGetOpLog(cid cidSDK.ID, treeID string, height uint64 // TreeDrop implements the pilorama.Forest interface. func (e *StorageEngine) TreeDrop(cid cidSDK.ID, treeID string) error { var err error - for _, sh := range e.sortShardsByWeight(cid) { + for _, sh := range e.sortedShards(cid) { err = sh.TreeDrop(cid, treeID) if err != nil { if errors.Is(err, shard.ErrPiloramaDisabled) { @@ -214,7 +214,7 @@ func (e *StorageEngine) TreeExists(cid cidSDK.ID, treeID string) (bool, error) { } func (e *StorageEngine) getTreeShard(cid cidSDK.ID, treeID string) (int, []shardWrapper, error) { - lst := e.sortShardsByWeight(cid) + lst := e.sortedShards(cid) for i, sh := range lst { exists, err := sh.TreeExists(cid, treeID) if err != nil { From 99d43202b48590336ab702b0a1639664fe4b860f Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Tue, 12 Nov 2024 17:13:18 +0300 Subject: [PATCH 5/5] engine: unify GetRange implementation with Get A lot of the same code, the only difference is apistatus.ObjectOutOfRange handling which doesn't hurt for Get and is very much relevant for GetRange. Signed-off-by: Roman Khimov --- pkg/local_object_storage/engine/get.go | 7 +- pkg/local_object_storage/engine/range.go | 94 +++--------------------- 2 files changed, 15 insertions(+), 86 deletions(-) diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index 822f029f96..ed4ba2249d 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -87,7 +87,9 @@ func (e *StorageEngine) get(addr oid.Address, shardFunc func(s *shard.Shard, ign return logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo)) } continue - case shard.IsErrRemoved(err): + case + shard.IsErrRemoved(err), + shard.IsErrOutOfRange(err): return err // stop, return it back case shard.IsErrObjectExpired(err): // object is found but should not @@ -120,6 +122,9 @@ func (e *StorageEngine) get(addr oid.Address, shardFunc func(s *shard.Shard, ign } _, err := shardFunc(sh.Shard, true) + if shard.IsErrOutOfRange(err) { + return apistatus.ObjectOutOfRange{} + } if err == nil { if shardWithMeta.Shard != nil { e.reportShardError(shardWithMeta, "meta info was present, but object is missing", diff --git a/pkg/local_object_storage/engine/range.go b/pkg/local_object_storage/engine/range.go index 0b98853d20..55e0a2f3ab 100644 --- a/pkg/local_object_storage/engine/range.go +++ b/pkg/local_object_storage/engine/range.go @@ -1,15 +1,9 @@ package engine import ( - "errors" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" - apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" - "go.uber.org/zap" ) // RngRes groups the resulting values of GetRange operation. @@ -47,90 +41,20 @@ func (e *StorageEngine) GetRange(addr oid.Address, offset uint64, length uint64) } var ( - hasDegraded bool - splitInfo *objectSDK.SplitInfo - shardWithMeta shardWrapper - shPrm shard.RngPrm - metaError error + err error + data []byte + shPrm shard.RngPrm ) - shPrm.SetAddress(addr) shPrm.SetRange(offset, length) - for _, sh := range e.sortedShards(addr) { - noMeta := sh.GetMode().NoMetabase() - hasDegraded = hasDegraded || noMeta - shPrm.SetIgnoreMeta(noMeta) - - res, err := sh.GetRange(shPrm) - if err != nil { - var siErr *objectSDK.SplitInfoError - - if res.HasMeta() { - shardWithMeta = sh - metaError = err - } - switch { - case shard.IsErrNotFound(err): - continue // ignore, go to next shard - case errors.As(err, &siErr): - if splitInfo == nil { - splitInfo = objectSDK.NewSplitInfo() - } - - util.MergeSplitInfo(siErr.SplitInfo(), splitInfo) - - // stop iterating over shards if SplitInfo structure is complete - if !splitInfo.GetLink().IsZero() && !splitInfo.GetLastPart().IsZero() { - return nil, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo)) - } - continue - case - shard.IsErrRemoved(err), - shard.IsErrOutOfRange(err): - return nil, err // stop, return it back - default: - e.reportShardError(sh, "could not get object from shard", err) - continue - } - } - - return res.Object().Payload(), nil - } - - if splitInfo != nil { - return nil, logicerr.Wrap(objectSDK.NewSplitInfoError(splitInfo)) - } - - // If any shard is in a degraded mode, we should assume that metabase could store - // info about some object. - if shardWithMeta.Shard == nil && !hasDegraded { - return nil, apistatus.ObjectNotFound{} - } - - // If the object is not found but is present in metabase, - // try to fetch it from blobstor directly. If it is found in any - // blobstor, increase the error counter for the shard which contains the meta. - shPrm.SetIgnoreMeta(true) - - for _, sh := range e.sortedShards(addr) { - if sh.GetMode().NoMetabase() { - // Already processed it without a metabase. - continue - } - + err = e.get(addr, func(sh *shard.Shard, ignoreMetadata bool) (bool, error) { + shPrm.SetIgnoreMeta(ignoreMetadata) res, err := sh.GetRange(shPrm) - if shard.IsErrOutOfRange(err) { - return nil, apistatus.ObjectOutOfRange{} - } if err == nil { - if shardWithMeta.Shard != nil { - e.reportShardError(shardWithMeta, "meta info was present, but object is missing", - metaError, - zap.Stringer("address", addr)) - } - return res.Object().Payload(), nil + data = res.Object().Payload() } - } - return nil, apistatus.ObjectNotFound{} + return res.HasMeta(), err + }) + return data, err }