From 10a63c088455e2906b9dd36a68a3e21148bbf43d Mon Sep 17 00:00:00 2001 From: Nate Maninger Date: Sat, 2 Mar 2024 19:25:39 -0800 Subject: [PATCH] all: restructure --- cmd/fsd/main.go | 44 +++-- go.mod | 13 +- go.sum | 18 +- http/api.go | 77 +-------- http/ipfs.go | 49 +++--- http/unixfs.go | 83 +++++---- ipfs/node.go | 51 +----- ipfs/node_test.go | 9 +- ipfs/unixfs.go | 115 +++++++++++++ persist/badger/badger.go | 26 --- persist/badger/cids.go | 87 ---------- renterd/downloader/downloader.go | 225 ++++++++++++++++++++++++ renterd/options.go | 60 +++++++ renterd/store.go | 205 ++++++++++++++++++++++ sia/sia.go | 285 ------------------------------- sia/store.go | 215 ----------------------- sia/types.go | 41 ----- sia/uploader.go | 195 --------------------- 18 files changed, 710 insertions(+), 1088 deletions(-) create mode 100644 ipfs/unixfs.go delete mode 100644 persist/badger/badger.go delete mode 100644 persist/badger/cids.go create mode 100644 renterd/downloader/downloader.go create mode 100644 renterd/options.go create mode 100644 renterd/store.go delete mode 100644 sia/sia.go delete mode 100644 sia/store.go delete mode 100644 sia/types.go delete mode 100644 sia/uploader.go diff --git a/cmd/fsd/main.go b/cmd/fsd/main.go index c42c8e3..132524c 100644 --- a/cmd/fsd/main.go +++ b/cmd/fsd/main.go @@ -19,9 +19,11 @@ import ( "go.sia.tech/fsd/config" shttp "go.sia.tech/fsd/http" "go.sia.tech/fsd/ipfs" - "go.sia.tech/fsd/persist/badger" - "go.sia.tech/fsd/sia" + "go.sia.tech/fsd/renterd" + "go.sia.tech/fsd/renterd/downloader" "go.sia.tech/jape" + "go.sia.tech/renterd/bus" + "go.sia.tech/renterd/worker" "go.uber.org/zap" "go.uber.org/zap/zapcore" "gopkg.in/yaml.v3" @@ -119,13 +121,8 @@ func main() { ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, syscall.SIGINT) defer cancel() - db, err := badger.OpenDatabase(filepath.Join(dir, "fsd.badgerdb"), log.Named("badger")) - if err != nil { - log.Fatal("failed to open badger database", zap.Error(err)) - } - defer db.Close() - var privateKey crypto.PrivKey + var err error if cfg.IPFS.PrivateKey != "" { buf, err := hex.DecodeString(strings.TrimPrefix(cfg.IPFS.PrivateKey, "ed25519:")) if err != nil { @@ -150,15 +147,30 @@ func main() { } defer ds.Close() - bs := sia.NewBlockStore(db, cfg.Renterd, log.Named("blockstore")) + workerClient := worker.NewClient(cfg.Renterd.WorkerAddress, cfg.Renterd.WorkerPassword) + busClient := bus.NewClient(cfg.Renterd.BusAddress, cfg.Renterd.BusPassword) - inode, err := ipfs.NewNode(ctx, privateKey, cfg.IPFS, ds, bs) + bd, err := downloader.NewBlockDownloader(cfg.Renterd.Bucket, 4096, workerClient, log.Named("downloader")) if err != nil { - log.Fatal("failed to start ipfs node", zap.Error(err)) + log.Fatal("failed to create block downloader", zap.Error(err)) + } + bd.StartWorkers(ctx, 1000) + + bs, err := renterd.NewBlockStore( + renterd.WithBucket(cfg.Renterd.Bucket), + renterd.WithWorker(workerClient), + renterd.WithBus(busClient), + renterd.WithDownloader(bd), + renterd.WithLog(log.Named("blockstore"))) + if err != nil { + log.Fatal("failed to create blockstore", zap.Error(err)) } - defer inode.Close() - snode := sia.New(db, inode, cfg.Renterd, log.Named("sia")) + ipfs, err := ipfs.NewNode(ctx, privateKey, cfg.IPFS, ds, bs) + if err != nil { + log.Fatal("failed to start ipfs node", zap.Error(err)) + } + defer ipfs.Close() apiListener, err := net.Listen("tcp", cfg.API.Address) if err != nil { @@ -173,12 +185,12 @@ func main() { defer gatewayListener.Close() apiServer := &http.Server{ - Handler: jape.BasicAuth(cfg.API.Password)(shttp.NewAPIHandler(inode, snode, cfg, log.Named("api"))), + Handler: jape.BasicAuth(cfg.API.Password)(shttp.NewAPIHandler(ipfs, cfg, log.Named("api"))), } defer apiServer.Close() gatewayServer := &http.Server{ - Handler: shttp.NewIPFSGatewayHandler(inode, snode, cfg, log.Named("gateway")), + Handler: shttp.NewIPFSGatewayHandler(ipfs, cfg, log.Named("gateway")), } defer gatewayServer.Close() @@ -201,7 +213,7 @@ func main() { prettyKey := "ed25519:" + hex.EncodeToString(buf) log.Info("fsd started", - zap.Stringer("peerID", inode.PeerID()), + zap.Stringer("peerID", ipfs.PeerID()), zap.String("privateKey", prettyKey), zap.String("apiAddress", apiListener.Addr().String()), zap.String("gatewayAddress", gatewayListener.Addr().String()), diff --git a/go.mod b/go.mod index aa5f3e3..a4ca840 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,7 @@ module go.sia.tech/fsd go 1.21 require ( - github.com/dgraph-io/badger/v4 v4.2.0 - github.com/gogo/protobuf v1.3.2 + github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/ipfs/boxo v0.15.0 github.com/ipfs/go-block-format v0.1.2 github.com/ipfs/go-cid v0.4.1 @@ -17,7 +16,7 @@ require ( github.com/multiformats/go-multicodec v0.9.0 github.com/multiformats/go-multihash v0.2.3 go.sia.tech/jape v0.11.1 - go.sia.tech/renterd v1.0.5 + go.sia.tech/renterd v1.0.6-0.20240228125621-9a935c3b8f30 go.uber.org/zap v1.26.0 gopkg.in/yaml.v3 v3.0.1 lukechampine.com/frand v1.4.2 @@ -37,10 +36,7 @@ require ( github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect github.com/dchest/threefish v0.0.0-20120919164726-3ecf4c494abf // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect - github.com/dgraph-io/ristretto v0.1.1 // indirect - github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect github.com/docker/go-units v0.5.0 // indirect - github.com/dustin/go-humanize v1.0.1 // indirect github.com/elastic/gosigar v0.14.2 // indirect github.com/flynn/noise v1.0.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect @@ -49,11 +45,9 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect - github.com/golang/glog v1.1.0 // indirect - github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect + github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/google/flatbuffers v1.12.1 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/google/pprof v0.0.0-20231023181126-ff6d637d2a7b // indirect github.com/google/uuid v1.5.0 // indirect @@ -62,7 +56,6 @@ require ( github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect - github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/huin/goupnp v1.3.0 // indirect github.com/ipfs/bbloom v0.0.4 // indirect github.com/ipfs/go-bitfield v1.1.0 // indirect diff --git a/go.sum b/go.sum index 55a2457..b9c8d2b 100644 --- a/go.sum +++ b/go.sum @@ -45,7 +45,6 @@ github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46f github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= -github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cilium/ebpf v0.2.0/go.mod h1:To2CFviqOWL/M0gIMsvSMlqe7em/l1ALkX1PyjrX2Qs= @@ -85,21 +84,13 @@ github.com/decred/dcrd/crypto/blake256 v1.0.1/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPc github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etlyjdBU4sfcs2WYQMs= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= -github.com/dgraph-io/badger/v4 v4.2.0 h1:kJrlajbXXL9DFTNuhhu9yCx7JJa4qpYWxtE8BzuWsEs= -github.com/dgraph-io/badger/v4 v4.2.0/go.mod h1:qfCqhPoWDFJRx1gp5QwwyGo8xk1lbHUxvK9nK0OGAak= -github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= -github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= -github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y= -github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= -github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/elastic/gosigar v0.12.0/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= github.com/elastic/gosigar v0.14.2 h1:Dg80n8cr90OZ7x+bAax/QjoW/XqTI11RmA79ZwIm9/4= github.com/elastic/gosigar v0.14.2/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= @@ -148,8 +139,6 @@ github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXP github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/glog v1.1.0 h1:/d3pCKDPWNnvIWe0vVUpNP32qc8U3PDVxySP/y360qE= -github.com/golang/glog v1.1.0/go.mod h1:pfYeQZ3JWZoXTV5sFc986z3HTpwQs9At6P4ImfuP3NQ= github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -176,8 +165,6 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= -github.com/google/flatbuffers v1.12.1 h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw= -github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -687,8 +674,8 @@ go.sia.tech/jape v0.11.1 h1:M7IP+byXL7xOqzxcHUQuXW+q3sYMkYzmMlMw+q8ZZw0= go.sia.tech/jape v0.11.1/go.mod h1:4QqmBB+t3W7cNplXPj++ZqpoUb2PeiS66RLpXmEGap4= go.sia.tech/mux v1.2.0 h1:ofa1Us9mdymBbGMY2XH/lSpY8itFsKIo/Aq8zwe+GHU= go.sia.tech/mux v1.2.0/go.mod h1:Yyo6wZelOYTyvrHmJZ6aQfRoer3o4xyKQ4NmQLJrBSo= -go.sia.tech/renterd v1.0.5 h1:qb7tLBthF5ocHaSqgy+yGBkdFKOT4NAEEx8BIX3zzCs= -go.sia.tech/renterd v1.0.5/go.mod h1:0y3agucuCIp1L7W2tRjksvcJIsTZfoC9TZc3h8ie1kU= +go.sia.tech/renterd v1.0.6-0.20240228125621-9a935c3b8f30 h1:ptKXoEbZkCyHUVJs/fGDVb2BixKiEwLlvHHXnA36+Xc= +go.sia.tech/renterd v1.0.6-0.20240228125621-9a935c3b8f30/go.mod h1:0y3agucuCIp1L7W2tRjksvcJIsTZfoC9TZc3h8ie1kU= go.sia.tech/siad v1.5.10-0.20230228235644-3059c0b930ca h1:aZMg2AKevn7jKx+wlusWQfwSM5pNU9aGtRZme29q3O4= go.sia.tech/siad v1.5.10-0.20230228235644-3059c0b930ca/go.mod h1:h/1afFwpxzff6/gG5i1XdAgPK7dEY6FaibhK7N5F86Y= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= @@ -837,7 +824,6 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/http/api.go b/http/api.go index 9da9c5f..357a038 100644 --- a/http/api.go +++ b/http/api.go @@ -1,14 +1,11 @@ package http import ( - "bufio" "net/http" - "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p/core/peer" "go.sia.tech/fsd/config" "go.sia.tech/fsd/ipfs" - "go.sia.tech/fsd/sia" "go.sia.tech/jape" "go.uber.org/zap" ) @@ -16,71 +13,10 @@ import ( type ( apiServer struct { ipfs *ipfs.Node - sia *sia.Node log *zap.Logger } ) -func (as *apiServer) handlePin(jc jape.Context) { - ctx := jc.Request.Context() - var cidStr string - if err := jc.DecodeParam("cid", &cidStr); err != nil { - return - } - c, err := cid.Parse(cidStr) - if err != nil { - jc.Error(err, http.StatusBadRequest) - return - } - - // TODO: break this out for better support, the current implementation will - // not properly handle anything but standard unixfs files with the default - // block size - rr, err := as.ipfs.DownloadCID(ctx, c, nil) - if err != nil { - jc.Error(err, http.StatusInternalServerError) - return - } - defer rr.Close() - - var opts sia.UnixFSOptions - switch c.Version() { - case 1: - prefix := c.Prefix() - opts.CIDBuilder = cid.V1Builder{Codec: prefix.Codec, MhType: prefix.MhType, MhLength: prefix.MhLength} - case 0: - opts.CIDBuilder = cid.V0Builder{} - } - - br := bufio.NewReaderSize(rr, 256<<20) // 256 MiB - c, err = as.sia.UploadFile(jc.Request.Context(), br, opts) - if err != nil { - jc.Error(err, http.StatusInternalServerError) - return - } - - // return the calculated cid - jc.Encode(c.String()) -} - -func (as *apiServer) handleCIDVerify(jc jape.Context) { - ctx := jc.Request.Context() - var cidStr string - if err := jc.DecodeParam("cid", &cidStr); err != nil { - return - } - c, err := cid.Parse(cidStr) - if err != nil { - jc.Error(err, http.StatusBadRequest) - return - } - - if err := as.sia.VerifyCID(ctx, c); err != nil { - jc.Error(err, http.StatusInternalServerError) - return - } -} - func (as *apiServer) handleListPeers(jc jape.Context) { jc.Encode(as.ipfs.Peers()) } @@ -94,18 +30,15 @@ func (as *apiServer) handleAddPeer(jc jape.Context) { } // NewAPIHandler returns a new http.Handler that handles requests to the api -func NewAPIHandler(ipfs *ipfs.Node, sia *sia.Node, cfg config.Config, log *zap.Logger) http.Handler { +func NewAPIHandler(ipfs *ipfs.Node, cfg config.Config, log *zap.Logger) http.Handler { s := &apiServer{ ipfs: ipfs, - sia: sia, log: log, } return jape.Mux(map[string]jape.Handler{ - "POST /api/cid/:cid/verify": s.handleCIDVerify, - "POST /api/unixfs/calculate": s.handleUnixFSCalculate, - "POST /api/unixfs/upload": s.handleUnixFSUpload, - "POST /api/pin/:cid": s.handlePin, - "GET /api/peers": s.handleListPeers, - "PUT /api/peers": s.handleAddPeer, + "POST /api/unixfs/upload": s.handleUnixFSUpload, + // "POST /api/pin/:cid": s.handlePin, + "GET /api/peers": s.handleListPeers, + "PUT /api/peers": s.handleAddPeer, }) } diff --git a/http/ipfs.go b/http/ipfs.go index f6458ba..0990276 100644 --- a/http/ipfs.go +++ b/http/ipfs.go @@ -1,22 +1,21 @@ package http import ( + "context" "errors" - "io" "net/http" "net/url" "strings" + "time" "github.com/ipfs/go-cid" "go.sia.tech/fsd/config" "go.sia.tech/fsd/ipfs" - "go.sia.tech/fsd/sia" "go.uber.org/zap" ) type ipfsGatewayServer struct { ipfs *ipfs.Node - sia *sia.Node log *zap.Logger config config.Config @@ -76,14 +75,18 @@ func redirectPathCID(w http.ResponseWriter, r *http.Request, c cid.Cid, path []s http.Redirect(w, r, u.String(), http.StatusMovedPermanently) } -func (is *ipfsGatewayServer) allowRemoteFetch(c cid.Cid) bool { +func (is *ipfsGatewayServer) fetchAllowed(ctx context.Context, c cid.Cid) bool { + // check if the file is locally pinned + if has, err := is.ipfs.HasBlock(ctx, c); err != nil { + is.log.Error("failed to check block existence", zap.Error(err)) + } else if has { + return true + } + if !is.config.IPFS.Gateway.Fetch.Enabled { return false // deny all - } else if len(is.config.IPFS.Gateway.Fetch.AllowList) == 0 { - return true // allow all } - // allowlist check for _, match := range is.config.IPFS.Gateway.Fetch.AllowList { if c.Equals(match) { return true @@ -112,34 +115,28 @@ func (is *ipfsGatewayServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - is.log.Info("serving", zap.Stringer("cid", c), zap.Strings("path", path)) + if !is.fetchAllowed(ctx, c) { + http.Error(w, "", http.StatusNotFound) + is.log.Info("remote fetch denied", zap.Stringer("cid", c)) + return + } - // TODO: support paths in Sia proxied downloads - err = is.sia.ProxyHTTPDownload(c, r, w) - if errors.Is(err, sia.ErrNotFound) && is.allowRemoteFetch(c) { - is.log.Info("downloading from ipfs", zap.Stringer("cid", c), zap.Strings("path", path)) - r, err := is.ipfs.DownloadCID(ctx, c, path) - if err != nil { - http.Error(w, "", http.StatusNotFound) - is.log.Error("failed to download cid", zap.Error(err)) - return - } - defer r.Close() + is.log.Info("serving", zap.Stringer("cid", c), zap.Strings("path", path)) - io.Copy(w, r) - } else if errors.Is(err, sia.ErrNotFound) { + rsc, err := is.ipfs.DownloadUnixFile(ctx, c, path) + if err != nil { http.Error(w, "", http.StatusNotFound) - } else if err != nil { - http.Error(w, "", http.StatusInternalServerError) - is.log.Error("failed to get block", zap.Error(err)) + is.log.Error("failed to download cid", zap.Error(err)) + return } + defer rsc.Close() + http.ServeContent(w, r, "", time.Now(), rsc) } // NewIPFSGatewayHandler creates a new http.Handler for the IPFS gateway. -func NewIPFSGatewayHandler(ipfs *ipfs.Node, sia *sia.Node, cfg config.Config, log *zap.Logger) http.Handler { +func NewIPFSGatewayHandler(ipfs *ipfs.Node, cfg config.Config, log *zap.Logger) http.Handler { return &ipfsGatewayServer{ ipfs: ipfs, - sia: sia, log: log, config: cfg, diff --git a/http/unixfs.go b/http/unixfs.go index c5c32ad..7e0ec08 100644 --- a/http/unixfs.go +++ b/http/unixfs.go @@ -2,7 +2,10 @@ package http import ( "bufio" + "crypto/sha256" + "encoding/hex" "fmt" + "io" "net/http" chunker "github.com/ipfs/boxo/chunker" @@ -10,76 +13,66 @@ import ( "github.com/ipfs/go-cid" "github.com/multiformats/go-multicodec" "github.com/multiformats/go-multihash" - "go.sia.tech/fsd/sia" + "go.sia.tech/fsd/ipfs" "go.sia.tech/jape" + "go.uber.org/zap" ) -func (as *apiServer) handleUnixFSCalculate(jc jape.Context) { - ctx := jc.Request.Context() - - body := jc.Request.Body - defer body.Close() - - opts, ok := parseUnixFSOptions(jc) - if !ok { - return // error already handled - } - - br := bufio.NewReaderSize(body, 256<<20) // 256 MiB - blocks, err := as.sia.CalculateBlocks(ctx, br, opts) - if err != nil { - jc.Error(err, http.StatusInternalServerError) - return - } - jc.Encode(blocks) -} - func (as *apiServer) handleUnixFSUpload(jc jape.Context) { - body := jc.Request.Body - defer body.Close() + defer jc.Request.Body.Close() - opts, ok := parseUnixFSOptions(jc) - if !ok { + opts := parseUnixFSOptions(jc) + if opts == nil { return // error already handled } - br := bufio.NewReaderSize(body, 256<<20) // 256 MiB - c, err := as.sia.UploadFile(jc.Request.Context(), br, opts) + h := sha256.New() + br := bufio.NewReaderSize(io.TeeReader(jc.Request.Body, h), 256<<20) // 256 MiB + root, err := as.ipfs.UploadUnixFile(jc.Request.Context(), br, opts...) if err != nil { jc.Error(err, http.StatusInternalServerError) return } - + as.log.Debug("uploaded UnixFS file", zap.Stringer("cid", root.Cid()), zap.String("checksum", hex.EncodeToString(h.Sum(nil)))) // return the calculated cid - jc.Encode(c.String()) + jc.Encode(root.Cid().String()) } -func parseUnixFSOptions(jc jape.Context) (sia.UnixFSOptions, bool) { +func parseUnixFSOptions(jc jape.Context) (opts []ipfs.UnixFSOption) { var cidVersion int if err := jc.DecodeForm("version", &cidVersion); err != nil { - return sia.UnixFSOptions{}, false + return nil } - opts := sia.UnixFSOptions{ - MaxLinks: ihelpers.DefaultLinksPerBlock, - BlockSize: chunker.DefaultBlockSize, - } + var builder cid.Builder switch cidVersion { case 0: - opts.CIDBuilder = cid.V0Builder{} + builder = cid.V0Builder{} case 1: - opts.CIDBuilder = cid.V1Builder{Codec: uint64(multicodec.DagPb), MhType: multihash.SHA2_256} + builder = cid.V1Builder{Codec: uint64(multicodec.DagPb), MhType: multihash.SHA2_256} default: jc.Error(fmt.Errorf("unsupported CID version: %d", cidVersion), http.StatusBadRequest) - return sia.UnixFSOptions{}, false + return nil } + opts = append(opts, ipfs.UnixFSWithCIDBuilder(builder)) - if err := jc.DecodeForm("rawLeaves", &opts.RawLeaves); err != nil { - return sia.UnixFSOptions{}, false - } else if err := jc.DecodeForm("maxLinks", &opts.MaxLinks); err != nil { - return sia.UnixFSOptions{}, false - } else if err := jc.DecodeForm("blockSize", &opts.BlockSize); err != nil { - return sia.UnixFSOptions{}, false + var rawLeaves bool + if err := jc.DecodeForm("rawLeaves", &rawLeaves); err != nil { + return nil } - return opts, true + opts = append(opts, ipfs.UnixFSWithRawLeaves(rawLeaves)) + + maxlinks := ihelpers.DefaultLinksPerBlock + if err := jc.DecodeForm("maxLinks", &maxlinks); err != nil { + return nil + } + opts = append(opts, ipfs.UnixFSWithMaxLinks(maxlinks)) + + blocksize := chunker.DefaultBlockSize + if err := jc.DecodeForm("blockSize", &blocksize); err != nil { + return nil + } + opts = append(opts, ipfs.UnixFSWithBlockSize(blocksize)) + + return opts } diff --git a/ipfs/node.go b/ipfs/node.go index 3427476..4e05ccd 100644 --- a/ipfs/node.go +++ b/ipfs/node.go @@ -3,8 +3,6 @@ package ipfs import ( "context" "fmt" - "io" - "strings" "time" "github.com/ipfs/boxo/bitswap" @@ -12,7 +10,6 @@ import ( "github.com/ipfs/boxo/blockservice" "github.com/ipfs/boxo/blockstore" "github.com/ipfs/boxo/ipld/merkledag" - fsio "github.com/ipfs/boxo/ipld/unixfs/io" "github.com/ipfs/boxo/provider" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" @@ -45,7 +42,6 @@ type Node struct { host host.Host frt *fullrt.FullRT - blockstore blockstore.Blockstore blockService blockservice.BlockService dagService format.DAGService bitswap *bitswap.Bitswap @@ -67,44 +63,9 @@ func (n *Node) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { return n.blockService.GetBlock(ctx, c) } -// DownloadCID downloads a CID from IPFS -func (n *Node) DownloadCID(ctx context.Context, c cid.Cid, path []string) (io.ReadSeekCloser, error) { - dagSess := merkledag.NewSession(ctx, n.dagService) - rootNode, err := dagSess.Get(ctx, c) - if err != nil { - return nil, fmt.Errorf("failed to get root node: %w", err) - } - - var traverse func(context.Context, format.Node, []string) (format.Node, error) - traverse = func(ctx context.Context, parent format.Node, path []string) (format.Node, error) { - if len(path) == 0 { - return parent, nil - } - - childLink, rem, err := parent.Resolve(path) - if err != nil { - return nil, fmt.Errorf("failed to resolve path %q: %w", strings.Join(path, "/"), err) - } - - switch v := childLink.(type) { - case *format.Link: - childNode, err := dagSess.Get(ctx, v.Cid) - if err != nil { - return nil, fmt.Errorf("failed to get child node %q: %w", v.Cid, err) - } - return traverse(ctx, childNode, rem) - default: - return nil, fmt.Errorf("expected link node, got %T", childLink) - } - } - - node, err := traverse(ctx, rootNode, path) - if err != nil { - return nil, fmt.Errorf("failed to traverse path: %w", err) - } - - dr, err := fsio.NewDagReader(ctx, node, dagSess) - return dr, err +// HasBlock checks if a block is locally pinned +func (n *Node) HasBlock(ctx context.Context, c cid.Cid) (bool, error) { + return n.blockService.Blockstore().Has(ctx, c) } // PeerID returns the peer ID of the node @@ -213,12 +174,11 @@ func NewNode(ctx context.Context, privateKey crypto.PrivKey, cfg config.IPFS, ds blockServ := blockservice.New(bs, bitswap) dagService := merkledag.NewDAGService(blockServ) - bsp := provider.NewBlockstoreProvider(bs) providerOpts := []provider.Option{ - provider.KeyProvider(bsp), + provider.KeyProvider(provider.NewBlockstoreProvider(bs)), provider.Online(frt), - provider.ReproviderInterval(10 * time.Hour), + provider.ReproviderInterval(6 * time.Hour), } prov, err := provider.New(ds, providerOpts...) @@ -242,7 +202,6 @@ func NewNode(ctx context.Context, privateKey crypto.PrivKey, cfg config.IPFS, ds return &Node{ frt: frt, host: host, - blockstore: bs, bitswap: bitswap, blockService: blockServ, dagService: dagService, diff --git a/ipfs/node_test.go b/ipfs/node_test.go index af281f4..4b58074 100644 --- a/ipfs/node_test.go +++ b/ipfs/node_test.go @@ -18,7 +18,6 @@ import ( "github.com/libp2p/go-libp2p/core/crypto" "go.sia.tech/fsd/config" "go.sia.tech/fsd/ipfs" - "go.sia.tech/fsd/persist/badger" "go.uber.org/zap" "go.uber.org/zap/zaptest" "lukechampine.com/frand" @@ -139,12 +138,6 @@ func TestDownload(t *testing.T) { t.Fatal(err) } - store, err := badger.OpenDatabase(filepath.Join(t.TempDir(), "fsd.badgerdb"), log.Named("badger")) - if err != nil { - log.Fatal("failed to open badger database", zap.Error(err)) - } - defer store.Close() - ds, err := levelds.NewDatastore(filepath.Join(t.TempDir(), "fsdds.leveldb"), nil) if err != nil { log.Fatal("failed to open leveldb datastore", zap.Error(err)) @@ -172,7 +165,7 @@ func TestDownload(t *testing.T) { downloadCtx, cancel := context.WithTimeout(ctx, 15*time.Second) defer cancel() - r, err := node.DownloadCID(downloadCtx, c, path) + r, err := node.DownloadUnixFile(downloadCtx, c, path) if err != nil { t.Fatal(err) } diff --git a/ipfs/unixfs.go b/ipfs/unixfs.go new file mode 100644 index 0000000..5dc3e36 --- /dev/null +++ b/ipfs/unixfs.go @@ -0,0 +1,115 @@ +package ipfs + +import ( + "context" + "fmt" + "io" + "strings" + + chunker "github.com/ipfs/boxo/chunker" + "github.com/ipfs/boxo/ipld/merkledag" + "github.com/ipfs/boxo/ipld/unixfs/importer/balanced" + ihelpers "github.com/ipfs/boxo/ipld/unixfs/importer/helpers" + fsio "github.com/ipfs/boxo/ipld/unixfs/io" + "github.com/ipfs/go-cid" + format "github.com/ipfs/go-ipld-format" +) + +type ( + unixFSOptions struct { + CIDBuilder cid.Builder + RawLeaves bool + MaxLinks int + BlockSize int64 + } + UnixFSOption func(*unixFSOptions) +) + +func UnixFSWithCIDBuilder(b cid.Builder) UnixFSOption { + return func(u *unixFSOptions) { + u.CIDBuilder = b + } +} + +func UnixFSWithRawLeaves(b bool) UnixFSOption { + return func(u *unixFSOptions) { + u.RawLeaves = b + } +} + +func UnixFSWithMaxLinks(b int) UnixFSOption { + return func(u *unixFSOptions) { + u.MaxLinks = b + } +} + +func UnixFSWithBlockSize(b int64) UnixFSOption { + return func(u *unixFSOptions) { + u.BlockSize = b + } +} + +// DownloadUnixFile downloads a UnixFS CID from IPFS +func (n *Node) DownloadUnixFile(ctx context.Context, c cid.Cid, path []string) (io.ReadSeekCloser, error) { + dagSess := merkledag.NewSession(ctx, n.dagService) + rootNode, err := dagSess.Get(ctx, c) + if err != nil { + return nil, fmt.Errorf("failed to get root node: %w", err) + } + + var traverse func(context.Context, format.Node, []string) (format.Node, error) + traverse = func(ctx context.Context, parent format.Node, path []string) (format.Node, error) { + if len(path) == 0 { + return parent, nil + } + + childLink, rem, err := parent.Resolve(path) + if err != nil { + return nil, fmt.Errorf("failed to resolve path %q: %w", strings.Join(path, "/"), err) + } + + switch v := childLink.(type) { + case *format.Link: + childNode, err := dagSess.Get(ctx, v.Cid) + if err != nil { + return nil, fmt.Errorf("failed to get child node %q: %w", v.Cid, err) + } + return traverse(ctx, childNode, rem) + default: + return nil, fmt.Errorf("expected link node, got %T", childLink) + } + } + + node, err := traverse(ctx, rootNode, path) + if err != nil { + return nil, fmt.Errorf("failed to traverse path: %w", err) + } + + dr, err := fsio.NewDagReader(ctx, node, dagSess) + return dr, err +} + +func (n *Node) UploadUnixFile(ctx context.Context, r io.Reader, opts ...UnixFSOption) (format.Node, error) { + opt := unixFSOptions{ + MaxLinks: ihelpers.DefaultLinksPerBlock, + BlockSize: chunker.DefaultBlockSize, + } + for _, o := range opts { + o(&opt) + } + + params := ihelpers.DagBuilderParams{ + Dagserv: merkledag.NewDAGService(n.blockService), + CidBuilder: opt.CIDBuilder, + RawLeaves: opt.RawLeaves, + Maxlinks: opt.MaxLinks, + } + + spl := chunker.NewSizeSplitter(r, opt.BlockSize) + db, err := params.New(spl) + if err != nil { + return nil, fmt.Errorf("failed to create dag builder: %w", err) + } + + return balanced.Layout(db) +} diff --git a/persist/badger/badger.go b/persist/badger/badger.go deleted file mode 100644 index 8d175cc..0000000 --- a/persist/badger/badger.go +++ /dev/null @@ -1,26 +0,0 @@ -package badger - -import ( - "github.com/dgraph-io/badger/v4" - "go.uber.org/zap" -) - -// A Store is a badger-backed store. -type Store struct { - db *badger.DB - log *zap.Logger -} - -// Close closes the underlying database -func (s *Store) Close() error { - return s.db.Close() -} - -// OpenDatabase opens a badger database at the given path. -func OpenDatabase(path string, log *zap.Logger) (*Store, error) { - db, err := badger.Open(badger.DefaultOptions(path)) - if err != nil { - return nil, err - } - return &Store{db: db, log: log}, nil -} diff --git a/persist/badger/cids.go b/persist/badger/cids.go deleted file mode 100644 index 7cb0040..0000000 --- a/persist/badger/cids.go +++ /dev/null @@ -1,87 +0,0 @@ -package badger - -import ( - "context" - "encoding/json" - - "github.com/dgraph-io/badger/v4" - "github.com/ipfs/go-cid" - "go.sia.tech/fsd/sia" - "go.uber.org/zap" -) - -// HasBlock returns true if the CID is in the store -func (s *Store) HasBlock(_ context.Context, c cid.Cid) (ok bool, err error) { - err = s.db.View(func(txn *badger.Txn) error { - _, err := txn.Get([]byte(c.Hash())) - if err != nil { - if err == badger.ErrKeyNotFound { - return nil - } - return err - } - ok = true - return nil - }) - return -} - -// GetBlock returns the block metadata for a given CID -func (s *Store) GetBlock(_ context.Context, c cid.Cid) (cm sia.Block, err error) { - err = s.db.View(func(txn *badger.Txn) error { - item, err := txn.Get([]byte(c.Hash())) - if err != nil { - if err == badger.ErrKeyNotFound { - return sia.ErrNotFound - } - return err - } - return item.Value(func(val []byte) error { - return json.Unmarshal(val, &cm) - }) - }) - return -} - -// AddBlocks adds blocks to the store -func (s *Store) AddBlocks(_ context.Context, blocks []sia.Block) error { - return s.db.Update(func(txn *badger.Txn) error { - for _, block := range blocks { - buf, err := json.Marshal(block) - if err != nil { - return err - } else if err := txn.Set([]byte(block.CID.Hash()), buf); err != nil { - return err - } - } - return nil - }) -} - -// AllKeysChan returns a channel of all CIDs in the store -func (s *Store) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { - ch := make(chan cid.Cid) - - go func() { - log := s.log.Named("allKeysChan") - _ = s.db.View(func(txn *badger.Txn) error { - it := txn.NewIterator(badger.IteratorOptions{}) - defer it.Close() - - for it.Rewind(); it.Valid(); it.Next() { - cid, err := cid.Parse(it.Item().Key()) - if err != nil { - log.Error("failed to parse cid", zap.Error(err)) - continue - } - select { - case <-ctx.Done(): - return ctx.Err() - case ch <- cid: - } - } - return nil - }) - }() - return ch, nil -} diff --git a/renterd/downloader/downloader.go b/renterd/downloader/downloader.go new file mode 100644 index 0000000..69adb8a --- /dev/null +++ b/renterd/downloader/downloader.go @@ -0,0 +1,225 @@ +// package downloader contains a cache for downloading blocks from a renterd node. +// A cache optimizes the number of in-flight requests to avoid overloading the +// node and caches blocks to avoid redundant downloads. +package downloader + +import ( + "bytes" + "container/heap" + "context" + "sync" + "time" + + lru "github.com/hashicorp/golang-lru/v2" + "github.com/ipfs/boxo/ipld/merkledag" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + "go.sia.tech/renterd/api" + "go.sia.tech/renterd/worker" + "go.uber.org/zap" +) + +const ( + downloadPriorityLow = iota + 1 + downloadPriorityMedium + downloadPriorityHigh +) + +type ( + blockResponse struct { + ch chan struct{} + b []byte + err error + + key string + priority int + index int + } + + priorityQueue []*blockResponse + + // BlockDownloader is a cache for downloading blocks from a renterd node. + // It limits the number of in-flight requests to avoid overloading the node + // and caches blocks to avoid redundant downloads. + // + // For UnixFS nodes, it also prefetches linked blocks. + BlockDownloader struct { + workerClient *worker.Client + log *zap.Logger + + bucket string + + ch chan struct{} + + mu sync.Mutex // protects the fields below + cache *lru.TwoQueueCache[string, *blockResponse] + queue *priorityQueue + } +) + +func (h priorityQueue) Len() int { return len(h) } + +func (h priorityQueue) Less(i, j int) bool { + return h[i].priority < h[j].priority +} + +func (h priorityQueue) Swap(i, j int) { + h[i], h[j] = h[j], h[i] + h[i].index = i + h[j].index = j +} + +func (h *priorityQueue) Push(t any) { + n := len(*h) + task := t.(*blockResponse) + task.index = n + *h = append(*h, task) +} + +func (h *priorityQueue) Pop() any { + old := *h + n := len(old) + item := old[n-1] + item.index = -1 // for safety + *h = old[0 : n-1] + return item +} + +var _ heap.Interface = &priorityQueue{} + +func (br *blockResponse) block(ctx context.Context, c cid.Cid) (blocks.Block, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-br.ch: + } + return blocks.NewBlockWithCid(br.b, c) +} + +func (bd *BlockDownloader) doDownloadTask(task *blockResponse, log *zap.Logger) { + defer func() { + close(task.ch) + }() + log.Debug("downloading block") + start := time.Now() + blockBuf := bytes.NewBuffer(make([]byte, 0, 1<<20)) + + err := bd.workerClient.DownloadObject(context.Background(), blockBuf, bd.bucket, task.key, api.DownloadObjectOptions{ + Range: api.DownloadRange{ + Offset: 0, + Length: 1 << 20, + }, + }) + if err != nil { + log.Debug("failed to download block", zap.Error(err)) + task.err = err + bd.cache.Remove(task.key) + return + } + + task.b = blockBuf.Bytes() + + log.Debug("downloaded block", zap.Duration("elapsed", time.Since(start))) + pn, err := merkledag.DecodeProtobuf(task.b) + if err != nil { + log.Debug("block is not a ProtoNode", zap.Error(err)) + } + + if len(pn.Links()) == 0 { + return + } + + // prefetch linked blocks + links := pn.Links() + if len(links) > 2 { + // prioritize first and last linked blocks + firstLink := links[0] + lastLink := links[len(links)-1] + links = links[1 : len(links)-1] + + bd.getResponse(firstLink.Cid, downloadPriorityMedium) + bd.getResponse(lastLink.Cid, downloadPriorityMedium) + } + for _, link := range links { + bd.getResponse(link.Cid, downloadPriorityLow) + log.Debug("queued linked blocks", zap.Stringer("cid", link.Cid), zap.String("key", cidKey(link.Cid))) + } +} + +func (bd *BlockDownloader) getResponse(c cid.Cid, priority int) *blockResponse { + bd.mu.Lock() + defer bd.mu.Unlock() + key := cidKey(c) + + if task, ok := bd.cache.Get(key); ok { + bd.log.Debug("cache hit", zap.String("key", key)) + return task + } + task := &blockResponse{ + key: key, + priority: priority, + ch: make(chan struct{}), + } + bd.cache.Add(key, task) + heap.Push(bd.queue, task) + bd.ch <- struct{}{} + return task +} + +func (bd *BlockDownloader) downloadWorker(ctx context.Context, n int) { + log := bd.log.Named("worker").With(zap.Int("n", n)) + + for { + select { + case <-ctx.Done(): + return + case <-bd.ch: + } + + for { + bd.mu.Lock() + if bd.queue.Len() == 0 { + bd.mu.Unlock() + break + } + + task := heap.Pop(bd.queue).(*blockResponse) + bd.mu.Unlock() + bd.doDownloadTask(task, log.With(zap.String("key", task.key))) + } + } +} + +func (bd *BlockDownloader) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { + return bd.getResponse(c, downloadPriorityHigh).block(ctx, c) +} + +// StartWorkers starts n workers to download blocks. +func (bd *BlockDownloader) StartWorkers(ctx context.Context, n int) { + for i := 0; i < n; i++ { + go bd.downloadWorker(ctx, i) + } +} + +func cidKey(c cid.Cid) string { + return cid.NewCidV1(c.Type(), c.Hash()).String() +} + +// NewBlockDownloader creates a new BlockDownloader. +func NewBlockDownloader(bucket string, cacheSize int, workerClient *worker.Client, log *zap.Logger) (*BlockDownloader, error) { + cache, err := lru.New2Q[string, *blockResponse](cacheSize) + if err != nil { + return nil, err + } + bd := &BlockDownloader{ + workerClient: workerClient, + log: log, + cache: cache, + queue: &priorityQueue{}, + + ch: make(chan struct{}, 1000), + bucket: bucket, + } + heap.Init(bd.queue) + return bd, nil +} diff --git a/renterd/options.go b/renterd/options.go new file mode 100644 index 0000000..3f0e85c --- /dev/null +++ b/renterd/options.go @@ -0,0 +1,60 @@ +package renterd + +import ( + "go.sia.tech/renterd/bus" + "go.sia.tech/renterd/worker" + "go.uber.org/zap" +) + +type options struct { + Bucket string + CacheSize int + + Downloader BlockDownloader + Worker *worker.Client + Bus *bus.Client + Log *zap.Logger +} + +type Option func(*options) + +// WithBucket sets the bucket name. +func WithBucket(bucket string) Option { + return func(o *options) { + o.Bucket = bucket + } +} + +// WithLog sets the logger. +func WithLog(l *zap.Logger) Option { + return func(o *options) { + o.Log = l + } +} + +// WithWorker sets the worker client. +func WithWorker(w *worker.Client) Option { + return func(o *options) { + o.Worker = w + } +} + +// WithBus sets the bus client. +func WithBus(b *bus.Client) Option { + return func(o *options) { + o.Bus = b + } +} + +func WithDownloader(bd BlockDownloader) Option { + return func(o *options) { + o.Downloader = bd + } +} + +// WithCacheSize sets the size of the block cache. +func WithCacheSize(size int) Option { + return func(o *options) { + o.CacheSize = size + } +} diff --git a/renterd/store.go b/renterd/store.go new file mode 100644 index 0000000..5158b2c --- /dev/null +++ b/renterd/store.go @@ -0,0 +1,205 @@ +package renterd + +import ( + "bytes" + "context" + "fmt" + "strings" + "time" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + "go.sia.tech/renterd/api" + "go.sia.tech/renterd/bus" + "go.sia.tech/renterd/worker" + "go.uber.org/zap" +) + +type ( + // A BlockDownloader downloads blocks from a renterd node. + BlockDownloader interface { + Get(ctx context.Context, c cid.Cid) (blocks.Block, error) + } + + // A BlockStore is a blockstore backed by a renterd node. + BlockStore struct { + log *zap.Logger + + bucket string + + workerClient *worker.Client + busClient *bus.Client + + downloader BlockDownloader + } +) + +func cidKey(c cid.Cid) string { + return cid.NewCidV1(c.Type(), c.Hash()).String() +} + +// DeleteBlock removes a given block from the blockstore. +func (bs *BlockStore) DeleteBlock(ctx context.Context, c cid.Cid) error { + key := cidKey(c) + log := bs.log.Named("DeleteBlock").With(zap.Stack("stack"), zap.Stringer("cid", c), zap.String("key", key)) + + start := time.Now() + if err := bs.busClient.DeleteObject(ctx, bs.bucket, key, api.DeleteObjectOptions{}); err != nil { + log.Debug("failed to delete block", zap.Error(err)) + } + log.Debug("deleted block", zap.Duration("elapsed", time.Since(start))) + return nil +} + +// Has returns whether or not a given block is in the blockstore. +func (bs *BlockStore) Has(ctx context.Context, c cid.Cid) (bool, error) { + key := cidKey(c) + log := bs.log.Named("Has").With(zap.Stringer("cid", c), zap.String("key", key)) + + start := time.Now() + _, err := bs.busClient.Object(ctx, bs.bucket, key, api.GetObjectOptions{}) + if err != nil { + if strings.Contains(err.Error(), "object not found") { + log.Debug("block does not exist", zap.Duration("elapsed", time.Since(start))) + return false, nil + } + log.Debug("failed to get block", zap.Error(err)) + return false, fmt.Errorf("failed to check block existence: %w", err) + } + log.Debug("block exists", zap.Duration("elapsed", time.Since(start))) + return true, nil +} + +// Get returns a block by CID +func (bs *BlockStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { + return bs.downloader.Get(ctx, c) +} + +// GetSize returns the CIDs mapped BlockSize +func (bs *BlockStore) GetSize(ctx context.Context, c cid.Cid) (int, error) { + key := cidKey(c) + log := bs.log.Named("GetSize").With(zap.Stringer("cid", c), zap.String("key", key)) + stat, err := bs.busClient.Object(ctx, bs.bucket, key, api.GetObjectOptions{}) + if err != nil { + if !strings.Contains(err.Error(), "object not found") { + log.Debug("failed to get block size", zap.Error(err)) + } + return 0, fmt.Errorf("failed to get block size: %w", err) + } + log.Debug("got block size", zap.Int("size", int(stat.Object.Size))) + return int(stat.Object.Size), nil +} + +// Put puts a given block to the underlying datastore +func (bs *BlockStore) Put(ctx context.Context, b blocks.Block) error { + key := cidKey(b.Cid()) + log := bs.log.Named("Put").With(zap.Stringer("cid", b.Cid()), zap.String("key", key), zap.Int("size", len(b.RawData()))) + start := time.Now() + _, err := bs.workerClient.UploadObject(ctx, bytes.NewReader(b.RawData()), bs.bucket, key, api.UploadObjectOptions{}) + log.Debug("put block", zap.Duration("duration", time.Since(start)), zap.Error(err)) + return err +} + +// PutMany puts a slice of blocks at the same time using batching +// capabilities of the underlying datastore whenever possible. +func (bs *BlockStore) PutMany(ctx context.Context, blocks []blocks.Block) error { + log := bs.log.Named("PutMany").With(zap.Int("blocks", len(blocks))) + + for _, block := range blocks { + log.Debug("putting block", zap.Stringer("cid", block.Cid())) + if err := bs.Put(ctx, block); err != nil { + return fmt.Errorf("failed to put block %q: %w", block.Cid(), err) + } + } + + return nil +} + +// AllKeysChan returns a channel from which +// the CIDs in the Blockstore can be read. It should respect +// the given context, closing the channel if it becomes Done. +// +// AllKeysChan treats the underlying blockstore as a set, and returns that +// set in full. The only guarantee is that the consumer of AKC will +// encounter every CID in the underlying set, at least once. If the +// underlying blockstore supports duplicate CIDs it is up to the +// implementation to elect to return such duplicates or not. Similarly no +// guarantees are made regarding CID ordering. +// +// When underlying blockstore is operating on Multihash and codec information +// is not preserved, returned CIDs will use Raw (0x55) codec. +func (bs *BlockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + log := bs.log.Named("AllKeysChan") + ch := make(chan cid.Cid) + go func() { + var marker string + for { + resp, err := bs.busClient.ListObjects(ctx, bs.bucket, api.ListObjectOptions{ + Marker: marker, + Limit: 100, + }) + if err != nil { + close(ch) + return + } else if len(resp.Objects) == 0 { + close(ch) + return + } + + for _, obj := range resp.Objects { + name := obj.Name + if p := strings.Split(obj.Name, "/"); len(p) > 1 { + name = p[len(p)-1] + } + c, err := cid.Parse(name) + if err != nil { + log.Debug("skipping invalid key", zap.String("key", obj.Name), zap.String("name", name)) + continue + } + log.Debug("found key", zap.Stringer("cid", c)) + select { + case ch <- c: + case <-ctx.Done(): + close(ch) + return + } + marker = obj.Name + } + } + }() + return ch, nil +} + +// HashOnRead specifies if every read block should be +// rehashed to make sure it matches its CID. +func (bs *BlockStore) HashOnRead(enabled bool) { + // TODO: implement +} + +// NewBlockStore creates a new blockstore backed by a renterd node +func NewBlockStore(opts ...Option) (*BlockStore, error) { + options := &options{ + Log: zap.NewNop(), + Bucket: "ipfs", + CacheSize: 1024, // 1GiB assuming 1MiB blocks + } + for _, opt := range opts { + opt(options) + } + + if options.Bus == nil { + return nil, fmt.Errorf("bus client is required") + } else if options.Worker == nil { + return nil, fmt.Errorf("worker client is required") + } else if options.Downloader == nil { + return nil, fmt.Errorf("block downloader is required") + } + + return &BlockStore{ + log: options.Log, + busClient: options.Bus, + workerClient: options.Worker, + bucket: options.Bucket, + downloader: options.Downloader, + }, nil +} diff --git a/sia/sia.go b/sia/sia.go deleted file mode 100644 index b10b04a..0000000 --- a/sia/sia.go +++ /dev/null @@ -1,285 +0,0 @@ -package sia - -import ( - "context" - "encoding/hex" - "errors" - "fmt" - "io" - "net/http" - "net/http/httputil" - "net/url" - "sync" - - chunker "github.com/ipfs/boxo/chunker" - "github.com/ipfs/boxo/ipld/merkledag" - "github.com/ipfs/boxo/ipld/unixfs/importer/balanced" - ihelpers "github.com/ipfs/boxo/ipld/unixfs/importer/helpers" - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - "go.sia.tech/fsd/config" - "go.sia.tech/renterd/api" - "go.sia.tech/renterd/bus" - "go.sia.tech/renterd/worker" - "go.uber.org/zap" - "lukechampine.com/frand" -) - -type ( - // A Store is a persistent store for IPFS blocks - Store interface { - AddBlocks(context.Context, []Block) error - GetBlock(ctx context.Context, c cid.Cid) (Block, error) - HasBlock(ctx context.Context, c cid.Cid) (bool, error) - AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) - } - - // An IPFSProvider broadcasts CIDs to the IPFS network - IPFSProvider interface { - // Provide broadcasts a CID to the IPFS network - Provide(cid.Cid) error - // GetBlock retrieves a block from the IPFS network - GetBlock(context.Context, cid.Cid) (blocks.Block, error) - } - - // A Node is a specialized IPFS gateway that retrieves data from a renterd - // node - Node struct { - store Store - ipfs IPFSProvider - log *zap.Logger - - worker *worker.Client - bus *bus.Client - - renterd config.Renterd - } - - // UnixFSOptions holds configuration options for UnixFS uploads - UnixFSOptions struct { - CIDBuilder cid.Builder - RawLeaves bool - MaxLinks int - BlockSize int64 - } -) - -// ErrNotFound is returned when a CID is not found in the store -var ErrNotFound = errors.New("not found") - -func setDefaultCIDOpts(opts *UnixFSOptions) { - if opts.MaxLinks <= 0 { - opts.MaxLinks = ihelpers.DefaultLinksPerBlock - } - - if opts.BlockSize <= 0 { - opts.BlockSize = chunker.DefaultBlockSize - } -} - -// UploadFile uploads a unixfs file to the renterd node and returns the root CID -func (n *Node) UploadFile(ctx context.Context, r io.Reader, opts UnixFSOptions) (cid.Cid, error) { - log := n.log.Named("upload").With(zap.String("bucket", n.renterd.Bucket)) - - uploadID := hex.EncodeToString(frand.Bytes(32)) - tmpDataKey := uploadID + ".tmp" - tmpMetaKey := uploadID + ".meta.tmp" - - metaR, metaW := io.Pipe() - dataR, dataW := io.Pipe() - - dagSvc := NewUnixFileUploader(dataW, metaW, log) - - errCh := make(chan error, 2) - - var wg sync.WaitGroup - wg.Add(2) - go func() { - defer wg.Done() - defer metaW.Close() - - if _, err := n.worker.UploadObject(ctx, dataR, n.renterd.Bucket, tmpDataKey, api.UploadObjectOptions{}); err != nil { - errCh <- fmt.Errorf("failed to upload data: %w", err) - return - } - }() - - go func() { - defer wg.Done() - defer dataW.Close() - - if _, err := n.worker.UploadObject(ctx, metaR, n.renterd.Bucket, tmpMetaKey, api.UploadObjectOptions{}); err != nil { - errCh <- fmt.Errorf("failed to upload metadata: %w", err) - return - } - }() - - setDefaultCIDOpts(&opts) - spl := chunker.NewSizeSplitter(r, opts.BlockSize) - dbp := ihelpers.DagBuilderParams{ - Dagserv: dagSvc, - CidBuilder: opts.CIDBuilder, - RawLeaves: opts.RawLeaves, - Maxlinks: opts.MaxLinks, - } - db, err := dbp.New(spl) - if err != nil { - return cid.Cid{}, fmt.Errorf("failed to create dag builder: %w", err) - } - - rootNode, err := balanced.Layout(db) - if err != nil { - return cid.Cid{}, fmt.Errorf("failed to build dag: %w", err) - } - - // close the pipes to signal the uploaders that we're done - metaW.Close() - dataW.Close() - // wait for uploads to finish - wg.Wait() - - select { - case err := <-errCh: - return cid.Cid{}, err - default: - } - - dataKey := rootNode.Cid().String() - metaKey := dataKey + ".meta" - - if err = n.bus.RenameObject(ctx, n.renterd.Bucket, "/"+tmpDataKey, "/"+dataKey, true); err != nil { - return cid.Cid{}, fmt.Errorf("failed to rename tmp data: %w", err) - } else if err = n.bus.RenameObject(ctx, n.renterd.Bucket, "/"+tmpMetaKey, "/"+metaKey, true); err != nil { - return cid.Cid{}, fmt.Errorf("failed to rename tmp metadata: %w", err) - } - - // get the blocks from the dag service - blocks := dagSvc.Blocks() - // set the renterd bucket and object key for the blocks - for i := range blocks { - blocks[i].Data.Bucket = n.renterd.Bucket - blocks[i].Data.Key = dataKey - blocks[i].Metadata.Bucket = n.renterd.Bucket - blocks[i].Metadata.Key = metaKey - } - - // add the blocks to the store - if err := n.store.AddBlocks(ctx, blocks); err != nil { - return cid.Cid{}, fmt.Errorf("failed to add blocks to store: %w", err) - } - - // broadcast the CIDs to the IPFS network - for _, b := range blocks { - if err := n.ipfs.Provide(b.CID); err != nil { - return cid.Cid{}, fmt.Errorf("failed to provide block: %w", err) - } - } - return rootNode.Cid(), nil -} - -// ProxyHTTPDownload proxies an http download request to the renterd node -func (n *Node) ProxyHTTPDownload(c cid.Cid, r *http.Request, w http.ResponseWriter) error { - block, err := n.store.GetBlock(r.Context(), c) - if err != nil { - return err - } else if block.Data.Offset != 0 { - return errors.New("cannot proxy partial downloads") - } - - target, err := url.Parse(n.renterd.WorkerAddress + "/objects/" + block.Data.Key) - if err != nil { - panic(err) - } - target.RawQuery = url.Values{ - "bucket": []string{block.Data.Bucket}, - }.Encode() - - rp := &httputil.ReverseProxy{ - Rewrite: func(r *httputil.ProxyRequest) { - r.Out.Method = http.MethodGet - r.Out.URL = target - r.Out.SetBasicAuth("", n.renterd.WorkerPassword) - r.Out.Header.Set("Range", r.In.Header.Get("Range")) - - n.log.Debug("proxying request to", zap.Stringer("url", r.Out.URL)) - }, - } - - rp.ServeHTTP(w, r) - return nil -} - -// CalculateBlocks calculates the blocks for a given reader and returns them -func (n *Node) CalculateBlocks(ctx context.Context, r io.Reader, opts UnixFSOptions) ([]Block, error) { - dagSvc := NewUnixFileUploader(io.Discard, io.Discard, n.log.Named("calculate")) - - setDefaultCIDOpts(&opts) - spl := chunker.NewSizeSplitter(r, opts.BlockSize) - dbp := ihelpers.DagBuilderParams{ - Dagserv: dagSvc, - CidBuilder: opts.CIDBuilder, - RawLeaves: opts.RawLeaves, - Maxlinks: opts.MaxLinks, - } - db, err := dbp.New(spl) - if err != nil { - return nil, fmt.Errorf("failed to create dag builder: %w", err) - } - - _, err = balanced.Layout(db) - if err != nil { - return nil, fmt.Errorf("failed to build dag: %w", err) - } - - return dagSvc.Blocks(), nil -} - -// VerifyCID verifies that a CID is correctly stored in the renterd node -func (n *Node) VerifyCID(ctx context.Context, c cid.Cid) error { - rbs := NewBlockStore(n.store, n.renterd, n.log.Named("verify")) - - var recursiveVerifyCid func(ctx context.Context, c cid.Cid) error - recursiveVerifyCid = func(ctx context.Context, c cid.Cid) error { - block, err := rbs.Get(ctx, c) - if err != nil { - return fmt.Errorf("failed to get block: %w", err) - } else if block.Cid().Hash().String() != c.Hash().String() { - return fmt.Errorf("unexpected multihash: %s != %s", block.Cid(), c) - } - - switch node := block.(type) { - case *merkledag.ProtoNode: - links := node.Links() - for _, link := range links { - if err := recursiveVerifyCid(ctx, link.Cid); err != nil { - return err - } - } - case *merkledag.RawNode: - links := node.Links() - for _, link := range links { - if err := recursiveVerifyCid(ctx, link.Cid); err != nil { - return err - } - } - } - - return nil - } - - return recursiveVerifyCid(ctx, c) -} - -// New creates a new Sia IPFS store -func New(store Store, ipfs IPFSProvider, cfg config.Renterd, log *zap.Logger) *Node { - return &Node{ - store: store, - ipfs: ipfs, - log: log, - - worker: worker.NewClient(cfg.WorkerAddress, cfg.WorkerPassword), - bus: bus.NewClient(cfg.BusAddress, cfg.BusPassword), - - renterd: cfg, - } -} diff --git a/sia/store.go b/sia/store.go deleted file mode 100644 index 5ae01be..0000000 --- a/sia/store.go +++ /dev/null @@ -1,215 +0,0 @@ -package sia - -import ( - "bytes" - "context" - "errors" - "fmt" - "sync" - "time" - - "github.com/gogo/protobuf/proto" - "github.com/ipfs/boxo/ipld/merkledag" - pb "github.com/ipfs/boxo/ipld/unixfs/pb" - blocks "github.com/ipfs/go-block-format" - "github.com/ipfs/go-cid" - format "github.com/ipfs/go-ipld-format" - "go.sia.tech/fsd/config" - "go.sia.tech/renterd/api" - "go.sia.tech/renterd/bus" - "go.sia.tech/renterd/worker" - "go.uber.org/zap" -) - -type ( - // A RenterdBlockStore is a blockstore backed by a renterd node. IPFS blocks - // are stored in a local database and uploaded to a renterd node. The - // primary difference between this and a normal IPFS blockstore is that a - // file is stored on the renterd node in one piece and the offsets for - // rebuilding the block are stored in the database. - RenterdBlockStore struct { - store Store - log *zap.Logger - workerClient *worker.Client - busClient *bus.Client - } -) - -// DeleteBlock removes a given block from the blockstore. -// note: this is not implemented -func (bs *RenterdBlockStore) DeleteBlock(context.Context, cid.Cid) error { - return nil -} - -// Has returns whether or not a given block is in the blockstore. -func (bs *RenterdBlockStore) Has(ctx context.Context, c cid.Cid) (bool, error) { - bs.log.Debug("has block", zap.Stringer("cid", c)) - return bs.store.HasBlock(ctx, c) -} - -func (bs *RenterdBlockStore) downloadBlock(ctx context.Context, cm Block) (blocks.Block, error) { - dataBuf := bytes.NewBuffer(make([]byte, 0, cm.Data.BlockSize)) - metadataBuf := bytes.NewBuffer(make([]byte, 0, cm.Metadata.Length)) - - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - var wg sync.WaitGroup - errCh := make(chan error, 2) - if cm.Data.BlockSize != 0 { - wg.Add(1) - go func() { - defer wg.Done() - - err := bs.workerClient.DownloadObject(ctx, dataBuf, cm.Data.Bucket, cm.Data.Key, api.DownloadObjectOptions{ - Range: api.DownloadRange{ - Offset: int64(cm.Data.Offset), - Length: int64(cm.Data.BlockSize), - }, - }) - if err != nil { - bs.log.Error("failed to download block data", zap.Error(err), zap.Stringer("cid", cm.CID)) - errCh <- fmt.Errorf("failed to download block data: %w", err) - } - }() - } - - if cm.Metadata.Length != 0 { - wg.Add(1) - go func() { - defer wg.Done() - - err := bs.workerClient.DownloadObject(ctx, metadataBuf, cm.Metadata.Bucket, cm.Metadata.Key, api.DownloadObjectOptions{ - Range: api.DownloadRange{ - Offset: int64(cm.Metadata.Offset), - Length: int64(cm.Metadata.Length), - }, - }) - if err != nil { - bs.log.Error("failed to download block metadata", zap.Error(err), zap.Stringer("cid", cm.CID)) - errCh <- fmt.Errorf("failed to download block metadata: %w", err) - } - }() - } - - // Wait for all downloads to complete. - go func() { - wg.Wait() - close(errCh) - }() - - for err := range errCh { - return nil, err - } - - data, metadata := dataBuf.Bytes(), metadataBuf.Bytes() - if n := len(data); n != int(cm.Data.BlockSize) { - return nil, fmt.Errorf("unexpected data size: requested %d got %d", cm.Data.BlockSize, n) - } else if n := len(metadata); n != int(cm.Metadata.Length) { - return nil, fmt.Errorf("unexpected metadata size: requested %d got %d", cm.Metadata.Length, n) - } - - var meta pb.Data - if err := proto.Unmarshal(metadata, &meta); err != nil { - return nil, fmt.Errorf("failed to unmarshal metadata: %w", err) - } - - // note: non-nil block data encodes to 2 bytes - if cm.Data.BlockSize != 0 { - meta.Data = data - } - - buf, err := proto.Marshal(&meta) - if err != nil { - return nil, fmt.Errorf("failed to marshal metadata: %w", err) - } - - node := merkledag.NodeWithData(buf) - for _, link := range cm.Links { - node.AddRawLink(link.Name, &format.Link{ - Name: link.Name, - Size: link.Size, - Cid: link.CID, - }) - } - return node, nil -} - -// Get returns a block by CID -func (bs *RenterdBlockStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { - cm, err := bs.store.GetBlock(ctx, c) - if errors.Is(err, ErrNotFound) { - return nil, format.ErrNotFound{Cid: c} - } else if err != nil { - return nil, fmt.Errorf("failed to get cid: %w", err) - } - - log := bs.log.With(zap.Stringer("cid", c), zap.String("dataBucket", cm.Data.Bucket), zap.String("dataKey", cm.Data.Key), zap.String("metaBucket", cm.Metadata.Bucket), zap.String("metaKey", cm.Metadata.Key), zap.Uint64("blockSize", cm.Data.BlockSize), zap.Uint64("blockOffset", cm.Data.Offset), zap.Uint64("metadataSize", cm.Metadata.Length), zap.Uint64("metadataOffset", cm.Metadata.Offset)) - log.Debug("downloading block") - - start := time.Now() - block, err := bs.downloadBlock(ctx, cm) - if err != nil { - log.Error("failed to download block", zap.Error(err)) - } - log.Debug("block downloaded", zap.Duration("elapsed", time.Since(start))) - return block, err -} - -// GetSize returns the CIDs mapped BlockSize -func (bs *RenterdBlockStore) GetSize(ctx context.Context, c cid.Cid) (int, error) { - cm, err := bs.store.GetBlock(ctx, c) - if errors.Is(err, ErrNotFound) { - return 0, format.ErrNotFound{Cid: c} - } else if err != nil { - return 0, fmt.Errorf("failed to get cid: %w", err) - } - bs.log.Debug("get block size", zap.Stringer("cid", c), zap.Uint64("size", cm.Data.BlockSize)) - return int(cm.Data.BlockSize + cm.Metadata.Length), nil -} - -// Put puts a given block to the underlying datastore -func (bs *RenterdBlockStore) Put(context.Context, blocks.Block) error { - return nil -} - -// PutMany puts a slice of blocks at the same time using batching -// capabilities of the underlying datastore whenever possible. -func (bs *RenterdBlockStore) PutMany(context.Context, []blocks.Block) error { - return nil -} - -// AllKeysChan returns a channel from which -// the CIDs in the Blockstore can be read. It should respect -// the given context, closing the channel if it becomes Done. -// -// AllKeysChan treats the underlying blockstore as a set, and returns that -// set in full. The only guarantee is that the consumer of AKC will -// encounter every CID in the underlying set, at least once. If the -// underlying blockstore supports duplicate CIDs it is up to the -// implementation to elect to return such duplicates or not. Similarly no -// guarantees are made regarding CID ordering. -// -// When underlying blockstore is operating on Multihash and codec information -// is not preserved, returned CIDs will use Raw (0x55) codec. -func (bs *RenterdBlockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { - return bs.store.AllKeysChan(ctx) -} - -// HashOnRead specifies if every read block should be -// rehashed to make sure it matches its CID. -func (bs *RenterdBlockStore) HashOnRead(enabled bool) { - // TODO: implement -} - -// NewBlockStore creates a new blockstore backed by the given -// badger.Store and a renterd node -func NewBlockStore(store Store, renterd config.Renterd, log *zap.Logger) *RenterdBlockStore { - return &RenterdBlockStore{ - store: store, - log: log, - - workerClient: worker.NewClient(renterd.WorkerAddress, renterd.WorkerPassword), - busClient: bus.NewClient(renterd.BusAddress, renterd.BusPassword), - } -} diff --git a/sia/types.go b/sia/types.go deleted file mode 100644 index 87ff40f..0000000 --- a/sia/types.go +++ /dev/null @@ -1,41 +0,0 @@ -package sia - -import ( - "github.com/ipfs/go-cid" -) - -type ( - // A RenterdData links IPFS block data to an object stored on a renterd node - RenterdData struct { - Bucket string `json:"bucket"` - Key string `json:"key"` - Offset uint64 `json:"offset"` - FileSize uint64 `json:"filesize"` - BlockSize uint64 `json:"blocksize"` - } - - // RenterdMeta links IPFS block metadata to an object stored on a renterd - // node - RenterdMeta struct { - Bucket string `json:"bucket"` - Key string `json:"key"` - Offset uint64 `json:"offset"` - Length uint64 `json:"length"` - } - - // A Link is a link to another IPFS node - Link struct { - CID cid.Cid `json:"cid"` - Name string `json:"name"` - Size uint64 `json:"size"` - } - - // A Block is an IPFS chunk with metadata for efficient storage and - // retrieval from a renterd object - Block struct { - CID cid.Cid `json:"cid"` - Data RenterdData `json:"data"` - Metadata RenterdMeta `json:"metadata"` - Links []Link `json:"links"` - } -) diff --git a/sia/uploader.go b/sia/uploader.go deleted file mode 100644 index b767543..0000000 --- a/sia/uploader.go +++ /dev/null @@ -1,195 +0,0 @@ -package sia - -import ( - "context" - "fmt" - "io" - - "github.com/gogo/protobuf/proto" - "github.com/ipfs/boxo/ipld/merkledag" - "github.com/ipfs/boxo/ipld/unixfs" - pb "github.com/ipfs/boxo/ipld/unixfs/pb" - "github.com/ipfs/go-cid" - format "github.com/ipfs/go-ipld-format" - "go.uber.org/zap" -) - -// A UnixFileUploader uploads a UnixFS DAG to a renterd node -type UnixFileUploader struct { - dataOffset uint64 - metaOffset uint64 - fileSize uint64 - - data io.Writer - metadata io.Writer - - blocks []Block - - log *zap.Logger -} - -// Get retrieves nodes by CID. Depending on the NodeGetter -// implementation, this may involve fetching the Node from a remote -// machine; consider setting a deadline in the context. -func (ufs *UnixFileUploader) Get(ctx context.Context, c cid.Cid) (format.Node, error) { - panic("not implemented") -} - -// GetMany returns a channel of NodeOptions given a set of CIDs. -func (ufs *UnixFileUploader) GetMany(ctx context.Context, c []cid.Cid) <-chan *format.NodeOption { - panic("not implemented") -} - -func (ufs *UnixFileUploader) uploadProtoNode(_ context.Context, node *merkledag.ProtoNode) (data, metadata []byte, _ error) { - fsNode, err := unixfs.ExtractFSNode(node) - if err != nil { - return nil, nil, fmt.Errorf("failed to extract fs node: %w", err) - } - - switch fsNode.Type() { - case unixfs.TFile: - data = fsNode.Data() - - buf, err := fsNode.GetBytes() - if err != nil { - return nil, nil, fmt.Errorf("failed to get bytes: %w", err) - } - - var meta pb.Data - if err := proto.Unmarshal(buf, &meta); err != nil { - return nil, nil, fmt.Errorf("failed to unmarshal metadata: %w", err) - } - meta.Data = nil - metadata, err = proto.Marshal(&meta) - if err != nil { - return nil, nil, fmt.Errorf("failed to marshal metadata: %w", err) - } - default: - return nil, nil, fmt.Errorf("unsupported unixfs node type: %T", fsNode) - } - return -} - -func (ufs *UnixFileUploader) uploadRawNode(_ context.Context, node *merkledag.RawNode) (data, metadata []byte, _ error) { - return node.RawData(), nil, nil -} - -// Add adds a node to this DAG. -func (ufs *UnixFileUploader) Add(ctx context.Context, node format.Node) error { - var data, metadata []byte - var err error - switch node := node.(type) { - case *merkledag.ProtoNode: - data, metadata, err = ufs.uploadProtoNode(ctx, node) - case *merkledag.RawNode: - data, metadata, err = ufs.uploadRawNode(ctx, node) - default: - return fmt.Errorf("unsupported node type: %T", node) - } - - if err != nil { - return err - } - - var links []Link - for _, link := range node.Links() { - links = append(links, Link{ - CID: link.Cid, - Name: link.Name, - Size: link.Size, - }) - } - - dataSize := uint64(len(data)) - fileSize := ufs.fileSize + dataSize - dataOffset := ufs.dataOffset - if dataSize == 0 { - dataOffset = 0 - } - - ufs.log.Debug("adding node", - zap.Stringer("cid", node.Cid()), - zap.Uint64("filesize", fileSize), - zap.Uint64("dataOffset", dataOffset), - zap.Uint64("metaOffset", ufs.metaOffset), - zap.Uint64("datasize", dataSize), - zap.Int("links", len(node.Links()))) - - metaLen, err := ufs.metadata.Write(metadata) - if err != nil { - return fmt.Errorf("failed to write metadata: %w", err) - } - - _, err = ufs.data.Write(data) - if err != nil { - return fmt.Errorf("failed to write data: %w", err) - } - - block := Block{ - CID: node.Cid(), - Links: links, - Data: RenterdData{ - Offset: dataOffset, - FileSize: fileSize, - BlockSize: dataSize, - }, - Metadata: RenterdMeta{ - Offset: ufs.metaOffset, - Length: uint64(metaLen), - }, - } - ufs.dataOffset += dataSize - ufs.fileSize += dataSize - ufs.metaOffset += block.Metadata.Length - ufs.blocks = append(ufs.blocks, block) - return nil -} - -// AddMany adds many nodes to this DAG. -// -// Consider using the Batch NodeAdder (`NewBatch`) if you make -// extensive use of this function. -func (ufs *UnixFileUploader) AddMany(ctx context.Context, nodes []format.Node) error { - for _, node := range nodes { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - if err := ufs.Add(ctx, node); err != nil { - return err - } - } - return nil -} - -// Remove removes a node from this DAG. -// -// Remove returns no error if the requested node is not present in this DAG. -func (ufs *UnixFileUploader) Remove(context.Context, cid.Cid) error { - panic("not implemented") -} - -// RemoveMany removes many nodes from this DAG. -// -// It returns success even if the nodes were not present in the DAG. -func (ufs *UnixFileUploader) RemoveMany(context.Context, []cid.Cid) error { - panic("not implemented") -} - -// Blocks returns all blocks that were added to this DAG. They should be added -// to the blockstore -func (ufs *UnixFileUploader) Blocks() []Block { - return ufs.blocks -} - -// NewUnixFileUploader creates a new UnixFileUploader that uploads a UnixFS DAG -// to a renterd node. -func NewUnixFileUploader(data, metadata io.Writer, log *zap.Logger) *UnixFileUploader { - return &UnixFileUploader{ - log: log, - - data: data, - metadata: metadata, - } -}