Skip to content

Commit

Permalink
node/control: add object revive control command
Browse files Browse the repository at this point in the history
Includes API definition extending, RPC implementation, tests of metabase func.
The command requests server's storage engine to revive object by address. It's
purge all removal marks from all metabases and returns revival statuses.

Signed-off-by: Andrey Butusov <[email protected]>
  • Loading branch information
End-rey committed Oct 17, 2024
1 parent cdda811 commit 66fdf01
Show file tree
Hide file tree
Showing 10 changed files with 1,306 additions and 258 deletions.
39 changes: 39 additions & 0 deletions pkg/local_object_storage/engine/revive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package engine

import (
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
)

// ReviveShardStatus contains the Status of the object's revival in the Shard, Shard ID and Error.
type ReviveShardStatus struct {
ID string
Status meta.ReviveStatus
Error error
}

// ReviveStatus represents the status of the object's revival in the StorageEngine.
type ReviveStatus struct {
Shards []ReviveShardStatus
}

// ReviveObject forcefully revives object by oid.Address in the StorageEngine.
// Iterate over all shards despite errors and purge all removal marks from all metabases.
func (e *StorageEngine) ReviveObject(address oid.Address) (res ReviveStatus, err error) {
e.iterateOverUnsortedShards(func(sh hashedShard) (stop bool) {
reviveStatus, err := sh.ReviveObject(address)
id := *sh.ID()
res.Shards = append(res.Shards, ReviveShardStatus{
ID: id.String(),
Status: reviveStatus,
Error: err,
})
if err != nil {
e.log.Debug("failed to revive object in shard", zap.String("shard", id.String()), zap.Error(err))
}

return false
})
return
}
155 changes: 155 additions & 0 deletions pkg/local_object_storage/metabase/revive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package meta

import (
"fmt"

"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.etcd.io/bbolt"
)

// ErrObjectWasNotRemoved is returned when object neither in the graveyard nor was marked with GC mark.
var ErrObjectWasNotRemoved = logicerr.New("object neither in the graveyard nor was marked with GC mark")

// ErrReviveFromContainerGarbage is returned when the object is in the container that marked with GC mark.
var ErrReviveFromContainerGarbage = logicerr.New("revive from container marked with GC")

type reviveStatusType int

const (
// ReviveStatusGraveyard is the type of revival status of an object from a graveyard.
ReviveStatusGraveyard reviveStatusType = iota
// ReviveStatusGC is the type of revival status of an object from GC.
ReviveStatusGC
// ReviveStatusError is the type of status when an error occurs during revive.
ReviveStatusError
)

// ReviveStatus groups the resulting values of ReviveObject operation.
// Contains the type of revival status and message for details.
type ReviveStatus struct {
statusType reviveStatusType
message string
}

// Message returns message of status.
func (s *ReviveStatus) Message() string {
return s.message
}

// StatusType returns the type of revival status.
func (s *ReviveStatus) StatusType() reviveStatusType {
return s.statusType
}

func (s *ReviveStatus) setStatusGraveyard(tomb string) {
s.statusType = ReviveStatusGraveyard
s.message = fmt.Sprintf("successful revival from graveyard, tomb: %s", tomb)
}

func (s *ReviveStatus) setStatusGC() {
s.statusType = ReviveStatusGC
s.message = "successful revive object with GC mark"
}

func (s *ReviveStatus) setStatusError(err error) {
s.statusType = ReviveStatusError
s.message = fmt.Sprintf("don't revive, err: %v", err)
}

// ReviveObject revives object by oid.Address. Removes GCMark/Tombstone records in the corresponding buckets
// and restore metrics.
func (db *DB) ReviveObject(addr oid.Address) (res ReviveStatus, err error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()

if db.mode.ReadOnly() {
res.setStatusError(ErrReadOnlyMode)
return res, ErrReadOnlyMode
} else if db.mode.NoMetabase() {
res.setStatusError(ErrDegradedMode)
return res, ErrDegradedMode
}

currEpoch := db.epochState.CurrentEpoch()

err = db.boltDB.Update(func(tx *bbolt.Tx) error {
garbageObjectsBKT := tx.Bucket(garbageObjectsBucketName)
garbageContainersBKT := tx.Bucket(garbageContainersBucketName)
graveyardBKT := tx.Bucket(graveyardBucketName)

buf := make([]byte, addressKeySize)

targetKey := addressKey(addr, buf)

if graveyardBKT == nil {
// incorrect metabase state, does not make
// sense to check garbage bucket
return ErrObjectWasNotRemoved
}

val := graveyardBKT.Get(targetKey)
if val != nil {
// object in the graveyard
if err := graveyardBKT.Delete(targetKey); err != nil {
return err
}

if err := garbageObjectsBKT.Delete(targetKey); err != nil {
return err
}

var tombAddress oid.Address
if err := decodeAddressFromKey(&tombAddress, val[:addressKeySize]); err != nil {
return err
}
res.setStatusGraveyard(tombAddress.EncodeToString())
} else {
if garbageObjectsBKT == nil {
// incorrect node state
return ErrObjectWasNotRemoved
}

val = garbageContainersBKT.Get(targetKey[:cidSize])
if val != nil {
return ErrReviveFromContainerGarbage
}

val = garbageObjectsBKT.Get(targetKey)
if val != nil {
// object marked with GC mark
if err := garbageObjectsBKT.Delete(targetKey); err != nil {
return err
}
res.setStatusGC()
} else {
// neither in the graveyard
// nor was marked with GC mark
return ErrObjectWasNotRemoved
}
}

if obj, err := db.get(tx, addr, buf, false, true, currEpoch); err == nil {
// if object is stored, and it is regular object then update bucket
// with container size estimations
if obj.Type() == object.TypeRegular {
if err := changeContainerSize(tx, addr.Container(), obj.PayloadSize(), true); err != nil {
return err
}
}

// also need to restore logical counter
if err := db.updateCounter(tx, logical, 1, true); err != nil {
return err
}
}

return nil
})
if err != nil {
res.setStatusError(err)
}

return
}
130 changes: 130 additions & 0 deletions pkg/local_object_storage/metabase/revive_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package meta_test

import (
"testing"

"github.com/nspcc-dev/neofs-node/pkg/core/object"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)

func TestDB_ReviveObject(t *testing.T) {
db := newDB(t)

t.Run("from graveyard", func(t *testing.T) {
raw := generateObject(t)
addAttribute(raw, "foo", "bar")

tombstoneID := oidtest.Address()

err := putBig(db, raw)
require.NoError(t, err)

exists, err := metaExists(db, object.AddressOf(raw))
require.NoError(t, err)
require.True(t, exists)

// inhume object with tombstone
err = metaInhume(db, object.AddressOf(raw), tombstoneID)
require.NoError(t, err)

_, err = metaExists(db, object.AddressOf(raw))
require.ErrorAs(t, err, new(apistatus.ObjectAlreadyRemoved))

_, err = metaGet(db, object.AddressOf(raw), false)
require.ErrorAs(t, err, new(apistatus.ObjectAlreadyRemoved))

// revive object
res, err := db.ReviveObject(object.AddressOf(raw))
require.NoError(t, err)
require.Equal(t, meta.ReviveStatusGraveyard, res.StatusType())

exists, err = metaExists(db, object.AddressOf(raw))
require.NoError(t, err)
require.True(t, exists)
})

t.Run("from GC", func(t *testing.T) {
raw := generateObject(t)
addAttribute(raw, "foo", "bar")

err := putBig(db, raw)
require.NoError(t, err)

exists, err := metaExists(db, object.AddressOf(raw))
require.NoError(t, err)
require.True(t, exists)

// inhume with GC mark
var gcPrm meta.InhumePrm
gcPrm.SetGCMark()
gcPrm.SetAddresses(object.AddressOf(raw))

_, err = db.Inhume(gcPrm)
require.NoError(t, err)

_, err = metaExists(db, object.AddressOf(raw))
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))

_, err = metaGet(db, object.AddressOf(raw), false)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))

// revive object
res, err := db.ReviveObject(object.AddressOf(raw))
require.NoError(t, err)
require.Equal(t, meta.ReviveStatusGC, res.StatusType())

exists, err = metaExists(db, object.AddressOf(raw))
require.NoError(t, err)
require.True(t, exists)

obj, err := metaGet(db, object.AddressOf(raw), false)
require.NoError(t, err)
require.NotNil(t, obj)
})

t.Run("revive locked", func(t *testing.T) {
locked := oidtest.Address()

err := db.Lock(locked.Container(), oidtest.ID(), []oid.ID{locked.Object()})
require.NoError(t, err)

var prm meta.InhumePrm
prm.SetAddresses(locked)

_, err = db.Inhume(prm)

require.ErrorIs(t, err, new(apistatus.ObjectLocked))

res, err := db.ReviveObject(locked)
require.ErrorIs(t, err, meta.ErrObjectWasNotRemoved)
require.Equal(t, meta.ReviveStatusError, res.StatusType())
})

t.Run("revive object that not stored in db", func(t *testing.T) {
addr := oidtest.Address()

res, err := db.ReviveObject(addr)
require.ErrorIs(t, err, meta.ErrObjectWasNotRemoved)
require.Equal(t, meta.ReviveStatusError, res.StatusType())
})

t.Run("revive object that not removed", func(t *testing.T) {
raw := generateObject(t)
addAttribute(raw, "foo", "bar")

err := putBig(db, raw)
require.NoError(t, err)

exists, err := metaExists(db, object.AddressOf(raw))
require.NoError(t, err)
require.True(t, exists)

res, err := db.ReviveObject(object.AddressOf(raw))
require.ErrorIs(t, err, meta.ErrObjectWasNotRemoved)
require.Equal(t, meta.ReviveStatusError, res.StatusType())
})
}
21 changes: 21 additions & 0 deletions pkg/local_object_storage/shard/revive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package shard

import (
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
)

// ReviveObject try to revive object in Shard, by remove records from graveyard and garbage.
//
// Returns meta.ReviveStatus of object and error.
func (s *Shard) ReviveObject(addr oid.Address) (meta.ReviveStatus, error) {
s.m.RLock()
defer s.m.RUnlock()

if s.GetMode().ReadOnly() {
return meta.ReviveStatus{}, ErrReadOnlyMode
} else if s.GetMode().NoMetabase() {
return meta.ReviveStatus{}, ErrDegradedMode
}
return s.metaBase.ReviveObject(addr)
}
18 changes: 18 additions & 0 deletions pkg/services/control/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,21 @@ func (w *objectStatusResponseWrapper) FromGRPCMessage(m grpc.Message) error {
w.ObjectStatusResponse = r
return nil
}

type reviveObjectResponseWrapper struct {
*ReviveObjectResponse
}

func (w *reviveObjectResponseWrapper) ToGRPCMessage() grpc.Message {
return w.ReviveObjectResponse
}

func (w *reviveObjectResponseWrapper) FromGRPCMessage(m grpc.Message) error {
r, ok := m.(*ReviveObjectResponse)
if !ok {
return message.NewUnexpectedMessageType(m, (*ReviveObjectResponse)(nil))
}

w.ReviveObjectResponse = r
return nil
}
Loading

0 comments on commit 66fdf01

Please sign in to comment.