Skip to content

Commit

Permalink
Add list endpoints.
Browse files Browse the repository at this point in the history
  • Loading branch information
q-uint committed May 17, 2024
1 parent 2c05823 commit bc4c645
Show file tree
Hide file tree
Showing 15 changed files with 5,453 additions and 455 deletions.
61 changes: 60 additions & 1 deletion agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"google.golang.org/protobuf/proto"
"net/url"
"reflect"
"time"
Expand Down Expand Up @@ -158,6 +159,33 @@ func (a Agent) Call(canisterID principal.Principal, methodName string, args []an
return call.CallAndWait(values...)
}

// CallProto calls a method on a canister and unmarshals the result into the given proto message.
func (a Agent) CallProto(canisterID principal.Principal, methodName string, in, out proto.Message) error {
payload, err := proto.Marshal(in)
if err != nil {
return err
}
requestID, data, err := a.sign(Request{
Type: RequestTypeCall,
Sender: a.Sender(),
IngressExpiry: a.expiryDate(),
CanisterID: canisterID,
MethodName: methodName,
Arguments: payload,
})
if err != nil {
return err
}
if _, err := a.call(canisterID, data); err != nil {
return err
}
raw, err := a.poll(canisterID, *requestID)
if err != nil {
return err
}
return proto.Unmarshal(raw, out)
}

// Client returns the underlying Client of the Agent.
func (a Agent) Client() *Client {
return &a.client
Expand Down Expand Up @@ -301,14 +329,45 @@ func (a Agent) GetRootKey() []byte {
}

// Query calls a method on a canister and unmarshals the result into the given values.
func (a Agent) Query(canisterID principal.Principal, methodName string, args []any, values []any) error {
func (a Agent) Query(canisterID principal.Principal, methodName string, args, values []any) error {
query, err := a.CreateQuery(canisterID, methodName, args...)
if err != nil {
return err
}
return query.Query(values...)
}

// QueryProto calls a method on a canister and unmarshals the result into the given proto message.
func (a Agent) QueryProto(canisterID principal.Principal, methodName string, in, out proto.Message) error {
payload, err := proto.Marshal(in)
if err != nil {
return err
}
_, data, err := a.sign(Request{
Type: RequestTypeQuery,
Sender: a.Sender(),
IngressExpiry: a.expiryDate(),
CanisterID: canisterID,
MethodName: methodName,
Arguments: payload,
})
if err != nil {
return err
}
resp, err := a.client.Query(canisterID, data)
if err != nil {
return err
}
var response Response
if err := cbor.Unmarshal(resp, &response); err != nil {
return err
}
if response.Status != "replied" {
return fmt.Errorf("status: %s", response.Status)
}
return proto.Unmarshal(response.Reply["arg"], out)
}

// ReadStateCertificate reads the certificate state of the given canister at the given path.
func (a Agent) ReadStateCertificate(canisterID principal.Principal, path [][]hashtree.Label) (hashtree.Node, error) {
c, err := a.readStateCertificate(canisterID, path)
Expand Down
91 changes: 91 additions & 0 deletions registry/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package registry

import (
"bytes"
"fmt"
"github.com/aviate-labs/agent-go/principal"
v1 "github.com/aviate-labs/agent-go/registry/proto/v1"
"google.golang.org/protobuf/proto"
"sort"
"strings"
)

type Client struct {
dp *DataProvider
deltas []*v1.RegistryDelta
}

func New() (*Client, error) {
dp, err := NewDataProvider()
if err != nil {
return nil, err
}
deltas, _, err := dp.GetChangesSince(0)
if err != nil {
return nil, err
}
sort.Slice(deltas, func(i, j int) bool {
return 0 < bytes.Compare(deltas[i].Key, deltas[j].Key)
})
return &Client{
dp: dp,
deltas: deltas,
}, nil
}

func (c *Client) GetNNSSubnetID() (*principal.Principal, error) {
v, _, err := c.dp.GetValueUpdate([]byte("nns_subnet_id"), nil)
if err != nil {
return nil, err
}
var nnsSubnetID v1.SubnetId
if err := proto.Unmarshal(v, &nnsSubnetID); err != nil {
return nil, err
}
return &principal.Principal{Raw: nnsSubnetID.PrincipalId.Raw}, nil
}

func (c *Client) GetNodeList() ([]*v1.NodeRecord, error) {
var nodes []*v1.NodeRecord
for _, delta := range c.deltas {
key := string(delta.Key)
if strings.HasPrefix(key, "node_record_") {
for _, value := range delta.Values {
record := new(v1.NodeRecord)
if err := proto.Unmarshal(value.Value, record); err != nil {
return nil, err
}
nodes = append(nodes, record)
}
}
}
return nodes, nil
}

func (c *Client) GetSubnetDetails(subnetID principal.Principal) (*v1.SubnetRecord, error) {
v, _, err := c.dp.GetValueUpdate([]byte(fmt.Sprintf("subnet_record_%s", subnetID)), nil)
if err != nil {
return nil, err
}
var record v1.SubnetRecord
if err := proto.Unmarshal(v, &record); err != nil {
return nil, err
}
return &record, nil
}

func (c *Client) GetSubnetIDs() ([]principal.Principal, error) {
v, _, err := c.dp.GetValueUpdate([]byte("subnet_list"), nil)
if err != nil {
return nil, err
}
var list v1.SubnetListRecord
if err := proto.Unmarshal(v, &list); err != nil {
return nil, err
}
var subnets []principal.Principal
for _, subnet := range list.Subnets {
subnets = append(subnets, principal.Principal{Raw: subnet})
}
return subnets, nil
}
36 changes: 36 additions & 0 deletions registry/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package registry

import (
"fmt"
"github.com/aviate-labs/agent-go/principal"
"testing"
)

var client, _ = New()

func TestClient_GetNodeList(t *testing.T) {
nodes, err := client.GetNodeList()
if err != nil {
t.Fatal(err)
}
if len(nodes) == 0 {
t.Fatal("no nodes")
}
for _, node := range nodes {
fmt.Println(node.GetXnet(), principal.Principal{Raw: node.NodeOperatorId})
}
}

func TestClient_GetSubnetIDs(t *testing.T) {
subnetIDs, err := client.GetSubnetIDs()
if err != nil {
t.Fatal(err)
}
if len(subnetIDs) == 0 {
t.Fatal("no subnet IDs")
}
subnetID := subnetIDs[0]
if _, err := client.GetSubnetDetails(subnetID); err != nil {
t.Fatal(err)
}
}
149 changes: 61 additions & 88 deletions registry/dataprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,115 +2,88 @@ package registry

import (
"fmt"
"sort"
"sync"
"time"

"github.com/aviate-labs/agent-go"
"github.com/aviate-labs/agent-go/ic"
"github.com/aviate-labs/agent-go/identity"
"github.com/aviate-labs/agent-go/principal"
"github.com/aviate-labs/agent-go/registry/proto/v1"
"github.com/fxamacker/cbor/v2"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/wrapperspb"
)

type DataProvider struct {
sync.RWMutex
Records []v1.ProtoRegistryRecord
}

func (d *DataProvider) Add(
key string,
version uint64,
value []byte,
) error {
if version < 1 {
return fmt.Errorf("version must be greater than 0")
}

d.Lock()
defer d.Unlock()

var ok bool
idx := sort.Search(len(d.Records), func(i int) bool {
if d.Records[i].Key == key {
if d.Records[i].Version == version {
ok = true // Record already exists.
}
return d.Records[i].Version >= version
}
return d.Records[i].Key >= key
})
if ok {
// Key and version already exist.
return fmt.Errorf("record already exists: %s@%d", key, version)
}
d.Records = append(d.Records, v1.ProtoRegistryRecord{})
copy(d.Records[idx+1:], d.Records[idx:]) // Shift right.
d.Records[idx] = v1.ProtoRegistryRecord{
Key: key,
Version: version,
Value: wrapperspb.Bytes(value),
}
return nil
a *agent.Agent
}

func (d *DataProvider) GetChangesSince(version uint64) ([]*v1.RegistryDelta, uint64, error) {
func NewDataProvider() (*DataProvider, error) {
a, err := agent.New(agent.DefaultConfig)
if err != nil {
return nil, 0, err
return nil, err
}
payload, err := proto.Marshal(&v1.RegistryGetChangesSinceRequest{
Version: version,
})
if err != nil {
return &DataProvider{a: a}, nil
}

// GetChangesSince returns the changes since the given version.
func (d DataProvider) GetChangesSince(version uint64) ([]*v1.RegistryDelta, uint64, error) {
var resp v1.RegistryGetChangesSinceResponse
if err := d.a.QueryProto(
ic.REGISTRY_PRINCIPAL,
"get_changes_since",
&v1.RegistryGetChangesSinceRequest{
Version: version,
},
&resp,
); err != nil {
return nil, 0, err
}
request := agent.Request{
Type: agent.RequestTypeQuery,
Sender: principal.AnonymousID,
IngressExpiry: uint64(time.Now().Add(5 * time.Minute).UnixNano()),
CanisterID: ic.REGISTRY_PRINCIPAL,
MethodName: "get_changes_since",
Arguments: payload,
if resp.Error != nil {
return nil, 0, fmt.Errorf("error: %s", resp.Error.String())
}
requestID := agent.NewRequestID(request)
id := new(identity.AnonymousIdentity)
data, err := cbor.Marshal(agent.Envelope{
Content: request,
SenderPubKey: id.PublicKey(),
SenderSig: requestID.Sign(id),
})
if err != nil {
return nil, 0, err
}
resp, err := a.Client().Query(ic.REGISTRY_PRINCIPAL, data)
if err != nil {
return nil, 0, err
return resp.Deltas, resp.Version, nil
}

// GetValue returns the value of the given key and its version.
// If version is nil, the latest version is returned.
func (d DataProvider) GetValue(key []byte, version *uint64) ([]byte, uint64, error) {
var v *wrapperspb.UInt64Value
if version != nil {
v = wrapperspb.UInt64(*version)
}
var response agent.Response
if err := cbor.Unmarshal(resp, &response); err != nil {
var resp v1.RegistryGetValueResponse
if err := d.a.QueryProto(
ic.REGISTRY_PRINCIPAL,
"get_value",
&v1.RegistryGetValueRequest{
Key: key,
Version: v,
},
&resp,
); err != nil {
return nil, 0, err
}
if response.Status != "replied" {
return nil, 0, fmt.Errorf("status: %s", response.Status)
if resp.Error != nil {
return nil, 0, fmt.Errorf("error: %s", resp.Error.String())
}
return resp.Value, resp.Version, nil
}

changesResponse := new(v1.RegistryGetChangesSinceResponse)
if err := proto.Unmarshal(response.Reply["arg"], changesResponse); err != nil {
// GetValueUpdate returns the value of the given key and its version.
func (d DataProvider) GetValueUpdate(key []byte, version *uint64) ([]byte, uint64, error) {
var v *wrapperspb.UInt64Value
if version != nil {
v = wrapperspb.UInt64(*version)
}
var resp v1.RegistryGetValueResponse
if err := d.a.CallProto(
ic.REGISTRY_PRINCIPAL,
"get_value",
&v1.RegistryGetValueRequest{
Key: key,
Version: v,
},
&resp,
); err != nil {
return nil, 0, err
}
if changesResponse.Error != nil {
return nil, 0, fmt.Errorf("error: %s", changesResponse.Error.String())
if resp.Error != nil {
return nil, 0, fmt.Errorf("error: %s", resp.Error.String())
}
return changesResponse.Deltas, changesResponse.Version, nil
}

func (d *DataProvider) IsEmpty() bool {
d.RLock()
defer d.RUnlock()

return len(d.Records) == 0
return resp.Value, resp.Version, nil
}
Loading

0 comments on commit bc4c645

Please sign in to comment.