From 28e43946ddc4305e691d9a2a02757ad61d46ee37 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 11 Nov 2024 18:03:24 +0300 Subject: [PATCH 01/16] engine: drop useless Prm/Res from container APIs These APIs seem to be totally unused. Signed-off-by: Roman Khimov --- pkg/local_object_storage/engine/container.go | 89 ++++++-------------- 1 file changed, 24 insertions(+), 65 deletions(-) diff --git a/pkg/local_object_storage/engine/container.go b/pkg/local_object_storage/engine/container.go index 472ecfac94..3ad469c261 100644 --- a/pkg/local_object_storage/engine/container.go +++ b/pkg/local_object_storage/engine/container.go @@ -12,114 +12,75 @@ import ( "golang.org/x/sync/errgroup" ) -// ContainerSizePrm groups parameters of ContainerSize operation. -type ContainerSizePrm struct { - cnr cid.ID -} - -// ContainerSizeRes resulting values of ContainerSize operation. -type ContainerSizeRes struct { - size uint64 -} - -// ListContainersPrm groups parameters of ListContainers operation. -type ListContainersPrm struct{} - -// ListContainersRes groups the resulting values of ListContainers operation. -type ListContainersRes struct { - containers []cid.ID -} - -// SetContainerID sets the identifier of the container to estimate the size. -func (p *ContainerSizePrm) SetContainerID(cnr cid.ID) { - p.cnr = cnr -} - -// Size returns calculated estimation of the container size. -func (r ContainerSizeRes) Size() uint64 { - return r.size -} - -// Containers returns a list of identifiers of the containers in which local objects are stored. -func (r ListContainersRes) Containers() []cid.ID { - return r.containers -} - // ContainerSize returns the sum of estimation container sizes among all shards. // // Returns an error if executions are blocked (see BlockExecution). -func (e *StorageEngine) ContainerSize(prm ContainerSizePrm) (res ContainerSizeRes, err error) { +func (e *StorageEngine) ContainerSize(cnr cid.ID) (uint64, error) { + var ( + err error + size uint64 + ) err = e.execIfNotBlocked(func() error { - res, err = e.containerSize(prm) + size, err = e.containerSize(cnr) return err }) - return + return size, err } // ContainerSize calls ContainerSize method on engine to calculate sum of estimation container sizes among all shards. func ContainerSize(e *StorageEngine, id cid.ID) (uint64, error) { - var prm ContainerSizePrm - - prm.SetContainerID(id) - - res, err := e.ContainerSize(prm) - if err != nil { - return 0, err - } - - return res.Size(), nil + return e.ContainerSize(id) } -func (e *StorageEngine) containerSize(prm ContainerSizePrm) (res ContainerSizeRes, err error) { +func (e *StorageEngine) containerSize(cnr cid.ID) (uint64, error) { + var size uint64 + if e.metrics != nil { defer elapsed(e.metrics.AddEstimateContainerSizeDuration)() } e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { var csPrm shard.ContainerSizePrm - csPrm.SetContainerID(prm.cnr) + csPrm.SetContainerID(cnr) csRes, err := sh.Shard.ContainerSize(csPrm) if err != nil { e.reportShardError(sh, "can't get container size", err, - zap.Stringer("container_id", prm.cnr)) + zap.Stringer("container_id", cnr)) return false } - res.size += csRes.Size() + size += csRes.Size() return false }) - return + return size, nil } // ListContainers returns a unique container IDs presented in the engine objects. // // Returns an error if executions are blocked (see BlockExecution). -func (e *StorageEngine) ListContainers(_ ListContainersPrm) (res ListContainersRes, err error) { +func (e *StorageEngine) ListContainers() ([]cid.ID, error) { + var ( + res []cid.ID + err error + ) err = e.execIfNotBlocked(func() error { res, err = e.listContainers() return err }) - return + return res, err } // ListContainers calls ListContainers method on engine to get a unique container IDs presented in the engine objects. func ListContainers(e *StorageEngine) ([]cid.ID, error) { - var prm ListContainersPrm - - res, err := e.ListContainers(prm) - if err != nil { - return nil, err - } - - return res.Containers(), nil + return e.ListContainers() } -func (e *StorageEngine) listContainers() (ListContainersRes, error) { +func (e *StorageEngine) listContainers() ([]cid.ID, error) { if e.metrics != nil { defer elapsed(e.metrics.AddListContainersDuration)() } @@ -147,9 +108,7 @@ func (e *StorageEngine) listContainers() (ListContainersRes, error) { result = append(result, cnr) } - return ListContainersRes{ - containers: result, - }, nil + return result, nil } // DeleteContainer deletes container's objects that engine stores. From 4b6d39e407869ecd9236054c39bfdc1d28e6fb96 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 11 Nov 2024 18:03:55 +0300 Subject: [PATCH 02/16] engine: drop Prm/Res from delete APIs We _never_ used non-forced deletion, so only a forced version is provided now. But maybe we can just Inhume(). Signed-off-by: Roman Khimov --- pkg/local_object_storage/engine/delete.go | 60 +++++-------------- .../engine/delete_test.go | 6 +- pkg/local_object_storage/engine/lock_test.go | 6 +- pkg/services/control/server/gc.go | 7 +-- 4 files changed, 17 insertions(+), 62 deletions(-) diff --git a/pkg/local_object_storage/engine/delete.go b/pkg/local_object_storage/engine/delete.go index 37c18f89d5..1615177f08 100644 --- a/pkg/local_object_storage/engine/delete.go +++ b/pkg/local_object_storage/engine/delete.go @@ -7,73 +7,41 @@ import ( "go.uber.org/zap" ) -// DeletePrm groups the parameters of Delete operation. -type DeletePrm struct { - addr oid.Address - - forceRemoval bool -} - -// DeleteRes groups the resulting values of Delete operation. -type DeleteRes struct{} - -// WithAddress is a Delete option to set the addresses of the objects to delete. -// -// Option is required. -func (p *DeletePrm) WithAddress(addr oid.Address) { - p.addr = addr -} - -// WithForceRemoval is a Delete option to remove an object despite any -// restrictions imposed on deleting that object. Expected to be used -// only in control service. -func (p *DeletePrm) WithForceRemoval() { - p.forceRemoval = true -} - // Delete marks the objects to be removed. // // Returns an error if executions are blocked (see BlockExecution). // -// Returns apistatus.ObjectLocked if at least one object is locked. -// In this case no object from the list is marked to be deleted. -// -// NOTE: Marks any object to be deleted (despite any prohibitions -// on operations with that object) if WithForceRemoval option has -// been provided. -func (e *StorageEngine) Delete(prm DeletePrm) (res DeleteRes, err error) { - err = e.execIfNotBlocked(func() error { - res, err = e.delete(prm) - return err +// NOTE: This is a forced removal, marks any object to be deleted (despite +// any prohibitions on operations with that object). +func (e *StorageEngine) Delete(addr oid.Address) error { + return e.execIfNotBlocked(func() error { + return e.deleteObj(addr, true) }) - - return } -func (e *StorageEngine) delete(prm DeletePrm) (DeleteRes, error) { +func (e *StorageEngine) deleteObj(addr oid.Address, force bool) error { if e.metrics != nil { defer elapsed(e.metrics.AddDeleteDuration)() } - if !prm.forceRemoval { - locked, err := e.isLocked(prm.addr) + if !force { + locked, err := e.isLocked(addr) if err != nil { e.log.Warn("deleting an object without full locking check", zap.Error(err), - zap.Stringer("addr", prm.addr)) + zap.Stringer("addr", addr)) } else if locked { - var lockedErr apistatus.ObjectLocked - return DeleteRes{}, lockedErr + return apistatus.ObjectLocked{} } } var inhumePrm shard.InhumePrm - inhumePrm.MarkAsGarbage(prm.addr) - if prm.forceRemoval { + inhumePrm.MarkAsGarbage(addr) + if force { inhumePrm.ForceRemoval() } - _, err := e.inhumeAddr(prm.addr, inhumePrm) + _, err := e.inhumeAddr(addr, inhumePrm) - return DeleteRes{}, err + return err } diff --git a/pkg/local_object_storage/engine/delete_test.go b/pkg/local_object_storage/engine/delete_test.go index 6207d0ac13..400ea81ac5 100644 --- a/pkg/local_object_storage/engine/delete_test.go +++ b/pkg/local_object_storage/engine/delete_test.go @@ -72,11 +72,7 @@ func TestDeleteBigObject(t *testing.T) { checkGetError(t, e, object.AddressOf(children[i]), nil) } - var deletePrm DeletePrm - deletePrm.WithForceRemoval() - deletePrm.WithAddress(addrParent) - - _, err := e.Delete(deletePrm) + err := e.Delete(addrParent) require.NoError(t, err) checkGetError(t, e, addrParent, &apistatus.ObjectNotFound{}) diff --git a/pkg/local_object_storage/engine/lock_test.go b/pkg/local_object_storage/engine/lock_test.go index baeb1039cb..c0846448f9 100644 --- a/pkg/local_object_storage/engine/lock_test.go +++ b/pkg/local_object_storage/engine/lock_test.go @@ -256,11 +256,7 @@ func TestLockForceRemoval(t *testing.T) { require.ErrorAs(t, err, new(apistatus.ObjectLocked)) // 4. - var deletePrm DeletePrm - deletePrm.WithAddress(objectcore.AddressOf(lock)) - deletePrm.WithForceRemoval() - - _, err = e.Delete(deletePrm) + err = e.Delete(objectcore.AddressOf(lock)) require.NoError(t, err) // 5. diff --git a/pkg/services/control/server/gc.go b/pkg/services/control/server/gc.go index 6dc55ed893..576faa9162 100644 --- a/pkg/services/control/server/gc.go +++ b/pkg/services/control/server/gc.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/services/control" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "google.golang.org/grpc/codes" @@ -43,11 +42,7 @@ func (s *Server) DropObjects(_ context.Context, req *control.DropObjectsRequest) var firstErr error for i := range addrList { - var prm engine.DeletePrm - prm.WithForceRemoval() - prm.WithAddress(addrList[i]) - - _, err := s.storage.Delete(prm) + err := s.storage.Delete(addrList[i]) if err != nil && firstErr == nil { firstErr = err } From 3701d2ddae097b6eaa2da9da52f5b43639c345c8 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 11 Nov 2024 18:43:48 +0300 Subject: [PATCH 03/16] engine: simplify Evacuate interface Signed-off-by: Roman Khimov --- pkg/local_object_storage/engine/evacuate.go | 81 +++++++------------ .../engine/evacuate_test.go | 56 +++++-------- pkg/services/control/server/evacuate.go | 10 +-- 3 files changed, 47 insertions(+), 100 deletions(-) diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index aaade013fd..b65919c1be 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -13,39 +13,6 @@ import ( "go.uber.org/zap" ) -// EvacuateShardPrm represents parameters for the EvacuateShard operation. -type EvacuateShardPrm struct { - shardID []*shard.ID - handler func(oid.Address, *objectSDK.Object) error - ignoreErrors bool -} - -// EvacuateShardRes represents result of the EvacuateShard operation. -type EvacuateShardRes struct { - count int -} - -// WithShardIDList sets shard ID. -func (p *EvacuateShardPrm) WithShardIDList(id []*shard.ID) { - p.shardID = id -} - -// WithIgnoreErrors sets flag to ignore errors. -func (p *EvacuateShardPrm) WithIgnoreErrors(ignore bool) { - p.ignoreErrors = ignore -} - -// WithFaultHandler sets handler to call for objects which cannot be saved on other shards. -func (p *EvacuateShardPrm) WithFaultHandler(f func(oid.Address, *objectSDK.Object) error) { - p.handler = f -} - -// Count returns amount of evacuated objects. -// Objects for which handler returned no error are also assumed evacuated. -func (p EvacuateShardRes) Count() int { - return p.count -} - const defaultEvacuateBatchSize = 100 type pooledShard struct { @@ -55,12 +22,18 @@ type pooledShard struct { var errMustHaveTwoShards = errors.New("must have at least 1 spare shard") -// Evacuate moves data from one shard to the others. -// The shard being moved must be in read-only mode. -func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) { - sidList := make([]string, len(prm.shardID)) - for i := range prm.shardID { - sidList[i] = prm.shardID[i].String() +// Evacuate moves data from a set of given shards to other shards available to +// this engine (so at least one shard must be available unless faultHandler is +// given). Shards being moved must be in read-only mode. Will return an error +// if unable to get an object unless ignoreErrors is set to true. If unable +// to put an object into any of the provided shards invokes faultHandler +// (if provided, fails otherwise) which can return its own error to abort +// evacuation (or nil to continue). Returns the number of evacuated objects +// (which can be non-zero even in case of error). +func (e *StorageEngine) Evacuate(shardIDs []*shard.ID, ignoreErrors bool, faultHandler func(oid.Address, *objectSDK.Object) error) (int, error) { + sidList := make([]string, len(shardIDs)) + for i := range shardIDs { + sidList[i] = shardIDs[i].String() } e.mtx.RLock() @@ -68,18 +41,18 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) sh, ok := e.shards[sidList[i]] if !ok { e.mtx.RUnlock() - return EvacuateShardRes{}, errShardNotFound + return 0, errShardNotFound } if !sh.GetMode().ReadOnly() { e.mtx.RUnlock() - return EvacuateShardRes{}, shard.ErrMustBeReadOnly + return 0, shard.ErrMustBeReadOnly } } - if len(e.shards)-len(sidList) < 1 && prm.handler == nil { + if len(e.shards)-len(sidList) < 1 && faultHandler == nil { e.mtx.RUnlock() - return EvacuateShardRes{}, errMustHaveTwoShards + return 0, errMustHaveTwoShards } e.log.Info("started shards evacuation", zap.Strings("shard_ids", sidList)) @@ -108,7 +81,7 @@ func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) var listPrm shard.ListWithCursorPrm listPrm.WithCount(defaultEvacuateBatchSize) - var res EvacuateShardRes + var count int mainLoop: for n := range sidList { @@ -125,7 +98,7 @@ mainLoop: if errors.Is(err, meta.ErrEndOfListing) || errors.Is(err, shard.ErrDegradedMode) { continue mainLoop } - return res, err + return count, err } // TODO (@fyrchik): #1731 parallelize the loop @@ -141,10 +114,10 @@ mainLoop: getRes, err := sh.Get(getPrm) if err != nil { - if prm.ignoreErrors { + if ignoreErrors { continue } - return res, err + return count, err } hrw.Sort(shards, addrHash) @@ -160,7 +133,7 @@ mainLoop: zap.Stringer("to", shards[j].ID()), zap.Stringer("addr", addr)) - res.count++ + count++ } continue loop } @@ -168,17 +141,17 @@ mainLoop: e.log.Debug("could not put to shard, trying another", zap.String("shard", shards[j].ID().String())) } - if prm.handler == nil { + if faultHandler == nil { // Do not check ignoreErrors flag here because // ignoring errors on put make this command kinda useless. - return res, fmt.Errorf("%w: %s", errPutShard, lst[i]) + return count, fmt.Errorf("%w: %s", errPutShard, lst[i]) } - err = prm.handler(addr, getRes.Object()) + err = faultHandler(addr, getRes.Object()) if err != nil { - return res, err + return count, err } - res.count++ + count++ } c = listRes.Cursor() @@ -187,5 +160,5 @@ mainLoop: e.log.Info("finished shards evacuation", zap.Strings("shard_ids", sidList)) - return res, nil + return count, nil } diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index f4c3b283a6..39098c1e22 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -89,20 +89,17 @@ func TestEvacuateShard(t *testing.T) { checkHasObjects(t) - var prm EvacuateShardPrm - prm.WithShardIDList(ids[2:3]) - t.Run("must be read-only", func(t *testing.T) { - res, err := e.Evacuate(prm) + count, err := e.Evacuate(ids[2:3], false, nil) require.ErrorIs(t, err, shard.ErrMustBeReadOnly) - require.Equal(t, 0, res.Count()) + require.Equal(t, 0, count) }) require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly)) - res, err := e.Evacuate(prm) + count, err := e.Evacuate(ids[2:3], false, nil) require.NoError(t, err) - require.Equal(t, objPerShard, res.count) + require.Equal(t, objPerShard, count) // We check that all objects are available both before and after shard removal. // First case is a real-world use-case. It ensures that an object can be put in presence @@ -111,9 +108,9 @@ func TestEvacuateShard(t *testing.T) { checkHasObjects(t) // Calling it again is OK, but all objects are already moved, so no new PUTs should be done. - res, err = e.Evacuate(prm) + count, err = e.Evacuate(ids[2:3], false, nil) require.NoError(t, err) - require.Equal(t, 0, res.count) + require.Equal(t, 0, count) checkHasObjects(t) @@ -153,18 +150,13 @@ func TestEvacuateNetwork(t *testing.T) { require.NoError(t, e.shards[evacuateShardID].SetMode(mode.ReadOnly)) - var prm EvacuateShardPrm - prm.shardID = ids[0:1] - - res, err := e.Evacuate(prm) + count, err := e.Evacuate(ids[0:1], false, nil) require.ErrorIs(t, err, errMustHaveTwoShards) - require.Equal(t, 0, res.Count()) + require.Equal(t, 0, count) - prm.handler = acceptOneOf(objects, 2) - - res, err = e.Evacuate(prm) + count, err = e.Evacuate(ids[0:1], false, acceptOneOf(objects, 2)) require.ErrorIs(t, err, errReplication) - require.Equal(t, 2, res.Count()) + require.Equal(t, 2, count) }) t.Run("multiple shards, evacuate one", func(t *testing.T) { e, ids, objects := newEngineEvacuate(t, 2, 3) @@ -172,20 +164,14 @@ func TestEvacuateNetwork(t *testing.T) { require.NoError(t, e.shards[ids[0].String()].SetMode(mode.ReadOnly)) require.NoError(t, e.shards[ids[1].String()].SetMode(mode.ReadOnly)) - var prm EvacuateShardPrm - prm.shardID = ids[1:2] - prm.handler = acceptOneOf(objects, 2) - - res, err := e.Evacuate(prm) + count, err := e.Evacuate(ids[1:2], false, acceptOneOf(objects, 2)) require.ErrorIs(t, err, errReplication) - require.Equal(t, 2, res.Count()) + require.Equal(t, 2, count) t.Run("no errors", func(t *testing.T) { - prm.handler = acceptOneOf(objects, 3) - - res, err := e.Evacuate(prm) + count, err := e.Evacuate(ids[1:2], false, acceptOneOf(objects, 3)) require.NoError(t, err) - require.Equal(t, 3, res.Count()) + require.Equal(t, 3, count) }) }) t.Run("multiple shards, evacuate many", func(t *testing.T) { @@ -204,20 +190,14 @@ func TestEvacuateNetwork(t *testing.T) { require.NoError(t, e.shards[ids[i].String()].SetMode(mode.ReadOnly)) } - var prm EvacuateShardPrm - prm.shardID = evacuateIDs - prm.handler = acceptOneOf(objects, totalCount-1) - - res, err := e.Evacuate(prm) + count, err := e.Evacuate(evacuateIDs, false, acceptOneOf(objects, totalCount-1)) require.ErrorIs(t, err, errReplication) - require.Equal(t, totalCount-1, res.Count()) + require.Equal(t, totalCount-1, count) t.Run("no errors", func(t *testing.T) { - prm.handler = acceptOneOf(objects, totalCount) - - res, err := e.Evacuate(prm) + count, err = e.Evacuate(evacuateIDs, false, acceptOneOf(objects, totalCount)) require.NoError(t, err) - require.Equal(t, totalCount, res.Count()) + require.Equal(t, totalCount, count) }) }) } diff --git a/pkg/services/control/server/evacuate.go b/pkg/services/control/server/evacuate.go index 56794a3d44..2e6d137b63 100644 --- a/pkg/services/control/server/evacuate.go +++ b/pkg/services/control/server/evacuate.go @@ -8,7 +8,6 @@ import ( "slices" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/services/control" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/services/replicator" @@ -31,19 +30,14 @@ func (s *Server) EvacuateShard(_ context.Context, req *control.EvacuateShardRequ return nil, err } - var prm engine.EvacuateShardPrm - prm.WithShardIDList(s.getShardIDList(req.GetBody().GetShard_ID())) - prm.WithIgnoreErrors(req.GetBody().GetIgnoreErrors()) - prm.WithFaultHandler(s.replicate) - - res, err := s.storage.Evacuate(prm) + count, err := s.storage.Evacuate(s.getShardIDList(req.GetBody().GetShard_ID()), req.GetBody().GetIgnoreErrors(), s.replicate) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } resp := &control.EvacuateShardResponse{ Body: &control.EvacuateShardResponse_Body{ - Count: uint32(res.Count()), + Count: uint32(count), }, } From ccbacc37f1035e36756a32f61e665e946ce0443e Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 11 Nov 2024 18:54:52 +0300 Subject: [PATCH 04/16] engine: simplify FlushWriteCache interface No Prm/Res and ignore option is dropped too since there were no users of it. Signed-off-by: Roman Khimov --- pkg/local_object_storage/engine/writecache.go | 34 +++---------------- pkg/services/control/server/flush_cache.go | 6 +--- 2 files changed, 6 insertions(+), 34 deletions(-) diff --git a/pkg/local_object_storage/engine/writecache.go b/pkg/local_object_storage/engine/writecache.go index 84359cdae2..d7dd469b38 100644 --- a/pkg/local_object_storage/engine/writecache.go +++ b/pkg/local_object_storage/engine/writecache.go @@ -4,39 +4,15 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" ) -// FlushWriteCachePrm groups the parameters of FlushWriteCache operation. -type FlushWriteCachePrm struct { - shardID *shard.ID - ignoreErrors bool -} - -// SetShardID is an option to set shard ID. -// -// Option is required. -func (p *FlushWriteCachePrm) SetShardID(id *shard.ID) { - p.shardID = id -} - -// SetIgnoreErrors sets errors ignore flag.. -func (p *FlushWriteCachePrm) SetIgnoreErrors(ignore bool) { - p.ignoreErrors = ignore -} - -// FlushWriteCacheRes groups the resulting values of FlushWriteCache operation. -type FlushWriteCacheRes struct{} - -// FlushWriteCache flushes write-cache on a single shard. -func (e *StorageEngine) FlushWriteCache(p FlushWriteCachePrm) (FlushWriteCacheRes, error) { +// FlushWriteCache flushes write-cache on a single shard with the given ID. +func (e *StorageEngine) FlushWriteCache(id *shard.ID) error { e.mtx.RLock() - sh, ok := e.shards[p.shardID.String()] + sh, ok := e.shards[id.String()] e.mtx.RUnlock() if !ok { - return FlushWriteCacheRes{}, errShardNotFound + return errShardNotFound } - var prm shard.FlushWriteCachePrm - prm.SetIgnoreErrors(p.ignoreErrors) - - return FlushWriteCacheRes{}, sh.FlushWriteCache(prm) + return sh.FlushWriteCache(shard.FlushWriteCachePrm{}) } diff --git a/pkg/services/control/server/flush_cache.go b/pkg/services/control/server/flush_cache.go index 0835b4b2d2..a6305deefc 100644 --- a/pkg/services/control/server/flush_cache.go +++ b/pkg/services/control/server/flush_cache.go @@ -3,7 +3,6 @@ package control import ( "context" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/services/control" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -22,10 +21,7 @@ func (s *Server) FlushCache(_ context.Context, req *control.FlushCacheRequest) ( } for _, shardID := range s.getShardIDList(req.GetBody().GetShard_ID()) { - var prm engine.FlushWriteCachePrm - prm.SetShardID(shardID) - - _, err = s.storage.FlushWriteCache(prm) + err = s.storage.FlushWriteCache(shardID) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } From b919df49b71a1573806a0e0009eb7cbab64868ee Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 11 Nov 2024 19:06:34 +0300 Subject: [PATCH 05/16] shard: simplify Get interface Pure refactoring, no functional changes. Signed-off-by: Roman Khimov --- .../engine/container_test.go | 8 +--- .../engine/delete_test.go | 5 +- pkg/local_object_storage/engine/error_test.go | 16 +++---- .../engine/evacuate_test.go | 5 +- pkg/local_object_storage/engine/get.go | 48 +++++-------------- pkg/local_object_storage/engine/inhume.go | 7 +-- pkg/local_object_storage/engine/put_test.go | 9 ++-- pkg/services/object/get/util.go | 10 +--- 8 files changed, 28 insertions(+), 80 deletions(-) diff --git a/pkg/local_object_storage/engine/container_test.go b/pkg/local_object_storage/engine/container_test.go index 2c3ae9dded..3a20e86144 100644 --- a/pkg/local_object_storage/engine/container_test.go +++ b/pkg/local_object_storage/engine/container_test.go @@ -64,12 +64,8 @@ func TestStorageEngine_ContainerCleanUp(t *testing.T) { require.NoError(t, e.Init()) require.Eventually(t, func() bool { - var prmGet GetPrm - prmGet.WithAddress(object.AddressOf(&o1)) - _, err1 := e.Get(prmGet) - - prmGet.WithAddress(object.AddressOf(&o2)) - _, err2 := e.Get(prmGet) + _, err1 := e.Get(object.AddressOf(&o1)) + _, err2 := e.Get(object.AddressOf(&o2)) return errors.Is(err1, new(apistatus.ObjectNotFound)) && errors.Is(err2, new(apistatus.ObjectNotFound)) }, time.Second, 100*time.Millisecond) diff --git a/pkg/local_object_storage/engine/delete_test.go b/pkg/local_object_storage/engine/delete_test.go index 400ea81ac5..892270ce52 100644 --- a/pkg/local_object_storage/engine/delete_test.go +++ b/pkg/local_object_storage/engine/delete_test.go @@ -83,10 +83,7 @@ func TestDeleteBigObject(t *testing.T) { } func checkGetError(t *testing.T, e *StorageEngine, addr oid.Address, expected any) { - var getPrm GetPrm - getPrm.WithAddress(addr) - - _, err := e.Get(getPrm) + _, err := e.Get(addr) if expected != nil { require.ErrorAs(t, err, expected) } else { diff --git a/pkg/local_object_storage/engine/error_test.go b/pkg/local_object_storage/engine/error_test.go index 7c9b549c4e..31af531751 100644 --- a/pkg/local_object_storage/engine/error_test.go +++ b/pkg/local_object_storage/engine/error_test.go @@ -75,7 +75,7 @@ func TestErrorReporting(t *testing.T) { e.mtx.RUnlock() require.NoError(t, err) - _, err = e.Get(GetPrm{addr: object.AddressOf(obj)}) + _, err = e.Get(object.AddressOf(obj)) require.NoError(t, err) checkShardState(t, e, id[0], 0, mode.ReadWrite) @@ -84,7 +84,7 @@ func TestErrorReporting(t *testing.T) { corruptSubDir(t, filepath.Join(dir, "0")) for i := uint32(1); i < 3; i++ { - _, err = e.Get(GetPrm{addr: object.AddressOf(obj)}) + _, err = e.Get(object.AddressOf(obj)) require.Error(t, err) checkShardState(t, e, id[0], i, mode.ReadWrite) checkShardState(t, e, id[1], 0, mode.ReadWrite) @@ -105,7 +105,7 @@ func TestErrorReporting(t *testing.T) { e.mtx.RUnlock() require.NoError(t, err) - _, err = e.Get(GetPrm{addr: object.AddressOf(obj)}) + _, err = e.Get(object.AddressOf(obj)) require.NoError(t, err) checkShardState(t, e, id[0], 0, mode.ReadWrite) @@ -114,14 +114,14 @@ func TestErrorReporting(t *testing.T) { corruptSubDir(t, filepath.Join(dir, "0")) for i := uint32(1); i < errThreshold; i++ { - _, err = e.Get(GetPrm{addr: object.AddressOf(obj)}) + _, err = e.Get(object.AddressOf(obj)) require.Error(t, err) checkShardState(t, e, id[0], i, mode.ReadWrite) checkShardState(t, e, id[1], 0, mode.ReadWrite) } for i := range uint32(2) { - _, err = e.Get(GetPrm{addr: object.AddressOf(obj)}) + _, err = e.Get(object.AddressOf(obj)) require.Error(t, err) checkShardState(t, e, id[0], errThreshold+i, mode.DegradedReadOnly) checkShardState(t, e, id[1], 0, mode.ReadWrite) @@ -159,7 +159,7 @@ func TestBlobstorFailback(t *testing.T) { for i := range objs { addr := object.AddressOf(objs[i]) - _, err = e.Get(GetPrm{addr: addr}) + _, err = e.Get(addr) require.NoError(t, err) _, err = e.GetRange(RngPrm{addr: addr}) require.NoError(t, err) @@ -179,9 +179,9 @@ func TestBlobstorFailback(t *testing.T) { for i := range objs { addr := object.AddressOf(objs[i]) - getRes, err := e.Get(GetPrm{addr: addr}) + getObj, err := e.Get(addr) require.NoError(t, err) - require.Equal(t, objs[i], getRes.Object()) + require.Equal(t, objs[i], getObj) rngRes, err := e.GetRange(RngPrm{addr: addr, off: 1, ln: 10}) require.NoError(t, err) diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 39098c1e22..13ebb7bfd5 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -79,10 +79,7 @@ func TestEvacuateShard(t *testing.T) { checkHasObjects := func(t *testing.T) { for i := range objects { - var prm GetPrm - prm.WithAddress(objectCore.AddressOf(objects[i])) - - _, err := e.Get(prm) + _, err := e.Get(objectCore.AddressOf(objects[i])) require.NoError(t, err) } } diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index 43ed90eb10..de4500d7ae 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -12,28 +12,6 @@ import ( "go.uber.org/zap" ) -// GetPrm groups the parameters of Get operation. -type GetPrm struct { - addr oid.Address -} - -// GetRes groups the resulting values of Get operation. -type GetRes struct { - obj *objectSDK.Object -} - -// WithAddress is a Get option to set the address of the requested object. -// -// Option is required. -func (p *GetPrm) WithAddress(addr oid.Address) { - p.addr = addr -} - -// Object returns the requested object. -func (r GetRes) Object() *objectSDK.Object { - return r.obj -} - // Get reads an object from local storage. // // Returns any error encountered that @@ -43,22 +21,26 @@ func (r GetRes) Object() *objectSDK.Object { // Returns an error of type apistatus.ObjectAlreadyRemoved if the object has been marked as removed. // // Returns an error if executions are blocked (see BlockExecution). -func (e *StorageEngine) Get(prm GetPrm) (res GetRes, err error) { - var sp shard.GetPrm - sp.SetAddress(prm.addr) +func (e *StorageEngine) Get(addr oid.Address) (*objectSDK.Object, error) { + var ( + err error + obj *objectSDK.Object + sp shard.GetPrm + ) + sp.SetAddress(addr) err = e.execIfNotBlocked(func() error { - return e.get(prm.addr, func(s *shard.Shard, ignoreMetadata bool) (hasMetadata bool, err error) { + return e.get(addr, func(s *shard.Shard, ignoreMetadata bool) (bool, error) { sp.SetIgnoreMeta(ignoreMetadata) sr, err := s.Get(sp) if err != nil { return sr.HasMeta(), err } - res.obj = sr.Object() + obj = sr.Object() return sr.HasMeta(), nil }) }) - return + return obj, err } func (e *StorageEngine) get(addr oid.Address, shardFunc func(s *shard.Shard, ignoreMetadata bool) (hasMetadata bool, err error)) error { @@ -164,15 +146,7 @@ func (e *StorageEngine) get(addr oid.Address, shardFunc func(s *shard.Shard, ign // Get reads object from local storage by provided address. func Get(storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) { - var getPrm GetPrm - getPrm.WithAddress(addr) - - res, err := storage.Get(getPrm) - if err != nil { - return nil, err - } - - return res.Object(), nil + return storage.Get(addr) } // GetBytes reads object from the StorageEngine by address into memory buffer in diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index bd21f1def7..e6061721b5 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -185,10 +185,7 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm) (bool, linkAddr.SetContainer(addr.Container()) linkAddr.SetObject(linkID) - var getPrm GetPrm - getPrm.WithAddress(linkAddr) - - res, err := e.Get(getPrm) + linkObj, err := e.Get(linkAddr) if err != nil { e.log.Error("inhuming root object but no link object is found", zap.Error(err)) @@ -199,8 +196,6 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm) (bool, return true } - linkObj := res.Object() - // v2 split if linkObj.Type() == objectSDK.TypeLink { var link objectSDK.Link diff --git a/pkg/local_object_storage/engine/put_test.go b/pkg/local_object_storage/engine/put_test.go index 4fead6c3df..e4e2086c95 100644 --- a/pkg/local_object_storage/engine/put_test.go +++ b/pkg/local_object_storage/engine/put_test.go @@ -36,11 +36,9 @@ func TestStorageEngine_PutBinary(t *testing.T) { _, err := e.Put(putPrm) require.NoError(t, err) - var getPrm GetPrm - getPrm.WithAddress(addr) - res, err := e.Get(getPrm) + gotObj, err := e.Get(addr) require.NoError(t, err) - require.Equal(t, &obj, res.Object()) + require.Equal(t, &obj, gotObj) b, err := e.GetBytes(addr) require.NoError(t, err) @@ -59,7 +57,6 @@ func TestStorageEngine_PutBinary(t *testing.T) { require.NoError(t, err) require.Equal(t, invalidObjBin, b) - getPrm.WithAddress(addr) - _, err = e.Get(getPrm) + _, err = e.Get(addr) require.Error(t, err) } diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index ee7c012e18..4437ecfc8d 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -220,15 +220,7 @@ func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) { return r.Object(), nil } - var getPrm engine.GetPrm - getPrm.WithAddress(exec.address()) - - r, err := e.engine.Get(getPrm) - if err != nil { - return nil, err - } - - return r.Object(), nil + return e.engine.Get(exec.address()) } func (w *partWriter) WriteChunk(p []byte) error { From 2fb09064a7e72533df6adb31c758db95b930dba8 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 11 Nov 2024 21:12:27 +0300 Subject: [PATCH 06/16] engine: simplify GetRange interface Signed-off-by: Roman Khimov --- pkg/local_object_storage/engine/error_test.go | 8 +- pkg/local_object_storage/engine/range.go | 78 ++++++------------- pkg/services/object/get/util.go | 11 ++- 3 files changed, 34 insertions(+), 63 deletions(-) diff --git a/pkg/local_object_storage/engine/error_test.go b/pkg/local_object_storage/engine/error_test.go index 31af531751..371a7e5f56 100644 --- a/pkg/local_object_storage/engine/error_test.go +++ b/pkg/local_object_storage/engine/error_test.go @@ -161,7 +161,7 @@ func TestBlobstorFailback(t *testing.T) { addr := object.AddressOf(objs[i]) _, err = e.Get(addr) require.NoError(t, err) - _, err = e.GetRange(RngPrm{addr: addr}) + _, err = e.GetRange(addr, 0, 0) require.NoError(t, err) } @@ -183,11 +183,11 @@ func TestBlobstorFailback(t *testing.T) { require.NoError(t, err) require.Equal(t, objs[i], getObj) - rngRes, err := e.GetRange(RngPrm{addr: addr, off: 1, ln: 10}) + rngRes, err := e.GetRange(addr, 1, 10) require.NoError(t, err) - require.Equal(t, objs[i].Payload()[1:11], rngRes.Object().Payload()) + require.Equal(t, objs[i].Payload()[1:11], rngRes) - _, err = e.GetRange(RngPrm{addr: addr, off: errSmallSize + 10, ln: 1}) + _, err = e.GetRange(addr, errSmallSize+10, 1) require.ErrorAs(t, err, &apistatus.ObjectOutOfRange{}) } diff --git a/pkg/local_object_storage/engine/range.go b/pkg/local_object_storage/engine/range.go index 907f05450c..1050a56782 100644 --- a/pkg/local_object_storage/engine/range.go +++ b/pkg/local_object_storage/engine/range.go @@ -12,33 +12,11 @@ import ( "go.uber.org/zap" ) -// RngPrm groups the parameters of GetRange operation. -type RngPrm struct { - off, ln uint64 - - addr oid.Address -} - // RngRes groups the resulting values of GetRange operation. type RngRes struct { obj *objectSDK.Object } -// WithAddress is a GetRng option to set the address of the requested object. -// -// Option is required. -func (p *RngPrm) WithAddress(addr oid.Address) { - p.addr = addr -} - -// WithPayloadRange is a GetRange option to set range of requested payload data. -// -// Missing an option or calling with zero length is equivalent -// to getting the full payload range. -func (p *RngPrm) WithPayloadRange(rng *objectSDK.Range) { - p.off, p.ln = rng.GetOffset(), rng.GetLength() -} - // Object returns the requested object part. // // Instance payload contains the requested range of the original object. @@ -46,7 +24,8 @@ func (r RngRes) Object() *objectSDK.Object { return r.obj } -// GetRange reads part of an object from local storage. +// GetRange reads a part of an object from local storage. Zero length is +// interpreted as requiring full object length independent of the offset. // // Returns any error encountered that // did not allow to completely read the object part. @@ -56,22 +35,26 @@ func (r RngRes) Object() *objectSDK.Object { // Returns ErrRangeOutOfBounds if the requested object range is out of bounds. // // Returns an error if executions are blocked (see BlockExecution). -func (e *StorageEngine) GetRange(prm RngPrm) (res RngRes, err error) { +func (e *StorageEngine) GetRange(addr oid.Address, offset uint64, length uint64) ([]byte, error) { + var ( + err error + res []byte + ) err = e.execIfNotBlocked(func() error { - res, err = e.getRange(prm) + res, err = e.getRange(addr, offset, length) return err }) - return + return res, err } -func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) { +func (e *StorageEngine) getRange(addr oid.Address, offset uint64, length uint64) ([]byte, error) { if e.metrics != nil { defer elapsed(e.metrics.AddRangeDuration)() } var ( - obj *objectSDK.Object + out []byte siErr *objectSDK.SplitInfoError errNotFound apistatus.ObjectNotFound @@ -86,10 +69,10 @@ func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) { var hasDegraded bool var shPrm shard.RngPrm - shPrm.SetAddress(prm.addr) - shPrm.SetRange(prm.off, prm.ln) + shPrm.SetAddress(addr) + shPrm.SetRange(offset, length) - e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { + e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { noMeta := sh.GetMode().NoMetabase() hasDegraded = hasDegraded || noMeta shPrm.SetIgnoreMeta(noMeta) @@ -124,20 +107,20 @@ func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) { } } - obj = res.Object() + out = res.Object().Payload() return true }) if outSI != nil { - return RngRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) + return nil, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) } - if obj == nil { + if out == nil { // If any shard is in a degraded mode, we should assume that metabase could store // info about some object. if shardWithMeta.Shard == nil && !hasDegraded || !shard.IsErrNotFound(outError) { - return RngRes{}, outError + return nil, outError } // If the object is not found but is present in metabase, @@ -145,7 +128,7 @@ func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) { // blobstor, increase the error counter for the shard which contains the meta. shPrm.SetIgnoreMeta(true) - e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { + e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { if sh.GetMode().NoMetabase() { // Already processed it without a metabase. return false @@ -158,34 +141,23 @@ func (e *StorageEngine) getRange(prm RngPrm) (RngRes, error) { outError = errOutOfRange return true } - obj = res.Object() + out = res.Object().Payload() return err == nil }) - if obj == nil { - return RngRes{}, outError + if out == nil { + return nil, outError } if shardWithMeta.Shard != nil { e.reportShardError(shardWithMeta, "meta info was present, but object is missing", metaError, - zap.Stringer("address", prm.addr)) + zap.Stringer("address", addr)) } } - return RngRes{ - obj: obj, - }, nil + return out, nil } // GetRange reads object payload range from local storage by provided address. func GetRange(storage *StorageEngine, addr oid.Address, rng *objectSDK.Range) ([]byte, error) { - var rangePrm RngPrm - rangePrm.WithAddress(addr) - rangePrm.WithPayloadRange(rng) - - res, err := storage.GetRange(rangePrm) - if err != nil { - return nil, err - } - - return res.Object().Payload(), nil + return storage.GetRange(addr, rng.GetOffset(), rng.GetLength()) } diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index 4437ecfc8d..6fbfd326d4 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -208,16 +208,15 @@ func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) { } if rng := exec.ctxRange(); rng != nil { - var getRange engine.RngPrm - getRange.WithAddress(exec.address()) - getRange.WithPayloadRange(rng) - - r, err := e.engine.GetRange(getRange) + r, err := e.engine.GetRange(exec.address(), rng.GetOffset(), rng.GetLength()) if err != nil { return nil, err } - return r.Object(), nil + o := object.New() + o.SetPayload(r) + + return o, nil } return e.engine.Get(exec.address()) From ea5c6db1f19221ed76fff033377820848e49bbb1 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 11 Nov 2024 21:18:49 +0300 Subject: [PATCH 07/16] engine: simplify Head interface Signed-off-by: Roman Khimov --- pkg/local_object_storage/engine/head.go | 83 +++++--------------- pkg/local_object_storage/engine/head_test.go | 6 +- pkg/services/object/get/util.go | 8 +- 3 files changed, 22 insertions(+), 75 deletions(-) diff --git a/pkg/local_object_storage/engine/head.go b/pkg/local_object_storage/engine/head.go index 0319b5854d..4d0f886dac 100644 --- a/pkg/local_object_storage/engine/head.go +++ b/pkg/local_object_storage/engine/head.go @@ -11,39 +11,8 @@ import ( oid "github.com/nspcc-dev/neofs-sdk-go/object/id" ) -// HeadPrm groups the parameters of Head operation. -type HeadPrm struct { - addr oid.Address - raw bool -} - -// HeadRes groups the resulting values of Head operation. -type HeadRes struct { - head *objectSDK.Object -} - -// WithAddress is a Head option to set the address of the requested object. -// -// Option is required. -func (p *HeadPrm) WithAddress(addr oid.Address) { - p.addr = addr -} - -// WithRaw is a Head option to set raw flag value. If flag is unset, then Head -// returns the header of the virtual object, otherwise it returns SplitInfo of the virtual -// object. -func (p *HeadPrm) WithRaw(raw bool) { - p.raw = raw -} - -// Header returns the requested object header. -// -// Instance has empty payload. -func (r HeadRes) Header() *objectSDK.Object { - return r.head -} - -// Head reads object header from local storage. +// Head reads object header from local storage. If raw is true returns +// SplitInfo of the virtual object instead of the virtual object header. // // Returns any error encountered that // did not allow to completely read the object header. @@ -52,16 +21,21 @@ func (r HeadRes) Header() *objectSDK.Object { // Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object was inhumed. // // Returns an error if executions are blocked (see BlockExecution). -func (e *StorageEngine) Head(prm HeadPrm) (res HeadRes, err error) { +func (e *StorageEngine) Head(addr oid.Address, raw bool) (*objectSDK.Object, error) { + var ( + obj *objectSDK.Object + err error + ) + err = e.execIfNotBlocked(func() error { - res, err = e.head(prm) + obj, err = e.head(addr, raw) return err }) - return + return obj, err } -func (e *StorageEngine) head(prm HeadPrm) (HeadRes, error) { +func (e *StorageEngine) head(addr oid.Address, raw bool) (*objectSDK.Object, error) { if e.metrics != nil { defer elapsed(e.metrics.AddHeadDuration)() } @@ -77,10 +51,10 @@ func (e *StorageEngine) head(prm HeadPrm) (HeadRes, error) { ) var shPrm shard.HeadPrm - shPrm.SetAddress(prm.addr) - shPrm.SetRaw(prm.raw) + shPrm.SetAddress(addr) + shPrm.SetRaw(raw) - e.iterateOverSortedShards(prm.addr, func(_ int, sh hashedShard) (stop bool) { + e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) { res, err := sh.Head(shPrm) if err != nil { switch { @@ -119,42 +93,23 @@ func (e *StorageEngine) head(prm HeadPrm) (HeadRes, error) { }) if outSI != nil { - return HeadRes{}, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) + return nil, logicerr.Wrap(objectSDK.NewSplitInfoError(outSI)) } if head == nil { - return HeadRes{}, outError + return nil, outError } - return HeadRes{ - head: head, - }, nil + return head, nil } // Head reads object header from local storage by provided address. func Head(storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) { - var headPrm HeadPrm - headPrm.WithAddress(addr) - - res, err := storage.Head(headPrm) - if err != nil { - return nil, err - } - - return res.Header(), nil + return storage.Head(addr, false) } // HeadRaw reads object header from local storage by provided address and raw // flag. func HeadRaw(storage *StorageEngine, addr oid.Address, raw bool) (*objectSDK.Object, error) { - var headPrm HeadPrm - headPrm.WithAddress(addr) - headPrm.WithRaw(raw) - - res, err := storage.Head(headPrm) - if err != nil { - return nil, err - } - - return res.Header(), nil + return storage.Head(addr, true) } diff --git a/pkg/local_object_storage/engine/head_test.go b/pkg/local_object_storage/engine/head_test.go index 50d162c552..6d133d2c34 100644 --- a/pkg/local_object_storage/engine/head_test.go +++ b/pkg/local_object_storage/engine/head_test.go @@ -61,11 +61,7 @@ func TestHeadRaw(t *testing.T) { require.NoError(t, err) // head with raw flag should return SplitInfoError - var headPrm HeadPrm - headPrm.WithAddress(parentAddr) - headPrm.WithRaw(true) - - _, err = e.Head(headPrm) + _, err = e.Head(parentAddr, true) require.Error(t, err) var si *object.SplitInfoError diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index 6fbfd326d4..34112cb471 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -195,16 +195,12 @@ func (c *clientWrapper) get(exec *execCtx, key *ecdsa.PrivateKey) (*object.Objec func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) { if exec.headOnly() { - var headPrm engine.HeadPrm - headPrm.WithAddress(exec.address()) - headPrm.WithRaw(exec.isRaw()) - - r, err := e.engine.Head(headPrm) + r, err := e.engine.Head(exec.address(), exec.isRaw()) if err != nil { return nil, err } - return r.Header(), nil + return r, nil } if rng := exec.ctxRange(); rng != nil { From fa47327cede147a0ea516bd1fd7177fa5f759755 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 11 Nov 2024 21:30:53 +0300 Subject: [PATCH 08/16] node: force removal of redundant object copy I believe it's more correct, since an object can be locked, but if we're gholding a copy no one needs it still should go away. Signed-off-by: Roman Khimov --- cmd/neofs-node/object.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 43ed23346e..a6f493e0ee 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -209,6 +209,7 @@ func initObjectService(c *cfg) { policer.WithRedundantCopyCallback(func(addr oid.Address) { var inhumePrm engine.InhumePrm inhumePrm.MarkAsGarbage(addr) + inhumePrm.WithForceRemoval() _, err := ls.Inhume(inhumePrm) if err != nil { From e012d78cb8cfbe9135d1044e154e614a791ccdce Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 11 Nov 2024 21:37:00 +0300 Subject: [PATCH 09/16] engine: deduplicate deleteObj/inhume code Both can check for locked status, but this doesn't make much sense, reuse code. Signed-off-by: Roman Khimov --- pkg/local_object_storage/engine/delete.go | 20 +++----------------- pkg/local_object_storage/engine/inhume.go | 3 +++ 2 files changed, 6 insertions(+), 17 deletions(-) diff --git a/pkg/local_object_storage/engine/delete.go b/pkg/local_object_storage/engine/delete.go index 1615177f08..9be2908bd5 100644 --- a/pkg/local_object_storage/engine/delete.go +++ b/pkg/local_object_storage/engine/delete.go @@ -1,10 +1,7 @@ package engine import ( - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" - apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" - "go.uber.org/zap" ) // Delete marks the objects to be removed. @@ -24,24 +21,13 @@ func (e *StorageEngine) deleteObj(addr oid.Address, force bool) error { defer elapsed(e.metrics.AddDeleteDuration)() } - if !force { - locked, err := e.isLocked(addr) - if err != nil { - e.log.Warn("deleting an object without full locking check", - zap.Error(err), - zap.Stringer("addr", addr)) - } else if locked { - return apistatus.ObjectLocked{} - } - } - - var inhumePrm shard.InhumePrm + var inhumePrm InhumePrm inhumePrm.MarkAsGarbage(addr) if force { - inhumePrm.ForceRemoval() + inhumePrm.WithForceRemoval() } - _, err := e.inhumeAddr(addr, inhumePrm) + _, err := e.inhumeInt(inhumePrm) return err } diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index e6061721b5..4a31f88c1a 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -76,7 +76,10 @@ func (e *StorageEngine) inhume(prm InhumePrm) (InhumeRes, error) { if e.metrics != nil { defer elapsed(e.metrics.AddInhumeDuration)() } + return e.inhumeInt(prm) +} +func (e *StorageEngine) inhumeInt(prm InhumePrm) (InhumeRes, error) { var shPrm shard.InhumePrm if prm.forceRemoval { shPrm.ForceRemoval() From a6b318193bbdf28f65e1a891f48ef9e77c9e8c93 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 11 Nov 2024 21:42:12 +0300 Subject: [PATCH 10/16] *: use engine.Delete instead of engine.Inhume where appropriate Delete is a forced removal of a given address, use it this way. Signed-off-by: Roman Khimov --- cmd/neofs-node/object.go | 6 +----- pkg/services/policer/check.go | 7 +------ 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index a6f493e0ee..16d0ff74b3 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -207,11 +207,7 @@ func initObjectService(c *cfg) { policer.WithHeadTimeout(c.applicationConfiguration.policer.headTimeout), policer.WithReplicator(c.replicator), policer.WithRedundantCopyCallback(func(addr oid.Address) { - var inhumePrm engine.InhumePrm - inhumePrm.MarkAsGarbage(addr) - inhumePrm.WithForceRemoval() - - _, err := ls.Inhume(inhumePrm) + err := ls.Delete(addr) if err != nil { c.log.Warn("could not inhume mark redundant copy as garbage", zap.String("error", err.Error()), diff --git a/pkg/services/policer/check.go b/pkg/services/policer/check.go index e2eddc1845..6f9cde2e0e 100644 --- a/pkg/services/policer/check.go +++ b/pkg/services/policer/check.go @@ -6,7 +6,6 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/container" objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" "github.com/nspcc-dev/neofs-node/pkg/services/replicator" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" @@ -88,11 +87,7 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add zap.String("error", err.Error()), ) if container.IsErrNotFound(err) { - var prm engine.InhumePrm - prm.MarkAsGarbage(addrWithType.Address) - prm.WithForceRemoval() - - _, err := p.jobQueue.localStorage.Inhume(prm) + err = p.jobQueue.localStorage.Delete(addrWithType.Address) if err != nil { p.log.Error("could not inhume object with missing container", zap.Stringer("cid", idCnr), From 8d166aaeb747ca519cf1f48f665ee996108d6f4b Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 11 Nov 2024 21:56:55 +0300 Subject: [PATCH 11/16] engine: drop MarkAsGarbage from the InhumePrm It's not used by any real code, Delete() is sufficient and Inhume can now be used for tombstone handling only. Signed-off-by: Roman Khimov --- pkg/local_object_storage/engine/delete.go | 8 +------ pkg/local_object_storage/engine/inhume.go | 23 +------------------ .../engine/inhume_test.go | 12 +++------- pkg/local_object_storage/engine/lock_test.go | 10 +++----- 4 files changed, 8 insertions(+), 45 deletions(-) diff --git a/pkg/local_object_storage/engine/delete.go b/pkg/local_object_storage/engine/delete.go index 9be2908bd5..3bae4af5d7 100644 --- a/pkg/local_object_storage/engine/delete.go +++ b/pkg/local_object_storage/engine/delete.go @@ -21,13 +21,7 @@ func (e *StorageEngine) deleteObj(addr oid.Address, force bool) error { defer elapsed(e.metrics.AddDeleteDuration)() } - var inhumePrm InhumePrm - inhumePrm.MarkAsGarbage(addr) - if force { - inhumePrm.WithForceRemoval() - } - - _, err := e.inhumeInt(inhumePrm) + _, err := e.inhumeInt(InhumePrm{addrs: []oid.Address{addr}, forceRemoval: force}) return err } diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 4a31f88c1a..531b7d6a70 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -36,21 +36,6 @@ func (p *InhumePrm) WithTombstone(tombstone oid.Address, tombExpiration uint64, p.tombExpiration = tombExpiration } -// MarkAsGarbage marks an object to be physically removed from local storage. -// -// Should not be called along with WithTombstone. -func (p *InhumePrm) MarkAsGarbage(addrs ...oid.Address) { - p.addrs = addrs - p.tombstone = nil -} - -// WithForceRemoval inhumes objects specified via MarkAsGarbage with GC mark -// without any object restrictions checks. -func (p *InhumePrm) WithForceRemoval() { - p.forceRemoval = true - p.tombstone = nil -} - var errInhumeFailure = errors.New("inhume operation failed") // Inhume calls metabase. Inhume method to mark an object as removed. It won't be @@ -59,9 +44,6 @@ var errInhumeFailure = errors.New("inhume operation failed") // Allows inhuming non-locked objects only. Returns apistatus.ObjectLocked // if at least one object is locked. // -// NOTE: Marks any object as removed (despite any prohibitions on operations -// with that object) if WithForceRemoval option has been provided. -// // Returns an error if executions are blocked (see BlockExecution). func (e *StorageEngine) Inhume(prm InhumePrm) (res InhumeRes, err error) { err = e.execIfNotBlocked(func() error { @@ -326,10 +308,7 @@ func (e *StorageEngine) isLocked(addr oid.Address) (bool, error) { } func (e *StorageEngine) processExpiredObjects(addrs []oid.Address) { - var prm InhumePrm - prm.MarkAsGarbage(addrs...) - - _, err := e.Inhume(prm) + _, err := e.inhumeInt(InhumePrm{addrs: addrs}) if err != nil { e.log.Warn("handling expired objects", zap.Error(err)) } diff --git a/pkg/local_object_storage/engine/inhume_test.go b/pkg/local_object_storage/engine/inhume_test.go index ce3d985243..314eae1397 100644 --- a/pkg/local_object_storage/engine/inhume_test.go +++ b/pkg/local_object_storage/engine/inhume_test.go @@ -144,10 +144,7 @@ func TestStorageEngine_Inhume(t *testing.T) { _, err = wrongShard.Get(getPrm) require.NoError(t, err) - var inhumePrm InhumePrm - inhumePrm.MarkAsGarbage(addr) - - _, err = e.Inhume(inhumePrm) + err = e.Delete(addr) require.NoError(t, err) // object was on the wrong (according to hash sorting) shard but is removed anyway @@ -161,14 +158,11 @@ func TestStorageEngine_Inhume(t *testing.T) { e := testNewEngineWithShardNum(t, 3) defer e.Close() - var inhumePrm InhumePrm - inhumePrm.MarkAsGarbage(addr) - - _, err := e.Inhume(inhumePrm) + err := e.Delete(addr) require.NoError(t, err) // object is marked as garbage but marking it again should not be a problem - _, err = e.Inhume(inhumePrm) + err = e.Delete(addr) require.NoError(t, err) }) } diff --git a/pkg/local_object_storage/engine/lock_test.go b/pkg/local_object_storage/engine/lock_test.go index c0846448f9..be4ae68934 100644 --- a/pkg/local_object_storage/engine/lock_test.go +++ b/pkg/local_object_storage/engine/lock_test.go @@ -244,12 +244,10 @@ func TestLockForceRemoval(t *testing.T) { require.NoError(t, err) // 3. - var inhumePrm InhumePrm - inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj)) - - _, err = e.Inhume(inhumePrm) + err = e.deleteObj(objectcore.AddressOf(obj), false) require.ErrorAs(t, err, new(apistatus.ObjectLocked)) + var inhumePrm InhumePrm inhumePrm.WithTombstone(oidtest.Address(), 0, objectcore.AddressOf(obj)) _, err = e.Inhume(inhumePrm) @@ -260,8 +258,6 @@ func TestLockForceRemoval(t *testing.T) { require.NoError(t, err) // 5. - inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj)) - - _, err = e.Inhume(inhumePrm) + err = e.deleteObj(objectcore.AddressOf(obj), false) require.NoError(t, err) } From c73ab95cc7424fcbb67f059173d2565199fe14c3 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 11 Nov 2024 22:14:25 +0300 Subject: [PATCH 12/16] engine: simplify Inhume() interface It's specifically for tombstones now since other deletion cases work via Delete(). Signed-off-by: Roman Khimov --- cmd/neofs-node/object.go | 7 +- pkg/local_object_storage/engine/delete.go | 4 +- pkg/local_object_storage/engine/inhume.go | 71 ++++++------------- .../engine/inhume_test.go | 10 +-- pkg/local_object_storage/engine/lock_test.go | 27 ++----- 5 files changed, 33 insertions(+), 86 deletions(-) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 16d0ff74b3..41967fe045 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -513,18 +513,13 @@ func (e storageEngine) IsLocked(address oid.Address) (bool, error) { } func (e storageEngine) Delete(tombstone oid.Address, tombExpiration uint64, toDelete []oid.ID) error { - var prm engine.InhumePrm - addrs := make([]oid.Address, len(toDelete)) for i := range addrs { addrs[i].SetContainer(tombstone.Container()) addrs[i].SetObject(toDelete[i]) } - prm.WithTombstone(tombstone, tombExpiration, addrs...) - - _, err := e.engine.Inhume(prm) - return err + return e.engine.Inhume(tombstone, tombExpiration, addrs...) } func (e storageEngine) Lock(locker oid.Address, toLock []oid.ID) error { diff --git a/pkg/local_object_storage/engine/delete.go b/pkg/local_object_storage/engine/delete.go index 3bae4af5d7..ed526e9421 100644 --- a/pkg/local_object_storage/engine/delete.go +++ b/pkg/local_object_storage/engine/delete.go @@ -21,7 +21,5 @@ func (e *StorageEngine) deleteObj(addr oid.Address, force bool) error { defer elapsed(e.metrics.AddDeleteDuration)() } - _, err := e.inhumeInt(InhumePrm{addrs: []oid.Address{addr}, forceRemoval: force}) - - return err + return e.inhumeInt([]oid.Address{addr}, force, nil, 0) } diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 531b7d6a70..b995250f34 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -13,89 +13,64 @@ import ( "go.uber.org/zap" ) -// InhumePrm encapsulates parameters for inhume operation. -type InhumePrm struct { - tombstone *oid.Address - tombExpiration uint64 - addrs []oid.Address - - forceRemoval bool -} - -// InhumeRes encapsulates results of inhume operation. -type InhumeRes struct{} - -// WithTombstone sets a list of objects that should be inhumed and tombstone address -// as the reason for inhume operation. -// -// addrs should not be empty. -// Should not be called along with MarkAsGarbage. -func (p *InhumePrm) WithTombstone(tombstone oid.Address, tombExpiration uint64, addrs ...oid.Address) { - p.addrs = addrs - p.tombstone = &tombstone - p.tombExpiration = tombExpiration -} - var errInhumeFailure = errors.New("inhume operation failed") -// Inhume calls metabase. Inhume method to mark an object as removed. It won't be -// removed physically from the shard until `Delete` operation. +// Inhume calls [metabase.Inhume] method to mark an object as removed following +// tombstone data. It won't be removed physically from the shard until GC cycle +// does it. // // Allows inhuming non-locked objects only. Returns apistatus.ObjectLocked // if at least one object is locked. // // Returns an error if executions are blocked (see BlockExecution). -func (e *StorageEngine) Inhume(prm InhumePrm) (res InhumeRes, err error) { - err = e.execIfNotBlocked(func() error { - res, err = e.inhume(prm) - return err +func (e *StorageEngine) Inhume(tombstone oid.Address, tombExpiration uint64, addrs ...oid.Address) error { + return e.execIfNotBlocked(func() error { + return e.inhume(tombstone, tombExpiration, addrs) }) - - return } -func (e *StorageEngine) inhume(prm InhumePrm) (InhumeRes, error) { +func (e *StorageEngine) inhume(tombstone oid.Address, tombExpiration uint64, addrs []oid.Address) error { if e.metrics != nil { defer elapsed(e.metrics.AddInhumeDuration)() } - return e.inhumeInt(prm) + return e.inhumeInt(addrs, false, &tombstone, tombExpiration) } -func (e *StorageEngine) inhumeInt(prm InhumePrm) (InhumeRes, error) { +func (e *StorageEngine) inhumeInt(addrs []oid.Address, force bool, tombstone *oid.Address, tombExpiration uint64) error { var shPrm shard.InhumePrm - if prm.forceRemoval { + if force { shPrm.ForceRemoval() } - for i := range prm.addrs { - if !prm.forceRemoval { - locked, err := e.IsLocked(prm.addrs[i]) + for i := range addrs { + if !force { + locked, err := e.IsLocked(addrs[i]) if err != nil { e.log.Warn("removing an object without full locking check", zap.Error(err), - zap.Stringer("addr", prm.addrs[i])) + zap.Stringer("addr", addrs[i])) } else if locked { var lockedErr apistatus.ObjectLocked - return InhumeRes{}, lockedErr + return lockedErr } } - if prm.tombstone != nil { - shPrm.InhumeByTomb(*prm.tombstone, prm.tombExpiration, prm.addrs[i]) + if tombstone != nil { + shPrm.InhumeByTomb(*tombstone, tombExpiration, addrs[i]) } else { - shPrm.MarkAsGarbage(prm.addrs[i]) + shPrm.MarkAsGarbage(addrs[i]) } - ok, err := e.inhumeAddr(prm.addrs[i], shPrm) + ok, err := e.inhumeAddr(addrs[i], shPrm) if err != nil { - return InhumeRes{}, err + return err } if !ok { - return InhumeRes{}, errInhumeFailure + return errInhumeFailure } } - return InhumeRes{}, nil + return nil } // InhumeContainer marks every object in a container as removed. @@ -308,7 +283,7 @@ func (e *StorageEngine) isLocked(addr oid.Address) (bool, error) { } func (e *StorageEngine) processExpiredObjects(addrs []oid.Address) { - _, err := e.inhumeInt(InhumePrm{addrs: addrs}) + err := e.inhumeInt(addrs, false, nil, 0) if err != nil { e.log.Warn("handling expired objects", zap.Error(err)) } diff --git a/pkg/local_object_storage/engine/inhume_test.go b/pkg/local_object_storage/engine/inhume_test.go index 314eae1397..6d573e9859 100644 --- a/pkg/local_object_storage/engine/inhume_test.go +++ b/pkg/local_object_storage/engine/inhume_test.go @@ -46,10 +46,7 @@ func TestStorageEngine_Inhume(t *testing.T) { err := Put(e, parent) require.NoError(t, err) - var inhumePrm InhumePrm - inhumePrm.WithTombstone(tombstoneID, 0, object.AddressOf(parent)) - - _, err = e.Inhume(inhumePrm) + err = e.Inhume(tombstoneID, 0, object.AddressOf(parent)) require.NoError(t, err) addrs, err := Select(e, cnr, fs) @@ -74,10 +71,7 @@ func TestStorageEngine_Inhume(t *testing.T) { _, err = s2.Put(putLink) require.NoError(t, err) - var inhumePrm InhumePrm - inhumePrm.WithTombstone(tombstoneID, 0, object.AddressOf(parent)) - - _, err = e.Inhume(inhumePrm) + err = e.Inhume(tombstoneID, 0, object.AddressOf(parent)) require.NoError(t, err) t.Run("empty search should fail", func(t *testing.T) { diff --git a/pkg/local_object_storage/engine/lock_test.go b/pkg/local_object_storage/engine/lock_test.go index be4ae68934..2af11e1470 100644 --- a/pkg/local_object_storage/engine/lock_test.go +++ b/pkg/local_object_storage/engine/lock_test.go @@ -96,10 +96,7 @@ func TestLockUserScenario(t *testing.T) { require.NoError(t, err) // 3. - var inhumePrm InhumePrm - inhumePrm.WithTombstone(tombAddr, 0, objAddr) - - _, err = e.Inhume(inhumePrm) + err = e.Inhume(tombAddr, 0, objAddr) require.ErrorAs(t, err, new(apistatus.ObjectLocked)) // 4. @@ -110,9 +107,7 @@ func TestLockUserScenario(t *testing.T) { err = Put(e, tombObj) require.NoError(t, err) - inhumePrm.WithTombstone(tombForLockAddr, 0, lockerAddr) - - _, err = e.Inhume(inhumePrm) + err = e.Inhume(tombForLockAddr, 0, lockerAddr) require.ErrorIs(t, err, meta.ErrLockObjectRemoval) // 5. @@ -121,9 +116,7 @@ func TestLockUserScenario(t *testing.T) { // delay for GC time.Sleep(time.Second) - inhumePrm.WithTombstone(tombAddr, 0, objAddr) - - _, err = e.Inhume(inhumePrm) + err = e.Inhume(tombAddr, 0, objAddr) require.NoError(t, err) } @@ -177,10 +170,7 @@ func TestLockExpiration(t *testing.T) { err = e.Lock(cnr, idLock, []oid.ID{id}) require.NoError(t, err) - var inhumePrm InhumePrm - inhumePrm.WithTombstone(oidtest.Address(), 0, objectcore.AddressOf(obj)) - - _, err = e.Inhume(inhumePrm) + err = e.Inhume(oidtest.Address(), 0, objectcore.AddressOf(obj)) require.ErrorAs(t, err, new(apistatus.ObjectLocked)) // 3. @@ -191,9 +181,7 @@ func TestLockExpiration(t *testing.T) { time.Sleep(time.Second) // 4. - inhumePrm.WithTombstone(oidtest.Address(), 0, objectcore.AddressOf(obj)) - - _, err = e.Inhume(inhumePrm) + err = e.Inhume(oidtest.Address(), 0, objectcore.AddressOf(obj)) require.NoError(t, err) } @@ -247,10 +235,7 @@ func TestLockForceRemoval(t *testing.T) { err = e.deleteObj(objectcore.AddressOf(obj), false) require.ErrorAs(t, err, new(apistatus.ObjectLocked)) - var inhumePrm InhumePrm - inhumePrm.WithTombstone(oidtest.Address(), 0, objectcore.AddressOf(obj)) - - _, err = e.Inhume(inhumePrm) + err = e.Inhume(oidtest.Address(), 0, objectcore.AddressOf(obj)) require.ErrorAs(t, err, new(apistatus.ObjectLocked)) // 4. From 2e2042b14d326ec88e18cbfe55bb1af54f6dc9b0 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 11 Nov 2024 22:23:44 +0300 Subject: [PATCH 13/16] engine: simplify List*/Select interfaces Signed-off-by: Roman Khimov --- cmd/neofs-lens/internal/storage/list.go | 11 +-- pkg/local_object_storage/engine/list.go | 58 +++------------ pkg/local_object_storage/engine/list_test.go | 19 ++--- pkg/local_object_storage/engine/select.go | 78 ++++++-------------- pkg/local_object_storage/engine/tree_test.go | 8 +- pkg/services/control/server/list_objects.go | 13 ++-- pkg/services/object/search/util.go | 8 +- pkg/services/policer/queue.go | 8 +- 8 files changed, 57 insertions(+), 146 deletions(-) diff --git a/cmd/neofs-lens/internal/storage/list.go b/cmd/neofs-lens/internal/storage/list.go index 86fb48494c..c8d19cf120 100644 --- a/cmd/neofs-lens/internal/storage/list.go +++ b/cmd/neofs-lens/internal/storage/list.go @@ -6,6 +6,7 @@ import ( "io" common "github.com/nspcc-dev/neofs-node/cmd/neofs-lens/internal" + objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/spf13/cobra" ) @@ -32,10 +33,12 @@ func listFunc(cmd *cobra.Command, _ []string) error { } defer storage.Close() - var p engine.ListWithCursorPrm - p.WithCount(1024) + var ( + addrs []objectcore.AddressWithType + cursor *engine.Cursor + ) for { - r, err := storage.ListWithCursor(p) + addrs, cursor, err = storage.ListWithCursor(1024, cursor) if err != nil { if errors.Is(err, engine.ErrEndOfListing) { return nil @@ -44,13 +47,11 @@ func listFunc(cmd *cobra.Command, _ []string) error { return fmt.Errorf("Storage iterator failure: %w", err) } } - var addrs = r.AddressList() for _, at := range addrs { _, err = io.WriteString(w, at.Address.String()+"\n") if err != nil { return fmt.Errorf("print failure: %w", err) } } - p.WithCursor(r.Cursor()) } } diff --git a/pkg/local_object_storage/engine/list.go b/pkg/local_object_storage/engine/list.go index c97a4d435e..6cec638772 100644 --- a/pkg/local_object_storage/engine/list.go +++ b/pkg/local_object_storage/engine/list.go @@ -12,55 +12,23 @@ import ( // cursor. Use nil cursor object to start listing again. var ErrEndOfListing = shard.ErrEndOfListing -// Cursor is a type for continuous object listing. +// Cursor is a type for continuous object listing. It's returned from +// [StorageEngine.ListWithCursor] and can be reused as a parameter for it for +// subsequent requests. type Cursor struct { shardID string shardCursor *shard.Cursor } -// ListWithCursorPrm contains parameters for ListWithCursor operation. -type ListWithCursorPrm struct { - count uint32 - cursor *Cursor -} - -// WithCount sets the maximum amount of addresses that ListWithCursor should return. -func (p *ListWithCursorPrm) WithCount(count uint32) { - p.count = count -} - -// WithCursor sets a cursor for ListWithCursor operation. For initial request -// ignore this param or use nil value. For consecutive requests, use value -// from ListWithCursorRes. -func (p *ListWithCursorPrm) WithCursor(cursor *Cursor) { - p.cursor = cursor -} - -// ListWithCursorRes contains values returned from ListWithCursor operation. -type ListWithCursorRes struct { - addrList []objectcore.AddressWithType - cursor *Cursor -} - -// AddressList returns addresses selected by ListWithCursor operation. -func (l ListWithCursorRes) AddressList() []objectcore.AddressWithType { - return l.addrList -} - -// Cursor returns cursor for consecutive listing requests. -func (l ListWithCursorRes) Cursor() *Cursor { - return l.cursor -} - // ListWithCursor lists physical objects available in the engine starting // from the cursor. It includes regular, tombstone and storage group objects. // Does not include inhumed objects. Use cursor value from the response -// for consecutive requests. +// for consecutive requests (it's nil when iteration is over). // // Returns ErrEndOfListing if there are no more objects to return or count // parameter set to zero. -func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes, error) { - result := make([]objectcore.AddressWithType, 0, prm.count) +func (e *StorageEngine) ListWithCursor(count uint32, cursor *Cursor) ([]objectcore.AddressWithType, *Cursor, error) { + result := make([]objectcore.AddressWithType, 0, count) // 1. Get available shards and sort them. e.mtx.RLock() @@ -71,20 +39,19 @@ func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes e.mtx.RUnlock() if len(shardIDs) == 0 { - return ListWithCursorRes{}, ErrEndOfListing + return nil, nil, ErrEndOfListing } slices.Sort(shardIDs) // 2. Prepare cursor object. - cursor := prm.cursor if cursor == nil { cursor = &Cursor{shardID: shardIDs[0]} } // 3. Iterate over available shards. Skip unavailable shards. for i := range shardIDs { - if len(result) >= int(prm.count) { + if len(result) >= int(count) { break } @@ -99,7 +66,7 @@ func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes continue } - count := uint32(int(prm.count) - len(result)) + count := uint32(int(count) - len(result)) var shardPrm shard.ListWithCursorPrm shardPrm.WithCount(count) if shardIDs[i] == cursor.shardID { @@ -117,11 +84,8 @@ func (e *StorageEngine) ListWithCursor(prm ListWithCursorPrm) (ListWithCursorRes } if len(result) == 0 { - return ListWithCursorRes{}, ErrEndOfListing + return nil, nil, ErrEndOfListing } - return ListWithCursorRes{ - addrList: result, - cursor: cursor, - }, nil + return result, cursor, nil } diff --git a/pkg/local_object_storage/engine/list_test.go b/pkg/local_object_storage/engine/list_test.go index 9cb27dfdfe..04bae3c1da 100644 --- a/pkg/local_object_storage/engine/list_test.go +++ b/pkg/local_object_storage/engine/list_test.go @@ -41,27 +41,20 @@ func TestListWithCursor(t *testing.T) { expected = sortAddresses(expected) - var prm ListWithCursorPrm - prm.WithCount(1) - - res, err := e.ListWithCursor(prm) + addrs, cursor, err := e.ListWithCursor(1, nil) require.NoError(t, err) - require.NotEmpty(t, res.AddressList()) - got = append(got, res.AddressList()...) + require.NotEmpty(t, addrs) + got = append(got, addrs...) for range total - 1 { - prm.WithCursor(res.Cursor()) - - res, err = e.ListWithCursor(prm) + addrs, cursor, err = e.ListWithCursor(1, cursor) if errors.Is(err, ErrEndOfListing) { break } - got = append(got, res.AddressList()...) + got = append(got, addrs...) } - prm.WithCursor(res.Cursor()) - - _, err = e.ListWithCursor(prm) + _, _, err = e.ListWithCursor(1, cursor) require.ErrorIs(t, err, ErrEndOfListing) got = sortAddresses(got) diff --git a/pkg/local_object_storage/engine/select.go b/pkg/local_object_storage/engine/select.go index dc8b2b9ef4..c5865b2e0b 100644 --- a/pkg/local_object_storage/engine/select.go +++ b/pkg/local_object_storage/engine/select.go @@ -10,47 +10,25 @@ import ( oid "github.com/nspcc-dev/neofs-sdk-go/object/id" ) -// SelectPrm groups the parameters of Select operation. -type SelectPrm struct { - cnr cid.ID - filters object.SearchFilters -} - -// SelectRes groups the resulting values of Select operation. -type SelectRes struct { - addrList []oid.Address -} - -// WithContainerID is a Select option to set the container id to search in. -func (p *SelectPrm) WithContainerID(cnr cid.ID) { - p.cnr = cnr -} - -// WithFilters is a Select option to set the object filters. -func (p *SelectPrm) WithFilters(fs object.SearchFilters) { - p.filters = fs -} - -// AddressList returns list of addresses of the selected objects. -func (r SelectRes) AddressList() []oid.Address { - return r.addrList -} - // Select selects the objects from local storage that match select parameters. // // Returns any error encountered that did not allow to completely select the objects. // // Returns an error if executions are blocked (see BlockExecution). -func (e *StorageEngine) Select(prm SelectPrm) (res SelectRes, err error) { +func (e *StorageEngine) Select(cnr cid.ID, filters object.SearchFilters) ([]oid.Address, error) { + var ( + err error + res []oid.Address + ) err = e.execIfNotBlocked(func() error { - res, err = e._select(prm) + res, err = e._select(cnr, filters) return err }) - return + return res, err } -func (e *StorageEngine) _select(prm SelectPrm) (SelectRes, error) { +func (e *StorageEngine) _select(cnr cid.ID, filters object.SearchFilters) ([]oid.Address, error) { if e.metrics != nil { defer elapsed(e.metrics.AddSearchDuration)() } @@ -61,8 +39,8 @@ func (e *StorageEngine) _select(prm SelectPrm) (SelectRes, error) { var outError error var shPrm shard.SelectPrm - shPrm.SetContainerID(prm.cnr) - shPrm.SetFilters(prm.filters) + shPrm.SetContainerID(cnr) + shPrm.SetFilters(filters) e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { res, err := sh.Select(shPrm) @@ -85,25 +63,27 @@ func (e *StorageEngine) _select(prm SelectPrm) (SelectRes, error) { return false }) - return SelectRes{ - addrList: addrList, - }, outError + return addrList, outError } // List returns `limit` available physically storage object addresses in engine. // If limit is zero, then returns all available object addresses. // // Returns an error if executions are blocked (see BlockExecution). -func (e *StorageEngine) List(limit uint64) (res SelectRes, err error) { +func (e *StorageEngine) List(limit uint64) ([]oid.Address, error) { + var ( + err error + res []oid.Address + ) err = e.execIfNotBlocked(func() error { res, err = e.list(limit) return err }) - return + return res, err } -func (e *StorageEngine) list(limit uint64) (SelectRes, error) { +func (e *StorageEngine) list(limit uint64) ([]oid.Address, error) { if e.metrics != nil { defer elapsed(e.metrics.AddListObjectsDuration)() } @@ -134,32 +114,16 @@ func (e *StorageEngine) list(limit uint64) (SelectRes, error) { return false }) - return SelectRes{ - addrList: addrList, - }, nil + return addrList, nil } // Select selects objects from local storage using provided filters. func Select(storage *StorageEngine, cnr cid.ID, fs object.SearchFilters) ([]oid.Address, error) { - var selectPrm SelectPrm - selectPrm.WithContainerID(cnr) - selectPrm.WithFilters(fs) - - res, err := storage.Select(selectPrm) - if err != nil { - return nil, err - } - - return res.AddressList(), nil + return storage.Select(cnr, fs) } // List returns `limit` available physically storage object addresses in // engine. If limit is zero, then returns all available object addresses. func List(storage *StorageEngine, limit uint64) ([]oid.Address, error) { - res, err := storage.List(limit) - if err != nil { - return nil, err - } - - return res.AddressList(), nil + return storage.List(limit) } diff --git a/pkg/local_object_storage/engine/tree_test.go b/pkg/local_object_storage/engine/tree_test.go index 2a340d10ac..21e31f7e3a 100644 --- a/pkg/local_object_storage/engine/tree_test.go +++ b/pkg/local_object_storage/engine/tree_test.go @@ -42,19 +42,15 @@ func benchmarkTreeVsSearch(b *testing.B, objCount int) { } b.Run("search", func(b *testing.B) { - var prm SelectPrm - prm.WithContainerID(cid) - var fs object.SearchFilters fs.AddFilter(pilorama.AttributeFilename, strconv.Itoa(objCount/2), object.MatchStringEqual) - prm.WithFilters(fs) for range b.N { - res, err := e.Select(prm) + res, err := e.Select(cid, fs) if err != nil { b.Fatal(err) } - if count := len(res.addrList); count != 1 { + if count := len(res); count != 1 { b.Fatalf("expected 1 object, got %d", count) } } diff --git a/pkg/services/control/server/list_objects.go b/pkg/services/control/server/list_objects.go index ea70cb5b35..865a04c10c 100644 --- a/pkg/services/control/server/list_objects.go +++ b/pkg/services/control/server/list_objects.go @@ -3,6 +3,7 @@ package control import ( "errors" + objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" "github.com/nspcc-dev/neofs-node/pkg/services/control" "google.golang.org/grpc/codes" @@ -21,11 +22,14 @@ func (s *Server) ListObjects(req *control.ListObjectsRequest, stream control.Con return status.Error(codes.Internal, err.Error()) } - var prm engine.ListWithCursorPrm + var ( + cursor *engine.Cursor + addresses []objectcore.AddressWithType + ) // (Limit 4MB - 64KB for service bytes and future fields) / 89B address length = 46390 addresses can be sent - prm.WithCount(46390) + const count = 46390 for { - res, err := s.storage.ListWithCursor(prm) + addresses, cursor, err = s.storage.ListWithCursor(count, cursor) if err != nil { if errors.Is(err, engine.ErrEndOfListing) { return nil @@ -34,7 +38,6 @@ func (s *Server) ListObjects(req *control.ListObjectsRequest, stream control.Con return status.Error(codes.Internal, err.Error()) } - addresses := res.AddressList() objectsAddresses := make([][]byte, 0, len(addresses)) for _, objectId := range addresses { objectsAddresses = append(objectsAddresses, []byte(objectId.Address.EncodeToString())) @@ -54,7 +57,5 @@ func (s *Server) ListObjects(req *control.ListObjectsRequest, stream control.Con if err = stream.Send(resp); err != nil { return status.Error(codes.Internal, err.Error()) } - - prm.WithCursor(res.Cursor()) } } diff --git a/pkg/services/object/search/util.go b/pkg/services/object/search/util.go index b83526a846..907be2631c 100644 --- a/pkg/services/object/search/util.go +++ b/pkg/services/object/search/util.go @@ -109,16 +109,12 @@ func (c *clientWrapper) searchObjects(ctx context.Context, exec *execCtx, info c } func (e *storageEngineWrapper) search(exec *execCtx) ([]oid.ID, error) { - var selectPrm engine.SelectPrm - selectPrm.WithFilters(exec.searchFilters()) - selectPrm.WithContainerID(exec.containerID()) - - r, err := e.storage.Select(selectPrm) + r, err := e.storage.Select(exec.containerID(), exec.searchFilters()) if err != nil { return nil, err } - return idsFromAddresses(r.AddressList()), nil + return idsFromAddresses(r), nil } func idsFromAddresses(addrs []oid.Address) []oid.ID { diff --git a/pkg/services/policer/queue.go b/pkg/services/policer/queue.go index 4b2cc41706..2953e8c08f 100644 --- a/pkg/services/policer/queue.go +++ b/pkg/services/policer/queue.go @@ -12,14 +12,10 @@ type jobQueue struct { } func (q *jobQueue) Select(cursor *engine.Cursor, count uint32) ([]objectcore.AddressWithType, *engine.Cursor, error) { - var prm engine.ListWithCursorPrm - prm.WithCursor(cursor) - prm.WithCount(count) - - res, err := q.localStorage.ListWithCursor(prm) + res, cursor, err := q.localStorage.ListWithCursor(count, cursor) if err != nil { return nil, nil, fmt.Errorf("cannot list objects in engine: %w", err) } - return res.AddressList(), res.Cursor(), nil + return res, cursor, nil } From 88a37741183e4a2f8ed9e773914fceb498f28c1b Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 11 Nov 2024 22:50:30 +0300 Subject: [PATCH 14/16] engine: simplify Put interface Can be split into two methods in future, but works fine for now. Signed-off-by: Roman Khimov --- cmd/neofs-node/object.go | 8 +-- .../engine/container_test.go | 8 +-- pkg/local_object_storage/engine/evacuate.go | 2 +- .../engine/evacuate_test.go | 5 +- pkg/local_object_storage/engine/list_test.go | 5 +- pkg/local_object_storage/engine/put.go | 67 +++++-------------- pkg/local_object_storage/engine/put_test.go | 9 +-- 7 files changed, 24 insertions(+), 80 deletions(-) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 41967fe045..aa4aff5ce7 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -527,13 +527,7 @@ func (e storageEngine) Lock(locker oid.Address, toLock []oid.ID) error { } func (e storageEngine) Put(o *objectSDK.Object, objBin []byte, hdrLen int) error { - var putPrm engine.PutPrm - putPrm.WithObject(o) - if objBin != nil { - putPrm.SetObjectBinary(objBin, hdrLen) - } - _, err := e.engine.Put(putPrm) - return err + return e.engine.Put(o, objBin, hdrLen) } func cachedHeaderSource(getSvc *getsvc.Service, cacheSize int, l *zap.Logger) headerSource { diff --git a/pkg/local_object_storage/engine/container_test.go b/pkg/local_object_storage/engine/container_test.go index 3a20e86144..76c3dffe5e 100644 --- a/pkg/local_object_storage/engine/container_test.go +++ b/pkg/local_object_storage/engine/container_test.go @@ -51,14 +51,10 @@ func TestStorageEngine_ContainerCleanUp(t *testing.T) { o2 := objecttest.Object() o2.SetPayload(make([]byte, errSmallSize+1)) - var prmPut PutPrm - prmPut.WithObject(&o1) - - _, err := e.Put(prmPut) + err := e.Put(&o1, nil, 0) require.NoError(t, err) - prmPut.WithObject(&o2) - _, err = e.Put(prmPut) + err = e.Put(&o2, nil, 0) require.NoError(t, err) require.NoError(t, e.Init()) diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index b65919c1be..a1629316e9 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -125,7 +125,7 @@ mainLoop: if _, ok := shardMap[shards[j].ID().String()]; ok { continue } - putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, PutPrm{obj: getRes.Object()}) + putDone, exists := e.putToShard(shards[j].hashedShard, j, shards[j].pool, addr, getRes.Object(), nil, 0) if putDone || exists { if putDone { e.log.Debug("object is moved to another shard", diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 13ebb7bfd5..5763f7ee6b 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -55,10 +55,7 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng for i := 0; ; i++ { objects = append(objects, generateObjectWithCID(cidtest.ID())) - var putPrm PutPrm - putPrm.WithObject(objects[i]) - - _, err := e.Put(putPrm) + err := e.Put(objects[i], nil, 0) require.NoError(t, err) res, err := e.shards[ids[len(ids)-1].String()].List() diff --git a/pkg/local_object_storage/engine/list_test.go b/pkg/local_object_storage/engine/list_test.go index 04bae3c1da..257faa91db 100644 --- a/pkg/local_object_storage/engine/list_test.go +++ b/pkg/local_object_storage/engine/list_test.go @@ -31,10 +31,7 @@ func TestListWithCursor(t *testing.T) { containerID := cidtest.ID() obj := generateObjectWithCID(containerID) - var prm PutPrm - prm.WithObject(obj) - - _, err := e.Put(prm) + err := e.Put(obj, nil, 0) require.NoError(t, err) expected = append(expected, object.AddressWithType{Type: objectSDK.TypeRegular, Address: object.AddressOf(obj)}) } diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index e85a3c9a7d..6161fe51e5 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -13,38 +13,11 @@ import ( "go.uber.org/zap" ) -// PutPrm groups the parameters of Put operation. -type PutPrm struct { - obj *objectSDK.Object - - binSet bool - objBin []byte - hdrLen int -} - -// PutRes groups the resulting values of Put operation. -type PutRes struct{} - var errPutShard = errors.New("could not put object to any shard") -// WithObject is a Put option to set object to save. -// -// Option is required. -func (p *PutPrm) WithObject(obj *objectSDK.Object) { - p.obj = obj -} - -// SetObjectBinary allows to provide the already encoded object in -// [StorageEngine] format. Object header must be a prefix with specified length. -// If provided, the encoding step is skipped. It's the caller's responsibility -// to ensure that the data matches the object structure being processed. -func (p *PutPrm) SetObjectBinary(objBin []byte, hdrLen int) { - p.binSet = true - p.objBin = objBin - p.hdrLen = hdrLen -} - -// Put saves the object to local storage. +// Put saves an object to local storage. objBin and hdrLen parameters are +// optional and used to optimize out object marshaling, when used both must +// be valid. // // Returns any error encountered that // did not allow to completely save the object. @@ -52,27 +25,24 @@ func (p *PutPrm) SetObjectBinary(objBin []byte, hdrLen int) { // Returns an error if executions are blocked (see BlockExecution). // // Returns an error of type apistatus.ObjectAlreadyRemoved if the object has been marked as removed. -func (e *StorageEngine) Put(prm PutPrm) (res PutRes, err error) { - err = e.execIfNotBlocked(func() error { - res, err = e.put(prm) - return err +func (e *StorageEngine) Put(obj *objectSDK.Object, objBin []byte, hdrLen int) error { + return e.execIfNotBlocked(func() error { + return e.put(obj, objBin, hdrLen) }) - - return } -func (e *StorageEngine) put(prm PutPrm) (PutRes, error) { +func (e *StorageEngine) put(obj *objectSDK.Object, objBin []byte, hdrLen int) error { if e.metrics != nil { defer elapsed(e.metrics.AddPutDuration)() } - addr := object.AddressOf(prm.obj) + addr := object.AddressOf(obj) // In #1146 this check was parallelized, however, it became // much slower on fast machines for 4 shards. _, err := e.exists(addr) if err != nil { - return PutRes{}, err + return err } finished := false @@ -86,7 +56,7 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) { return false } - putDone, exists := e.putToShard(sh, ind, pool, addr, prm) + putDone, exists := e.putToShard(sh, ind, pool, addr, obj, objBin, hdrLen) finished = putDone || exists return finished }) @@ -95,13 +65,13 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) { err = errPutShard } - return PutRes{}, err + return err } // putToShard puts object to sh. // First return value is true iff put has been successfully done. // Second return value is true iff object already exists. -func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, prm PutPrm) (bool, bool) { +func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool, addr oid.Address, obj *objectSDK.Object, objBin []byte, hdrLen int) (bool, bool) { var putSuccess, alreadyExists bool exitCh := make(chan struct{}) @@ -151,9 +121,9 @@ func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool } var putPrm shard.PutPrm - putPrm.SetObject(prm.obj) - if prm.binSet { - putPrm.SetObjectBinary(prm.objBin, prm.hdrLen) + putPrm.SetObject(obj) + if objBin != nil { + putPrm.SetObjectBinary(objBin, hdrLen) } _, err = sh.Put(putPrm) @@ -183,10 +153,5 @@ func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool // Put writes provided object to local storage. func Put(storage *StorageEngine, obj *objectSDK.Object) error { - var putPrm PutPrm - putPrm.WithObject(obj) - - _, err := storage.Put(putPrm) - - return err + return storage.Put(obj, nil, 0) } diff --git a/pkg/local_object_storage/engine/put_test.go b/pkg/local_object_storage/engine/put_test.go index e4e2086c95..cbab9fbfa2 100644 --- a/pkg/local_object_storage/engine/put_test.go +++ b/pkg/local_object_storage/engine/put_test.go @@ -30,10 +30,7 @@ func TestStorageEngine_PutBinary(t *testing.T) { e, _, _ := newEngine(t, t.TempDir()) - var putPrm PutPrm - putPrm.WithObject(&obj) - putPrm.SetObjectBinary(objBin, hdrLen) - _, err := e.Put(putPrm) + err := e.Put(&obj, objBin, hdrLen) require.NoError(t, err) gotObj, err := e.Get(addr) @@ -47,10 +44,8 @@ func TestStorageEngine_PutBinary(t *testing.T) { // now place some garbage addr.SetObject(oidtest.ID()) obj.SetID(addr.Object()) // to avoid 'already exists' outcome - putPrm.WithObject(&obj) invalidObjBin := []byte("definitely not an object") - putPrm.SetObjectBinary(invalidObjBin, 5) - _, err = e.Put(putPrm) + err = e.Put(&obj, invalidObjBin, 5) require.NoError(t, err) b, err = e.GetBytes(addr) From 8d91f8211f349ab1e2196bc768ce3b51b62c07d4 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 11 Nov 2024 23:03:34 +0300 Subject: [PATCH 15/16] engine: move metrics collection into entry methods I believe these should be counted irrespective of blocked/unblocked state, doing this in internal functions is also error-prone since we can count for some unrelated calls as well. Signed-off-by: Roman Khimov --- pkg/local_object_storage/engine/container.go | 16 ++++++++-------- pkg/local_object_storage/engine/delete.go | 10 +++++----- pkg/local_object_storage/engine/get.go | 9 +++++---- pkg/local_object_storage/engine/head.go | 8 ++++---- pkg/local_object_storage/engine/inhume.go | 14 +++++--------- pkg/local_object_storage/engine/put.go | 8 ++++---- pkg/local_object_storage/engine/range.go | 7 +++---- pkg/local_object_storage/engine/select.go | 18 ++++++++++-------- 8 files changed, 44 insertions(+), 46 deletions(-) diff --git a/pkg/local_object_storage/engine/container.go b/pkg/local_object_storage/engine/container.go index 3ad469c261..b40cb399b1 100644 --- a/pkg/local_object_storage/engine/container.go +++ b/pkg/local_object_storage/engine/container.go @@ -20,6 +20,10 @@ func (e *StorageEngine) ContainerSize(cnr cid.ID) (uint64, error) { err error size uint64 ) + if e.metrics != nil { + defer elapsed(e.metrics.AddEstimateContainerSizeDuration)() + } + err = e.execIfNotBlocked(func() error { size, err = e.containerSize(cnr) return err @@ -36,10 +40,6 @@ func ContainerSize(e *StorageEngine, id cid.ID) (uint64, error) { func (e *StorageEngine) containerSize(cnr cid.ID) (uint64, error) { var size uint64 - if e.metrics != nil { - defer elapsed(e.metrics.AddEstimateContainerSizeDuration)() - } - e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { var csPrm shard.ContainerSizePrm csPrm.SetContainerID(cnr) @@ -67,6 +67,10 @@ func (e *StorageEngine) ListContainers() ([]cid.ID, error) { res []cid.ID err error ) + if e.metrics != nil { + defer elapsed(e.metrics.AddListContainersDuration)() + } + err = e.execIfNotBlocked(func() error { res, err = e.listContainers() return err @@ -81,10 +85,6 @@ func ListContainers(e *StorageEngine) ([]cid.ID, error) { } func (e *StorageEngine) listContainers() ([]cid.ID, error) { - if e.metrics != nil { - defer elapsed(e.metrics.AddListContainersDuration)() - } - uniqueIDs := make(map[cid.ID]struct{}) e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) { diff --git a/pkg/local_object_storage/engine/delete.go b/pkg/local_object_storage/engine/delete.go index ed526e9421..185b9a2dcb 100644 --- a/pkg/local_object_storage/engine/delete.go +++ b/pkg/local_object_storage/engine/delete.go @@ -11,15 +11,15 @@ import ( // NOTE: This is a forced removal, marks any object to be deleted (despite // any prohibitions on operations with that object). func (e *StorageEngine) Delete(addr oid.Address) error { + if e.metrics != nil { + defer elapsed(e.metrics.AddDeleteDuration)() + } + return e.execIfNotBlocked(func() error { return e.deleteObj(addr, true) }) } func (e *StorageEngine) deleteObj(addr oid.Address, force bool) error { - if e.metrics != nil { - defer elapsed(e.metrics.AddDeleteDuration)() - } - - return e.inhumeInt([]oid.Address{addr}, force, nil, 0) + return e.inhume([]oid.Address{addr}, force, nil, 0) } diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index de4500d7ae..8b771c09ac 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -27,6 +27,11 @@ func (e *StorageEngine) Get(addr oid.Address) (*objectSDK.Object, error) { obj *objectSDK.Object sp shard.GetPrm ) + + if e.metrics != nil { + defer elapsed(e.metrics.AddGetDuration)() + } + sp.SetAddress(addr) err = e.execIfNotBlocked(func() error { return e.get(addr, func(s *shard.Shard, ignoreMetadata bool) (bool, error) { @@ -44,10 +49,6 @@ func (e *StorageEngine) Get(addr oid.Address) (*objectSDK.Object, error) { } func (e *StorageEngine) get(addr oid.Address, shardFunc func(s *shard.Shard, ignoreMetadata bool) (hasMetadata bool, err error)) error { - if e.metrics != nil { - defer elapsed(e.metrics.AddGetDuration)() - } - var ( ok bool siErr *objectSDK.SplitInfoError diff --git a/pkg/local_object_storage/engine/head.go b/pkg/local_object_storage/engine/head.go index 4d0f886dac..3cd1c735af 100644 --- a/pkg/local_object_storage/engine/head.go +++ b/pkg/local_object_storage/engine/head.go @@ -27,6 +27,10 @@ func (e *StorageEngine) Head(addr oid.Address, raw bool) (*objectSDK.Object, err err error ) + if e.metrics != nil { + defer elapsed(e.metrics.AddHeadDuration)() + } + err = e.execIfNotBlocked(func() error { obj, err = e.head(addr, raw) return err @@ -36,10 +40,6 @@ func (e *StorageEngine) Head(addr oid.Address, raw bool) (*objectSDK.Object, err } func (e *StorageEngine) head(addr oid.Address, raw bool) (*objectSDK.Object, error) { - if e.metrics != nil { - defer elapsed(e.metrics.AddHeadDuration)() - } - var ( head *objectSDK.Object siErr *objectSDK.SplitInfoError diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index b995250f34..45df2e010c 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -24,19 +24,15 @@ var errInhumeFailure = errors.New("inhume operation failed") // // Returns an error if executions are blocked (see BlockExecution). func (e *StorageEngine) Inhume(tombstone oid.Address, tombExpiration uint64, addrs ...oid.Address) error { - return e.execIfNotBlocked(func() error { - return e.inhume(tombstone, tombExpiration, addrs) - }) -} - -func (e *StorageEngine) inhume(tombstone oid.Address, tombExpiration uint64, addrs []oid.Address) error { if e.metrics != nil { defer elapsed(e.metrics.AddInhumeDuration)() } - return e.inhumeInt(addrs, false, &tombstone, tombExpiration) + return e.execIfNotBlocked(func() error { + return e.inhume(addrs, false, &tombstone, tombExpiration) + }) } -func (e *StorageEngine) inhumeInt(addrs []oid.Address, force bool, tombstone *oid.Address, tombExpiration uint64) error { +func (e *StorageEngine) inhume(addrs []oid.Address, force bool, tombstone *oid.Address, tombExpiration uint64) error { var shPrm shard.InhumePrm if force { shPrm.ForceRemoval() @@ -283,7 +279,7 @@ func (e *StorageEngine) isLocked(addr oid.Address) (bool, error) { } func (e *StorageEngine) processExpiredObjects(addrs []oid.Address) { - err := e.inhumeInt(addrs, false, nil, 0) + err := e.inhume(addrs, false, nil, 0) if err != nil { e.log.Warn("handling expired objects", zap.Error(err)) } diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 6161fe51e5..d9b1b27e84 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -26,16 +26,16 @@ var errPutShard = errors.New("could not put object to any shard") // // Returns an error of type apistatus.ObjectAlreadyRemoved if the object has been marked as removed. func (e *StorageEngine) Put(obj *objectSDK.Object, objBin []byte, hdrLen int) error { + if e.metrics != nil { + defer elapsed(e.metrics.AddPutDuration)() + } + return e.execIfNotBlocked(func() error { return e.put(obj, objBin, hdrLen) }) } func (e *StorageEngine) put(obj *objectSDK.Object, objBin []byte, hdrLen int) error { - if e.metrics != nil { - defer elapsed(e.metrics.AddPutDuration)() - } - addr := object.AddressOf(obj) // In #1146 this check was parallelized, however, it became diff --git a/pkg/local_object_storage/engine/range.go b/pkg/local_object_storage/engine/range.go index 1050a56782..ddf3eb7167 100644 --- a/pkg/local_object_storage/engine/range.go +++ b/pkg/local_object_storage/engine/range.go @@ -40,6 +40,9 @@ func (e *StorageEngine) GetRange(addr oid.Address, offset uint64, length uint64) err error res []byte ) + if e.metrics != nil { + defer elapsed(e.metrics.AddRangeDuration)() + } err = e.execIfNotBlocked(func() error { res, err = e.getRange(addr, offset, length) return err @@ -49,10 +52,6 @@ func (e *StorageEngine) GetRange(addr oid.Address, offset uint64, length uint64) } func (e *StorageEngine) getRange(addr oid.Address, offset uint64, length uint64) ([]byte, error) { - if e.metrics != nil { - defer elapsed(e.metrics.AddRangeDuration)() - } - var ( out []byte siErr *objectSDK.SplitInfoError diff --git a/pkg/local_object_storage/engine/select.go b/pkg/local_object_storage/engine/select.go index c5865b2e0b..2418a6df29 100644 --- a/pkg/local_object_storage/engine/select.go +++ b/pkg/local_object_storage/engine/select.go @@ -20,6 +20,11 @@ func (e *StorageEngine) Select(cnr cid.ID, filters object.SearchFilters) ([]oid. err error res []oid.Address ) + + if e.metrics != nil { + defer elapsed(e.metrics.AddSearchDuration)() + } + err = e.execIfNotBlocked(func() error { res, err = e._select(cnr, filters) return err @@ -29,10 +34,6 @@ func (e *StorageEngine) Select(cnr cid.ID, filters object.SearchFilters) ([]oid. } func (e *StorageEngine) _select(cnr cid.ID, filters object.SearchFilters) ([]oid.Address, error) { - if e.metrics != nil { - defer elapsed(e.metrics.AddSearchDuration)() - } - addrList := make([]oid.Address, 0) uniqueMap := make(map[string]struct{}) @@ -75,6 +76,11 @@ func (e *StorageEngine) List(limit uint64) ([]oid.Address, error) { err error res []oid.Address ) + + if e.metrics != nil { + defer elapsed(e.metrics.AddListObjectsDuration)() + } + err = e.execIfNotBlocked(func() error { res, err = e.list(limit) return err @@ -84,10 +90,6 @@ func (e *StorageEngine) List(limit uint64) ([]oid.Address, error) { } func (e *StorageEngine) list(limit uint64) ([]oid.Address, error) { - if e.metrics != nil { - defer elapsed(e.metrics.AddListObjectsDuration)() - } - addrList := make([]oid.Address, 0, limit) uniqueMap := make(map[string]struct{}) ln := uint64(0) From e482dbf2f846b312cffa70cb355d7116de415070 Mon Sep 17 00:00:00 2001 From: Roman Khimov Date: Mon, 11 Nov 2024 23:15:12 +0300 Subject: [PATCH 16/16] engine: drop package-level wrappers over StorageEngine The only reason they existed is because StorageEngine interfaces were unusable. Now they're not and these wrappers are trivial and do nothing in most of the cases. Signed-off-by: Roman Khimov --- cmd/neofs-lens/internal/storage/get.go | 3 +-- cmd/neofs-node/container.go | 4 ++-- pkg/local_object_storage/engine/container.go | 10 ---------- .../engine/control_test.go | 8 ++++---- pkg/local_object_storage/engine/delete_test.go | 4 ++-- pkg/local_object_storage/engine/engine_test.go | 2 +- pkg/local_object_storage/engine/gc_test.go | 18 +++++++++--------- pkg/local_object_storage/engine/get.go | 5 ----- pkg/local_object_storage/engine/get_test.go | 2 +- pkg/local_object_storage/engine/head.go | 11 ----------- pkg/local_object_storage/engine/inhume_test.go | 14 +++++++------- pkg/local_object_storage/engine/lock_test.go | 14 +++++++------- pkg/local_object_storage/engine/put.go | 5 ----- pkg/local_object_storage/engine/range.go | 5 ----- pkg/local_object_storage/engine/select.go | 11 ----------- pkg/local_object_storage/engine/tree_test.go | 2 +- pkg/services/object/acl/eacl/v2/localstore.go | 2 +- 17 files changed, 36 insertions(+), 84 deletions(-) diff --git a/cmd/neofs-lens/internal/storage/get.go b/cmd/neofs-lens/internal/storage/get.go index e8b96f4f61..5848544f35 100644 --- a/cmd/neofs-lens/internal/storage/get.go +++ b/cmd/neofs-lens/internal/storage/get.go @@ -4,7 +4,6 @@ import ( "fmt" common "github.com/nspcc-dev/neofs-node/cmd/neofs-lens/internal" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/spf13/cobra" ) @@ -38,7 +37,7 @@ func getFunc(cmd *cobra.Command, _ []string) error { } defer storage.Close() - obj, err := engine.Get(storage, addr) + obj, err := storage.Get(addr) if err != nil { return fmt.Errorf("could not fetch object: %w", err) } diff --git a/cmd/neofs-node/container.go b/cmd/neofs-node/container.go index 8af3b200c1..9a6f0d35b8 100644 --- a/cmd/neofs-node/container.go +++ b/cmd/neofs-node/container.go @@ -431,13 +431,13 @@ type localStorageLoad struct { } func (d *localStorageLoad) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) error { - idList, err := engine.ListContainers(d.engine) + idList, err := d.engine.ListContainers() if err != nil { return fmt.Errorf("list containers on engine failure: %w", err) } for i := range idList { - sz, err := engine.ContainerSize(d.engine, idList[i]) + sz, err := d.engine.ContainerSize(idList[i]) if err != nil { d.log.Debug("failed to calculate container size in storage engine", zap.Stringer("cid", idList[i]), diff --git a/pkg/local_object_storage/engine/container.go b/pkg/local_object_storage/engine/container.go index b40cb399b1..d933d18fcf 100644 --- a/pkg/local_object_storage/engine/container.go +++ b/pkg/local_object_storage/engine/container.go @@ -32,11 +32,6 @@ func (e *StorageEngine) ContainerSize(cnr cid.ID) (uint64, error) { return size, err } -// ContainerSize calls ContainerSize method on engine to calculate sum of estimation container sizes among all shards. -func ContainerSize(e *StorageEngine, id cid.ID) (uint64, error) { - return e.ContainerSize(id) -} - func (e *StorageEngine) containerSize(cnr cid.ID) (uint64, error) { var size uint64 @@ -79,11 +74,6 @@ func (e *StorageEngine) ListContainers() ([]cid.ID, error) { return res, err } -// ListContainers calls ListContainers method on engine to get a unique container IDs presented in the engine objects. -func ListContainers(e *StorageEngine) ([]cid.ID, error) { - return e.ListContainers() -} - func (e *StorageEngine) listContainers() ([]cid.ID, error) { uniqueIDs := make(map[cid.ID]struct{}) diff --git a/pkg/local_object_storage/engine/control_test.go b/pkg/local_object_storage/engine/control_test.go index 7dd8d8647c..3d42a0acc3 100644 --- a/pkg/local_object_storage/engine/control_test.go +++ b/pkg/local_object_storage/engine/control_test.go @@ -154,7 +154,7 @@ func TestExecBlocks(t *testing.T) { addr := object.AddressOf(obj) - require.NoError(t, Put(e, obj)) + require.NoError(t, e.Put(obj, nil, 0)) // block executions errBlock := errors.New("block exec err") @@ -162,20 +162,20 @@ func TestExecBlocks(t *testing.T) { require.NoError(t, e.BlockExecution(errBlock)) // try to exec some op - _, err := Head(e, addr) + _, err := e.Head(addr, false) require.ErrorIs(t, err, errBlock) // resume executions require.NoError(t, e.ResumeExecution()) - _, err = Head(e, addr) // can be any data-related op + _, err = e.Head(addr, false) // can be any data-related op require.NoError(t, err) // close require.NoError(t, e.Close()) // try exec after close - _, err = Head(e, addr) + _, err = e.Head(addr, false) require.Error(t, err) // try to resume diff --git a/pkg/local_object_storage/engine/delete_test.go b/pkg/local_object_storage/engine/delete_test.go index 892270ce52..5740c9d90b 100644 --- a/pkg/local_object_storage/engine/delete_test.go +++ b/pkg/local_object_storage/engine/delete_test.go @@ -56,9 +56,9 @@ func TestDeleteBigObject(t *testing.T) { defer e.Close() for i := range children { - require.NoError(t, Put(e, children[i])) + require.NoError(t, e.Put(children[i], nil, 0)) } - require.NoError(t, Put(e, link)) + require.NoError(t, e.Put(link, nil, 0)) var splitErr *objectSDK.SplitInfoError diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index f25d42ca5d..1c0dfe83bf 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -64,7 +64,7 @@ func benchmarkExists(b *testing.B, shardNum int) { addr := oidtest.Address() for range 100 { obj := generateObjectWithCID(cidtest.ID()) - err := Put(e, obj) + err := e.Put(obj, nil, 0) if err != nil { b.Fatal(err) } diff --git a/pkg/local_object_storage/engine/gc_test.go b/pkg/local_object_storage/engine/gc_test.go index 15c5ff283c..e9a97763e4 100644 --- a/pkg/local_object_storage/engine/gc_test.go +++ b/pkg/local_object_storage/engine/gc_test.go @@ -93,10 +93,10 @@ func TestChildrenExpiration(t *testing.T) { link.SetChildren(child1ID, child2ID, child3ID) link.SetSplitID(splitID) - require.NoError(t, Put(e, child1)) - require.NoError(t, Put(e, child2)) - require.NoError(t, Put(e, child3)) - require.NoError(t, Put(e, link)) + require.NoError(t, e.Put(child1, nil, 0)) + require.NoError(t, e.Put(child2, nil, 0)) + require.NoError(t, e.Put(child3, nil, 0)) + require.NoError(t, e.Put(link, nil, 0)) e.HandleNewEpoch(currEpoch + 1) @@ -144,10 +144,10 @@ func TestChildrenExpiration(t *testing.T) { linkObj.CalculateAndSetPayloadChecksum() require.NoError(t, linkObj.CalculateAndSetID()) - require.NoError(t, Put(e, child1)) - require.NoError(t, Put(e, child2)) - require.NoError(t, Put(e, child3)) - require.NoError(t, Put(e, &linkObj)) + require.NoError(t, e.Put(child1, nil, 0)) + require.NoError(t, e.Put(child2, nil, 0)) + require.NoError(t, e.Put(child3, nil, 0)) + require.NoError(t, e.Put(&linkObj, nil, 0)) e.HandleNewEpoch(currEpoch + 1) @@ -163,7 +163,7 @@ func checkObjectsAsyncRemoval(t *testing.T, e *StorageEngine, cnr cid.ID, objs . for _, obj := range objs { addr.SetObject(obj) - _, err := Get(e, addr) + _, err := e.Get(addr) if !errors.As(err, new(statusSDK.ObjectNotFound)) { return false } diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index 8b771c09ac..d7ac1941c9 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -145,11 +145,6 @@ func (e *StorageEngine) get(addr oid.Address, shardFunc func(s *shard.Shard, ign return nil } -// Get reads object from local storage by provided address. -func Get(storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) { - return storage.Get(addr) -} - // GetBytes reads object from the StorageEngine by address into memory buffer in // a canonical NeoFS binary format. Returns [apistatus.ObjectNotFound] if object // is missing. diff --git a/pkg/local_object_storage/engine/get_test.go b/pkg/local_object_storage/engine/get_test.go index c5bf6aee2b..21a02da72c 100644 --- a/pkg/local_object_storage/engine/get_test.go +++ b/pkg/local_object_storage/engine/get_test.go @@ -15,7 +15,7 @@ func TestStorageEngine_GetBytes(t *testing.T) { objBin := obj.Marshal() - err := Put(e, obj) + err := e.Put(obj, nil, 0) require.NoError(t, err) b, err := e.GetBytes(addr) diff --git a/pkg/local_object_storage/engine/head.go b/pkg/local_object_storage/engine/head.go index 3cd1c735af..3fbd090900 100644 --- a/pkg/local_object_storage/engine/head.go +++ b/pkg/local_object_storage/engine/head.go @@ -102,14 +102,3 @@ func (e *StorageEngine) head(addr oid.Address, raw bool) (*objectSDK.Object, err return head, nil } - -// Head reads object header from local storage by provided address. -func Head(storage *StorageEngine, addr oid.Address) (*objectSDK.Object, error) { - return storage.Head(addr, false) -} - -// HeadRaw reads object header from local storage by provided address and raw -// flag. -func HeadRaw(storage *StorageEngine, addr oid.Address, raw bool) (*objectSDK.Object, error) { - return storage.Head(addr, true) -} diff --git a/pkg/local_object_storage/engine/inhume_test.go b/pkg/local_object_storage/engine/inhume_test.go index 6d573e9859..343ec44ddc 100644 --- a/pkg/local_object_storage/engine/inhume_test.go +++ b/pkg/local_object_storage/engine/inhume_test.go @@ -43,13 +43,13 @@ func TestStorageEngine_Inhume(t *testing.T) { e := testNewEngineWithShardNum(t, 1) defer e.Close() - err := Put(e, parent) + err := e.Put(parent, nil, 0) require.NoError(t, err) err = e.Inhume(tombstoneID, 0, object.AddressOf(parent)) require.NoError(t, err) - addrs, err := Select(e, cnr, fs) + addrs, err := e.Select(cnr, fs) require.NoError(t, err) require.Empty(t, addrs) }) @@ -75,13 +75,13 @@ func TestStorageEngine_Inhume(t *testing.T) { require.NoError(t, err) t.Run("empty search should fail", func(t *testing.T) { - addrs, err := Select(e, cnr, objectSDK.SearchFilters{}) + addrs, err := e.Select(cnr, objectSDK.SearchFilters{}) require.NoError(t, err) require.Empty(t, addrs) }) t.Run("root search should fail", func(t *testing.T) { - addrs, err := Select(e, cnr, fs) + addrs, err := e.Select(cnr, fs) require.NoError(t, err) require.Empty(t, addrs) }) @@ -91,18 +91,18 @@ func TestStorageEngine_Inhume(t *testing.T) { addr.SetContainer(cnr) addr.SetObject(idChild) - _, err = Get(e, addr) + _, err = e.Get(addr) require.ErrorAs(t, err, new(apistatus.ObjectAlreadyRemoved)) linkID := link.GetID() addr.SetObject(linkID) - _, err = Get(e, addr) + _, err = e.Get(addr) require.ErrorAs(t, err, new(apistatus.ObjectAlreadyRemoved)) }) t.Run("parent get should claim deletion", func(t *testing.T) { - _, err = Get(e, object.AddressOf(parent)) + _, err = e.Get(object.AddressOf(parent)) require.ErrorAs(t, err, new(apistatus.ObjectAlreadyRemoved)) }) }) diff --git a/pkg/local_object_storage/engine/lock_test.go b/pkg/local_object_storage/engine/lock_test.go index 2af11e1470..7967ebc056 100644 --- a/pkg/local_object_storage/engine/lock_test.go +++ b/pkg/local_object_storage/engine/lock_test.go @@ -81,7 +81,7 @@ func TestLockUserScenario(t *testing.T) { id := obj.GetID() objAddr.SetObject(id) - err = Put(e, obj) + err = e.Put(obj, nil, 0) require.NoError(t, err) // 2. @@ -89,7 +89,7 @@ func TestLockUserScenario(t *testing.T) { locker.WriteMembers([]oid.ID{id}) lockerObj.WriteLock(locker) - err = Put(e, lockerObj) + err = e.Put(lockerObj, nil, 0) require.NoError(t, err) err = e.Lock(cnr, lockerID, []oid.ID{id}) @@ -104,7 +104,7 @@ func TestLockUserScenario(t *testing.T) { tombObj.SetID(tombForLockID) tombObj.SetAttributes(a) - err = Put(e, tombObj) + err = e.Put(tombObj, nil, 0) require.NoError(t, err) err = e.Inhume(tombForLockAddr, 0, lockerAddr) @@ -149,7 +149,7 @@ func TestLockExpiration(t *testing.T) { // 1. obj := generateObjectWithCID(cnr) - err = Put(e, obj) + err = e.Put(obj, nil, 0) require.NoError(t, err) // 2. @@ -161,7 +161,7 @@ func TestLockExpiration(t *testing.T) { lock.SetType(object.TypeLock) lock.SetAttributes(a) - err = Put(e, lock) + err = e.Put(lock, nil, 0) require.NoError(t, err) id := obj.GetID() @@ -215,14 +215,14 @@ func TestLockForceRemoval(t *testing.T) { // 1. obj := generateObjectWithCID(cnr) - err = Put(e, obj) + err = e.Put(obj, nil, 0) require.NoError(t, err) // 2. lock := generateObjectWithCID(cnr) lock.SetType(object.TypeLock) - err = Put(e, lock) + err = e.Put(lock, nil, 0) require.NoError(t, err) id := obj.GetID() diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index d9b1b27e84..da2a3922cd 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -150,8 +150,3 @@ func (e *StorageEngine) putToShard(sh hashedShard, ind int, pool util.WorkerPool return putSuccess, alreadyExists } - -// Put writes provided object to local storage. -func Put(storage *StorageEngine, obj *objectSDK.Object) error { - return storage.Put(obj, nil, 0) -} diff --git a/pkg/local_object_storage/engine/range.go b/pkg/local_object_storage/engine/range.go index ddf3eb7167..abd4b479b5 100644 --- a/pkg/local_object_storage/engine/range.go +++ b/pkg/local_object_storage/engine/range.go @@ -155,8 +155,3 @@ func (e *StorageEngine) getRange(addr oid.Address, offset uint64, length uint64) return out, nil } - -// GetRange reads object payload range from local storage by provided address. -func GetRange(storage *StorageEngine, addr oid.Address, rng *objectSDK.Range) ([]byte, error) { - return storage.GetRange(addr, rng.GetOffset(), rng.GetLength()) -} diff --git a/pkg/local_object_storage/engine/select.go b/pkg/local_object_storage/engine/select.go index 2418a6df29..ef124459c3 100644 --- a/pkg/local_object_storage/engine/select.go +++ b/pkg/local_object_storage/engine/select.go @@ -118,14 +118,3 @@ func (e *StorageEngine) list(limit uint64) ([]oid.Address, error) { return addrList, nil } - -// Select selects objects from local storage using provided filters. -func Select(storage *StorageEngine, cnr cid.ID, fs object.SearchFilters) ([]oid.Address, error) { - return storage.Select(cnr, fs) -} - -// List returns `limit` available physically storage object addresses in -// engine. If limit is zero, then returns all available object addresses. -func List(storage *StorageEngine, limit uint64) ([]oid.Address, error) { - return storage.List(limit) -} diff --git a/pkg/local_object_storage/engine/tree_test.go b/pkg/local_object_storage/engine/tree_test.go index 21e31f7e3a..5ed754e702 100644 --- a/pkg/local_object_storage/engine/tree_test.go +++ b/pkg/local_object_storage/engine/tree_test.go @@ -30,7 +30,7 @@ func benchmarkTreeVsSearch(b *testing.B, objCount int) { for i := range objCount { obj := generateObjectWithCID(cid) addAttribute(obj, pilorama.AttributeFilename, strconv.Itoa(i)) - err := Put(e, obj) + err := e.Put(obj, nil, 0) if err != nil { b.Fatal(err) } diff --git a/pkg/services/object/acl/eacl/v2/localstore.go b/pkg/services/object/acl/eacl/v2/localstore.go index 74192c588e..a3652367a5 100644 --- a/pkg/services/object/acl/eacl/v2/localstore.go +++ b/pkg/services/object/acl/eacl/v2/localstore.go @@ -17,5 +17,5 @@ func (s *localStorage) Head(addr oid.Address) (*objectSDK.Object, error) { return nil, io.ErrUnexpectedEOF } - return engine.Head(s.ls, addr) + return s.ls.Head(addr, false) }