Skip to content

Commit

Permalink
*: Remove tree
Browse files Browse the repository at this point in the history
WIP

Signed-off-by: Evgenii Baidakov <[email protected]>
  • Loading branch information
smallhive committed Dec 27, 2024
1 parent 396ab4a commit 2a654a1
Show file tree
Hide file tree
Showing 7 changed files with 699 additions and 142 deletions.
5 changes: 3 additions & 2 deletions api/handler/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func TestDeleteMarkers(t *testing.T) {
require.Len(t, versions.DeleteMarker, 3, "invalid delete markers length")
require.Len(t, versions.Version, 0, "versions must be empty")

require.Len(t, listOIDsFromMockedNeoFS(t, tc, bktName), 0, "shouldn't be any object in neofs")
require.Len(t, listOIDsFromMockedNeoFS(t, tc, bktName), 3, "should be all delete marker object in neofs")
}

func TestDeleteObjectFromListCache(t *testing.T) {
Expand Down Expand Up @@ -237,7 +237,8 @@ func TestDeleteObjectCheckMarkerReturn(t *testing.T) {
require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID)

deleteMarkerVersion2, isDeleteMarker2 := deleteObject(t, tc, bktName, objName, deleteMarkerVersion)
require.True(t, isDeleteMarker2)
// deleting object with non-empty version - remove object from storage (even it is a delete marker). No additional markers.
require.False(t, isDeleteMarker2)
versions = listVersions(t, tc, bktName)
require.Len(t, versions.DeleteMarker, 0)
require.Equal(t, deleteMarkerVersion, deleteMarkerVersion2)
Expand Down
1 change: 0 additions & 1 deletion api/handler/object_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ func TestS3BucketListV2DelimiterPrefix(t *testing.T) {
continuationToken := validateListV2(t, tc, bktName, prefix, delim, "", 1, true, false, []string{"asdf"}, empty)
continuationToken = validateListV2(t, tc, bktName, prefix, delim, continuationToken, 1, true, false, empty, []string{"boo/"})
validateListV2(t, tc, bktName, prefix, delim, continuationToken, 1, false, true, empty, []string{"cquux/"})

continuationToken = validateListV2(t, tc, bktName, prefix, delim, "", 2, true, false, []string{"asdf"}, []string{"boo/"})
validateListV2(t, tc, bktName, prefix, delim, continuationToken, 2, false, true, empty, []string{"cquux/"})

Expand Down
275 changes: 224 additions & 51 deletions api/layer/layer.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package layer

import (
"bytes"
"context"
"crypto/rand"
"errors"
"fmt"
"io"
Expand All @@ -22,6 +22,7 @@ import (
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/eacl"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/nspcc-dev/neofs-sdk-go/user"
Expand Down Expand Up @@ -73,9 +74,10 @@ type (

// HeadObjectParams stores object head request parameters.
HeadObjectParams struct {
BktInfo *data.BucketInfo
Object string
VersionID string
BktInfo *data.BucketInfo
Object string
VersionID string
IsBucketVersioningEnabled bool
}

// ObjectVersion stores object version info.
Expand Down Expand Up @@ -506,10 +508,17 @@ func (n *layer) GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.O
func (n *layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ExtendedObjectInfo, error) {
var objInfo *data.ExtendedObjectInfo
var err error
var settings *data.BucketSettings

if len(p.VersionID) == 0 {
objInfo, err = n.headLastVersionIfNotDeleted(ctx, p.BktInfo, p.Object)
} else {
settings, err = n.GetBucketSettings(ctx, p.BktInfo)
if err != nil {
return nil, fmt.Errorf("get bucket settings: %w", err)
}

p.IsBucketVersioningEnabled = settings.VersioningEnabled()
objInfo, err = n.headVersion(ctx, p.BktInfo, p)
}
if err != nil {
Expand Down Expand Up @@ -556,76 +565,193 @@ func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.Exte
})
}

func getRandomOID() (oid.ID, error) {
b := [32]byte{}
if _, err := rand.Read(b[:]); err != nil {
return oid.ID{}, err
}

var objID oid.ID
objID.SetSHA256(b)
return objID, nil
}

func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject) *VersionedObject {
if len(obj.VersionID) != 0 || settings.Unversioned() {
var nodeVersion *data.NodeVersion
if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
return dismissNotFoundError(obj)
}

if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
if strings.Contains(obj.Error.Error(), "2050 message = object is locked") {
if settings.VersioningEnabled() {
if len(obj.VersionID) > 0 {
var deleteOID oid.ID
if err := deleteOID.DecodeString(obj.VersionID); err != nil {
obj.Error = fmt.Errorf("decode version: %w", err)
return obj
}

n.log.Info("remove old version", zap.Error(obj.Error))
if obj.Error = n.objectDelete(ctx, bkt, deleteOID); obj.Error != nil {
return obj
}
} else {
var markerOID oid.ID
markerOID, obj.Error = n.putDeleteMarker(ctx, bkt, obj.Name)
obj.DeleteMarkVersion = markerOID.EncodeToString()
}

obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeVersion.ID)
n.cache.CleanListCacheEntriesContainingObject(obj.Name, bkt.CID)
n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)
return obj
}

var newVersion *data.NodeVersion

if settings.VersioningSuspended() {
obj.VersionID = data.UnversionedObjectVersionID

var nodeVersion *data.NodeVersion
if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
return dismissNotFoundError(obj)
// versions, err := n.getLatestObjectsVersions(ctx, bkt, bkt.Owner, obj.Name, true)
versions, err := n.searchAllVersionsInNeoFS(ctx, bkt, bkt.Owner, obj.Name, true)
if err != nil {
obj.Error = fmt.Errorf("search versions: %w", err)
return obj
}

if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
// if len(versions) == 0 {
// obj.Error = s3errors.GetAPIError(s3errors.ErrNoSuchKey)
// return dismissNotFoundError(obj)
// }

if len(versions) > 1 {
obj.Error = errors.New("more than one object version found")
return obj
}

if obj.Error = n.objectDelete(ctx, bkt, versions[0].GetID()); obj.Error != nil {
return obj
}

var markerOID oid.ID
markerOID, obj.Error = n.putDeleteMarker(ctx, bkt, obj.Name)
obj.DeleteMarkVersion = markerOID.EncodeToString()

return obj
}

randOID, err := getRandomOID()
versions, err := n.searchAllVersionsInNeoFS(ctx, bkt, bkt.Owner, obj.Name, false)
if err != nil {
obj.Error = fmt.Errorf("couldn't get random oid: %w", err)
if errors.Is(err, ErrNodeNotFound) {
obj.Error = nil
} else {
obj.Error = fmt.Errorf("search versions: %w", err)
}

return obj
}

obj.DeleteMarkVersion = randOID.EncodeToString()
// if len(versions) == 0 {
// obj.Error = s3errors.GetAPIError(s3errors.ErrNoSuchKey)
// return dismissNotFoundError(obj)
// }

newVersion = &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: randOID,
FilePath: obj.Name,
},
DeleteMarker: &data.DeleteMarkerInfo{
Created: TimeNow(ctx),
Owner: n.Owner(ctx),
},
IsUnversioned: settings.VersioningSuspended(),
}
if obj.VersionID == "" {
for _, ver := range versions {
if obj.Error = n.objectDelete(ctx, bkt, ver.GetID()); obj.Error != nil {
return obj
}
}
} else {
for _, ver := range versions {
if ver.GetID().EncodeToString() == obj.VersionID {
if obj.Error = n.objectDelete(ctx, bkt, ver.GetID()); obj.Error != nil {
return obj
}

if _, obj.Error = n.treeService.AddVersion(ctx, bkt, newVersion); obj.Error != nil {
return obj
return obj
}
}
}

/*
if len(obj.VersionID) > 0 {
for _, ver := range versions {
if ver.GetID().EncodeToString() == obj.VersionID {
if obj.Error = n.objectDelete(ctx, bkt, ver.GetID()); obj.Error != nil {
return obj
}
return obj
}
}
return dismissNotFoundError(obj)
}
if settings.Unversioned() {
for _, ver := range versions {
for _, attr := range ver.Attributes() {
if attr.Key() == attrS3VersioningState && attr.Value() == "false" {
if obj.Error = n.objectDelete(ctx, bkt, ver.GetID()); obj.Error != nil {
return obj
}
continue
}
}
}
}
if settings.VersioningEnabled() {
deleteMarkOID, err := n.putDeleteMarker(ctx, bkt, obj.Name)
if err != nil {
obj.Error = err
return obj
}
obj.DeleteMarkVersion = deleteMarkOID.EncodeToString()
}
*/

// data.UnversionedObjectVersionID
/*
if len(obj.VersionID) != 0 || settings.Unversioned() {
var nodeVersion *data.NodeVersion
if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
return dismissNotFoundError(obj)
}
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
if strings.Contains(obj.Error.Error(), "2050 message = object is locked") {
return obj
}
n.log.Info("remove old version", zap.Error(obj.Error))
}
obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeVersion.ID)
n.cache.CleanListCacheEntriesContainingObject(obj.Name, bkt.CID)
return obj
}
var newVersion *data.NodeVersion
if settings.VersioningSuspended() {
obj.VersionID = data.UnversionedObjectVersionID
var nodeVersion *data.NodeVersion
if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil {
return dismissNotFoundError(obj)
}
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
return obj
}
}
randOID, err := getRandomOID()
if err != nil {
obj.Error = fmt.Errorf("couldn't get random oid: %w", err)
return obj
}
obj.DeleteMarkVersion = randOID.EncodeToString()
newVersion = &data.NodeVersion{
BaseNodeVersion: data.BaseNodeVersion{
OID: randOID,
FilePath: obj.Name,
},
DeleteMarker: &data.DeleteMarkerInfo{
Created: TimeNow(ctx),
Owner: n.Owner(ctx),
},
IsUnversioned: settings.VersioningSuspended(),
}
if _, obj.Error = n.treeService.AddVersion(ctx, bkt, newVersion); obj.Error != nil {
return obj
}
*/

n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name)

return obj
Expand Down Expand Up @@ -699,14 +825,61 @@ func (n *layer) ResolveBucket(ctx context.Context, name string) (cid.ID, error)
}

func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
nodeVersions, err := n.bucketNodeVersions(ctx, p.BktInfo, "")
objects, err := n.searchAllVersionsInNeoFS(ctx, p.BktInfo, p.BktInfo.Owner, "", false)
if err != nil {
return err
if !errors.Is(err, ErrNodeNotFound) {
return err
}
}
if len(nodeVersions) != 0 {

if len(objects) != 0 {
return s3errors.GetAPIError(s3errors.ErrBucketNotEmpty)
}

n.cache.DeleteBucket(p.BktInfo.Name)
return n.neoFS.DeleteContainer(ctx, p.BktInfo.CID, p.SessionToken)
}

func (n *layer) putDeleteMarker(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (oid.ID, error) {
var (
ts = strconv.FormatInt(time.Now().Unix(), 10)
params = PutObjectParams{
BktInfo: bktInfo,
Object: objectName,
Reader: bytes.NewReader(nil),
Header: map[string]string{
attrS3DeleteMarker: ts,
object.AttributeTimestamp: ts,
},
}
)

// object.AttributeTimestamp, strconv.FormatInt(creationTime.Unix(), 10)

id, err := n.PutObject(ctx, &params)
if err != nil {
return oid.ID{}, fmt.Errorf("save delete marker object: %w", err)
}

return id.ObjectInfo.ID, nil
}

func isDeleteMarkerObject(head object.Object) bool {
for _, attr := range head.Attributes() {
if attr.Key() == attrS3DeleteMarker {
return true
}
}

return false
}

func getS3VersioningState(head object.Object) string {
for _, attr := range head.Attributes() {
if attr.Key() == attrS3VersioningState {
return attr.Value()
}
}

return ""
}
Loading

0 comments on commit 2a654a1

Please sign in to comment.