Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calculate CID #9

Merged
merged 14 commits into from
Nov 30, 2023
6 changes: 3 additions & 3 deletions cmd/fsd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ var (
dir = "."
cfg = config.Config{
Renterd: config.Renterd{
Address: "http://localhost:9980/api/worker",
Password: "password",
Bucket: "ipfs",
WorkerAddress: "http://localhost:9980/api/worker",
BusAddress: "http://localhost:9980/api/bus",
Bucket: "ipfs",
},
IPFS: config.IPFS{
Gateway: config.HTTPGateway{
Expand Down
8 changes: 5 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import "github.com/ipfs/go-cid"
type (
// Renterd contains the address, password, and bucket on the renterd worker
Renterd struct {
Address string `yaml:"address"`
Password string `yaml:"password"`
Bucket string `yaml:"bucket"`
WorkerAddress string `yaml:"workerAddress"`
WorkerPassword string `yaml:"workerPassword"`
BusAddress string `yaml:"busAddress"`
BusPassword string `yaml:"busPassword"`
Bucket string `yaml:"bucket"`
}

// RemoteFetch contains settings for enabling/disabling remote IPFS block
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ require (
github.com/multiformats/go-multiaddr v0.11.0
github.com/multiformats/go-multicodec v0.9.0
github.com/multiformats/go-multihash v0.2.3
go.sia.tech/jape v0.11.0
go.sia.tech/renterd v0.6.1-0.20231117113258-d4afcc97a585
go.sia.tech/jape v0.11.1
go.sia.tech/renterd v0.6.1-0.20231129193254-22f903d6143a
go.uber.org/zap v1.25.0
gopkg.in/yaml.v3 v3.0.1
lukechampine.com/frand v1.4.2
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -707,12 +707,12 @@ go.sia.tech/core v0.1.12-0.20231011172826-6ca0ac7b3b6b h1:gHnhRiY1SMWCEFu+1Xo0a9
go.sia.tech/core v0.1.12-0.20231011172826-6ca0ac7b3b6b/go.mod h1:3EoY+rR78w1/uGoXXVqcYdwSjSJKuEMI5bL7WROA27Q=
go.sia.tech/gofakes3 v0.0.0-20231109151325-e0d47c10dce2 h1:ulzfJNjxN5DjXHClkW2pTiDk+eJ+0NQhX87lFDZ03t0=
go.sia.tech/gofakes3 v0.0.0-20231109151325-e0d47c10dce2/go.mod h1:PlsiVCn6+wssrR7bsOIlZm0DahsVrDydrlbjY4F14sg=
go.sia.tech/jape v0.11.0 h1:S2JTONZ4FGl5JFmh3VFGkieuFB1wXRLImkHc859V0FY=
go.sia.tech/jape v0.11.0/go.mod h1:4QqmBB+t3W7cNplXPj++ZqpoUb2PeiS66RLpXmEGap4=
go.sia.tech/jape v0.11.1 h1:M7IP+byXL7xOqzxcHUQuXW+q3sYMkYzmMlMw+q8ZZw0=
go.sia.tech/jape v0.11.1/go.mod h1:4QqmBB+t3W7cNplXPj++ZqpoUb2PeiS66RLpXmEGap4=
go.sia.tech/mux v1.2.0 h1:ofa1Us9mdymBbGMY2XH/lSpY8itFsKIo/Aq8zwe+GHU=
go.sia.tech/mux v1.2.0/go.mod h1:Yyo6wZelOYTyvrHmJZ6aQfRoer3o4xyKQ4NmQLJrBSo=
go.sia.tech/renterd v0.6.1-0.20231117113258-d4afcc97a585 h1:CsnWOTGyqn01ZbUwnbTW/1QW++2wRkxIC03CqlPCouY=
go.sia.tech/renterd v0.6.1-0.20231117113258-d4afcc97a585/go.mod h1:wJGnnLUsNhN82DkA45aRUcb++xGm2AjOcpDG0SqQ06M=
go.sia.tech/renterd v0.6.1-0.20231129193254-22f903d6143a h1:qR+IAfJs3PAu59HdeP4OTLhfUogmfZks5Zlo3HEBiP0=
go.sia.tech/renterd v0.6.1-0.20231129193254-22f903d6143a/go.mod h1:5eX9/Ezl+F0mkDs7q6aU4yrHsh/uhWdKeeq37E+Li9M=
go.sia.tech/siad v1.5.10-0.20230228235644-3059c0b930ca h1:aZMg2AKevn7jKx+wlusWQfwSM5pNU9aGtRZme29q3O4=
go.sia.tech/siad v1.5.10-0.20230228235644-3059c0b930ca/go.mod h1:h/1afFwpxzff6/gG5i1XdAgPK7dEY6FaibhK7N5F86Y=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
Expand Down
123 changes: 15 additions & 108 deletions http/api.go
Original file line number Diff line number Diff line change
@@ -1,57 +1,25 @@
package http

import (
"bufio"
"net/http"

"github.com/ipfs/go-cid"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
"go.sia.tech/fsd/config"
"go.sia.tech/fsd/ipfs"
"go.sia.tech/fsd/sia"
"go.sia.tech/jape"
"go.sia.tech/renterd/worker"
"go.uber.org/zap"
)

type (
apiServer struct {
ipfs *ipfs.Node
sia *sia.Node
worker *worker.Client
log *zap.Logger

renterd config.Renterd
ipfs *ipfs.Node
sia *sia.Node
log *zap.Logger
}
)

func (as *apiServer) handleCalculate(jc jape.Context) {
ctx := jc.Request.Context()

body := jc.Request.Body
defer body.Close()

opts := sia.CIDOptions{
CIDBuilder: cid.V1Builder{Codec: uint64(multicodec.DagPb), MhType: multihash.SHA2_256},
RawLeaves: true,
}

if err := jc.DecodeForm("rawLeaves", &opts.RawLeaves); err != nil {
return
} else if err := jc.DecodeForm("maxLinks", &opts.MaxLinks); err != nil {
return
} else if err := jc.DecodeForm("blockSize", &opts.BlockSize); err != nil {
return
}

blocks, err := as.sia.CalculateBlocks(ctx, body, opts)
if err != nil {
jc.Error(err, http.StatusInternalServerError)
return
}
jc.Encode(blocks)
}

func (as *apiServer) handlePin(jc jape.Context) {
ctx := jc.Request.Context()
var cidStr string
Expand All @@ -65,106 +33,45 @@ func (as *apiServer) handlePin(jc jape.Context) {
}

// TODO: break this out for better support, the current implementation will
// not handle anything but standard unixfs files with the default block size
r, err := as.ipfs.DownloadCID(ctx, c, nil)
// not properly handle anything but standard unixfs files with the default
// block size
rr, err := as.ipfs.DownloadCID(ctx, c, nil)
if err != nil {
jc.Error(err, http.StatusInternalServerError)
return
}
defer r.Close()
defer rr.Close()

var opts sia.CIDOptions
var opts sia.UnixFSOptions
switch c.Version() {
case 1:
prefix := c.Prefix()
opts.CIDBuilder = cid.V1Builder{Codec: prefix.Codec, MhType: prefix.MhType, MhLength: prefix.MhLength}
opts.RawLeaves = true
case 0:
opts.CIDBuilder = cid.V0Builder{}
}

if err := as.sia.UploadCID(ctx, c, r, opts); err != nil {
jc.Error(err, http.StatusInternalServerError)
return
}
}

func (as *apiServer) handleUpload(jc jape.Context) {
ctx := jc.Request.Context()
var cidStr string
if err := jc.DecodeParam("cid", &cidStr); err != nil {
return
}
c, err := cid.Parse(cidStr)
if err != nil {
jc.Error(err, http.StatusBadRequest)
return
}

body := jc.Request.Body
defer body.Close()

var opts sia.CIDOptions

prefix := c.Prefix()
switch prefix.Version {
case 0:
opts.CIDBuilder = cid.V0Builder{}
case 1:
opts.CIDBuilder = cid.V1Builder{Codec: prefix.Codec, MhType: prefix.MhType, MhLength: prefix.MhLength}
opts.RawLeaves = true
}

if err := jc.DecodeForm("rawLeaves", &opts.RawLeaves); err != nil {
return
} else if err := jc.DecodeForm("maxLinks", &opts.MaxLinks); err != nil {
return
} else if err := jc.DecodeForm("blockSize", &opts.BlockSize); err != nil {
return
}

err = as.sia.UploadCID(ctx, c, body, opts)
br := bufio.NewReaderSize(rr, 256<<20) // 256 MiB
c, err = as.sia.UploadFile(jc.Request.Context(), br, opts)
if err != nil {
jc.Error(err, http.StatusInternalServerError)
return
}

// the root cid is the first block
// return the calculated cid
jc.Encode(c.String())
}

func (as *apiServer) handleVerifyCID(jc jape.Context) {
ctx := jc.Request.Context()
var cidStr string
if err := jc.DecodeParam("cid", &cidStr); err != nil {
return
}
cid, err := cid.Parse(cidStr)
if err != nil {
jc.Error(err, http.StatusBadRequest)
return
}

if err := as.sia.VerifyCID(ctx, cid); err != nil {
jc.Error(err, http.StatusInternalServerError)
return
}
}

// NewAPIHandler returns a new http.Handler that handles requests to the api
func NewAPIHandler(ipfs *ipfs.Node, sia *sia.Node, cfg config.Config, log *zap.Logger) http.Handler {
s := &apiServer{
worker: worker.NewClient(cfg.Renterd.Address, cfg.Renterd.Password),
renterd: cfg.Renterd,

ipfs: ipfs,
sia: sia,
log: log,
}
return jape.Mux(map[string]jape.Handler{
"POST /api/cid/calculate": s.handleCalculate,
"POST /api/cid/verify/:cid": s.handleVerifyCID,
"POST /api/upload/:cid": s.handleUpload,
"POST /api/pin/:cid": s.handlePin,
"POST /api/unixfs/calculate": s.handleUnixFSCalculate,
"POST /api/unixfs/upload": s.handleUnixFSUpload,
"POST /api/pin/:cid": s.handlePin,
})
}
85 changes: 85 additions & 0 deletions http/unixfs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package http

import (
"bufio"
"fmt"
"net/http"

chunker "github.com/ipfs/boxo/chunker"
ihelpers "github.com/ipfs/boxo/ipld/unixfs/importer/helpers"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
"go.sia.tech/fsd/sia"
"go.sia.tech/jape"
)

func (as *apiServer) handleUnixFSCalculate(jc jape.Context) {
ctx := jc.Request.Context()

body := jc.Request.Body
defer body.Close()

opts, ok := parseUnixFSOptions(jc)
if !ok {
return // error already handled
}

br := bufio.NewReaderSize(body, 256<<20) // 256 MiB
blocks, err := as.sia.CalculateBlocks(ctx, br, opts)
if err != nil {
jc.Error(err, http.StatusInternalServerError)
return
}
jc.Encode(blocks)
}

func (as *apiServer) handleUnixFSUpload(jc jape.Context) {
body := jc.Request.Body
defer body.Close()

opts, ok := parseUnixFSOptions(jc)
if !ok {
return // error already handled
}

br := bufio.NewReaderSize(body, 256<<20) // 256 MiB
c, err := as.sia.UploadFile(jc.Request.Context(), br, opts)
if err != nil {
jc.Error(err, http.StatusInternalServerError)
return
}

// return the calculated cid
jc.Encode(c.String())
}

func parseUnixFSOptions(jc jape.Context) (sia.UnixFSOptions, bool) {
cidVersion := 1
if err := jc.DecodeForm("version", &cidVersion); err != nil {
return sia.UnixFSOptions{}, false
}

opts := sia.UnixFSOptions{
MaxLinks: ihelpers.DefaultLinksPerBlock,
BlockSize: chunker.DefaultBlockSize,
}
switch cidVersion {
case 0:
opts.CIDBuilder = cid.V0Builder{}
case 1:
opts.CIDBuilder = cid.V1Builder{Codec: uint64(multicodec.DagPb), MhType: multihash.SHA2_256}
default:
jc.Error(fmt.Errorf("unsupported CID version: %d", cidVersion), http.StatusBadRequest)
return sia.UnixFSOptions{}, false
}

if err := jc.DecodeForm("rawLeaves", &opts.RawLeaves); err != nil {
return sia.UnixFSOptions{}, false
} else if err := jc.DecodeForm("maxLinks", &opts.MaxLinks); err != nil {
return sia.UnixFSOptions{}, false
} else if err := jc.DecodeForm("blockSize", &opts.BlockSize); err != nil {
return sia.UnixFSOptions{}, false
}
return opts, true
}
6 changes: 6 additions & 0 deletions ipfs/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/ipfs/boxo/ipld/merkledag"
fsio "github.com/ipfs/boxo/ipld/unixfs/io"
"github.com/ipfs/boxo/provider"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
format "github.com/ipfs/go-ipld-format"
Expand Down Expand Up @@ -59,6 +60,11 @@ func (n *Node) Close() error {
return nil
}

// GetBlock fetches a block from the IPFS network
func (n *Node) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
return n.blockService.GetBlock(ctx, c)
}

// DownloadCID downloads a CID from IPFS
func (n *Node) DownloadCID(ctx context.Context, c cid.Cid, path []string) (io.ReadSeekCloser, error) {
dagSess := merkledag.NewSession(ctx, n.dagService)
Expand Down
4 changes: 2 additions & 2 deletions sia/renterd.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

// downloadPartialData range requests in the worker client are broken
func downloadPartialData(cfg config.Renterd, key string, offset, length uint64) ([]byte, error) {
u, err := url.Parse(cfg.Address + "/objects/" + key)
u, err := url.Parse(cfg.WorkerAddress + "/objects/" + key)
if err != nil {
return nil, fmt.Errorf("failed to parse url: %w", err)
}
Expand All @@ -25,7 +25,7 @@ func downloadPartialData(cfg config.Renterd, key string, offset, length uint64)
return nil, fmt.Errorf("failed to create request: %w", err)
}

req.SetBasicAuth("", cfg.Password)
req.SetBasicAuth("", cfg.WorkerPassword)
req.Header.Add("Range", fmt.Sprintf("bytes=%v-%v", offset, offset+length-1))

resp, err := http.DefaultClient.Do(req)
Expand Down
Loading