diff --git a/.gitignore b/.gitignore index 19eb4cc..cbab71d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ filestore config.yml data -bin \ No newline at end of file +bin +.DS_Store \ No newline at end of file diff --git a/ape0.png b/ape0.png deleted file mode 100644 index b205e8a..0000000 Binary files a/ape0.png and /dev/null differ diff --git a/cmd/fsd/main.go b/cmd/fsd/main.go index 2f05f38..33285dc 100644 --- a/cmd/fsd/main.go +++ b/cmd/fsd/main.go @@ -19,6 +19,7 @@ import ( shttp "go.sia.tech/fsd/http" "go.sia.tech/fsd/ipfs" "go.sia.tech/fsd/persist/badger" + "go.sia.tech/fsd/sia" "go.sia.tech/jape" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -139,13 +140,15 @@ func main() { log.Fatal("failed to generate private key", zap.Error(err)) } } - store := ipfs.NewRenterdBlockStore(db, cfg.Renterd, log.Named("blockstore")) + store := sia.NewBlockStore(db, cfg.Renterd, log.Named("blockstore")) - node, err := ipfs.NewNode(ctx, privateKey, cfg.IPFS, store) + inode, err := ipfs.NewNode(ctx, privateKey, cfg.IPFS, store) if err != nil { log.Fatal("failed to start ipfs node", zap.Error(err)) } - defer node.Close() + defer inode.Close() + + snode := sia.New(db, cfg.Renterd, log.Named("sia")) apiListener, err := net.Listen("tcp", cfg.API.Address) if err != nil { @@ -160,12 +163,12 @@ func main() { defer gatewayListener.Close() apiServer := &http.Server{ - Handler: jape.BasicAuth(cfg.API.Password)(shttp.NewAPIHandler(node, db, cfg, log.Named("api"))), + Handler: jape.BasicAuth(cfg.API.Password)(shttp.NewAPIHandler(inode, snode, cfg, log.Named("api"))), } defer apiServer.Close() gatewayServer := &http.Server{ - Handler: shttp.NewIPFSHandler(node, db, cfg, log.Named("gateway")), + Handler: shttp.NewIPFSGatewayHandler(inode, snode, cfg, log.Named("gateway")), } defer gatewayServer.Close() @@ -188,7 +191,7 @@ func main() { prettyKey := "ed25519:" + hex.EncodeToString(buf) log.Info("fsd started", - zap.Stringer("peerID", node.PeerID()), + zap.Stringer("peerID", inode.PeerID()), zap.String("privateKey", prettyKey), zap.String("apiAddress", apiListener.Addr().String()), zap.String("gatewayAddress", gatewayListener.Addr().String()), diff --git a/go.mod b/go.mod index 6998d31..7695899 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.21 require ( github.com/dgraph-io/badger/v4 v4.2.0 + github.com/gogo/protobuf v1.3.2 github.com/ipfs/boxo v0.13.2-0.20231005100652-b3886904df5d github.com/ipfs/go-block-format v0.1.2 github.com/ipfs/go-cid v0.4.1 @@ -11,7 +12,6 @@ require ( github.com/libp2p/go-libp2p v0.31.0 github.com/libp2p/go-libp2p-kad-dht v0.24.4 github.com/multiformats/go-multiaddr v0.11.0 - github.com/multiformats/go-multihash v0.2.3 go.sia.tech/jape v0.9.1-0.20230525021720-ecf031ecbffb go.sia.tech/renterd v0.6.1-0.20231005151658-e450d5902e31 go.uber.org/zap v1.25.0 @@ -46,7 +46,6 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect - github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/glog v1.1.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/mock v1.6.0 // indirect @@ -115,6 +114,7 @@ require ( github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect github.com/multiformats/go-multibase v0.2.0 // indirect github.com/multiformats/go-multicodec v0.9.0 // indirect + github.com/multiformats/go-multihash v0.2.3 // indirect github.com/multiformats/go-multistream v0.4.1 // indirect github.com/multiformats/go-varint v0.0.7 // indirect github.com/onsi/ginkgo/v2 v2.11.0 // indirect diff --git a/http/api.go b/http/api.go index cf664a4..8de5c60 100644 --- a/http/api.go +++ b/http/api.go @@ -1,30 +1,22 @@ package http import ( - "context" - "io" "net/http" "github.com/ipfs/go-cid" "go.sia.tech/fsd/config" "go.sia.tech/fsd/ipfs" + "go.sia.tech/fsd/sia" "go.sia.tech/jape" - "go.sia.tech/renterd/api" "go.sia.tech/renterd/worker" "go.uber.org/zap" ) type ( - // A Store is a persistent store for IPFS blocks - Store interface { - GetBlock(context.Context, cid.Cid) (ipfs.Block, error) - AddBlocks(context.Context, []ipfs.Block) error - } - apiServer struct { - node *ipfs.Node + ipfs *ipfs.Node + sia *sia.Node worker *worker.Client - store Store log *zap.Logger renterd config.Renterd @@ -32,10 +24,12 @@ type ( ) func (as *apiServer) handleCalculate(jc jape.Context) { + ctx := jc.Request.Context() + body := jc.Request.Body defer body.Close() - blocks, err := ipfs.BuildBalancedCID("test", body) + blocks, err := as.sia.CalculateBlocks(ctx, body) if err != nil { jc.Error(err, http.StatusInternalServerError) return @@ -55,39 +49,14 @@ func (as *apiServer) handlePin(jc jape.Context) { return } - r, err := as.node.DownloadCID(ctx, cid, nil) + r, err := as.ipfs.DownloadCID(ctx, cid, nil) if err != nil { jc.Error(err, http.StatusInternalServerError) return } defer r.Close() - pr, pw := io.Pipe() - tr := io.TeeReader(r, pw) - uploadErr := make(chan error, 1) - - go func() { - defer pw.Close() - defer close(uploadErr) - - _, err = as.worker.UploadObject(ctx, tr, cid.Hash().B58String(), api.UploadWithBucket(as.renterd.Bucket)) - if err != nil { - uploadErr <- err - } - }() - - blocks, err := ipfs.BuildBalancedCID(cidStr, pr) - if err != nil { - jc.Error(err, http.StatusInternalServerError) - return - } - - if err := <-uploadErr; err != nil { - jc.Error(err, http.StatusInternalServerError) - return - } - - if err := as.store.AddBlocks(ctx, blocks); err != nil { + if err := as.sia.UploadCID(ctx, cid, r); err != nil { jc.Error(err, http.StatusInternalServerError) return } @@ -108,50 +77,25 @@ func (as *apiServer) handleUpload(jc jape.Context) { body := jc.Request.Body defer body.Close() - pr, pw := io.Pipe() - r := io.TeeReader(body, pw) - uploadErr := make(chan error, 1) - - go func() { - defer pw.Close() - defer close(uploadErr) - - _, err = as.worker.UploadObject(ctx, r, cid.Hash().B58String(), api.UploadWithBucket(as.renterd.Bucket)) - if err != nil { - uploadErr <- err - } - }() - - blocks, err := ipfs.BuildBalancedCID(cidStr, pr) + err = as.sia.UploadCID(ctx, cid, body) if err != nil { jc.Error(err, http.StatusInternalServerError) return } - if err := <-uploadErr; err != nil { - jc.Error(err, http.StatusInternalServerError) - return - } - - if err := as.store.AddBlocks(ctx, blocks); err != nil { - jc.Error(err, http.StatusInternalServerError) - return - } // the root cid is the first block - rootCID := blocks[0].CID - jc.Encode(rootCID.Hash().B58String()) - as.log.Info("uploaded cid", zap.String("rootCID", rootCID.Hash().B58String()), zap.Int("blocks", len(blocks))) + jc.Encode(cid.Hash().B58String()) } // NewAPIHandler returns a new http.Handler that handles requests to the api -func NewAPIHandler(node *ipfs.Node, store Store, cfg config.Config, log *zap.Logger) http.Handler { +func NewAPIHandler(ipfs *ipfs.Node, sia *sia.Node, cfg config.Config, log *zap.Logger) http.Handler { s := &apiServer{ worker: worker.NewClient(cfg.Renterd.Address, cfg.Renterd.Password), renterd: cfg.Renterd, - node: node, - store: store, - log: log, + ipfs: ipfs, + sia: sia, + log: log, } return jape.Mux(map[string]jape.Handler{ "POST /api/cid/calculate": s.handleCalculate, diff --git a/http/ipfs.go b/http/ipfs.go index 6e82027..4d72943 100644 --- a/http/ipfs.go +++ b/http/ipfs.go @@ -7,23 +7,22 @@ import ( "strings" "github.com/ipfs/go-cid" - format "github.com/ipfs/go-ipld-format" "go.sia.tech/fsd/config" "go.sia.tech/fsd/ipfs" + "go.sia.tech/fsd/sia" "go.sia.tech/jape" "go.uber.org/zap" ) -type ipfsServer struct { - store Store - node *ipfs.Node - log *zap.Logger +type ipfsGatewayServer struct { + ipfs *ipfs.Node + sia *sia.Node + log *zap.Logger - ipfs config.IPFS - renterd config.Renterd + config config.Config } -func (is *ipfsServer) handleIPFS(jc jape.Context) { +func (is *ipfsGatewayServer) handleIPFS(jc jape.Context) { ctx := jc.Request.Context() var pathStr string @@ -51,10 +50,10 @@ func (is *ipfsServer) handleIPFS(jc jape.Context) { return } - block, err := is.store.GetBlock(ctx, cid) - if format.IsNotFound(err) && is.ipfs.FetchRemote { + err = is.sia.ProxyHTTPDownload(cid, jc.Request, jc.ResponseWriter) + if errors.Is(err, sia.ErrNotFound) && is.config.IPFS.FetchRemote { is.log.Info("downloading from ipfs", zap.String("cid", cid.Hash().B58String())) - r, err := is.node.DownloadCID(ctx, cid, path) + r, err := is.ipfs.DownloadCID(ctx, cid, path) if err != nil { jc.Error(err, http.StatusInternalServerError) is.log.Error("failed to download cid", zap.Error(err)) @@ -69,31 +68,16 @@ func (is *ipfsServer) handleIPFS(jc jape.Context) { is.log.Error("failed to get block", zap.Error(err)) return } - - is.log.Info("downloading from renterd", zap.String("cid", cid.Hash().B58String()), zap.String("key", block.Key), zap.Uint64("offset", block.Offset), zap.Uint64("length", block.Length)) - reader, err := downloadObject(ctx, is.renterd, block.Key, block.Offset, block.Length) - if err != nil { - jc.Error(err, http.StatusInternalServerError) - is.log.Error("failed to download object", zap.Error(err)) - return - } - defer reader.Close() - - if _, err := io.Copy(jc.ResponseWriter, reader); err != nil { - is.log.Error("failed to copy file", zap.Error(err)) - return - } } -// NewIPFSHandler creates a new http.Handler for the IPFS gateway. -func NewIPFSHandler(node *ipfs.Node, store Store, cfg config.Config, log *zap.Logger) http.Handler { - s := &ipfsServer{ - node: node, - store: store, - log: log, +// NewIPFSGatewayHandler creates a new http.Handler for the IPFS gateway. +func NewIPFSGatewayHandler(ipfs *ipfs.Node, sia *sia.Node, cfg config.Config, log *zap.Logger) http.Handler { + s := &ipfsGatewayServer{ + ipfs: ipfs, + sia: sia, + log: log, - ipfs: cfg.IPFS, - renterd: cfg.Renterd, + config: cfg, } return jape.Mux(map[string]jape.Handler{ diff --git a/http/renterd.go b/http/renterd.go deleted file mode 100644 index 43b5324..0000000 --- a/http/renterd.go +++ /dev/null @@ -1,36 +0,0 @@ -package http - -import ( - "context" - "errors" - "fmt" - "io" - "net/http" - "net/url" - - "go.sia.tech/fsd/config" -) - -func downloadObject(ctx context.Context, renterd config.Renterd, key string, offset, length uint64) (io.ReadCloser, error) { - values := url.Values{} - values.Set("bucket", url.QueryEscape(renterd.Bucket)) - url := fmt.Sprintf("%s/objects/%s?%s", renterd.Address, key, values.Encode()) - - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) - if err != nil { - return nil, err - } - req.SetBasicAuth("", renterd.Password) - req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", offset, offset+length)) - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return nil, err - } - if resp.StatusCode != 200 && resp.StatusCode != 206 { - err, _ := io.ReadAll(resp.Body) - _ = resp.Body.Close() - return nil, errors.New(string(err)) - } - return resp.Body, err -} diff --git a/ipfs/cid.go b/ipfs/cid.go deleted file mode 100644 index 47b8ef0..0000000 --- a/ipfs/cid.go +++ /dev/null @@ -1,160 +0,0 @@ -package ipfs - -import ( - "fmt" - "io" - "sort" - - chunker "github.com/ipfs/boxo/chunker" - "github.com/ipfs/boxo/ipld/merkledag" - "github.com/ipfs/boxo/ipld/merkledag/dagutils" - "github.com/ipfs/boxo/ipld/unixfs" - ihelpers "github.com/ipfs/boxo/ipld/unixfs/importer/helpers" - "github.com/ipfs/go-cid" - ipld "github.com/ipfs/go-ipld-format" - multihash "github.com/multiformats/go-multihash/core" -) - -// A Block is an IPFS chunk with metadata -// for renterd -type Block struct { - CID cid.Cid `json:"cid"` - Key string `json:"key"` - Offset uint64 `json:"offset"` - Length uint64 `json:"length"` - Links []cid.Cid `json:"links"` -} - -// BuildBalancedCID builds a balanced CID from the given reader It returns a -// slice of Block which contains the CID, the renterd object key, the offset, -// the length and the links of each block. -func BuildBalancedCID(key string, r io.Reader) ([]Block, error) { - prefix, err := merkledag.PrefixForCidVersion(0) - if err != nil { - return nil, fmt.Errorf("failed to get prefix: %w", err) - } - - prefix.MhType = multihash.SHA2_256 - - spl := chunker.NewSizeSplitter(r, chunker.DefaultBlockSize) - dbp := ihelpers.DagBuilderParams{ - Maxlinks: ihelpers.DefaultLinksPerBlock, - Dagserv: dagutils.NewMemoryDagService(), - } - db, err := dbp.New(spl) - if err != nil { - return nil, fmt.Errorf("failed to create dag builder: %w", err) - } - - var offset uint64 - blocks := make(map[string]Block) - - var fillNode func(db *ihelpers.DagBuilderHelper, node *ihelpers.FSNodeOverDag, depth int) (ipld.Node, uint64, error) - fillNode = func(db *ihelpers.DagBuilderHelper, node *ihelpers.FSNodeOverDag, depth int) (ipld.Node, uint64, error) { - if node == nil { - node = db.NewFSNodeOverDag(unixfs.TFile) - } - - var child ipld.Node - var childSize uint64 - - for node.NumChildren() < db.Maxlinks() && !db.Done() { - if depth == 1 { - child, childSize, err = db.NewLeafDataNode(unixfs.TFile) - if err != nil { - return nil, 0, fmt.Errorf("failed to create new leaf node: %w", err) - } - cid := child.Cid() - blocks[cid.String()] = Block{ - CID: cid, - Key: key, - Offset: offset, - Length: childSize, - } - offset += childSize - } else { - child, childSize, err = fillNode(db, nil, depth-1) - if err != nil { - return nil, 0, fmt.Errorf("failed to fill node: %w", err) - } - cc := child.Cid() - var links []cid.Cid - for _, link := range child.Links() { - links = append(links, link.Cid) - } - size, err := child.Size() - if err != nil { - return nil, 0, fmt.Errorf("failed to get size: %w", err) - } - blocks[cc.String()] = Block{ - CID: cc, - Key: key, - Offset: offset, - Length: size, - Links: links, - } - } - - if err = node.AddChild(child, childSize, db); err != nil { - return nil, 0, fmt.Errorf("failed to add child: %w", err) - } - } - - nodeSize := node.FileSize() - filledNode, err := node.Commit() - return filledNode, nodeSize, err - } - - root, size, err := db.NewLeafDataNode(unixfs.TFile) - if err != nil { - return nil, fmt.Errorf("failed to create new leaf node: %w", err) - } - rc := root.Cid() - blocks[rc.String()] = Block{ - CID: rc, - Key: key, - Offset: 0, - Length: size, - } - offset += size - - for depth := 1; !db.Done(); depth++ { - newRoot := db.NewFSNodeOverDag(unixfs.TFile) - if err := newRoot.AddChild(root, size, db); err != nil { - return nil, fmt.Errorf("failed to add child: %w", err) - } - - root, size, err = fillNode(db, newRoot, depth) - if err != nil { - return nil, fmt.Errorf("failed to fill node: %w", err) - } - } - - var links []cid.Cid - for _, link := range root.Links() { - links = append(links, link.Cid) - } - rc = root.Cid() - blocks[rc.String()] = Block{ - CID: rc, - Key: key, - Offset: 0, - Length: size, - Links: links, - } - - var blockSlice = make([]Block, 0, len(blocks)) - for _, block := range blocks { - blockSlice = append(blockSlice, block) - } - sort.Slice(blockSlice, func(i, j int) bool { - if blockSlice[i].Offset < blockSlice[j].Offset { - return true - } - if blockSlice[i].Offset > blockSlice[j].Offset { - return false - } - return len(blockSlice[j].Links) < len(blockSlice[i].Links) - }) - return blockSlice, nil -} diff --git a/ipfs/node.go b/ipfs/node.go index 68ad656..3288a49 100644 --- a/ipfs/node.go +++ b/ipfs/node.go @@ -55,7 +55,7 @@ func (n *Node) Close() error { return nil } -// DownloadCID downloads the CID +// DownloadCID downloads a CID from IPFS func (n *Node) DownloadCID(ctx context.Context, c cid.Cid, path []string) (io.ReadSeekCloser, error) { dagSess := merkledag.NewSession(ctx, n.dagService) rootNode, err := dagSess.Get(ctx, c) diff --git a/ipfs/node_test.go b/ipfs/node_test.go new file mode 100644 index 0000000..3819f05 --- /dev/null +++ b/ipfs/node_test.go @@ -0,0 +1,236 @@ +package ipfs_test + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "io" + "path/filepath" + "strings" + "sync" + "testing" + "time" + + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + format "github.com/ipfs/go-ipld-format" + "github.com/libp2p/go-libp2p/core/crypto" + "go.sia.tech/fsd/config" + "go.sia.tech/fsd/ipfs" + "go.sia.tech/fsd/persist/badger" + "go.uber.org/zap" + "go.uber.org/zap/zaptest" + "lukechampine.com/frand" +) + +type memoryBlockStore struct { + mu sync.Mutex + blocks map[cid.Cid][]byte +} + +// DeleteBlock removes a given block from the blockstore. +// note: this is not implemented +func (ms *memoryBlockStore) DeleteBlock(ctx context.Context, c cid.Cid) error { + ms.mu.Lock() + defer ms.mu.Unlock() + delete(ms.blocks, c) + return nil +} + +// Has returns whether or not a given block is in the blockstore. +func (ms *memoryBlockStore) Has(ctx context.Context, c cid.Cid) (bool, error) { + ms.mu.Lock() + defer ms.mu.Unlock() + _, ok := ms.blocks[c] + return ok, nil +} + +// Get returns a block by CID +func (ms *memoryBlockStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { + ms.mu.Lock() + defer ms.mu.Unlock() + if b, ok := ms.blocks[c]; ok { + return blocks.NewBlockWithCid(b, c) + } + return nil, format.ErrNotFound{Cid: c} +} + +// GetSize returns the CIDs mapped BlockSize +func (ms *memoryBlockStore) GetSize(ctx context.Context, c cid.Cid) (int, error) { + ms.mu.Lock() + defer ms.mu.Unlock() + if b, ok := ms.blocks[c]; ok { + return len(b), nil + } + return 0, format.ErrNotFound{Cid: c} +} + +// Put puts a given block to the underlying datastore +func (ms *memoryBlockStore) Put(ctx context.Context, b blocks.Block) error { + ms.mu.Lock() + defer ms.mu.Unlock() + ms.blocks[b.Cid()] = b.RawData() + return nil +} + +// PutMany puts a slice of blocks at the same time using batching +// capabilities of the underlying datastore whenever possible. +func (ms *memoryBlockStore) PutMany(ctx context.Context, blocks []blocks.Block) error { + for _, b := range blocks { + if err := ms.Put(ctx, b); err != nil { + return err + } + } + return nil +} + +// AllKeysChan returns a channel from which +// the CIDs in the Blockstore can be read. It should respect +// the given context, closing the channel if it becomes Done. +// +// AllKeysChan treats the underlying blockstore as a set, and returns that +// set in full. The only guarantee is that the consumer of AKC will +// encounter every CID in the underlying set, at least once. If the +// underlying blockstore supports duplicate CIDs it is up to the +// implementation to elect to return such duplicates or not. Similarly no +// guarantees are made regarding CID ordering. +// +// When underlying blockstore is operating on Multihash and codec information +// is not preserved, returned CIDs will use Raw (0x55) codec. +func (ms *memoryBlockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + ch := make(chan cid.Cid) + + go func() { + var keys []cid.Cid + ms.mu.Lock() + for c := range ms.blocks { + keys = append(keys, c) + } + ms.mu.Unlock() + + for _, c := range keys { + select { + case <-ctx.Done(): + return + case ch <- c: + } + } + close(ch) + }() + + return ch, nil +} + +// HashOnRead specifies if every read block should be +// rehashed to make sure it matches its CID. +func (ms *memoryBlockStore) HashOnRead(enabled bool) { + // TODO: implement +} + +func TestDownload(t *testing.T) { + log := zaptest.NewLogger(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + privateKey, _, err := crypto.GenerateEd25519Key(frand.Reader) + if err != nil { + t.Fatal(err) + } + + store, err := badger.OpenDatabase(filepath.Join(t.TempDir(), "fsd.badgerdb"), log.Named("badger")) + if err != nil { + log.Fatal("failed to open badger database", zap.Error(err)) + } + defer store.Close() + + memBlockStore := &memoryBlockStore{ + blocks: make(map[cid.Cid][]byte), + } + + node, err := ipfs.NewNode(ctx, privateKey, config.IPFS{}, memBlockStore) + if err != nil { + t.Fatal(err) + } + defer node.Close() + + time.Sleep(time.Second) + + t.Log(node.PeerID()) + t.Log(node.Peers()) + + path := strings.Split("1002 - Game AIs/1002 - Game AIs.png", "/") + c := cid.MustParse("QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm") + + downloadCtx, cancel := context.WithTimeout(ctx, 15*time.Second) + defer cancel() + + r, err := node.DownloadCID(downloadCtx, c, path) + if err != nil { + t.Fatal(err) + } + defer r.Close() + + h := sha256.New() + if _, err := io.Copy(h, r); err != nil { + t.Fatal(err) + } + + shaChecksum := hex.EncodeToString(h.Sum(nil)) + if shaChecksum != "bb783b7b53f4a36fd6076fbc8384ca860c20aecd6e57f29cb23ea06409808f31" { + t.Fatalf("unexpected hash: %x", h.Sum(nil)) + } +} + +/* +func TestPin(t *testing.T) { + log := zaptest.NewLogger(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + privateKey, _, err := crypto.GenerateEd25519Key(frand.Reader) + if err != nil { + t.Fatal(err) + } + + store, err := badger.OpenDatabase(filepath.Join(t.TempDir(), "fsd.badgerdb"), log.Named("badger")) + if err != nil { + log.Fatal("failed to open badger database", zap.Error(err)) + } + defer store.Close() + + memBlockStore := &memoryBlockStore{ + blocks: make(map[cid.Cid][]byte), + } + + node, err := ipfs.NewNode(ctx, privateKey, config.IPFS{}, memBlockStore) + if err != nil { + t.Fatal(err) + } + defer node.Close() + + time.Sleep(time.Second) + + c := cid.MustParse("QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm") + + _, err = node.PinCID(ctx, c, func(c cid.Cid, path string) error { + r, err := node.DownloadCID(ctx, c, nil) + if err != nil { + return err + } + defer r.Close() + h := sha256.New() + if _, err := io.Copy(h, r); err != nil { + return err + } + shaChecksum := hex.EncodeToString(h.Sum(nil)) + t.Log("pinned", c, path, shaChecksum) + return nil + }) + if err != nil { + t.Fatal(err) + } + t.Fail() +} +*/ diff --git a/persist/badger/cids.go b/persist/badger/cids.go index c3988c0..651b3dd 100644 --- a/persist/badger/cids.go +++ b/persist/badger/cids.go @@ -6,8 +6,7 @@ import ( "github.com/dgraph-io/badger/v4" "github.com/ipfs/go-cid" - format "github.com/ipfs/go-ipld-format" - "go.sia.tech/fsd/ipfs" + "go.sia.tech/fsd/sia" ) // HasBlock returns true if the CID is in the store @@ -27,12 +26,12 @@ func (s *Store) HasBlock(_ context.Context, c cid.Cid) (ok bool, err error) { } // GetBlock returns the block metadata for a given CID -func (s *Store) GetBlock(_ context.Context, c cid.Cid) (cm ipfs.Block, err error) { +func (s *Store) GetBlock(_ context.Context, c cid.Cid) (cm sia.Block, err error) { err = s.db.View(func(txn *badger.Txn) error { item, err := txn.Get([]byte(c.Bytes())) if err != nil { if err == badger.ErrKeyNotFound { - return format.ErrNotFound{Cid: c} + return sia.ErrNotFound } return err } @@ -44,7 +43,7 @@ func (s *Store) GetBlock(_ context.Context, c cid.Cid) (cm ipfs.Block, err error } // AddBlocks adds blocks to the store -func (s *Store) AddBlocks(_ context.Context, blocks []ipfs.Block) error { +func (s *Store) AddBlocks(_ context.Context, blocks []sia.Block) error { return s.db.Update(func(txn *badger.Txn) error { for _, block := range blocks { buf, err := json.Marshal(block) diff --git a/ipfs/renterd.go b/sia/renterd.go similarity index 98% rename from ipfs/renterd.go rename to sia/renterd.go index 3e24e5d..761088b 100644 --- a/ipfs/renterd.go +++ b/sia/renterd.go @@ -1,4 +1,4 @@ -package ipfs +package sia import ( "context" diff --git a/sia/sia.go b/sia/sia.go new file mode 100644 index 0000000..a210389 --- /dev/null +++ b/sia/sia.go @@ -0,0 +1,176 @@ +package sia + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "net/http/httputil" + "net/url" + "sync" + + chunker "github.com/ipfs/boxo/chunker" + "github.com/ipfs/boxo/ipld/unixfs/importer/balanced" + ihelpers "github.com/ipfs/boxo/ipld/unixfs/importer/helpers" + "github.com/ipfs/go-cid" + "go.sia.tech/fsd/config" + "go.sia.tech/renterd/api" + "go.sia.tech/renterd/worker" + "go.uber.org/zap" +) + +type ( + // A Store is a persistent store for IPFS blocks + Store interface { + AddBlocks(context.Context, []Block) error + GetBlock(ctx context.Context, c cid.Cid) (Block, error) + HasBlock(ctx context.Context, c cid.Cid) (bool, error) + AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) + } + Node struct { + store Store + log *zap.Logger + + renterd config.Renterd + } +) + +// ErrNotFound is returned when a CID is not found in the store +var ErrNotFound = errors.New("not found") + +// UploadCID uploads a CID to the renterd node +func (n *Node) UploadCID(ctx context.Context, c cid.Cid, r io.Reader) error { + log := n.log.Named("upload").With(zap.String("cid", c.Hash().B58String()), zap.String("bucket", n.renterd.Bucket)) + + dataKey := c.Hash().B58String() + metaKey := dataKey + ".meta" + + metaR, metaW := io.Pipe() + dataR, dataW := io.Pipe() + + client := worker.NewClient(n.renterd.Address, n.renterd.Password) + dagSvc := NewUnixFileUploader(c.Hash().B58String(), dataW, metaW, log) + + errCh := make(chan error, 2) + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + defer metaW.Close() + + if _, err := client.UploadObject(ctx, dataR, dataKey, api.UploadWithBucket(n.renterd.Bucket)); err != nil { + errCh <- fmt.Errorf("failed to upload data: %w", err) + return + } + }() + + go func() { + defer wg.Done() + defer dataW.Close() + + if _, err := client.UploadObject(ctx, metaR, metaKey, api.UploadWithBucket(n.renterd.Bucket)); err != nil { + errCh <- fmt.Errorf("failed to upload metadata: %w", err) + return + } + }() + + spl := chunker.NewSizeSplitter(r, chunker.DefaultBlockSize) + + dbp := ihelpers.DagBuilderParams{ + Maxlinks: ihelpers.DefaultLinksPerBlock, + Dagserv: dagSvc, + } + db, err := dbp.New(spl) + if err != nil { + return fmt.Errorf("failed to create dag builder: %w", err) + } + + rootNode, err := balanced.Layout(db) + if err != nil { + return fmt.Errorf("failed to build dag: %w", err) + } + + // close the pipes to signal the uploaders that we're done + metaW.Close() + dataW.Close() + // wait for uploads to finish + wg.Wait() + + select { + case err := <-errCh: + return err + default: + } + + if rootNode.Cid().Hash().B58String() != c.Hash().B58String() { + return fmt.Errorf("unexpected root cid: %s", rootNode.Cid().Hash().B58String()) + } + return n.store.AddBlocks(ctx, dagSvc.Blocks()) +} + +// ProxyDownload proxies an http download request to the renterd node +func (n *Node) ProxyHTTPDownload(cid cid.Cid, r *http.Request, w http.ResponseWriter) error { + block, err := n.store.GetBlock(r.Context(), cid) + if err != nil { + return err + } else if block.Data.Offset != 0 { + return errors.New("cannot proxy partial downloads") + } + + target, err := url.Parse(n.renterd.Address + "/objects/" + cid.Hash().B58String()) + if err != nil { + panic(err) + } + target.RawQuery = url.Values{ + "bucket": []string{n.renterd.Bucket}, + }.Encode() + + rp := &httputil.ReverseProxy{ + Rewrite: func(r *httputil.ProxyRequest) { + r.Out.Method = http.MethodGet + r.Out.URL = target + r.Out.SetBasicAuth("", n.renterd.Password) + r.Out.Header.Set("Range", r.In.Header.Get("Range")) + + n.log.Debug("proxying request to", zap.Stringer("url", r.Out.URL)) + }, + } + + rp.ServeHTTP(w, r) + return nil +} + +// CalculateBlocks calculates the blocks for a given reader and returns them +func (n *Node) CalculateBlocks(ctx context.Context, r io.Reader) ([]Block, error) { + dagSvc := NewUnixFileUploader("", io.Discard, io.Discard, n.log.Named("calculate")) + + spl := chunker.NewSizeSplitter(r, chunker.DefaultBlockSize) + + dbp := ihelpers.DagBuilderParams{ + Maxlinks: ihelpers.DefaultLinksPerBlock, + Dagserv: dagSvc, + } + db, err := dbp.New(spl) + if err != nil { + return nil, fmt.Errorf("failed to create dag builder: %w", err) + } + + _, err = balanced.Layout(db) + if err != nil { + return nil, fmt.Errorf("failed to build dag: %w", err) + } + + return dagSvc.Blocks(), nil +} + +// New creates a new Sia IPFS store +func New(store Store, cfg config.Renterd, log *zap.Logger) *Node { + return &Node{ + store: store, + log: log, + + renterd: cfg, + } +} diff --git a/sia/sia_test.go b/sia/sia_test.go new file mode 100644 index 0000000..1d3257d --- /dev/null +++ b/sia/sia_test.go @@ -0,0 +1,79 @@ +package sia_test + +import ( + "bytes" + "os" + "testing" + + "github.com/gogo/protobuf/proto" + chunker "github.com/ipfs/boxo/chunker" + "github.com/ipfs/boxo/ipld/merkledag" + "github.com/ipfs/boxo/ipld/unixfs/importer/balanced" + ihelpers "github.com/ipfs/boxo/ipld/unixfs/importer/helpers" + pb "github.com/ipfs/boxo/ipld/unixfs/pb" + "go.sia.tech/fsd/sia" + "go.uber.org/zap/zaptest" +) + +func TestLargeFile(t *testing.T) { + f, err := os.Open("/Users/n8maninger/Downloads/Big Buck Bunny.webm") + if err != nil { + t.Fatal(err) + } + defer f.Close() + + var blockBuf, metaBuf bytes.Buffer + dagSvc := sia.NewUnixFileUploader("", &blockBuf, &metaBuf, zaptest.NewLogger(t)) + + spl := chunker.NewSizeSplitter(f, chunker.DefaultBlockSize) + + dbp := ihelpers.DagBuilderParams{ + Maxlinks: ihelpers.DefaultLinksPerBlock, + Dagserv: dagSvc, + } + db, err := dbp.New(spl) + if err != nil { + t.Fatal(err) + } + + rootNode, err := balanced.Layout(db) + if err != nil { + t.Fatal(err) + } + + if rootNode.Cid().Hash().B58String() != "QmbGtJg23skhvFmu9mJiePVByhfzu5rwo74MEkVDYAmF5T" { + t.Fatalf("unexpected root cid: %s", rootNode.Cid().Hash().B58String()) + } + + blocks := dagSvc.Blocks() + root := blocks[len(blocks)-1] + if root.CID.Hash().B58String() != "QmbGtJg23skhvFmu9mJiePVByhfzu5rwo74MEkVDYAmF5T" { + t.Fatalf("unexpected root cid: %s", root.CID) + } else if root.Data.FileSize != 167347949 { + t.Fatalf("unexpected root file size: %d", root.Data.FileSize) + } else if root.Data.Offset != 0 { + t.Fatalf("unexpected root offset: %d", root.Data.Offset) + } + + firstBlock := blocks[0] + + t.Log(firstBlock.Metadata.Offset, firstBlock.Metadata.Length) + metaStart, metaEnd := firstBlock.Metadata.Offset, firstBlock.Metadata.Offset+firstBlock.Metadata.Length + var meta pb.Data + if err := proto.Unmarshal(metaBuf.Bytes()[metaStart:metaEnd], &meta); err != nil { + t.Fatal(err) + } + dataStart, dataEnd := firstBlock.Data.Offset, firstBlock.Data.Offset+firstBlock.Data.BlockSize + t.Log(firstBlock.Data.Offset, firstBlock.Data.BlockSize, len(firstBlock.Links)) + meta.Data = blockBuf.Bytes()[dataStart:dataEnd] + + buf, err := proto.Marshal(&meta) + if err != nil { + t.Fatal(err) + } + + node := merkledag.NodeWithData(buf) + if !node.Cid().Equals(firstBlock.CID) { + t.Fatalf("unexpected cid: %s", node.Cid()) + } +} diff --git a/ipfs/store.go b/sia/store.go similarity index 52% rename from ipfs/store.go rename to sia/store.go index 532c465..2ae0be1 100644 --- a/ipfs/store.go +++ b/sia/store.go @@ -1,30 +1,29 @@ -package ipfs +package sia import ( "bytes" "context" + "errors" "fmt" "io" + "sync" + "github.com/gogo/protobuf/proto" + "github.com/ipfs/boxo/ipld/merkledag" + pb "github.com/ipfs/boxo/ipld/unixfs/pb" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" + format "github.com/ipfs/go-ipld-format" "go.sia.tech/fsd/config" "go.uber.org/zap" ) type ( - // A Store is a persistent store for IPFS blocks - Store interface { - GetBlock(ctx context.Context, c cid.Cid) (Block, error) - HasBlock(ctx context.Context, c cid.Cid) (bool, error) - AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) - } - - // A RenterdBlockStore is a blockstore backed by a renterd node IPFS blocks - // are stored in a local database and backed by a renterd node. The primary - // difference between this and a normal IPFS blockstore is that an object is - // stored on the renterd node in one piece and the offsets for each block - // are stored in the database. + // A RenterdBlockStore is a blockstore backed by a renterd node. IPFS blocks + // are stored in a local database and uploaded to a renterd node. The + // primary difference between this and a normal IPFS blockstore is that a + // file is stored on the renterd node in one piece and the offsets for + // rebuilding the block are stored in the database. RenterdBlockStore struct { store Store log *zap.Logger @@ -48,31 +47,94 @@ func (bs *RenterdBlockStore) Has(ctx context.Context, c cid.Cid) (bool, error) { func (bs *RenterdBlockStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { bs.log.Debug("get block", zap.String("cid", c.Hash().B58String())) cm, err := bs.store.GetBlock(ctx, c) - if err != nil { + if errors.Is(err, ErrNotFound) { + return nil, format.ErrNotFound{Cid: c} + } else if err != nil { return nil, fmt.Errorf("failed to get cid: %w", err) } - var buf bytes.Buffer - r, err := downloadObject(ctx, bs.renterd, cm.Key, cm.Offset, cm.Length) - if err != nil { - return nil, fmt.Errorf("failed to download object: %w", err) + errCh := make(chan error, 2) + defer close(errCh) + + var metaBuf, dataBuf bytes.Buffer + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + + if cm.Data.BlockSize == 0 { + return + } + + r, err := downloadObject(ctx, bs.renterd, cm.Data.Key, cm.Data.Offset, cm.Data.BlockSize) + if err != nil { + errCh <- fmt.Errorf("failed to download object: %w", err) + return + } + defer r.Close() + + if _, err := io.Copy(&dataBuf, r); err != nil { + errCh <- fmt.Errorf("failed to copy object: %w", err) + return + } + }() + + go func() { + defer wg.Done() + + if cm.Metadata.Length == 0 { + return + } + + r, err := downloadObject(ctx, bs.renterd, cm.Metadata.Key, cm.Metadata.Offset, cm.Metadata.Length) + if err != nil { + errCh <- fmt.Errorf("failed to download object: %w", err) + return + } + defer r.Close() + + if _, err := io.Copy(&metaBuf, r); err != nil { + errCh <- fmt.Errorf("failed to copy object: %w", err) + return + } + }() + + wg.Wait() + select { + case err := <-errCh: + return nil, err + default: } - defer r.Close() - if n, err := io.Copy(&buf, r); err != nil { - return nil, fmt.Errorf("failed to copy object: %w", err) - } else if n != int64(cm.Length) { - return nil, fmt.Errorf("failed to copy object: expected %d bytes, got %d", cm.Length, n) + var meta pb.Data + if err := proto.Unmarshal(metaBuf.Bytes(), &meta); err != nil { + return nil, fmt.Errorf("failed to unmarshal metadata: %w", err) } + meta.Data = dataBuf.Bytes() - return blocks.NewBlockWithCid(buf.Bytes(), c) + buf, err := proto.Marshal(&meta) + if err != nil { + return nil, fmt.Errorf("failed to marshal metadata: %w", err) + } + + node := merkledag.NodeWithData(buf) + if !node.Cid().Equals(c) { + panic("unexpected cid") // developer error + } + return node, nil } // GetSize returns the CIDs mapped BlockSize func (bs *RenterdBlockStore) GetSize(ctx context.Context, c cid.Cid) (int, error) { - bs.log.Debug("get size", zap.String("cid", c.Hash().B58String())) cm, err := bs.store.GetBlock(ctx, c) - return int(cm.Length), err + if errors.Is(err, ErrNotFound) { + return 0, format.ErrNotFound{Cid: c} + } else if err != nil { + return 0, fmt.Errorf("failed to get cid: %w", err) + } + bs.log.Debug("get block size", zap.String("cid", c.Hash().B58String()), zap.Uint64("size", cm.Data.BlockSize)) + return int(cm.Data.BlockSize + cm.Metadata.Length), nil } // Put puts a given block to the underlying datastore @@ -109,9 +171,9 @@ func (bs *RenterdBlockStore) HashOnRead(enabled bool) { // TODO: implement } -// NewRenterdBlockStore creates a new blockstore backed by the given +// NewBlockStore creates a new blockstore backed by the given // badger.Store and a renterd node -func NewRenterdBlockStore(store Store, renterd config.Renterd, log *zap.Logger) *RenterdBlockStore { +func NewBlockStore(store Store, renterd config.Renterd, log *zap.Logger) *RenterdBlockStore { return &RenterdBlockStore{ store: store, renterd: renterd, diff --git a/sia/types.go b/sia/types.go new file mode 100644 index 0000000..29fce56 --- /dev/null +++ b/sia/types.go @@ -0,0 +1,36 @@ +package sia + +import ( + "github.com/ipfs/go-cid" +) + +type ( + // A RenterdObject links an IPFS node to an object stored on a renterd node + RenterdData struct { + Key string + Offset uint64 + FileSize uint64 + BlockSize uint64 + } + + RenterdMeta struct { + Key string + Offset uint64 + Length uint64 + } + + // A Link is a link to another IPFS node + Link struct { + CID cid.Cid + Name string + Size uint64 + } + + // A Block is an IPFS chunk with metadata for renterd + Block struct { + CID cid.Cid `json:"cid"` + Data RenterdData `json:"data"` + Metadata RenterdMeta `json:"metadata"` + Links []Link `json:"links"` + } +) diff --git a/sia/uploader.go b/sia/uploader.go new file mode 100644 index 0000000..1bff630 --- /dev/null +++ b/sia/uploader.go @@ -0,0 +1,171 @@ +package sia + +import ( + "context" + "fmt" + "io" + + "github.com/gogo/protobuf/proto" + "github.com/ipfs/boxo/ipld/unixfs" + pb "github.com/ipfs/boxo/ipld/unixfs/pb" + "github.com/ipfs/go-cid" + format "github.com/ipfs/go-ipld-format" + "go.uber.org/zap" +) + +// A UnixFileUploader uploads a UnixFS DAG to a renterd node +type UnixFileUploader struct { + dataOffset uint64 + metaOffset uint64 + + key string + + data io.Writer + metadata io.Writer + + blocks []Block + + log *zap.Logger +} + +// Get retrieves nodes by CID. Depending on the NodeGetter +// implementation, this may involve fetching the Node from a remote +// machine; consider setting a deadline in the context. +func (ufs *UnixFileUploader) Get(ctx context.Context, c cid.Cid) (format.Node, error) { + panic("not implemented") +} + +// GetMany returns a channel of NodeOptions given a set of CIDs. +func (ufs *UnixFileUploader) GetMany(ctx context.Context, c []cid.Cid) <-chan *format.NodeOption { + panic("not implemented") +} + +// Add adds a node to this DAG. +func (ufs *UnixFileUploader) Add(ctx context.Context, node format.Node) error { + fsNode, err := unixfs.ExtractFSNode(node) + if err != nil { + return fmt.Errorf("failed to extract fs node: %w", err) + } + + switch fsNode.Type() { + case unixfs.TFile: + dataSize := uint64(len(fsNode.Data())) + fileSize := fsNode.FileSize() + dataOffset := ufs.dataOffset + dataSize - fileSize + + ufs.log.Debug("adding node", + zap.String("cid", node.Cid().Hash().B58String()), + zap.Stringer("type", fsNode.Type()), + zap.Uint64("filesize", fileSize), + zap.Uint64("dataOffset", dataOffset), + zap.Uint64("metaOffset", ufs.metaOffset), + zap.Uint64("datasize", dataSize), + zap.Int("links", len(node.Links()))) + + var links []Link + for _, link := range node.Links() { + links = append(links, Link{ + CID: link.Cid, + Name: link.Name, + Size: link.Size, + }) + } + + buf, err := fsNode.GetBytes() + if err != nil { + return fmt.Errorf("failed to get bytes: %w", err) + } + + var meta pb.Data + if err := proto.Unmarshal(buf, &meta); err != nil { + return fmt.Errorf("failed to unmarshal metadata: %w", err) + } + meta.Data = nil + metaBytes, err := proto.Marshal(&meta) + if err != nil { + return fmt.Errorf("failed to marshal metadata: %w", err) + } + + metaLen, err := ufs.metadata.Write(metaBytes) + if err != nil { + return fmt.Errorf("failed to write metadata: %w", err) + } + + _, err = ufs.data.Write(fsNode.Data()) + if err != nil { + return fmt.Errorf("failed to write data: %w", err) + } + + block := Block{ + CID: node.Cid(), + Links: links, + Data: RenterdData{ + Key: ufs.key, + Offset: dataOffset, + FileSize: fileSize, + BlockSize: dataSize, + }, + Metadata: RenterdMeta{ + Key: ufs.key + ".meta", + Offset: ufs.metaOffset, + Length: uint64(metaLen), + }, + } + ufs.dataOffset += dataSize + ufs.metaOffset += block.Metadata.Length + ufs.blocks = append(ufs.blocks, block) + default: + return fmt.Errorf("unsupported node type: %v", fsNode.Type()) + } + return nil +} + +// AddMany adds many nodes to this DAG. +// +// Consider using the Batch NodeAdder (`NewBatch`) if you make +// extensive use of this function. +func (ufs *UnixFileUploader) AddMany(ctx context.Context, nodes []format.Node) error { + for _, node := range nodes { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if err := ufs.Add(ctx, node); err != nil { + return err + } + } + return nil +} + +// Remove removes a node from this DAG. +// +// Remove returns no error if the requested node is not present in this DAG. +func (ufs *UnixFileUploader) Remove(context.Context, cid.Cid) error { + panic("not implemented") +} + +// RemoveMany removes many nodes from this DAG. +// +// It returns success even if the nodes were not present in the DAG. +func (ufs *UnixFileUploader) RemoveMany(context.Context, []cid.Cid) error { + panic("not implemented") +} + +// Blocks returns all blocks that were added to this DAG. They should be added +// to the blockstore +func (ufs *UnixFileUploader) Blocks() []Block { + return ufs.blocks +} + +// NewUnixFileUploader creates a new UnixFileUploader that uploads a UnixFS DAG +// to a renterd node. +func NewUnixFileUploader(key string, data, metadata io.Writer, log *zap.Logger) *UnixFileUploader { + return &UnixFileUploader{ + log: log, + + key: key, + data: data, + metadata: metadata, + } +}