Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Directory support #3

Merged
merged 4 commits into from
Nov 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
filestore
config.yml
data
bin
bin
.DS_Store
Binary file removed ape0.png
Binary file not shown.
36 changes: 30 additions & 6 deletions cmd/fsd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
shttp "go.sia.tech/fsd/http"
"go.sia.tech/fsd/ipfs"
"go.sia.tech/fsd/persist/badger"
"go.sia.tech/fsd/sia"
"go.sia.tech/jape"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -40,6 +41,9 @@ var (
API: config.API{
Address: ":8081",
},
Log: config.Log{
Level: "info",
},
}
)

Expand Down Expand Up @@ -91,6 +95,24 @@ func main() {

mustLoadConfig(dir, log)

var level zap.AtomicLevel
switch cfg.Log.Level {
case "debug":
level = zap.NewAtomicLevelAt(zap.DebugLevel)
case "info":
level = zap.NewAtomicLevelAt(zap.InfoLevel)
case "warn":
level = zap.NewAtomicLevelAt(zap.WarnLevel)
case "error":
level = zap.NewAtomicLevelAt(zap.ErrorLevel)
default:
log.Fatal("invalid log level", zap.String("level", cfg.Log.Level))
}

log = log.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
return zapcore.NewCore(consoleEncoder, zapcore.Lock(os.Stdout), level)
}))

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
defer cancel()

Expand Down Expand Up @@ -118,13 +140,15 @@ func main() {
log.Fatal("failed to generate private key", zap.Error(err))
}
}
store := ipfs.NewRenterdBlockStore(db, cfg.Renterd, log.Named("blockstore"))
store := sia.NewBlockStore(db, cfg.Renterd, log.Named("blockstore"))

node, err := ipfs.NewNode(ctx, privateKey, cfg.IPFS, store)
inode, err := ipfs.NewNode(ctx, privateKey, cfg.IPFS, store)
if err != nil {
log.Fatal("failed to start ipfs node", zap.Error(err))
}
defer node.Close()
defer inode.Close()

snode := sia.New(db, cfg.Renterd, log.Named("sia"))

apiListener, err := net.Listen("tcp", cfg.API.Address)
if err != nil {
Expand All @@ -139,12 +163,12 @@ func main() {
defer gatewayListener.Close()

apiServer := &http.Server{
Handler: jape.BasicAuth(cfg.API.Password)(shttp.NewAPIHandler(node, db, cfg, log.Named("api"))),
Handler: jape.BasicAuth(cfg.API.Password)(shttp.NewAPIHandler(inode, snode, cfg, log.Named("api"))),
}
defer apiServer.Close()

gatewayServer := &http.Server{
Handler: shttp.NewIPFSHandler(node, db, cfg, log.Named("gateway")),
Handler: shttp.NewIPFSGatewayHandler(inode, snode, cfg, log.Named("gateway")),
}
defer gatewayServer.Close()

Expand All @@ -167,7 +191,7 @@ func main() {
prettyKey := "ed25519:" + hex.EncodeToString(buf)

log.Info("fsd started",
zap.Stringer("peerID", node.PeerID()),
zap.Stringer("peerID", inode.PeerID()),
zap.String("privateKey", prettyKey),
zap.String("apiAddress", apiListener.Addr().String()),
zap.String("gatewayAddress", gatewayListener.Addr().String()),
Expand Down
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@ type (
Password string `yaml:"password"`
}

// Log contains the log settings
Log struct {
Level string `yaml:"level"`
}

// Config contains the configuration for fsd
Config struct {
Renterd Renterd `yaml:"renterd"`
IPFS IPFS `yaml:"ipfs"`
API API `yaml:"api"`
Log Log `yaml:"log"`
}
)
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ go 1.21

require (
github.com/dgraph-io/badger/v4 v4.2.0
github.com/gogo/protobuf v1.3.2
github.com/ipfs/boxo v0.13.2-0.20231005100652-b3886904df5d
github.com/ipfs/go-block-format v0.1.2
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-ipld-format v0.5.0
github.com/libp2p/go-libp2p v0.31.0
github.com/libp2p/go-libp2p-kad-dht v0.24.4
github.com/multiformats/go-multiaddr v0.11.0
github.com/multiformats/go-multihash v0.2.3
go.sia.tech/jape v0.9.1-0.20230525021720-ecf031ecbffb
go.sia.tech/renterd v0.6.1-0.20231005151658-e450d5902e31
go.uber.org/zap v1.25.0
Expand Down Expand Up @@ -46,7 +46,6 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.1.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/mock v1.6.0 // indirect
Expand Down Expand Up @@ -115,6 +114,7 @@ require (
github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect
github.com/multiformats/go-multibase v0.2.0 // indirect
github.com/multiformats/go-multicodec v0.9.0 // indirect
github.com/multiformats/go-multihash v0.2.3 // indirect
github.com/multiformats/go-multistream v0.4.1 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/onsi/ginkgo/v2 v2.11.0 // indirect
Expand Down
84 changes: 14 additions & 70 deletions http/api.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,35 @@
package http

import (
"context"
"io"
"net/http"

"github.com/ipfs/go-cid"
"go.sia.tech/fsd/config"
"go.sia.tech/fsd/ipfs"
"go.sia.tech/fsd/sia"
"go.sia.tech/jape"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/worker"
"go.uber.org/zap"
)

type (
// A Store is a persistent store for IPFS blocks
Store interface {
GetBlock(context.Context, cid.Cid) (ipfs.Block, error)
AddBlocks(context.Context, []ipfs.Block) error
}

apiServer struct {
node *ipfs.Node
ipfs *ipfs.Node
sia *sia.Node
worker *worker.Client
store Store
log *zap.Logger

renterd config.Renterd
}
)

func (as *apiServer) handleCalculate(jc jape.Context) {
ctx := jc.Request.Context()

body := jc.Request.Body
defer body.Close()

blocks, err := ipfs.BuildBalancedCID("test", body)
blocks, err := as.sia.CalculateBlocks(ctx, body)
if err != nil {
jc.Error(err, http.StatusInternalServerError)
return
Expand All @@ -55,39 +49,14 @@ func (as *apiServer) handlePin(jc jape.Context) {
return
}

r, err := as.node.DownloadCID(ctx, cid)
r, err := as.ipfs.DownloadCID(ctx, cid, nil)
if err != nil {
jc.Error(err, http.StatusInternalServerError)
return
}
defer r.Close()

pr, pw := io.Pipe()
tr := io.TeeReader(r, pw)
uploadErr := make(chan error, 1)

go func() {
defer pw.Close()
defer close(uploadErr)

_, err = as.worker.UploadObject(ctx, tr, cid.Hash().B58String(), api.UploadWithBucket(as.renterd.Bucket))
if err != nil {
uploadErr <- err
}
}()

blocks, err := ipfs.BuildBalancedCID(cidStr, pr)
if err != nil {
jc.Error(err, http.StatusInternalServerError)
return
}

if err := <-uploadErr; err != nil {
jc.Error(err, http.StatusInternalServerError)
return
}

if err := as.store.AddBlocks(ctx, blocks); err != nil {
if err := as.sia.UploadCID(ctx, cid, r); err != nil {
jc.Error(err, http.StatusInternalServerError)
return
}
Expand All @@ -108,50 +77,25 @@ func (as *apiServer) handleUpload(jc jape.Context) {
body := jc.Request.Body
defer body.Close()

pr, pw := io.Pipe()
r := io.TeeReader(body, pw)
uploadErr := make(chan error, 1)

go func() {
defer pw.Close()
defer close(uploadErr)

_, err = as.worker.UploadObject(ctx, r, cid.Hash().B58String(), api.UploadWithBucket(as.renterd.Bucket))
if err != nil {
uploadErr <- err
}
}()

blocks, err := ipfs.BuildBalancedCID(cidStr, pr)
err = as.sia.UploadCID(ctx, cid, body)
if err != nil {
jc.Error(err, http.StatusInternalServerError)
return
}

if err := <-uploadErr; err != nil {
jc.Error(err, http.StatusInternalServerError)
return
}

if err := as.store.AddBlocks(ctx, blocks); err != nil {
jc.Error(err, http.StatusInternalServerError)
return
}
// the root cid is the first block
rootCID := blocks[0].CID
jc.Encode(rootCID.Hash().B58String())
as.log.Info("uploaded cid", zap.String("rootCID", rootCID.Hash().B58String()), zap.Int("blocks", len(blocks)))
jc.Encode(cid.Hash().B58String())
}

// NewAPIHandler returns a new http.Handler that handles requests to the api
func NewAPIHandler(node *ipfs.Node, store Store, cfg config.Config, log *zap.Logger) http.Handler {
func NewAPIHandler(ipfs *ipfs.Node, sia *sia.Node, cfg config.Config, log *zap.Logger) http.Handler {
s := &apiServer{
worker: worker.NewClient(cfg.Renterd.Address, cfg.Renterd.Password),
renterd: cfg.Renterd,

node: node,
store: store,
log: log,
ipfs: ipfs,
sia: sia,
log: log,
}
return jape.Mux(map[string]jape.Handler{
"POST /api/cid/calculate": s.handleCalculate,
Expand Down
Loading