From 17754758f947c9cb598ef88b97028a21f38259f1 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Tue, 3 Dec 2024 08:04:17 +0300 Subject: [PATCH 01/13] go.mod: update contracts to the latest version Signed-off-by: Pavel Karpy --- go.mod | 12 ++++++------ go.sum | 24 ++++++++++++------------ 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/go.mod b/go.mod index 025dcefb42..5f8e6ec9ca 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/nspcc-dev/locode-db v0.6.0 github.com/nspcc-dev/neo-go v0.107.1 github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240827150555-5ce597aa14ea - github.com/nspcc-dev/neofs-contract v0.20.0 + github.com/nspcc-dev/neofs-contract v0.20.1-0.20241220193924-4da43dfb5a65 github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.12.0.20240809202351-256513c1b29b github.com/nspcc-dev/tzhash v1.8.0 github.com/olekukonko/tablewriter v0.0.5 @@ -31,9 +31,9 @@ require ( go.etcd.io/bbolt v1.3.11 go.uber.org/zap v1.27.0 golang.org/x/net v0.28.0 - golang.org/x/sync v0.8.0 - golang.org/x/sys v0.24.0 - golang.org/x/term v0.23.0 + golang.org/x/sync v0.10.0 + golang.org/x/sys v0.28.0 + golang.org/x/term v0.27.0 google.golang.org/grpc v1.64.1 google.golang.org/protobuf v1.34.2 gopkg.in/yaml.v3 v3.0.1 @@ -92,9 +92,9 @@ require ( github.com/urfave/cli/v2 v2.27.4 // indirect github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.26.0 // indirect + golang.org/x/crypto v0.31.0 // indirect golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 // indirect - golang.org/x/text v0.17.0 // indirect + golang.org/x/text v0.21.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 // indirect gopkg.in/ini.v1 v1.67.0 // indirect lukechampine.com/blake3 v1.2.1 // indirect diff --git a/go.sum b/go.sum index 05b08c27f9..5381814e2e 100644 --- a/go.sum +++ b/go.sum @@ -178,8 +178,8 @@ github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240729160116-d8e3e57f88f2 h1:tv github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240729160116-d8e3e57f88f2/go.mod h1:/vrbWSHc7YS1KSYhVOyyeucXW/e+1DkVBOgnBEXUCeY= github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240827150555-5ce597aa14ea h1:mK0EMGLvunXcFyq7fBURS/CsN4MH+4nlYiqn6pTwWAU= github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240827150555-5ce597aa14ea/go.mod h1:YzhD4EZmC9Z/PNyd7ysC7WXgIgURc9uCG1UWDeV027Y= -github.com/nspcc-dev/neofs-contract v0.20.0 h1:ARE/3mSN+P9qi/10NBsf7QyPiYrvnxeEgYUN13vHRlo= -github.com/nspcc-dev/neofs-contract v0.20.0/go.mod h1:YxtKYE/5cMNiqwWcQWzeizbB9jizauLni+p8wXxfhsQ= +github.com/nspcc-dev/neofs-contract v0.20.1-0.20241220193924-4da43dfb5a65 h1:SruyrmzfmaIK+rx3EyLsG3hb9Ooh+cTkObucl0yTVxY= +github.com/nspcc-dev/neofs-contract v0.20.1-0.20241220193924-4da43dfb5a65/go.mod h1:fwM6QoYPnsIuUQ4/GOwgzfQ9qoDKknqYgf4XWOqEdJw= github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.12.0.20240809202351-256513c1b29b h1:/7jXQP5pf+M0kRFC1gg5GEdTPkvotpMHxjSXIbMZaGQ= github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.12.0.20240809202351-256513c1b29b/go.mod h1:ewV84r1NACvoBfbKQKzRLUun+Xn5+z9JVqsuCVgv9xI= github.com/nspcc-dev/rfc6979 v0.2.3 h1:QNVykGZ3XjFwM/88rGfV3oj4rKNBy+nYI6jM7q19hDI= @@ -293,8 +293,8 @@ go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= -golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= +golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 h1:kx6Ds3MlpiUHKj7syVnbp57++8WpuKPcR5yjLBjvLEA= golang.org/x/exp v0.0.0-20240823005443-9b4947da3948/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ= golang.org/x/mod v0.20.0 h1:utOm6MM3R3dnawAiJgn0y+xvuYRsm1RKM/4giyfDgV0= @@ -310,8 +310,8 @@ golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= -golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= -golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -327,16 +327,16 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= -golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU= -golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk= +golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q= +golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= -golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24= golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ= From 59cebb3fef8bbcc58507614644c72e5a017c9ba4 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Tue, 10 Dec 2024 22:52:37 +0300 Subject: [PATCH 02/13] morph: add method for container's members updates Follows https://github.com/nspcc-dev/neofs-contract/pull/438. Signed-off-by: Pavel Karpy --- pkg/morph/client/container/client.go | 3 ++ pkg/morph/client/container/nodes.go | 51 ++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) create mode 100644 pkg/morph/client/container/nodes.go diff --git a/pkg/morph/client/container/client.go b/pkg/morph/client/container/client.go index c3aa46baee..e7feca62e2 100644 --- a/pkg/morph/client/container/client.go +++ b/pkg/morph/client/container/client.go @@ -39,6 +39,9 @@ const ( // putNamedMethod is method name for container put with an alias. It is exported to provide custom fee. putNamedMethod = "putNamed" + + addNextEpochNodes = "addNextEpochNodes" + commitContainerListUpdate = "commitContainerListUpdate" ) var ( diff --git a/pkg/morph/client/container/nodes.go b/pkg/morph/client/container/nodes.go new file mode 100644 index 0000000000..195264507d --- /dev/null +++ b/pkg/morph/client/container/nodes.go @@ -0,0 +1,51 @@ +package container + +import ( + "fmt" + + "github.com/nspcc-dev/neofs-node/pkg/morph/client" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" +) + +// AddNextEpochNodes registers public keys as a container's placement vector +// with specified index. Registration must be finished with final +// [Client.CommitContainerListUpdate] call. Always sends a notary request with +// Alphabet multi-signature. +func (c *Client) AddNextEpochNodes(cid cid.ID, placementIndex int, nodesKeys [][]byte) error { + if len(nodesKeys) == 0 { + return errNilArgument + } + + prm := client.InvokePrm{} + prm.SetMethod(addNextEpochNodes) + prm.SetArgs(cid, placementIndex, nodesKeys) + prm.RequireAlphabetSignature() + + err := c.client.Invoke(prm) + if err != nil { + return fmt.Errorf("could not invoke method (%s): %w", addNextEpochNodes, err) + } + + return nil +} + +// CommitContainerListUpdate finishes container placement updates for the current +// epoch made by former [Client.AddNextEpochNodes] calls. Always sends a notary +// request with Alphabet multi-signature. +func (c *Client) CommitContainerListUpdate(cid cid.ID, replicas []uint32) error { + if len(replicas) == 0 { + return errNilArgument + } + + prm := client.InvokePrm{} + prm.SetMethod(commitContainerListUpdate) + prm.SetArgs(cid, replicas) + prm.RequireAlphabetSignature() + + err := c.client.Invoke(prm) + if err != nil { + return fmt.Errorf("could not invoke method (%s): %w", commitContainerListUpdate, err) + } + + return nil +} From 4e14e4340369a527977d37a2cd8f5c6e69a06f63 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Wed, 18 Dec 2024 20:41:29 +0300 Subject: [PATCH 03/13] morph/notary: Allow waiting for notary request inclusion Signed-off-by: Pavel Karpy --- pkg/innerring/processors/container/process_container.go | 4 ++-- pkg/innerring/processors/container/process_eacl.go | 2 +- pkg/innerring/processors/netmap/process_peers.go | 4 ++-- pkg/innerring/processors/reputation/process_put.go | 2 +- pkg/morph/client/notary.go | 5 ++++- 5 files changed, 10 insertions(+), 7 deletions(-) diff --git a/pkg/innerring/processors/container/process_container.go b/pkg/innerring/processors/container/process_container.go index 29cf4bf12b..7a1d5ef0fc 100644 --- a/pkg/innerring/processors/container/process_container.go +++ b/pkg/innerring/processors/container/process_container.go @@ -104,7 +104,7 @@ func (cp *Processor) approvePutContainer(ctx *putContainerContext) { prm.SetZone(ctx.d.Zone()) nr := e.NotaryRequest() - err = cp.cnrClient.Morph().NotarySignAndInvokeTX(nr.MainTransaction) + err = cp.cnrClient.Morph().NotarySignAndInvokeTX(nr.MainTransaction, false) if err != nil { cp.log.Error("could not approve put container", @@ -175,7 +175,7 @@ func (cp *Processor) approveDeleteContainer(e *containerEvent.Delete) { prm.SetToken(e.SessionToken()) nr := e.NotaryRequest() - err = cp.cnrClient.Morph().NotarySignAndInvokeTX(nr.MainTransaction) + err = cp.cnrClient.Morph().NotarySignAndInvokeTX(nr.MainTransaction, false) if err != nil { cp.log.Error("could not approve delete container", diff --git a/pkg/innerring/processors/container/process_eacl.go b/pkg/innerring/processors/container/process_eacl.go index db7d90dc98..6ef2ea578c 100644 --- a/pkg/innerring/processors/container/process_eacl.go +++ b/pkg/innerring/processors/container/process_eacl.go @@ -88,7 +88,7 @@ func (cp *Processor) approveSetEACL(e container.SetEACL) { prm.SetToken(e.SessionToken()) nr := e.NotaryRequest() - err = cp.cnrClient.Morph().NotarySignAndInvokeTX(nr.MainTransaction) + err = cp.cnrClient.Morph().NotarySignAndInvokeTX(nr.MainTransaction, false) if err != nil { cp.log.Error("could not approve set EACL", diff --git a/pkg/innerring/processors/netmap/process_peers.go b/pkg/innerring/processors/netmap/process_peers.go index 00b64c16c0..38c72ab0bb 100644 --- a/pkg/innerring/processors/netmap/process_peers.go +++ b/pkg/innerring/processors/netmap/process_peers.go @@ -61,7 +61,7 @@ func (np *Processor) processAddPeer(ev netmapEvent.AddPeer) { np.log.Info("approving network map candidate", zap.String("key", keyString)) - err = np.netmapClient.Morph().NotarySignAndInvokeTX(tx) + err = np.netmapClient.Morph().NotarySignAndInvokeTX(tx, false) if err != nil { np.log.Error("can't sign and send notary request calling netmap.AddPeer", zap.Error(err)) } @@ -93,7 +93,7 @@ func (np *Processor) processUpdatePeer(ev netmapEvent.UpdatePeer) { } nr := ev.NotaryRequest() - err = np.netmapClient.Morph().NotarySignAndInvokeTX(nr.MainTransaction) + err = np.netmapClient.Morph().NotarySignAndInvokeTX(nr.MainTransaction, false) if err != nil { np.log.Error("can't invoke netmap.UpdatePeer", zap.Error(err)) diff --git a/pkg/innerring/processors/reputation/process_put.go b/pkg/innerring/processors/reputation/process_put.go index 503147ad44..116e974c6d 100644 --- a/pkg/innerring/processors/reputation/process_put.go +++ b/pkg/innerring/processors/reputation/process_put.go @@ -77,7 +77,7 @@ func (rp *Processor) approvePutReputation(e *reputationEvent.Put) { nr = e.NotaryRequest() ) - err = rp.reputationWrp.Morph().NotarySignAndInvokeTX(nr.MainTransaction) + err = rp.reputationWrp.Morph().NotarySignAndInvokeTX(nr.MainTransaction, false) if err != nil { rp.log.Warn("can't send approval tx for reputation value", diff --git a/pkg/morph/client/notary.go b/pkg/morph/client/notary.go index 4424ca975b..92fc31394e 100644 --- a/pkg/morph/client/notary.go +++ b/pkg/morph/client/notary.go @@ -334,7 +334,7 @@ func (c *Client) NotaryInvokeNotAlpha(contract util.Uint160, fee fixedn.Fixed8, // Notary service. // NOTE: does not fallback to simple `Invoke()`. Expected to be used only for // TXs retrieved from the received notary requests. -func (c *Client) NotarySignAndInvokeTX(mainTx *transaction.Transaction) error { +func (c *Client) NotarySignAndInvokeTX(mainTx *transaction.Transaction, await bool) error { var conn = c.conn.Load() if conn == nil { @@ -424,6 +424,9 @@ func (c *Client) NotarySignAndInvokeTX(mainTx *transaction.Transaction) error { fbTx.Nonce = binary.BigEndian.Uint32(mainH[:]) mainH, fbH, untilActual, err := nAct.SendRequest(mainTx, fbTx) + if await { + _, err = nAct.Wait(mainH, fbH, untilActual, err) + } if err != nil && !alreadyOnChainError(err) { return err } From cefdbd94b2ccaf67c9df807bc180c04ccc470207 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Wed, 11 Dec 2024 00:07:27 +0300 Subject: [PATCH 04/13] ir: maintain container placement in container contract After https://github.com/nspcc-dev/neofs-contract/pull/438 Container contract now has API for container-side placement/verification operations. But building placement vectors and updating when needed is still a complex operation that should be done on a Go application side. This commit adds such a responsibility for Alphabet nodes. For simplicity, updating is done every epoch and once for _every_ container if netmap has been changed. Additional optimization can be considered. Signed-off-by: Pavel Karpy --- pkg/innerring/processors/container.go | 37 ++++++++++++ .../processors/container/process_container.go | 43 +++++++++++--- .../processors/container/processor.go | 4 ++ .../processors/netmap/cleanup_table.go | 16 +++++- .../processors/netmap/process_epoch.go | 56 ++++++++++++++++++- 5 files changed, 146 insertions(+), 10 deletions(-) create mode 100644 pkg/innerring/processors/container.go diff --git a/pkg/innerring/processors/container.go b/pkg/innerring/processors/container.go new file mode 100644 index 0000000000..59d78e7755 --- /dev/null +++ b/pkg/innerring/processors/container.go @@ -0,0 +1,37 @@ +package processors + +import ( + "fmt" + + cnrcli "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/netmap" +) + +// UpdatePlacementVectors updates placement vectors after a container placement +// change in Container contract. Empty vectors drops container vectors from the +// contract. +func UpdatePlacementVectors(cID cid.ID, cnrCli *cnrcli.Client, vectors [][]netmap.NodeInfo, replicas []uint32) error { + for i, vector := range vectors { + err := cnrCli.AddNextEpochNodes(cID, i, pubKeys(vector)) + if err != nil { + return fmt.Errorf("can't add %d placement vector to Container contract: %w", i, err) + } + } + + err := cnrCli.CommitContainerListUpdate(cID, replicas) + if err != nil { + return fmt.Errorf("can't commit container list to Container contract: %w", err) + } + + return nil +} + +func pubKeys(nodes []netmap.NodeInfo) [][]byte { + res := make([][]byte, 0, len(nodes)) + for _, node := range nodes { + res = append(res, node.PublicKey()) + } + + return res +} diff --git a/pkg/innerring/processors/container/process_container.go b/pkg/innerring/processors/container/process_container.go index 7a1d5ef0fc..fd2ee59c16 100644 --- a/pkg/innerring/processors/container/process_container.go +++ b/pkg/innerring/processors/container/process_container.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/nspcc-dev/neo-go/pkg/network/payload" + "github.com/nspcc-dev/neofs-node/pkg/innerring/processors" cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" "github.com/nspcc-dev/neofs-node/pkg/morph/event" containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container" @@ -26,7 +27,10 @@ type putEvent interface { type putContainerContext struct { e putEvent - d containerSDK.Domain + // must be filled when verifying raw data from e + cID cid.ID + cnr containerSDK.Container + d containerSDK.Domain } // Process a new container from the user by checking the container sanity @@ -55,15 +59,15 @@ func (cp *Processor) processContainerPut(put putEvent) { func (cp *Processor) checkPutContainer(ctx *putContainerContext) error { binCnr := ctx.e.Container() - var cnr containerSDK.Container + ctx.cID = cid.NewFromMarshalledContainer(binCnr) - err := cnr.Unmarshal(binCnr) + err := ctx.cnr.Unmarshal(binCnr) if err != nil { return fmt.Errorf("invalid binary container: %w", err) } err = cp.verifySignature(signatureVerificationData{ - ownerContainer: cnr.Owner(), + ownerContainer: ctx.cnr.Owner(), verb: session.VerbContainerPut, binTokenSession: ctx.e.SessionToken(), binPublicKey: ctx.e.PublicKey(), @@ -75,13 +79,13 @@ func (cp *Processor) checkPutContainer(ctx *putContainerContext) error { } // check homomorphic hashing setting - err = checkHomomorphicHashing(cp.netState, cnr) + err = checkHomomorphicHashing(cp.netState, ctx.cnr) if err != nil { return fmt.Errorf("incorrect homomorphic hashing setting: %w", err) } // check native name and zone - err = checkNNS(ctx, cnr) + err = checkNNS(ctx, ctx.cnr) if err != nil { return fmt.Errorf("NNS: %w", err) } @@ -104,12 +108,37 @@ func (cp *Processor) approvePutContainer(ctx *putContainerContext) { prm.SetZone(ctx.d.Zone()) nr := e.NotaryRequest() - err = cp.cnrClient.Morph().NotarySignAndInvokeTX(nr.MainTransaction, false) + err = cp.cnrClient.Morph().NotarySignAndInvokeTX(nr.MainTransaction, true) if err != nil { cp.log.Error("could not approve put container", zap.Error(err), ) + return + } + + nm, err := cp.netState.NetMap() + if err != nil { + cp.log.Error("could not get netmap for Container contract update", zap.Stringer("cid", ctx.cID), zap.Error(err)) + return + } + + policy := ctx.cnr.PlacementPolicy() + vectors, err := nm.ContainerNodes(policy, ctx.cID) + if err != nil { + cp.log.Error("could not build placement for Container contract update", zap.Stringer("cid", ctx.cID), zap.Error(err)) + return + } + + replicas := make([]uint32, 0, policy.NumberOfReplicas()) + for i := range vectors { + replicas = append(replicas, policy.ReplicaNumberByIndex(i)) + } + + err = processors.UpdatePlacementVectors(ctx.cID, cp.cnrClient, vectors, replicas) + if err != nil { + cp.log.Error("could not update Container contract", zap.Stringer("cid", ctx.cID), zap.Error(err)) + return } } diff --git a/pkg/innerring/processors/container/processor.go b/pkg/innerring/processors/container/processor.go index f27cd77bb4..fccb6128c3 100644 --- a/pkg/innerring/processors/container/processor.go +++ b/pkg/innerring/processors/container/processor.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/morph/client/neofsid" "github.com/nspcc-dev/neofs-node/pkg/morph/event" containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container" + "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) @@ -55,6 +56,9 @@ type NetworkState interface { // // which did not allow reading the value. HomomorphicHashDisabled() (bool, error) + + // NetMap must return actual network map. + NetMap() (*netmap.NetMap, error) } // New creates a container contract processor instance. diff --git a/pkg/innerring/processors/netmap/cleanup_table.go b/pkg/innerring/processors/netmap/cleanup_table.go index 226a7d4471..d3cd023146 100644 --- a/pkg/innerring/processors/netmap/cleanup_table.go +++ b/pkg/innerring/processors/netmap/cleanup_table.go @@ -2,6 +2,7 @@ package netmap import ( "bytes" + "slices" "sync" "github.com/nspcc-dev/neofs-sdk-go/netmap" @@ -13,6 +14,8 @@ type ( enabled bool threshold uint64 lastAccess map[string]epochStampWithNodeInfo + + prev netmap.NetMap } epochStamp struct { @@ -36,8 +39,9 @@ func newCleanupTable(enabled bool, threshold uint64) cleanupTable { } } -// Update cleanup table based on on-chain information about netmap. -func (c *cleanupTable) update(snapshot netmap.NetMap, now uint64) { +// Update cleanup table based on on-chain information about netmap. Returned +// value indicates if the composition of network map memebers has changed. +func (c *cleanupTable) update(snapshot netmap.NetMap, now uint64) bool { c.Lock() defer c.Unlock() @@ -64,6 +68,14 @@ func (c *cleanupTable) update(snapshot netmap.NetMap, now uint64) { } c.lastAccess = newMap + + // order is expected to be the same from epoch to epoch + mapChanged := !slices.EqualFunc(c.prev.Nodes(), nmNodes, func(i1 netmap.NodeInfo, i2 netmap.NodeInfo) bool { + return bytes.Equal(i1.PublicKey(), i2.PublicKey()) + }) + c.prev = snapshot + + return mapChanged } // updates last access time of the netmap node by string public key. diff --git a/pkg/innerring/processors/netmap/process_epoch.go b/pkg/innerring/processors/netmap/process_epoch.go index 01e63de45c..ab7d85551e 100644 --- a/pkg/innerring/processors/netmap/process_epoch.go +++ b/pkg/innerring/processors/netmap/process_epoch.go @@ -1,11 +1,15 @@ package netmap import ( + "fmt" + + "github.com/nspcc-dev/neofs-node/pkg/innerring/processors" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/audit" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/governance" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement" cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap" + "github.com/nspcc-dev/neofs-sdk-go/netmap" "go.uber.org/zap" ) @@ -65,7 +69,15 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) { } } - np.netmapSnapshot.update(*networkMap, epoch) + if np.netmapSnapshot.update(*networkMap, epoch) { + l.Debug("updating placements in Container contract...") + err = np.updatePlacementInContract(*networkMap, l) + if err != nil { + l.Error("can't update placements in Container contract", zap.Error(err)) + } else { + l.Debug("updated placements in Container contract") + } + } np.handleCleanupTick(netmapCleanupTick{epoch: epoch, txHash: ev.TxHash()}) np.handleNewAudit(audit.NewAuditStartEvent(epoch)) np.handleAuditSettlements(settlement.NewAuditEvent(epoch)) @@ -73,6 +85,48 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) { np.handleNotaryDeposit(ev) } +func (np *Processor) updatePlacementInContract(nm netmap.NetMap, l *zap.Logger) error { + // TODO: https://github.com/nspcc-dev/neofs-node/issues/3045 + cids, err := np.containerWrp.List(nil) + if err != nil { + return fmt.Errorf("can't get containers list: %w", err) + } + + for _, cID := range cids { + l := l.With(zap.Stringer("cid", cID)) + l.Debug("updating container placement in Container contract...") + + cnr, err := np.containerWrp.Get(cID[:]) + if err != nil { + l.Error("can't get container to update its placement in Container contract", zap.Error(err)) + continue + } + + policy := cnr.Value.PlacementPolicy() + + vectors, err := nm.ContainerNodes(policy, cID) + if err != nil { + l.Error("can't build placement vectors for update in Container contract", zap.Error(err)) + continue + } + + replicas := make([]uint32, 0, policy.NumberOfReplicas()) + for i := range vectors { + replicas = append(replicas, policy.ReplicaNumberByIndex(i)) + } + + err = processors.UpdatePlacementVectors(cID, np.containerWrp, vectors, replicas) + if err != nil { + l.Error("can't put placement vectors to Container contract", zap.Error(err)) + continue + } + + l.Debug("updated container placement in Container contract") + } + + return nil +} + // Process new epoch tick by invoking new epoch method in network map contract. func (np *Processor) processNewEpochTick() { if !np.alphabetState.IsAlphabet() { From befd00d1dcd27f58eeb2632a5018bead1dbaa7f1 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Tue, 3 Dec 2024 12:26:59 +0300 Subject: [PATCH 05/13] morph/container support new meta-data method Follows https://github.com/nspcc-dev/neofs-contract/issues/414. Signed-off-by: Pavel Karpy --- pkg/morph/client/container/client.go | 1 + pkg/morph/client/container/meta.go | 27 +++++++++++++++++++++++++++ 2 files changed, 28 insertions(+) create mode 100644 pkg/morph/client/container/meta.go diff --git a/pkg/morph/client/container/client.go b/pkg/morph/client/container/client.go index e7feca62e2..32d956d435 100644 --- a/pkg/morph/client/container/client.go +++ b/pkg/morph/client/container/client.go @@ -42,6 +42,7 @@ const ( addNextEpochNodes = "addNextEpochNodes" commitContainerListUpdate = "commitContainerListUpdate" + submitObjectPutMethod = "submitObjectPut" ) var ( diff --git a/pkg/morph/client/container/meta.go b/pkg/morph/client/container/meta.go new file mode 100644 index 0000000000..2afe4283f1 --- /dev/null +++ b/pkg/morph/client/container/meta.go @@ -0,0 +1,27 @@ +package container + +import ( + "fmt" + + "github.com/nspcc-dev/neofs-node/pkg/morph/client" +) + +// SubmitObjectPut puts object meta information. +// +// Returns any error encountered that caused the saving to interrupt. +func (c *Client) SubmitObjectPut(meta []byte, sigs [][]byte) error { + if len(meta) == 0 || len(sigs) == 0 { + return errNilArgument + } + + var prm client.InvokePrm + prm.SetMethod(submitObjectPutMethod) + prm.SetArgs(meta, sigs) + + err := c.client.Invoke(prm) + if err != nil { + return fmt.Errorf("could not invoke method (%s): %w", submitObjectPutMethod, err) + } + + return nil +} From 1af79c43e6225fcdaa5f187d747c67181a1617dd Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Tue, 3 Dec 2024 19:26:39 +0300 Subject: [PATCH 06/13] node/object/put: push object's meta to contract Refs https://github.com/nspcc-dev/neofs-contract/issues/414. Push meta only if consistency policy asks for it Signed-off-by: Pavel Karpy --- cmd/neofs-node/object.go | 1 + pkg/services/object/put/distributed.go | 30 ++++++++++++++++++++++++-- pkg/services/object/put/service.go | 9 ++++++++ pkg/services/object/put/streamer.go | 15 +++++++++---- 4 files changed, 49 insertions(+), 6 deletions(-) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 21d8e10b2e..c683074503 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -261,6 +261,7 @@ func initObjectService(c *cfg) { putsvc.WithNetworkMagic(mNumber), putsvc.WithKeyStorage(keyStorage), putsvc.WithClientConstructor(putConstructor), + putsvc.WithContainerClient(c.cCli), putsvc.WithMaxSizeSource(newCachedMaxObjectSizeSource(c)), putsvc.WithObjectStorage(storageEngine{engine: ls}), putsvc.WithContainerSource(c.cfgObject.cnrSource), diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index 930c2c0012..4e732397e8 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -10,6 +10,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/object" + chaincontainer "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" "github.com/nspcc-dev/neofs-node/pkg/network" svcutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/util" @@ -31,7 +32,13 @@ type distributedTarget struct { obj *objectSDK.Object objMeta object.ContentMeta networkMagicNumber uint32 - objSharedMeta []byte + + cnrClient *chaincontainer.Client + metainfoConsistencyAttr string + + metaMtx sync.RWMutex + objSharedMeta []byte + collectedSignatures [][]byte localNodeInContainer bool localNodeSigner neofscrypto.Signer @@ -143,7 +150,22 @@ func (t *distributedTarget) Close() (oid.ID, error) { t.objSharedMeta = object.EncodeReplicationMetaInfo(t.obj.GetContainerID(), t.obj.GetID(), t.obj.PayloadSize(), deletedObjs, lockedObjs, t.obj.CreationEpoch(), t.networkMagicNumber) id := t.obj.GetID() - return id, t.placementIterator.iterateNodesForObject(id, t.sendObject) + err := t.placementIterator.iterateNodesForObject(id, t.sendObject) + if err != nil { + return oid.ID{}, err + } + + if t.localNodeInContainer && (t.metainfoConsistencyAttr == "strict" || t.metainfoConsistencyAttr == "optimistic") { + t.metaMtx.RLock() + defer t.metaMtx.RUnlock() + + err = t.cnrClient.SubmitObjectPut(t.objSharedMeta, t.collectedSignatures) + if err != nil { + t.placementIterator.log.Info("failed to submit", zap.Stringer("oid", id), zap.Error(err)) + } + } + + return id, nil } func (t *distributedTarget) sendObject(node nodeDesc) error { @@ -181,6 +203,10 @@ func (t *distributedTarget) sendObject(node nodeDesc) error { if !sig.Verify(t.objSharedMeta) { l.Info("meta signature verification failed", zap.String("node", network.StringifyGroup(node.info.AddressGroup()))) } + + t.metaMtx.Lock() + t.collectedSignatures = append(t.collectedSignatures, sig.Value()) + t.metaMtx.Unlock() } return nil diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index e5b2414eb1..faa87e02c0 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -7,6 +7,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/core/object" + chaincontainer "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/util" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" @@ -102,6 +103,8 @@ type cfg struct { log *zap.Logger networkMagic uint32 + + cnrClient *chaincontainer.Client } func defaultCfg() *cfg { @@ -200,6 +203,12 @@ func WithClientConstructor(v ClientConstructor) Option { } } +func WithContainerClient(v *chaincontainer.Client) Option { + return func(c *cfg) { + c.cnrClient = v + } +} + func WithLogger(l *zap.Logger) Option { return func(c *cfg) { c.log = l diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index a1ffaeba4d..fbaa631eaa 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/services/object/internal" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" + "github.com/nspcc-dev/neofs-sdk-go/container" neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa" "github.com/nspcc-dev/neofs-sdk-go/object" "github.com/nspcc-dev/neofs-sdk-go/user" @@ -237,10 +238,12 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) internal.Target { return rt }, - relay: relay, - fmt: p.fmtValidator, - localNodeInContainer: prm.localNodeInContainer, - localNodeSigner: prm.localNodeSigner, + relay: relay, + fmt: p.fmtValidator, + localNodeInContainer: prm.localNodeInContainer, + localNodeSigner: prm.localNodeSigner, + cnrClient: p.cfg.cnrClient, + metainfoConsistencyAttr: metaAttribute(prm.cnr), } } @@ -270,3 +273,7 @@ func (p *Streamer) Close() (*PutResponse, error) { id: id, }, nil } + +func metaAttribute(cnr container.Container) string { + return cnr.Attribute("__NEOFS__METAINFO_CONSISTENCY") +} From 212a031e76521b29ccd44dc187cf529e8bbc0117 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Wed, 11 Dec 2024 11:26:47 +0300 Subject: [PATCH 07/13] morph,node/object/put: support "strict" meta-data policy According to https://github.com/nspcc-dev/neofs-api/pull/309, "strict" must wait for meta-data handling on the contract-side and every PUT request is responded only with transaction acceptance. Signed-off-by: Pavel Karpy --- pkg/morph/client/container/meta.go | 8 ++++++-- pkg/services/object/put/distributed.go | 15 +++++++++++++-- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/pkg/morph/client/container/meta.go b/pkg/morph/client/container/meta.go index 2afe4283f1..a4769464bf 100644 --- a/pkg/morph/client/container/meta.go +++ b/pkg/morph/client/container/meta.go @@ -6,10 +6,11 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/morph/client" ) -// SubmitObjectPut puts object meta information. +// SubmitObjectPut puts object meta information. With awaitTX it blocks until +// TX is accepted in chain or is expired. // // Returns any error encountered that caused the saving to interrupt. -func (c *Client) SubmitObjectPut(meta []byte, sigs [][]byte) error { +func (c *Client) SubmitObjectPut(awaitTX bool, meta []byte, sigs [][]byte) error { if len(meta) == 0 || len(sigs) == 0 { return errNilArgument } @@ -17,6 +18,9 @@ func (c *Client) SubmitObjectPut(meta []byte, sigs [][]byte) error { var prm client.InvokePrm prm.SetMethod(submitObjectPutMethod) prm.SetArgs(meta, sigs) + if awaitTX { + prm.Await() + } err := c.client.Invoke(prm) if err != nil { diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index 4e732397e8..9a34db73c1 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -155,11 +155,22 @@ func (t *distributedTarget) Close() (oid.ID, error) { return oid.ID{}, err } - if t.localNodeInContainer && (t.metainfoConsistencyAttr == "strict" || t.metainfoConsistencyAttr == "optimistic") { + if t.localNodeInContainer && t.metainfoConsistencyAttr != "" { t.metaMtx.RLock() defer t.metaMtx.RUnlock() - err = t.cnrClient.SubmitObjectPut(t.objSharedMeta, t.collectedSignatures) + var await bool + switch t.metainfoConsistencyAttr { + // TODO: there was no constant in SDK at the code creation moment + case "strict": + await = true + case "optimistic": + await = false + default: + return id, nil + } + + err = t.cnrClient.SubmitObjectPut(await, t.objSharedMeta, t.collectedSignatures) if err != nil { t.placementIterator.log.Info("failed to submit", zap.Stringer("oid", id), zap.Error(err)) } From 95bbed87cabb22fe8a5ff767ca68567a21fc1899 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Wed, 11 Dec 2024 10:46:49 +0300 Subject: [PATCH 08/13] morph/static: make it possible to wait for TX inclusion Will be helpful for new meta-data handling policies. Signed-off-by: Pavel Karpy --- pkg/innerring/processors/alphabet/process_emit.go | 2 +- pkg/morph/client/client.go | 5 ++++- pkg/morph/client/notary.go | 4 ++-- pkg/morph/client/static.go | 9 +++++++++ 4 files changed, 16 insertions(+), 4 deletions(-) diff --git a/pkg/innerring/processors/alphabet/process_emit.go b/pkg/innerring/processors/alphabet/process_emit.go index e8213ab2d5..23a791441e 100644 --- a/pkg/innerring/processors/alphabet/process_emit.go +++ b/pkg/innerring/processors/alphabet/process_emit.go @@ -27,7 +27,7 @@ func (ap *Processor) processEmit() { } // there is no signature collecting, so we don't need extra fee - err := ap.morphClient.Invoke(contract, 0, emitMethod) + err := ap.morphClient.Invoke(contract, false, 0, emitMethod) if err != nil { ap.log.Warn("can't invoke alphabet emit method", zap.Error(err)) diff --git a/pkg/morph/client/client.go b/pkg/morph/client/client.go index 6ec0a93155..8bd8429b66 100644 --- a/pkg/morph/client/client.go +++ b/pkg/morph/client/client.go @@ -294,7 +294,7 @@ func (e *notHaltStateError) Error() string { // Invoke invokes contract method by sending transaction into blockchain. // Supported args types: int64, string, util.Uint160, []byte and bool. -func (c *Client) Invoke(contract util.Uint160, fee fixedn.Fixed8, method string, args ...any) error { +func (c *Client) Invoke(contract util.Uint160, await bool, fee fixedn.Fixed8, method string, args ...any) error { var conn = c.conn.Load() if conn == nil { @@ -302,6 +302,9 @@ func (c *Client) Invoke(contract util.Uint160, fee fixedn.Fixed8, method string, } txHash, vub, err := conn.rpcActor.SendTunedCall(contract, method, nil, addFeeCheckerModifier(int64(fee)), args...) + if await { + _, err = conn.rpcActor.Wait(txHash, vub, err) + } if err != nil { return fmt.Errorf("could not invoke %s: %w", method, err) } diff --git a/pkg/morph/client/notary.go b/pkg/morph/client/notary.go index 92fc31394e..ad4d1f2fbb 100644 --- a/pkg/morph/client/notary.go +++ b/pkg/morph/client/notary.go @@ -311,7 +311,7 @@ func (c *Client) UpdateNeoFSAlphabetList(alphas keys.PublicKeys, txHash util.Uin // `nonce` and `vub` are used only if notary is enabled. func (c *Client) NotaryInvoke(contract util.Uint160, fee fixedn.Fixed8, nonce uint32, vub *uint32, method string, args ...any) error { if c.notary == nil { - return c.Invoke(contract, fee, method, args...) + return c.Invoke(contract, false, fee, method, args...) } return c.notaryInvoke(false, true, contract, nonce, vub, method, args...) @@ -324,7 +324,7 @@ func (c *Client) NotaryInvoke(contract util.Uint160, fee fixedn.Fixed8, nonce ui // Considered to be used by non-IR nodes. func (c *Client) NotaryInvokeNotAlpha(contract util.Uint160, fee fixedn.Fixed8, method string, args ...any) error { if c.notary == nil { - return c.Invoke(contract, fee, method, args...) + return c.Invoke(contract, false, fee, method, args...) } return c.notaryInvoke(false, false, contract, rand.Uint32(), nil, method, args...) diff --git a/pkg/morph/client/static.go b/pkg/morph/client/static.go index 4ac535f67e..c0a01eddfd 100644 --- a/pkg/morph/client/static.go +++ b/pkg/morph/client/static.go @@ -74,10 +74,18 @@ func (s StaticClient) Morph() *Client { type InvokePrm struct { TestInvokePrm + await bool + // optional parameters InvokePrmOptional } +// Await makes invokation block until TX is included in chain OR +// Valid Until Block is reached. Works _only_ for non-notary requests. +func (i *InvokePrm) Await() { + i.await = true +} + // InvokePrmOptional groups optional parameters of the Invoke operation. type InvokePrmOptional struct { // hash is an optional hash of the transaction @@ -145,6 +153,7 @@ func (s StaticClient) Invoke(prm InvokePrm) error { return s.client.Invoke( s.scScriptHash, + prm.await, fee, prm.method, prm.args..., From b6689c32bc21e76f1013a6cb60b601d46fa2c2de Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Wed, 11 Dec 2024 17:13:50 +0300 Subject: [PATCH 09/13] ir, morph/container: put container placement changes in a single TX It is the most secure way to ensure changes are done in sync and either all at once, or none of them. It is still can be a problem to update too long placement vectors in a single TX since there is always a certain limit, but it can be reconsidered separately. Signed-off-by: Pavel Karpy --- pkg/innerring/processors/container.go | 37 ---------- .../processors/container/process_container.go | 3 +- .../processors/netmap/process_epoch.go | 3 +- pkg/morph/client/container/nodes.go | 73 +++++++++++++------ pkg/morph/client/notary.go | 52 +++++++++++++ pkg/morph/client/static.go | 10 +++ 6 files changed, 115 insertions(+), 63 deletions(-) delete mode 100644 pkg/innerring/processors/container.go diff --git a/pkg/innerring/processors/container.go b/pkg/innerring/processors/container.go deleted file mode 100644 index 59d78e7755..0000000000 --- a/pkg/innerring/processors/container.go +++ /dev/null @@ -1,37 +0,0 @@ -package processors - -import ( - "fmt" - - cnrcli "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" - cid "github.com/nspcc-dev/neofs-sdk-go/container/id" - "github.com/nspcc-dev/neofs-sdk-go/netmap" -) - -// UpdatePlacementVectors updates placement vectors after a container placement -// change in Container contract. Empty vectors drops container vectors from the -// contract. -func UpdatePlacementVectors(cID cid.ID, cnrCli *cnrcli.Client, vectors [][]netmap.NodeInfo, replicas []uint32) error { - for i, vector := range vectors { - err := cnrCli.AddNextEpochNodes(cID, i, pubKeys(vector)) - if err != nil { - return fmt.Errorf("can't add %d placement vector to Container contract: %w", i, err) - } - } - - err := cnrCli.CommitContainerListUpdate(cID, replicas) - if err != nil { - return fmt.Errorf("can't commit container list to Container contract: %w", err) - } - - return nil -} - -func pubKeys(nodes []netmap.NodeInfo) [][]byte { - res := make([][]byte, 0, len(nodes)) - for _, node := range nodes { - res = append(res, node.PublicKey()) - } - - return res -} diff --git a/pkg/innerring/processors/container/process_container.go b/pkg/innerring/processors/container/process_container.go index fd2ee59c16..073a98a2e4 100644 --- a/pkg/innerring/processors/container/process_container.go +++ b/pkg/innerring/processors/container/process_container.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/nspcc-dev/neo-go/pkg/network/payload" - "github.com/nspcc-dev/neofs-node/pkg/innerring/processors" cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" "github.com/nspcc-dev/neofs-node/pkg/morph/event" containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container" @@ -135,7 +134,7 @@ func (cp *Processor) approvePutContainer(ctx *putContainerContext) { replicas = append(replicas, policy.ReplicaNumberByIndex(i)) } - err = processors.UpdatePlacementVectors(ctx.cID, cp.cnrClient, vectors, replicas) + err = cp.cnrClient.UpdateContainerPlacement(ctx.cID, vectors, replicas) if err != nil { cp.log.Error("could not update Container contract", zap.Stringer("cid", ctx.cID), zap.Error(err)) return diff --git a/pkg/innerring/processors/netmap/process_epoch.go b/pkg/innerring/processors/netmap/process_epoch.go index ab7d85551e..e91d640e6b 100644 --- a/pkg/innerring/processors/netmap/process_epoch.go +++ b/pkg/innerring/processors/netmap/process_epoch.go @@ -3,7 +3,6 @@ package netmap import ( "fmt" - "github.com/nspcc-dev/neofs-node/pkg/innerring/processors" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/audit" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/governance" "github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement" @@ -115,7 +114,7 @@ func (np *Processor) updatePlacementInContract(nm netmap.NetMap, l *zap.Logger) replicas = append(replicas, policy.ReplicaNumberByIndex(i)) } - err = processors.UpdatePlacementVectors(cID, np.containerWrp, vectors, replicas) + err = np.containerWrp.UpdateContainerPlacement(cID, vectors, replicas) if err != nil { l.Error("can't put placement vectors to Container contract", zap.Error(err)) continue diff --git a/pkg/morph/client/container/nodes.go b/pkg/morph/client/container/nodes.go index 195264507d..a8a38db1d5 100644 --- a/pkg/morph/client/container/nodes.go +++ b/pkg/morph/client/container/nodes.go @@ -2,44 +2,49 @@ package container import ( "fmt" + "slices" + "strings" + "github.com/nspcc-dev/neo-go/pkg/smartcontract" "github.com/nspcc-dev/neofs-node/pkg/morph/client" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/netmap" ) -// AddNextEpochNodes registers public keys as a container's placement vector -// with specified index. Registration must be finished with final -// [Client.CommitContainerListUpdate] call. Always sends a notary request with -// Alphabet multi-signature. -func (c *Client) AddNextEpochNodes(cid cid.ID, placementIndex int, nodesKeys [][]byte) error { - if len(nodesKeys) == 0 { - return errNilArgument +// UpdateContainerPlacement registers public keys as a container's placement +// vectors. Always sends a notary request with Alphabet multi-signature. +// Number of vectors must equal number of replicas. Empty vectors removes +// container placement from the contract. +func (c *Client) UpdateContainerPlacement(cid cid.ID, vectors [][]netmap.NodeInfo, replicas []uint32) error { + if len(vectors) == 0 { + return c.dropPlacement(cid[:]) } - prm := client.InvokePrm{} - prm.SetMethod(addNextEpochNodes) - prm.SetArgs(cid, placementIndex, nodesKeys) - prm.RequireAlphabetSignature() + cnrHash := c.client.ContractAddress() + b := smartcontract.NewBuilder() - err := c.client.Invoke(prm) + for i, vector := range vectors { + b.InvokeMethod(cnrHash, addNextEpochNodes, cid[:], i, toAnySlice(pubKeys(vector))) + } + b.InvokeMethod(cnrHash, commitContainerListUpdate, cid[:], toAnySlice(replicas)) + + script, err := b.Script() if err != nil { - return fmt.Errorf("could not invoke method (%s): %w", addNextEpochNodes, err) + return fmt.Errorf("building TX script: %w", err) + } + + err = c.client.RunAlphabetNotaryScript(script) + if err != nil { + return fmt.Errorf("could not invoke alphabet script: %w", err) } return nil } -// CommitContainerListUpdate finishes container placement updates for the current -// epoch made by former [Client.AddNextEpochNodes] calls. Always sends a notary -// request with Alphabet multi-signature. -func (c *Client) CommitContainerListUpdate(cid cid.ID, replicas []uint32) error { - if len(replicas) == 0 { - return errNilArgument - } - +func (c *Client) dropPlacement(cid []byte) error { prm := client.InvokePrm{} prm.SetMethod(commitContainerListUpdate) - prm.SetArgs(cid, replicas) + prm.SetArgs(cid, nil) prm.RequireAlphabetSignature() err := c.client.Invoke(prm) @@ -49,3 +54,27 @@ func (c *Client) CommitContainerListUpdate(cid cid.ID, replicas []uint32) error return nil } + +func pubKeys(nodes []netmap.NodeInfo) [][]byte { + res := make([][]byte, 0, len(nodes)) + for _, node := range nodes { + res = append(res, node.PublicKey()) + } + + // arrays take parts in transaction that should be multi-singed, so order + // is important to be the same + slices.SortFunc(res, func(a, b []byte) int { + return strings.Compare(string(a), string(b)) + }) + + return res +} + +func toAnySlice[T any](vv []T) []any { + res := make([]any, 0, len(vv)) + for _, v := range vv { + res = append(res, v) + } + + return res +} diff --git a/pkg/morph/client/notary.go b/pkg/morph/client/notary.go index ad4d1f2fbb..54a8ad5dc6 100644 --- a/pkg/morph/client/notary.go +++ b/pkg/morph/client/notary.go @@ -501,6 +501,58 @@ func (c *Client) notaryInvoke(committee, invokedByAlpha bool, contract util.Uint return nil } +func (c *Client) runAlphabetNotaryScript(script []byte, nonce uint32) error { + if c.notary == nil { + panic("notary support is not enabled") + } + + var conn = c.conn.Load() + if conn == nil { + return ErrConnectionLost + } + + alphabetList, err := c.notary.alphabetSource() // prepare arguments for test invocation + if err != nil { + return err + } + + until, err := c.notaryTxValidationLimit(conn) + if err != nil { + return err + } + + cosigners, err := c.notaryCosigners(false, alphabetList, false) + if err != nil { + return err + } + + nAct, err := notary.NewActor(conn.client, cosigners, c.acc) + if err != nil { + return err + } + + mainH, fbH, untilActual, err := nAct.Notarize(nAct.MakeTunedRun(script, nil, func(r *result.Invoke, t *transaction.Transaction) error { + if r.State != vmstate.Halt.String() { + return ¬HaltStateError{state: r.State, exception: r.FaultException} + } + + t.ValidUntilBlock = until + t.Nonce = nonce + + return nil + })) + if err != nil && !alreadyOnChainError(err) { + return err + } + + c.logger.Debug("notary request based on script invoked", + zap.Uint32("valid_until_block", untilActual), + zap.String("tx_hash", mainH.StringLE()), + zap.String("fallback_hash", fbH.StringLE())) + + return nil +} + func (c *Client) notaryCosigners(invokedByAlpha bool, ir []*keys.PublicKey, committee bool) ([]actor.SignerAccount, error) { multiaddrAccount, err := c.notaryMultisigAccount(ir, committee, invokedByAlpha) if err != nil { diff --git a/pkg/morph/client/static.go b/pkg/morph/client/static.go index c0a01eddfd..787af70245 100644 --- a/pkg/morph/client/static.go +++ b/pkg/morph/client/static.go @@ -160,6 +160,16 @@ func (s StaticClient) Invoke(prm InvokePrm) error { ) } +// RunAlphabetNotaryScript invokes script by sending tx to notary contract in +// blockchain. Fallback tx is a `RET`. Panics if Notary support is not enabled. +// TX is signed with internal key, 2/3+1 multisigners are expected. +func (s StaticClient) RunAlphabetNotaryScript(sc []byte) error { + // default nonce for Alphabet transactions that must be send asynchronous; + // it is chosen to be the same as in Invoke method + const nonce = 1 + return s.client.runAlphabetNotaryScript(sc, nonce) +} + // TestInvokePrm groups parameters of the TestInvoke operation. type TestInvokePrm struct { method string From ba2f72017bf0dd9cf39e73cb8677dae201a91159 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Wed, 11 Dec 2024 18:22:30 +0300 Subject: [PATCH 10/13] ir/container: drop useless parameter vars Signed-off-by: Pavel Karpy --- pkg/innerring/processors/container/process_container.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/pkg/innerring/processors/container/process_container.go b/pkg/innerring/processors/container/process_container.go index 073a98a2e4..5b8855310e 100644 --- a/pkg/innerring/processors/container/process_container.go +++ b/pkg/innerring/processors/container/process_container.go @@ -97,15 +97,6 @@ func (cp *Processor) approvePutContainer(ctx *putContainerContext) { var err error - prm := cntClient.PutPrm{} - - prm.SetContainer(e.Container()) - prm.SetKey(e.PublicKey()) - prm.SetSignature(e.Signature()) - prm.SetToken(e.SessionToken()) - prm.SetName(ctx.d.Name()) - prm.SetZone(ctx.d.Zone()) - nr := e.NotaryRequest() err = cp.cnrClient.Morph().NotarySignAndInvokeTX(nr.MainTransaction, true) From c6c6aea5f6d85d7b6eea3152681c2772d49814e4 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Wed, 11 Dec 2024 19:15:43 +0300 Subject: [PATCH 11/13] morph: support overloads for container.Put contract method It now may have additional arg that enables meta-on-chain support and be named, see https://github.com/nspcc-dev/neofs-contract/pull/451. Closes #2877. Signed-off-by: Pavel Karpy --- CHANGELOG.md | 1 + pkg/morph/client/container/put.go | 46 ++++++++-------- pkg/morph/event/container/put.go | 3 ++ pkg/morph/event/container/put_notary.go | 71 ++++++++++++++++++++++--- pkg/morph/event/opcodes.go | 12 +++++ 5 files changed, 105 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c27196377..9d52da6242 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ Changelog for NeoFS Node ## [Unreleased] ### Added +- Initial support for meta-on-chain for objects (#2877) ### Fixed diff --git a/pkg/morph/client/container/put.go b/pkg/morph/client/container/put.go index aec2665fd2..fa8205ed1c 100644 --- a/pkg/morph/client/container/put.go +++ b/pkg/morph/client/container/put.go @@ -5,6 +5,7 @@ import ( containercore "github.com/nspcc-dev/neofs-node/pkg/core/container" "github.com/nspcc-dev/neofs-node/pkg/morph/client" + containerSDK "github.com/nspcc-dev/neofs-sdk-go/container" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" ) @@ -21,6 +22,10 @@ func Put(c *Client, cnr containercore.Container) (*cid.ID, error) { prm.SetContainer(data) prm.SetName(d.Name()) prm.SetZone(d.Zone()) + switch metaAttribute(cnr.Value) { + case "optimistic", "strict": + prm.EnableMeta() + } if cnr.Session != nil { prm.SetToken(cnr.Session.Marshal()) @@ -41,12 +46,13 @@ func Put(c *Client, cnr containercore.Container) (*cid.ID, error) { // PutPrm groups parameters of Put operation. type PutPrm struct { - cnr []byte - key []byte - sig []byte - token []byte - name string - zone string + cnr []byte + key []byte + sig []byte + token []byte + name string + zone string + enableMetaOnChain bool client.InvokePrmOptional } @@ -81,6 +87,11 @@ func (p *PutPrm) SetZone(zone string) { p.zone = zone } +// EnableMeta enables meta-on-chain. +func (p *PutPrm) EnableMeta() { + p.enableMetaOnChain = true +} + // Put saves binary container with its session token, key and signature // in NeoFS system through Container contract call. // @@ -91,21 +102,10 @@ func (c *Client) Put(p PutPrm) error { return errNilArgument } - var ( - method string - prm client.InvokePrm - ) - - if p.name != "" { - method = putNamedMethod - prm.SetArgs(p.cnr, p.sig, p.key, p.token, p.name, p.zone) - } else { - method = putMethod - prm.SetArgs(p.cnr, p.sig, p.key, p.token) - } - - prm.SetMethod(method) + var prm client.InvokePrm + prm.SetMethod(putMethod) prm.InvokePrmOptional = p.InvokePrmOptional + prm.SetArgs(p.cnr, p.sig, p.key, p.token, p.name, p.zone, p.enableMetaOnChain) // no magic bugs with notary requests anymore, this operation should // _always_ be notary signed so make it one more time even if it is @@ -114,7 +114,11 @@ func (c *Client) Put(p PutPrm) error { err := c.client.Invoke(prm) if err != nil { - return fmt.Errorf("could not invoke method (%s): %w", method, err) + return fmt.Errorf("could not invoke method (%s): %w", putMethod, err) } return nil } + +func metaAttribute(cnr containerSDK.Container) string { + return cnr.Attribute("__NEOFS__METAINFO_CONSISTENCY") +} diff --git a/pkg/morph/event/container/put.go b/pkg/morph/event/container/put.go index ea2394e770..a22828f1c6 100644 --- a/pkg/morph/event/container/put.go +++ b/pkg/morph/event/container/put.go @@ -16,6 +16,9 @@ type Put struct { signature []byte publicKey []byte token []byte + name string + zone string + metaOnChain bool // For notary notifications only. // Contains raw transactions of notary request. diff --git a/pkg/morph/event/container/put_notary.go b/pkg/morph/event/container/put_notary.go index bf5d186464..0dba322ac1 100644 --- a/pkg/morph/event/container/put_notary.go +++ b/pkg/morph/event/container/put_notary.go @@ -32,6 +32,18 @@ func (p *Put) setToken(v []byte) { } } +func (p *Put) setName(v string) { + p.name = v +} + +func (p *Put) setZone(v string) { + p.zone = v +} + +func (p *Put) setMetaOnChain(v bool) { + p.metaOnChain = v +} + var putFieldSetters = []func(*Put, []byte){ // order on stack is reversed (*Put).setToken, @@ -56,6 +68,34 @@ func parsePutNotary(ev *Put, raw *payload.P2PNotaryRequest, ops []event.Op) erro fieldNum = 0 ) + switch len(ops) { + case expectedItemNumPut + 3: + enableMeta, err := event.BoolFromOpcode(ops[0]) + if err != nil { + return fmt.Errorf("parse arg meta: %w", err) + } + ev.setMetaOnChain(enableMeta) + + ops = ops[1:] + + err = parseNamedArgs(ev, ops) + if err != nil { + return err + } + + ops = ops[2:] + case expectedItemNumPut + 2: + err := parseNamedArgs(ev, ops) + if err != nil { + return err + } + + ops = ops[2:] + case expectedItemNumPut: + default: + return fmt.Errorf("unknown number of args: %d", len(ops)) + } + for _, op := range ops { currentOp = op.Code() @@ -104,14 +144,9 @@ func ParsePutNamedNotary(ne event.NotaryEvent) (event.Event, error) { err error ) - ev.zone, err = event.StringFromOpcode(ops[0]) + err = parseNamedArgs(&ev, ops) if err != nil { - return nil, fmt.Errorf("parse arg zone: %w", err) - } - - ev.name, err = event.StringFromOpcode(ops[1]) - if err != nil { - return nil, fmt.Errorf("parse arg name: %w", err) + return nil, err } err = parsePutNotary(&ev.Put, ne.Raw(), ops[putNamedAdditionalArgs:]) @@ -121,3 +156,25 @@ func ParsePutNamedNotary(ne event.NotaryEvent) (event.Event, error) { return ev, nil } + +type putEvNamed interface { + setName(v string) + setZone(v string) +} + +func parseNamedArgs(p putEvNamed, ops []event.Op) error { + zone, err := event.StringFromOpcode(ops[0]) + if err != nil { + return fmt.Errorf("parse arg zone: %w", err) + } + + name, err := event.StringFromOpcode(ops[1]) + if err != nil { + return fmt.Errorf("parse arg name: %w", err) + } + + p.setZone(zone) + p.setName(name) + + return nil +} diff --git a/pkg/morph/event/opcodes.go b/pkg/morph/event/opcodes.go index 3385c2eace..069748ba70 100644 --- a/pkg/morph/event/opcodes.go +++ b/pkg/morph/event/opcodes.go @@ -37,6 +37,18 @@ func BytesFromOpcode(op Op) ([]byte, error) { } } +// BoolFromOpcode tries to retrieve bool from Op. +func BoolFromOpcode(op Op) (bool, error) { + switch code := op.Code(); code { + case opcode.PUSHT: + return true, nil + case opcode.PUSHF: + return false, nil + default: + return false, fmt.Errorf("unexpected ByteArray opcode %s", code) + } +} + // IntFromOpcode tries to retrieve int from Op. func IntFromOpcode(op Op) (int64, error) { switch code := op.Code(); { From 70bd9ea1056551c706bfb64ff8307782d34b5702 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Wed, 18 Dec 2024 19:32:23 +0300 Subject: [PATCH 12/13] node/object/put: improve metadata logging Add logs about successful operation. Also, log empty signatures and do not try to continue sending when there are no signatures. Signed-off-by: Pavel Karpy --- pkg/services/object/put/distributed.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index 9a34db73c1..c4582bf4fd 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -159,6 +159,11 @@ func (t *distributedTarget) Close() (oid.ID, error) { t.metaMtx.RLock() defer t.metaMtx.RUnlock() + if len(t.collectedSignatures) == 0 { + t.placementIterator.log.Info("skip chain meta information subbmit, zero signatures were collected", zap.Stringer("oid", id)) + return id, nil + } + var await bool switch t.metainfoConsistencyAttr { // TODO: there was no constant in SDK at the code creation moment @@ -172,8 +177,11 @@ func (t *distributedTarget) Close() (oid.ID, error) { err = t.cnrClient.SubmitObjectPut(await, t.objSharedMeta, t.collectedSignatures) if err != nil { - t.placementIterator.log.Info("failed to submit", zap.Stringer("oid", id), zap.Error(err)) + t.placementIterator.log.Info("failed to submit object meta information", zap.Stringer("oid", id), zap.Error(err)) + return id, nil } + + t.placementIterator.log.Debug("submitted object meta information", zap.Stringer("oid", id)) } return id, nil From ae6e37d72e1d6b32352e79a5383d30da21edc4d2 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Wed, 18 Dec 2024 19:35:45 +0300 Subject: [PATCH 13/13] node/object/put: use block as `validUntil`, not epoches It is more natural for on-chain operations. Also, it requires handling sync problems when different blocks are used for the same objects on different nodes: two signatures are always attached with epoch duration difference. Refs https://github.com/nspcc-dev/neofs-contract/pull/451. Signed-off-by: Pavel Karpy --- cmd/neofs-node/morph.go | 2 + cmd/neofs-node/netmap.go | 12 ++- cmd/neofs-node/object.go | 2 +- cmd/neofs-node/reputation.go | 2 + cmd/neofs-node/transport.go | 27 ++---- pkg/core/netmap/state.go | 7 ++ pkg/core/object/replicate.go | 9 +- pkg/core/object/replicate_test.go | 2 +- .../transport/object/grpc/replication.go | 45 ++++++++-- .../transport/object/grpc/replication_test.go | 51 ++++++++--- pkg/network/transport/object/grpc/service.go | 5 +- pkg/services/object/put/distributed.go | 88 +++++++++++++++---- pkg/services/object/put/local.go | 3 +- pkg/services/object/put/remote.go | 7 +- pkg/services/object/put/service.go | 7 +- pkg/services/object/put/streamer.go | 4 +- 16 files changed, 194 insertions(+), 79 deletions(-) diff --git a/cmd/neofs-node/morph.go b/cmd/neofs-node/morph.go index cc40c1331a..b87ca464bf 100644 --- a/cmd/neofs-node/morph.go +++ b/cmd/neofs-node/morph.go @@ -97,6 +97,8 @@ func listenMorphNotifications(c *cfg) { registerBlockHandler(lis, func(block *block.Block) { c.log.Debug("new block", zap.Uint32("index", block.Index)) + c.networkState.block.Store(block.Index) + err = c.persistate.SetUInt32(persistateFSChainLastBlockKey, block.Index) if err != nil { c.log.Warn("can't update persistent state", diff --git a/cmd/neofs-node/netmap.go b/cmd/neofs-node/netmap.go index e03c2ecf5d..3a197ef08b 100644 --- a/cmd/neofs-node/netmap.go +++ b/cmd/neofs-node/netmap.go @@ -24,7 +24,9 @@ import ( // primary solution of local network state dump. type networkState struct { - epoch atomic.Uint64 + epoch atomic.Uint64 + block atomic.Uint32 + epochDuration atomic.Uint64 controlNetStatus atomic.Value // control.NetmapStatus @@ -46,6 +48,14 @@ func (s *networkState) CurrentEpoch() uint64 { return s.epoch.Load() } +func (s *networkState) CurrentBlock() uint32 { + return s.block.Load() +} + +func (s *networkState) CurrentEpochDuration() uint64 { + return s.epochDuration.Load() +} + func (s *networkState) setCurrentEpoch(v uint64) { s.epoch.Store(v) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index c683074503..de391b3304 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -350,7 +350,7 @@ func initObjectService(c *cfg) { firstSvc := objectService.NewMetricCollector(signSvc, c.metricsCollector) - server := objectTransportGRPC.New(firstSvc, mNumber, objNode, neofsecdsa.SignerRFC6979(c.shared.basics.key.PrivateKey)) + server := objectTransportGRPC.New(firstSvc, mNumber, objNode, neofsecdsa.SignerRFC6979(c.shared.basics.key.PrivateKey), c.cfgNetmap.state) for _, srv := range c.cfgGRPC.servers { objectGRPC.RegisterObjectServiceServer(srv, server) diff --git a/cmd/neofs-node/reputation.go b/cmd/neofs-node/reputation.go index 37e2e9fd84..d2ea3ad09f 100644 --- a/cmd/neofs-node/reputation.go +++ b/cmd/neofs-node/reputation.go @@ -230,6 +230,8 @@ func initReputationService(c *cfg) { return } + c.networkState.epochDuration.Store(duration) + iterations, err := c.cfgNetmap.wrapper.EigenTrustIterations() if err != nil { log.Debug("could not fetch iteration number", zap.Error(err)) diff --git a/cmd/neofs-node/transport.go b/cmd/neofs-node/transport.go index 156be9a54c..2382e17242 100644 --- a/cmd/neofs-node/transport.go +++ b/cmd/neofs-node/transport.go @@ -5,7 +5,6 @@ import ( "fmt" objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" - "github.com/nspcc-dev/neofs-api-go/v2/refs" rawclient "github.com/nspcc-dev/neofs-api-go/v2/rpc/client" "github.com/nspcc-dev/neofs-api-go/v2/rpc/common" "github.com/nspcc-dev/neofs-api-go/v2/rpc/grpc" @@ -13,7 +12,6 @@ import ( "github.com/nspcc-dev/neofs-api-go/v2/status" coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" - neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" ) type transport struct { @@ -22,7 +20,7 @@ type transport struct { // SendReplicationRequestToNode connects to described node and sends prepared // replication request message to it. -func (x *transport) SendReplicationRequestToNode(ctx context.Context, req []byte, node coreclient.NodeInfo) (*neofscrypto.Signature, error) { +func (x *transport) SendReplicationRequestToNode(ctx context.Context, req []byte, node coreclient.NodeInfo) ([]byte, error) { c, err := x.clients.Get(node) if err != nil { return nil, fmt.Errorf("connect to remote node: %w", err) @@ -40,12 +38,12 @@ func (x *transport) SendReplicationRequestToNode(ctx context.Context, req []byte } return resp.err }) - return resp.sig, err + return resp.sigs, err } type replicateResponse struct { - sig *neofscrypto.Signature - err error + sigs []byte + err error } func (x replicateResponse) ToGRPCMessage() grpc.Message { return new(objectGRPC.ReplicateResponse) } @@ -70,22 +68,7 @@ func (x *replicateResponse) FromGRPCMessage(gm grpc.Message) error { return nil } - sig := m.GetObjectSignature() - if sig == nil { - return nil - } - - sigV2 := new(refs.Signature) - err := sigV2.Unmarshal(sig) - if err != nil { - return fmt.Errorf("decoding signature from proto message: %w", err) - } - - x.sig = new(neofscrypto.Signature) - err = x.sig.ReadFromV2(*sigV2) - if err != nil { - return fmt.Errorf("invalid signature: %w", err) - } + x.sigs = m.GetObjectSignature() return nil } diff --git a/pkg/core/netmap/state.go b/pkg/core/netmap/state.go index 9b5b441a10..6482dc332e 100644 --- a/pkg/core/netmap/state.go +++ b/pkg/core/netmap/state.go @@ -5,3 +5,10 @@ type State interface { // CurrentEpoch returns the number of the current NeoFS epoch. CurrentEpoch() uint64 } + +// StateDetailed groups block, epoch and its duration information about FS chain. +type StateDetailed interface { + State + CurrentBlock() uint32 + CurrentEpochDuration() uint64 +} diff --git a/pkg/core/object/replicate.go b/pkg/core/object/replicate.go index 194b99cd0b..05d62b423a 100644 --- a/pkg/core/object/replicate.go +++ b/pkg/core/object/replicate.go @@ -9,8 +9,7 @@ import ( ) const ( - validInterval = 10 // in epochs - currentVersion = 7 // it is also a number of fields + currentVersion = 7 // it is also a number of fields ) const ( @@ -34,11 +33,11 @@ const ( // "size": payload size // "deleted": array of _raw_ object IDs // "locked": array of _raw_ object IDs -// "validuntil": last valid epoch number for meta information +// "validuntil": last valid block number for meta information // // Last valid epoch is object's creation epoch + 10. func EncodeReplicationMetaInfo(cID cid.ID, oID oid.ID, pSize uint64, - deleted, locked []oid.ID, createdAt uint64, magicNumber uint32) []byte { + deleted, locked []oid.ID, vub uint64, magicNumber uint32) []byte { kvs := []stackitem.MapElement{ kv(networkMagicKey, magicNumber), kv(cidKey, cID[:]), @@ -46,7 +45,7 @@ func EncodeReplicationMetaInfo(cID cid.ID, oID oid.ID, pSize uint64, kv(sizeKey, pSize), oidsKV(deletedKey, deleted), oidsKV(lockedKey, locked), - kv(validUntilKey, createdAt+validInterval), + kv(validUntilKey, vub), } result, err := stackitem.Serialize(stackitem.NewMapWithValue(kvs)) diff --git a/pkg/core/object/replicate_test.go b/pkg/core/object/replicate_test.go index 3f14669163..22ad851916 100644 --- a/pkg/core/object/replicate_test.go +++ b/pkg/core/object/replicate_test.go @@ -50,7 +50,7 @@ func TestMetaInfo(t *testing.T) { require.Equal(t, locked, stackItemToOIDs(t, mm[5].Value)) require.Equal(t, validUntilKey, string(mm[6].Key.Value().([]byte))) - require.Equal(t, validUntil+validInterval, mm[6].Value.Value().(*big.Int).Uint64()) + require.Equal(t, validUntil, mm[6].Value.Value().(*big.Int).Uint64()) } func stackItemToOIDs(t *testing.T, value stackitem.Item) []oid.ID { diff --git a/pkg/network/transport/object/grpc/replication.go b/pkg/network/transport/object/grpc/replication.go index e73f89c1ba..4c99e5cf89 100644 --- a/pkg/network/transport/object/grpc/replication.go +++ b/pkg/network/transport/object/grpc/replication.go @@ -3,6 +3,7 @@ package object import ( "bytes" "context" + "encoding/binary" "errors" "fmt" @@ -228,17 +229,47 @@ func (s *Server) metaInfoSignature(o object.Object) ([]byte, error) { default: } - metaInfo := objectcore.EncodeReplicationMetaInfo(o.GetContainerID(), o.GetID(), o.PayloadSize(), deleted, locked, - o.CreationEpoch(), s.mNumber) + currentBlock := s.nmState.CurrentBlock() + currentEpochDuration := s.nmState.CurrentEpochDuration() + firstBlock := (uint64(currentBlock)/currentEpochDuration + 1) * currentEpochDuration + secondBlock := firstBlock + currentEpochDuration + thirdBlock := secondBlock + currentEpochDuration - var sig neofscrypto.Signature - err := sig.Calculate(s.signer, metaInfo) + firstMeta := objectcore.EncodeReplicationMetaInfo(o.GetContainerID(), o.GetID(), o.PayloadSize(), deleted, locked, firstBlock, s.mNumber) + secondMeta := objectcore.EncodeReplicationMetaInfo(o.GetContainerID(), o.GetID(), o.PayloadSize(), deleted, locked, secondBlock, s.mNumber) + thirdMeta := objectcore.EncodeReplicationMetaInfo(o.GetContainerID(), o.GetID(), o.PayloadSize(), deleted, locked, thirdBlock, s.mNumber) + + var firstSig neofscrypto.Signature + var secondSig neofscrypto.Signature + var thirdSig neofscrypto.Signature + + err := firstSig.Calculate(s.signer, firstMeta) + if err != nil { + return nil, fmt.Errorf("signature failure: %w", err) + } + err = secondSig.Calculate(s.signer, secondMeta) if err != nil { return nil, fmt.Errorf("signature failure: %w", err) } + err = thirdSig.Calculate(s.signer, thirdMeta) + if err != nil { + return nil, fmt.Errorf("signature failure: %w", err) + } + + firstSigV2 := new(refsv2.Signature) + firstSig.WriteToV2(firstSigV2) + secondSigV2 := new(refsv2.Signature) + secondSig.WriteToV2(secondSigV2) + thirdSigV2 := new(refsv2.Signature) + thirdSig.WriteToV2(thirdSigV2) - sigV2 := new(refsv2.Signature) - sig.WriteToV2(sigV2) + res := make([]byte, 0, 4+firstSigV2.StableSize()+4+secondSigV2.StableSize()+4+thirdSigV2.StableSize()) + res = binary.LittleEndian.AppendUint32(res, uint32(firstSigV2.StableSize())) + res = append(res, firstSigV2.StableMarshal(nil)...) + res = binary.LittleEndian.AppendUint32(res, uint32(secondSigV2.StableSize())) + res = append(res, secondSigV2.StableMarshal(nil)...) + res = binary.LittleEndian.AppendUint32(res, uint32(thirdSigV2.StableSize())) + res = append(res, thirdSigV2.StableMarshal(nil)...) - return sigV2.StableMarshal(nil), nil + return res, nil } diff --git a/pkg/network/transport/object/grpc/replication_test.go b/pkg/network/transport/object/grpc/replication_test.go index ba4c31e2ec..6dbd1dbae7 100644 --- a/pkg/network/transport/object/grpc/replication_test.go +++ b/pkg/network/transport/object/grpc/replication_test.go @@ -6,6 +6,7 @@ import ( "crypto/ecdsa" "crypto/elliptic" "crypto/rand" + "encoding/binary" "errors" "testing" @@ -164,7 +165,7 @@ func anyValidRequest(tb testing.TB, signer neofscrypto.Signer, cnr cid.ID, objID func TestServer_Replicate(t *testing.T) { var noCallNode noCallTestNode var noCallObjSvc noCallObjectService - noCallSrv := New(noCallObjSvc, 0, &noCallNode, neofscryptotest.Signer()) + noCallSrv := New(noCallObjSvc, 0, &noCallNode, neofscryptotest.Signer(), netmapStateDetailed{}) clientSigner := neofscryptotest.Signer() clientPubKey := neofscrypto.PublicKeyBytes(clientSigner.Public()) serverPubKey := neofscrypto.PublicKeyBytes(neofscryptotest.Signer().Public()) @@ -328,7 +329,7 @@ func TestServer_Replicate(t *testing.T) { t.Run("apply storage policy failure", func(t *testing.T) { node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object) - srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer()) + srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer(), netmapStateDetailed{}) node.cnrErr = errors.New("any error") @@ -340,7 +341,7 @@ func TestServer_Replicate(t *testing.T) { t.Run("client or server mismatches object's storage policy", func(t *testing.T) { node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object) - srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer()) + srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer(), netmapStateDetailed{}) node.serverOutsideCnr = true node.clientOutsideCnr = true @@ -360,7 +361,7 @@ func TestServer_Replicate(t *testing.T) { t.Run("local storage failure", func(t *testing.T) { node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object) - srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer()) + srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer(), netmapStateDetailed{}) node.storeErr = errors.New("any error") @@ -375,7 +376,7 @@ func TestServer_Replicate(t *testing.T) { signer := neofscryptotest.Signer() reqForSignature, o := anyValidRequest(t, clientSigner, cnr, objID) node := newTestNode(t, serverPubKey, clientPubKey, cnr, reqForSignature.Object) - srv := New(noCallObjSvc, mNumber, node, signer) + srv := New(noCallObjSvc, mNumber, node, signer, netmapStateDetailed{}) t.Run("signature not requested", func(t *testing.T) { resp, err := srv.Replicate(context.Background(), reqForSignature) @@ -394,20 +395,30 @@ func TestServer_Replicate(t *testing.T) { require.Empty(t, resp.GetStatus().GetMessage()) require.NotNil(t, resp.GetObjectSignature()) - var sigV2 refsv2.Signature - require.NoError(t, sigV2.Unmarshal(resp.GetObjectSignature())) + sigsRaw := resp.GetObjectSignature() - var sig neofscrypto.Signature - require.NoError(t, sig.ReadFromV2(sigV2)) + for i := range 1 { + var sigV2 refsv2.Signature + l := binary.LittleEndian.Uint32(sigsRaw) - require.Equal(t, signer.PublicKeyBytes, sig.PublicKeyBytes()) - require.True(t, sig.Verify(objectcore.EncodeReplicationMetaInfo(o.GetContainerID(), o.GetID(), o.PayloadSize(), nil, nil, o.CreationEpoch(), mNumber))) + require.NoError(t, sigV2.Unmarshal(sigsRaw[4:4+l])) + + var sig neofscrypto.Signature + require.NoError(t, sig.ReadFromV2(sigV2)) + + require.Equal(t, signer.PublicKeyBytes, sig.PublicKeyBytes()) + require.True(t, sig.Verify(objectcore.EncodeReplicationMetaInfo( + o.GetContainerID(), o.GetID(), o.PayloadSize(), nil, nil, + uint64((123+1+i)*240), mNumber))) + + sigsRaw = sigsRaw[:4+l] + } }) }) t.Run("OK", func(t *testing.T) { node := newTestNode(t, serverPubKey, clientPubKey, cnr, req.Object) - srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer()) + srv := New(noCallObjSvc, 0, node, neofscryptotest.Signer(), netmapStateDetailed{}) resp, err := srv.Replicate(context.Background(), req) require.NoError(t, err) @@ -416,6 +427,20 @@ func TestServer_Replicate(t *testing.T) { }) } +type netmapStateDetailed struct{} + +func (n netmapStateDetailed) CurrentEpoch() uint64 { + return 123 +} + +func (n netmapStateDetailed) CurrentBlock() uint32 { + return 123 * 240 +} + +func (n netmapStateDetailed) CurrentEpochDuration() uint64 { + return 240 +} + type nopNode struct{} func (x nopNode) ForEachContainerNodePublicKeyInLastTwoEpochs(cid.ID, func(pubKey []byte) bool) error { @@ -434,7 +459,7 @@ func BenchmarkServer_Replicate(b *testing.B) { ctx := context.Background() var node nopNode - srv := New(nil, 0, node, neofscryptotest.Signer()) + srv := New(nil, 0, node, neofscryptotest.Signer(), netmapStateDetailed{}) for _, tc := range []struct { name string diff --git a/pkg/network/transport/object/grpc/service.go b/pkg/network/transport/object/grpc/service.go index 6ebe90c5c5..7f3d770cc6 100644 --- a/pkg/network/transport/object/grpc/service.go +++ b/pkg/network/transport/object/grpc/service.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/neofs-api-go/v2/object" objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" status "github.com/nspcc-dev/neofs-api-go/v2/status/grpc" + "github.com/nspcc-dev/neofs-node/pkg/core/netmap" objectSvc "github.com/nspcc-dev/neofs-node/pkg/services/object" "github.com/nspcc-dev/neofs-node/pkg/services/util" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" @@ -51,15 +52,17 @@ type Server struct { node Node signer neofscrypto.Signer mNumber uint32 + nmState netmap.StateDetailed } // New creates, initializes and returns Server instance. -func New(c objectSvc.ServiceServer, magicNumber uint32, node Node, signer neofscrypto.Signer) *Server { +func New(c objectSvc.ServiceServer, magicNumber uint32, node Node, signer neofscrypto.Signer, nmState netmap.StateDetailed) *Server { return &Server{ srv: c, node: node, signer: signer, mNumber: magicNumber, + nmState: nmState, } } diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index c4582bf4fd..ce5d26f3ae 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -2,12 +2,14 @@ package putsvc import ( "bytes" + "encoding/binary" "fmt" "math" "slices" "sync" "sync/atomic" + "github.com/nspcc-dev/neofs-api-go/v2/refs" "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/core/object" chaincontainer "github.com/nspcc-dev/neofs-node/pkg/morph/client/container" @@ -23,15 +25,17 @@ import ( type preparedObjectTarget interface { WriteObject(*objectSDK.Object, object.ContentMeta, encodedObject) error - Close() (oid.ID, *neofscrypto.Signature, error) + Close() (oid.ID, []byte, error) } type distributedTarget struct { placementIterator placementIterator - obj *objectSDK.Object - objMeta object.ContentMeta - networkMagicNumber uint32 + obj *objectSDK.Object + objMeta object.ContentMeta + networkMagicNumber uint32 + currentBlock uint32 + currentEpochDuration uint64 cnrClient *chaincontainer.Client metainfoConsistencyAttr string @@ -147,8 +151,9 @@ func (t *distributedTarget) Close() (oid.ID, error) { default: } + expectedVUB := (uint64(t.currentBlock)/t.currentEpochDuration + 2) * t.currentEpochDuration t.objSharedMeta = object.EncodeReplicationMetaInfo(t.obj.GetContainerID(), t.obj.GetID(), t.obj.PayloadSize(), deletedObjs, - lockedObjs, t.obj.CreationEpoch(), t.networkMagicNumber) + lockedObjs, expectedVUB, t.networkMagicNumber) id := t.obj.GetID() err := t.placementIterator.iterateNodesForObject(id, t.sendObject) if err != nil { @@ -199,7 +204,7 @@ func (t *distributedTarget) sendObject(node nodeDesc) error { return fmt.Errorf("could not write header: %w", err) } - _, sig, err := target.Close() + _, sigsRaw, err := target.Close() if err != nil { return fmt.Errorf("could not close object stream: %w", err) } @@ -207,28 +212,77 @@ func (t *distributedTarget) sendObject(node nodeDesc) error { if t.localNodeInContainer && !node.local { // These should technically be errors, but we don't have // a complete implementation now, so errors are substituted with logs. - var l = t.placementIterator.log.With(zap.Stringer("oid", t.obj.GetID())) + var l = t.placementIterator.log.With(zap.Stringer("oid", t.obj.GetID()), + zap.String("node", network.StringifyGroup(node.info.AddressGroup()))) - if sig == nil { - l.Info("missing object meta signature") + sigs, err := decodeSignatures(sigsRaw) + if err != nil { + l.Info("failed to decode signatures", zap.Error(err)) return nil } - if !bytes.Equal(sig.PublicKeyBytes(), node.info.PublicKey()) { - l.Info("public key differs in object meta signature") + for i, sig := range sigs { + if !bytes.Equal(sig.PublicKeyBytes(), node.info.PublicKey()) { + l.Info("public key differs in object meta signature", zap.Int("signature index", i)) + continue + } + + if !sig.Verify(t.objSharedMeta) { + continue + } + + t.metaMtx.Lock() + t.collectedSignatures = append(t.collectedSignatures, sig.Value()) + t.metaMtx.Unlock() + return nil } - if !sig.Verify(t.objSharedMeta) { - l.Info("meta signature verification failed", zap.String("node", network.StringifyGroup(node.info.AddressGroup()))) + l.Info("metadata: verification failed: no valid signatures received") + } + + return nil +} + +func decodeSignatures(b []byte) ([]neofscrypto.Signature, error) { + res := make([]neofscrypto.Signature, 3) + for i := range res { + var offset int + var err error + + res[i], offset, err = decodeSignature(b) + if err != nil { + return nil, fmt.Errorf("decoding %d signature from proto message: %w", i, err) } - t.metaMtx.Lock() - t.collectedSignatures = append(t.collectedSignatures, sig.Value()) - t.metaMtx.Unlock() + b = b[offset:] } - return nil + return res, nil +} + +func decodeSignature(b []byte) (neofscrypto.Signature, int, error) { + if len(b) < 4 { + return neofscrypto.Signature{}, 0, fmt.Errorf("unexpected signature format: len: %d", len(b)) + } + l := int(binary.LittleEndian.Uint32(b[:4])) + if len(b) < 4+l { + return neofscrypto.Signature{}, 0, fmt.Errorf("unexpected signature format: len: %d, len claimed: %d", len(b), l) + } + + sig := new(refs.Signature) + err := sig.Unmarshal(b[4 : 4+l]) + if err != nil { + return neofscrypto.Signature{}, 0, fmt.Errorf("decoding signature from proto message: %w", err) + } + + var res neofscrypto.Signature + err = res.ReadFromV2(*sig) + if err != nil { + return neofscrypto.Signature{}, 0, fmt.Errorf("invalid signature: %w", err) + } + + return res, 4 + l, nil } type errNotEnoughNodes struct { diff --git a/pkg/services/object/put/local.go b/pkg/services/object/put/local.go index bf156d715a..77508e69b2 100644 --- a/pkg/services/object/put/local.go +++ b/pkg/services/object/put/local.go @@ -8,7 +8,6 @@ import ( objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-sdk-go/checksum" - neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/tzhash/tz" @@ -48,7 +47,7 @@ func (t *localTarget) WriteObject(obj *object.Object, meta objectCore.ContentMet return nil } -func (t *localTarget) Close() (oid.ID, *neofscrypto.Signature, error) { +func (t *localTarget) Close() (oid.ID, []byte, error) { err := putObjectLocally(t.storage, t.obj, t.meta, &t.enc) if err != nil { return oid.ID{}, nil, err diff --git a/pkg/services/object/put/remote.go b/pkg/services/object/put/remote.go index a484822741..256f7931dd 100644 --- a/pkg/services/object/put/remote.go +++ b/pkg/services/object/put/remote.go @@ -10,7 +10,6 @@ import ( objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" internalclient "github.com/nspcc-dev/neofs-node/pkg/services/object/internal/client" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" - neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa" "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object" @@ -54,13 +53,13 @@ func (t *remoteTarget) WriteObject(obj *object.Object, _ objectcore.ContentMeta, return nil } -func (t *remoteTarget) Close() (oid.ID, *neofscrypto.Signature, error) { +func (t *remoteTarget) Close() (oid.ID, []byte, error) { if t.enc.hdrOff > 0 { - sig, err := t.transport.SendReplicationRequestToNode(t.ctx, t.enc.b, t.nodeInfo) + sigs, err := t.transport.SendReplicationRequestToNode(t.ctx, t.enc.b, t.nodeInfo) if err != nil { return oid.ID{}, nil, fmt.Errorf("replicate object to remote node (key=%x): %w", t.nodeInfo.PublicKey(), err) } - return t.obj.GetID(), sig, nil + return t.obj.GetID(), sigs, nil } var sessionInfo *util.SessionInfo diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index faa87e02c0..8b7b6e72c0 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -11,7 +11,6 @@ import ( objutil "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/util" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" - neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.uber.org/zap" @@ -37,7 +36,7 @@ type Option func(*cfg) type Transport interface { // SendReplicationRequestToNode sends a prepared replication request message to // the specified remote node. - SendReplicationRequestToNode(ctx context.Context, req []byte, node client.NodeInfo) (*neofscrypto.Signature, error) + SendReplicationRequestToNode(ctx context.Context, req []byte, node client.NodeInfo) ([]byte, error) } type ClientConstructor interface { @@ -96,7 +95,7 @@ type cfg struct { fmtValidatorOpts []object.FormatValidatorOption - networkState netmap.State + networkState netmap.StateDetailed clientConstructor ClientConstructor @@ -178,7 +177,7 @@ func WithWorkerPools(remote, local util.WorkerPool) Option { } } -func WithNetworkState(v netmap.State) Option { +func WithNetworkState(v netmap.StateDetailed) Option { return func(c *cfg) { c.networkState = v c.fmtValidatorOpts = append(c.fmtValidatorOpts, object.WithNetState(v)) diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index fbaa631eaa..cec1c31b32 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -208,7 +208,9 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) internal.Target { withBroadcast := !localOnly && (typ == object.TypeTombstone || typ == object.TypeLock) return &distributedTarget{ - networkMagicNumber: p.networkMagic, + currentBlock: p.networkState.CurrentBlock(), + currentEpochDuration: p.networkState.CurrentEpochDuration(), + networkMagicNumber: p.networkMagic, placementIterator: placementIterator{ log: p.log, neoFSNet: p.neoFSNet,