Skip to content

Commit

Permalink
Merge pull request #3725 from nspcc-dev/neofs-sdk-update
Browse files Browse the repository at this point in the history
*: update `neofs-sdk-go`
  • Loading branch information
AnnaShaleva authored Dec 13, 2024
2 parents f82088b + 9e3f75e commit 388ee25
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 140 deletions.
47 changes: 21 additions & 26 deletions cli/util/upload_bin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,17 @@ package util
import (
"context"
"fmt"
"slices"
"strconv"
"sync"
"time"

"github.com/nspcc-dev/neo-go/cli/cmdargs"
"github.com/nspcc-dev/neo-go/cli/options"
"github.com/nspcc-dev/neo-go/pkg/core/block"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/io"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/services/helpers/neofs"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet"
"github.com/nspcc-dev/neofs-sdk-go/client"
"github.com/nspcc-dev/neofs-sdk-go/container"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
Expand Down Expand Up @@ -107,12 +105,12 @@ func uploadBin(ctx *cli.Context) error {
return cli.Exit(fmt.Sprintf("failed to get current block height from RPC: %v", err), 1)
}
fmt.Fprintln(ctx.App.Writer, "Chain block height:", currentBlockHeight)
i, buf, err := searchIndexFile(ctx, pWrapper, containerID, acc, signer, indexFileSize, attr, indexAttrKey, maxParallelSearches, maxRetries, debug)
i, buf, err := searchIndexFile(ctx, pWrapper, containerID, acc.PrivateKey(), signer, indexFileSize, attr, indexAttrKey, maxParallelSearches, maxRetries, debug)
if err != nil {
return cli.Exit(fmt.Errorf("failed to find objects: %w", err), 1)
}

err = uploadBlocksAndIndexFiles(ctx, pWrapper, rpc, signer, containerID, acc, attr, indexAttrKey, buf, i, indexFileSize, uint(currentBlockHeight), numWorkers, maxRetries, debug)
err = uploadBlocksAndIndexFiles(ctx, pWrapper, rpc, signer, containerID, user.NewFromScriptHash(acc.ScriptHash()), attr, indexAttrKey, buf, i, indexFileSize, uint(currentBlockHeight), numWorkers, maxRetries, debug)
if err != nil {
return cli.Exit(fmt.Errorf("failed to upload objects: %w", err), 1)
}
Expand Down Expand Up @@ -140,7 +138,7 @@ func retry(action func() error, maxRetries uint, debug bool) error {
}

// uploadBlocksAndIndexFiles uploads the blocks and index files to the container using the pool.
func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, acc *wallet.Account, attr, indexAttributeKey string, buf []byte, currentIndexFileID, indexFileSize, currentBlockHeight uint, numWorkers, maxRetries uint, debug bool) error {
func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.Client, signer user.Signer, containerID cid.ID, ownerID user.ID, attr, indexAttributeKey string, buf []byte, currentIndexFileID, indexFileSize, currentBlockHeight uint, numWorkers, maxRetries uint, debug bool) error {
if currentIndexFileID*indexFileSize >= currentBlockHeight {
fmt.Fprintf(ctx.App.Writer, "No new blocks to upload. Need to upload starting from %d, current height %d\n", currentIndexFileID*indexFileSize, currentBlockHeight)
return nil
Expand All @@ -152,15 +150,14 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
errCh = make(chan error)
doneCh = make(chan struct{})
wg sync.WaitGroup
emptyOID = make([]byte, neofs.OIDSize)
)
fmt.Fprintf(ctx.App.Writer, "Processing batch from %d to %d\n", indexFileStart, indexFileEnd-1)
wg.Add(int(numWorkers))
for i := range numWorkers {
go func(i uint) {
defer wg.Done()
for blockIndex := indexFileStart + i; blockIndex < indexFileEnd; blockIndex += numWorkers {
if slices.Compare(buf[blockIndex%indexFileSize*neofs.OIDSize:blockIndex%indexFileSize*neofs.OIDSize+neofs.OIDSize], emptyOID) != 0 {
if !oid.ID(buf[blockIndex%indexFileSize*oid.Size : blockIndex%indexFileSize*oid.Size+oid.Size]).IsZero() {
if debug {
fmt.Fprintf(ctx.App.Writer, "Block %d is already uploaded\n", blockIndex)
}
Expand Down Expand Up @@ -207,7 +204,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
)
errRetr := retry(func() error {
var errUpload error
resOid, errUpload = uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, objBytes, attrs)
resOid, errUpload = uploadObj(ctx.Context, p, signer, ownerID, containerID, objBytes, attrs)
if errUpload != nil {
return errUpload
}
Expand All @@ -223,7 +220,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
}
return
}
resOid.Encode(buf[blockIndex%indexFileSize*neofs.OIDSize:])
copy(buf[blockIndex%indexFileSize*oid.Size:], resOid[:])
}
}(i)
}
Expand All @@ -241,9 +238,9 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
fmt.Fprintf(ctx.App.Writer, "Successfully processed batch of blocks: from %d to %d\n", indexFileStart, indexFileEnd-1)

// Additional check for empty OIDs in the buffer.
for k := uint(0); k < (indexFileEnd-indexFileStart)*neofs.OIDSize; k += neofs.OIDSize {
if slices.Compare(buf[k:k+neofs.OIDSize], emptyOID) == 0 {
return fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", indexFileStart/indexFileSize, k/neofs.OIDSize, indexFileStart/indexFileSize*indexFileSize+k/neofs.OIDSize)
for k := uint(0); k < (indexFileEnd-indexFileStart)*oid.Size; k += oid.Size {
if oid.ID(buf[k : k+oid.Size]).IsZero() {
return fmt.Errorf("empty OID found in index file %d at position %d (block index %d)", indexFileStart/indexFileSize, k/oid.Size, indexFileStart/indexFileSize*indexFileSize+k/oid.Size)
}
}
if indexFileEnd-indexFileStart == indexFileSize {
Expand All @@ -254,7 +251,7 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
}
err := retry(func() error {
var errUpload error
_, errUpload = uploadObj(ctx.Context, p, signer, acc.PrivateKey().GetScriptHash(), containerID, buf, attrs)
_, errUpload = uploadObj(ctx.Context, p, signer, ownerID, containerID, buf, attrs)
return errUpload
}, maxRetries, debug)
if err != nil {
Expand All @@ -268,10 +265,10 @@ func uploadBlocksAndIndexFiles(ctx *cli.Context, p poolWrapper, rpc *rpcclient.C
}

// searchIndexFile returns the ID and buffer for the next index file to be uploaded.
func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint, debug bool) (uint, []byte, error) {
func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, privKeys *keys.PrivateKey, signer user.Signer, indexFileSize uint, blockAttributeKey, attributeKey string, maxParallelSearches, maxRetries uint, debug bool) (uint, []byte, error) {
var (
// buf is used to store OIDs of the uploaded blocks.
buf = make([]byte, indexFileSize*neofs.OIDSize)
buf = make([]byte, indexFileSize*oid.Size)
doneCh = make(chan struct{})
errCh = make(chan error)

Expand All @@ -283,7 +280,7 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun
// Search for existing index files.
filters.AddFilter("IndexSize", fmt.Sprintf("%d", indexFileSize), object.MatchStringEqual)
for i := 0; ; i++ {
indexIDs := searchObjects(ctx.Context, p, containerID, account, attributeKey, uint(i), uint(i+1), 1, maxRetries, debug, errCh, filters)
indexIDs := searchObjects(ctx.Context, p, containerID, privKeys, attributeKey, uint(i), uint(i+1), 1, maxRetries, debug, errCh, filters)
count := 0
for range indexIDs {
count++
Expand Down Expand Up @@ -338,14 +335,14 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun
}
pos := uint(blockIndex) % indexFileSize
if _, ok := processedIndices.LoadOrStore(pos, blockIndex); !ok {
id.Encode(buf[pos*neofs.OIDSize:])
copy(buf[pos*oid.Size:], id[:])
}
}
}()
}

// Search for blocks within the index file range.
objIDs := searchObjects(ctx.Context, p, containerID, account, blockAttributeKey, existingIndexCount*indexFileSize, (existingIndexCount+1)*indexFileSize, maxParallelSearches, maxRetries, debug, errCh)
objIDs := searchObjects(ctx.Context, p, containerID, privKeys, blockAttributeKey, existingIndexCount*indexFileSize, (existingIndexCount+1)*indexFileSize, maxParallelSearches, maxRetries, debug, errCh)
for id := range objIDs {
oidCh <- id
}
Expand All @@ -364,7 +361,7 @@ func searchIndexFile(ctx *cli.Context, p poolWrapper, containerID cid.ID, accoun
// searchObjects searches in parallel for objects with attribute GE startIndex and LT
// endIndex. It returns a buffered channel of resulting object IDs and closes it once
// OID search is finished. Errors are sent to errCh in a non-blocking way.
func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, account *wallet.Account, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, debug bool, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID {
func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, privKeys *keys.PrivateKey, blockAttributeKey string, startIndex, endIndex, maxParallelSearches, maxRetries uint, debug bool, errCh chan error, additionalFilters ...object.SearchFilters) chan oid.ID {
var res = make(chan oid.ID, 2*neofs.DefaultSearchBatchSize)
go func() {
var wg sync.WaitGroup
Expand Down Expand Up @@ -402,7 +399,7 @@ func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, accou
var objIDs []oid.ID
err := retry(func() error {
var errBlockSearch error
objIDs, errBlockSearch = neofs.ObjectSearch(ctx, p, account.PrivateKey(), containerID.String(), prm)
objIDs, errBlockSearch = neofs.ObjectSearch(ctx, p, privKeys, containerID.String(), prm)
return errBlockSearch
}, maxRetries, debug)
if err != nil {
Expand All @@ -426,18 +423,16 @@ func searchObjects(ctx context.Context, p poolWrapper, containerID cid.ID, accou
}

// uploadObj uploads object to the container using provided settings.
func uploadObj(ctx context.Context, p poolWrapper, signer user.Signer, owner util.Uint160, containerID cid.ID, objData []byte, attrs []object.Attribute) (oid.ID, error) {
func uploadObj(ctx context.Context, p poolWrapper, signer user.Signer, ownerID user.ID, containerID cid.ID, objData []byte, attrs []object.Attribute) (oid.ID, error) {
var (
ownerID user.ID
hdr object.Object
prmObjectPutInit client.PrmObjectPutInit
resOID = oid.ID{}
)

ownerID.SetScriptHash(owner)
hdr.SetPayload(objData)
hdr.SetContainerID(containerID)
hdr.SetOwnerID(&ownerID)
hdr.SetOwner(ownerID)
hdr.SetAttributes(attrs...)

writer, err := p.ObjectPutInit(ctx, hdr, signer, prmObjectPutInit)
Expand All @@ -455,7 +450,7 @@ func uploadObj(ctx context.Context, p poolWrapper, signer user.Signer, owner uti
}
res := writer.GetResult()
resOID = res.StoredObjectID()
if resOID.Equals(oid.ID{}) {
if resOID.IsZero() {
return resOID, fmt.Errorf("object ID is empty")
}
return resOID, nil
Expand Down
15 changes: 7 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/nspcc-dev/dbft v0.3.1
github.com/nspcc-dev/go-ordered-json v0.0.0-20240830112754-291b000d1f3b
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20241212130705-ea0a6114d2d6
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.12
github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.12.0.20241205083504-335d9fe90f24
github.com/nspcc-dev/rfc6979 v0.2.3
github.com/pierrec/lz4 v2.6.1+incompatible
github.com/prometheus/client_golang v1.20.2
Expand All @@ -30,12 +30,12 @@ require (
golang.org/x/term v0.27.0
golang.org/x/text v0.21.0
golang.org/x/tools v0.24.0
google.golang.org/grpc v1.62.0
google.golang.org/grpc v1.65.0
gopkg.in/yaml.v3 v3.0.1
)

require (
github.com/antlr/antlr4/runtime/Go/antlr/v4 v4.0.0-20221202181307-76fa05c21b12 // indirect
github.com/antlr4-go/antlr/v4 v4.13.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.14.2 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
Expand All @@ -45,7 +45,6 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/frankban/quicktest v1.14.5 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 // indirect
github.com/ingonyama-zk/icicle v1.1.0 // indirect
Expand All @@ -55,9 +54,9 @@ require (
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mmcloughlin/addchain v0.4.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nspcc-dev/hrw/v2 v2.0.1 // indirect
github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240305074711-35bc78d84dc4 // indirect
github.com/nspcc-dev/tzhash v1.7.2 // indirect
github.com/nspcc-dev/hrw/v2 v2.0.2 // indirect
github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240827150555-5ce597aa14ea // indirect
github.com/nspcc-dev/tzhash v1.8.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
Expand All @@ -73,7 +72,7 @@ require (
golang.org/x/net v0.28.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/sys v0.28.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240528184218-531527333157 // indirect
google.golang.org/protobuf v1.34.2 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
)
Loading

0 comments on commit 388ee25

Please sign in to comment.