Skip to content

Commit

Permalink
feat: add grpcds support
Browse files Browse the repository at this point in the history
  • Loading branch information
guseggert committed Apr 3, 2023
1 parent 55587d8 commit 419cfe4
Show file tree
Hide file tree
Showing 8 changed files with 2,312 additions and 40 deletions.
19 changes: 10 additions & 9 deletions docs/examples/kubo-as-a-library/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,14 @@ require (
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20221203041831-ce31453925ec // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect
github.com/guseggert/go-ds-grpc v0.0.0-20230402190854-00fd80d37780 // indirect
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
Expand Down Expand Up @@ -181,16 +182,16 @@ require (
go4.org v0.0.0-20200411211856-f5505b9728dd // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb // indirect
golang.org/x/mod v0.7.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/tools v0.3.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
google.golang.org/grpc v1.53.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 // indirect
google.golang.org/grpc v1.54.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/square/go-jose.v2 v2.5.1 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
nhooyr.io/websocket v1.8.7 // indirect
Expand Down
1,031 changes: 1,022 additions & 9 deletions docs/examples/kubo-as-a-library/go.sum

Large diffs are not rendered by default.

24 changes: 13 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,14 @@ require (
go.uber.org/fx v1.18.2
go.uber.org/zap v1.24.0
golang.org/x/crypto v0.6.0
golang.org/x/mod v0.7.0
golang.org/x/mod v0.8.0
golang.org/x/sync v0.1.0
golang.org/x/sys v0.6.0
google.golang.org/grpc v1.54.0
)

require github.com/guseggert/go-ds-grpc v0.0.0-20230402190854-00fd80d37780

require (
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect
github.com/Kubuxu/go-os-helper v0.0.1 // indirect
Expand Down Expand Up @@ -125,12 +128,12 @@ require (
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20221203041831-ce31453925ec // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
Expand Down Expand Up @@ -213,16 +216,15 @@ require (
go.uber.org/multierr v1.9.0 // indirect
go4.org v0.0.0-20200411211856-f5505b9728dd // indirect
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/oauth2 v0.4.0 // indirect
golang.org/x/term v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/tools v0.3.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/oauth2 v0.6.0 // indirect
golang.org/x/term v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
google.golang.org/grpc v1.53.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
google.golang.org/genproto v0.0.0-20230331144136-dcfb400f0633 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/square/go-jose.v2 v2.5.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
1,029 changes: 1,018 additions & 11 deletions go.sum

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions plugin/loader/preload.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
pluginflatfs "github.com/ipfs/kubo/plugin/plugins/flatfs"
pluginfxtest "github.com/ipfs/kubo/plugin/plugins/fxtest"
pluginipldgit "github.com/ipfs/kubo/plugin/plugins/git"
plugingrpcds "github.com/ipfs/kubo/plugin/plugins/grpcds"
pluginlevelds "github.com/ipfs/kubo/plugin/plugins/levelds"
pluginpeerlog "github.com/ipfs/kubo/plugin/plugins/peerlog"
)
Expand All @@ -22,4 +23,5 @@ func init() {
Preload(pluginlevelds.Plugins...)
Preload(pluginpeerlog.Plugins...)
Preload(pluginfxtest.Plugins...)
Preload(plugingrpcds.Plugins...)
}
1 change: 1 addition & 0 deletions plugin/loader/preload_list
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ flatfs github.com/ipfs/kubo/plugin/plugins/flatfs *
levelds github.com/ipfs/kubo/plugin/plugins/levelds *
peerlog github.com/ipfs/kubo/plugin/plugins/peerlog *
fxtest github.com/ipfs/kubo/plugin/plugins/fxtest *
grpcds github.com/ipfs/kubo/plugin/plugins/grpcds *
160 changes: 160 additions & 0 deletions plugin/plugins/grpcds/grpcds.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package grpcds

import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

grpcds "github.com/guseggert/go-ds-grpc"
pb "github.com/guseggert/go-ds-grpc/proto"
"github.com/ipfs/kubo/plugin"
"github.com/ipfs/kubo/repo"
"github.com/ipfs/kubo/repo/fsrepo"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials/insecure"
)

// Plugins is exported list of plugins that will be loaded
var Plugins = []plugin.Plugin{
&grpcdsPlugin{},
}

type grpcdsPlugin struct{}

var _ plugin.PluginDatastore = (*grpcdsPlugin)(nil)

func (*grpcdsPlugin) Name() string {
return "ds-grpc"
}

func (*grpcdsPlugin) Version() string {
return "0.1.0"
}

func (*grpcdsPlugin) Init(_ *plugin.Environment) error {
return nil
}

func (*grpcdsPlugin) DatastoreTypeName() string {
return "grpcds"
}

type datastoreConfig struct {
Name string `json:"name"`
Target string `json:"target"`
AllowInsecure bool `json:"allowInsecure"`
ConnectParams *connectParams `json:"connectParams"`
Compressor string `json:"compressor"`
DefaultServiceConfig json.RawMessage `json:"defaultServiceConfig"`
UserAgent string `json:"userAgent"`
}

type connectParams struct {
Backoff *backoffConfig `json:"backoff"`
MinConnectTimeoutMillis int64 `json:"minConnectTimeoutMillis"`
}

type backoffConfig struct {
BaseDelayMillis int64 `json:"baseDelayMillis"`
Multiplier float64 `json:"multiplier"`
Jitter float64 `json:"jitter"`
MaxDelayMillis int64 `json:"maxDelayMillis"`
}

func (b *backoffConfig) ToGRPCConfig() backoff.Config {
return backoff.Config{
BaseDelay: time.Duration(b.BaseDelayMillis) * time.Millisecond,
Multiplier: b.Multiplier,
Jitter: b.Jitter,
MaxDelay: time.Duration(b.MaxDelayMillis) * time.Millisecond,
}
}

func (*grpcdsPlugin) DatastoreConfigParser() fsrepo.ConfigFromMap {
return func(params map[string]interface{}) (fsrepo.DatastoreConfig, error) {

var c datastoreConfig
b, err := json.Marshal(params)
if err != nil {
return nil, fmt.Errorf("marshaling grpcds config: %w", err)
}

err = json.Unmarshal(b, &c)
if err != nil {
return nil, fmt.Errorf("unmarshaling grpcds config: %w", err)
}

if c.Name == "" {
return nil, errors.New("'name' must be specified")
}

if c.Target == "" {
return nil, errors.New("'target' must be specified")
}

return &c, nil
}
}

func (c *datastoreConfig) DiskSpec() fsrepo.DiskSpec {
return map[string]interface{}{
"type": "grpcds",
"name": c.Name,
}
}

func (c *datastoreConfig) Create(path string) (repo.Datastore, error) {
dialOpts := []grpc.DialOption{}
callOpts := []grpc.CallOption{}

if c.AllowInsecure {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

if c.ConnectParams != nil {
backoffConfig := backoff.DefaultConfig
if c.ConnectParams.Backoff != nil {
backoffConfig = c.ConnectParams.Backoff.ToGRPCConfig()
}
dialOpts = append(dialOpts, grpc.WithConnectParams(grpc.ConnectParams{
Backoff: backoffConfig,
MinConnectTimeout: time.Duration(c.ConnectParams.MinConnectTimeoutMillis),
}))
}

if c.DefaultServiceConfig != nil {
dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(string(c.DefaultServiceConfig)))
}

if c.UserAgent != "" {
dialOpts = append(dialOpts, grpc.WithUserAgent(c.UserAgent))
}

if c.Compressor != "" {
callOpts = append(callOpts, grpc.UseCompressor(c.Compressor))
}

dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(callOpts...))

conn, err := grpc.Dial(c.Target, dialOpts...)
if err != nil {
return nil, fmt.Errorf("initial dialing of grpcds target '%s': %w", c.Target, err)
}
client := pb.NewDatastoreClient(conn)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
ds, err := grpcds.New(ctx, client)
if err != nil {
return nil, fmt.Errorf("building grpcds: %w", err)
}
repoDS, ok := ds.(repo.Datastore)
if !ok {
return nil, fmt.Errorf("remote gRPC datastore must be a repo datastore")
}

return repoDS, nil
}
86 changes: 86 additions & 0 deletions test/cli/grpc_ds_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package cli

import (
"context"
"encoding/json"
"fmt"
"net"
"os"
"path/filepath"
"testing"

pb "github.com/guseggert/go-ds-grpc/proto"
"github.com/guseggert/go-ds-grpc/server"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/ipfs/kubo/config"
"github.com/ipfs/kubo/test/cli/harness"
"github.com/ipfs/kubo/test/cli/testutils"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

const grpcDatastoreSpec = `
{
"type": "grpcds",
"name": "grpc-datastore",
"target": "%s",
"allowInsecure": true
}
`

func TestGRPCDatastore(t *testing.T) {
// we init the node to get the default config, then modify it, then re-init the node
node := harness.NewT(t).NewNode().Init()

// run grpc datastore server
ds := dssync.MutexWrap(datastore.NewMapDatastore())
dsServer := server.New(ds)
grpcServer := grpc.NewServer()
pb.RegisterDatastoreServer(grpcServer, dsServer)

l, err := net.Listen("tcp", "127.0.0.1:")
require.NoError(t, err)
go func() {
if err := grpcServer.Serve(l); err != nil {
t.Logf("grpc server error: %s", err)
}
}()
defer grpcServer.Stop()

// update the config
spec := fmt.Sprintf(grpcDatastoreSpec, l.Addr().String())
fmt.Printf("using spec: \n%s\n", spec)
specMap := map[string]interface{}{}
err = json.Unmarshal([]byte(spec), &specMap)
require.NoError(t, err)
node.UpdateConfig(func(cfg *config.Config) {
cfg.Datastore.Spec = specMap
})

// copy config to a new file and re-init the repo to initialize the datastore
config := node.ReadFile(node.ConfigFile())
require.NoError(t, os.RemoveAll(node.Dir))
require.NoError(t, os.Mkdir(node.Dir, 0777))
node.WriteBytes("config-backup", []byte(config))
node.IPFS("init", filepath.Join(node.Dir, "config-backup"))

node.StartDaemon()

randStr := string(testutils.RandomBytes(100))
node.IPFSAddStr(randStr)

// verify the datastore has stuff in it
keys := map[string]bool{}
results, err := ds.Query(context.Background(), query.Query{})
require.NoError(t, err)
for res := range results.Next() {
keys[res.Entry.Key] = true
}
assert.True(t, keys["/pins/state/dirty"])

// TODO ensure daemon won't launch when grpc server is not running
}

0 comments on commit 419cfe4

Please sign in to comment.