Skip to content

Commit

Permalink
Merge pull request #3 from SiaFoundation/nate/directory-support
Browse files Browse the repository at this point in the history
Directory support
  • Loading branch information
n8maninger authored Nov 19, 2023
2 parents f21a96a + 0992f0a commit 09189f6
Show file tree
Hide file tree
Showing 17 changed files with 841 additions and 347 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
filestore
config.yml
data
bin
bin
.DS_Store
Binary file removed ape0.png
Binary file not shown.
36 changes: 30 additions & 6 deletions cmd/fsd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
shttp "go.sia.tech/fsd/http"
"go.sia.tech/fsd/ipfs"
"go.sia.tech/fsd/persist/badger"
"go.sia.tech/fsd/sia"
"go.sia.tech/jape"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand All @@ -40,6 +41,9 @@ var (
API: config.API{
Address: ":8081",
},
Log: config.Log{
Level: "info",
},
}
)

Expand Down Expand Up @@ -91,6 +95,24 @@ func main() {

mustLoadConfig(dir, log)

var level zap.AtomicLevel
switch cfg.Log.Level {
case "debug":
level = zap.NewAtomicLevelAt(zap.DebugLevel)
case "info":
level = zap.NewAtomicLevelAt(zap.InfoLevel)
case "warn":
level = zap.NewAtomicLevelAt(zap.WarnLevel)
case "error":
level = zap.NewAtomicLevelAt(zap.ErrorLevel)
default:
log.Fatal("invalid log level", zap.String("level", cfg.Log.Level))
}

log = log.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
return zapcore.NewCore(consoleEncoder, zapcore.Lock(os.Stdout), level)
}))

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
defer cancel()

Expand Down Expand Up @@ -118,13 +140,15 @@ func main() {
log.Fatal("failed to generate private key", zap.Error(err))
}
}
store := ipfs.NewRenterdBlockStore(db, cfg.Renterd, log.Named("blockstore"))
store := sia.NewBlockStore(db, cfg.Renterd, log.Named("blockstore"))

node, err := ipfs.NewNode(ctx, privateKey, cfg.IPFS, store)
inode, err := ipfs.NewNode(ctx, privateKey, cfg.IPFS, store)
if err != nil {
log.Fatal("failed to start ipfs node", zap.Error(err))
}
defer node.Close()
defer inode.Close()

snode := sia.New(db, cfg.Renterd, log.Named("sia"))

apiListener, err := net.Listen("tcp", cfg.API.Address)
if err != nil {
Expand All @@ -139,12 +163,12 @@ func main() {
defer gatewayListener.Close()

apiServer := &http.Server{
Handler: jape.BasicAuth(cfg.API.Password)(shttp.NewAPIHandler(node, db, cfg, log.Named("api"))),
Handler: jape.BasicAuth(cfg.API.Password)(shttp.NewAPIHandler(inode, snode, cfg, log.Named("api"))),
}
defer apiServer.Close()

gatewayServer := &http.Server{
Handler: shttp.NewIPFSHandler(node, db, cfg, log.Named("gateway")),
Handler: shttp.NewIPFSGatewayHandler(inode, snode, cfg, log.Named("gateway")),
}
defer gatewayServer.Close()

Expand All @@ -167,7 +191,7 @@ func main() {
prettyKey := "ed25519:" + hex.EncodeToString(buf)

log.Info("fsd started",
zap.Stringer("peerID", node.PeerID()),
zap.Stringer("peerID", inode.PeerID()),
zap.String("privateKey", prettyKey),
zap.String("apiAddress", apiListener.Addr().String()),
zap.String("gatewayAddress", gatewayListener.Addr().String()),
Expand Down
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@ type (
Password string `yaml:"password"`
}

// Log contains the log settings
Log struct {
Level string `yaml:"level"`
}

// Config contains the configuration for fsd
Config struct {
Renterd Renterd `yaml:"renterd"`
IPFS IPFS `yaml:"ipfs"`
API API `yaml:"api"`
Log Log `yaml:"log"`
}
)
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ go 1.21

require (
github.com/dgraph-io/badger/v4 v4.2.0
github.com/gogo/protobuf v1.3.2
github.com/ipfs/boxo v0.13.2-0.20231005100652-b3886904df5d
github.com/ipfs/go-block-format v0.1.2
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-ipld-format v0.5.0
github.com/libp2p/go-libp2p v0.31.0
github.com/libp2p/go-libp2p-kad-dht v0.24.4
github.com/multiformats/go-multiaddr v0.11.0
github.com/multiformats/go-multihash v0.2.3
go.sia.tech/jape v0.9.1-0.20230525021720-ecf031ecbffb
go.sia.tech/renterd v0.6.1-0.20231005151658-e450d5902e31
go.uber.org/zap v1.25.0
Expand Down Expand Up @@ -46,7 +46,6 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.1.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/mock v1.6.0 // indirect
Expand Down Expand Up @@ -115,6 +114,7 @@ require (
github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect
github.com/multiformats/go-multibase v0.2.0 // indirect
github.com/multiformats/go-multicodec v0.9.0 // indirect
github.com/multiformats/go-multihash v0.2.3 // indirect
github.com/multiformats/go-multistream v0.4.1 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/onsi/ginkgo/v2 v2.11.0 // indirect
Expand Down
84 changes: 14 additions & 70 deletions http/api.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,35 @@
package http

import (
"context"
"io"
"net/http"

"github.com/ipfs/go-cid"
"go.sia.tech/fsd/config"
"go.sia.tech/fsd/ipfs"
"go.sia.tech/fsd/sia"
"go.sia.tech/jape"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/worker"
"go.uber.org/zap"
)

type (
// A Store is a persistent store for IPFS blocks
Store interface {
GetBlock(context.Context, cid.Cid) (ipfs.Block, error)
AddBlocks(context.Context, []ipfs.Block) error
}

apiServer struct {
node *ipfs.Node
ipfs *ipfs.Node
sia *sia.Node
worker *worker.Client
store Store
log *zap.Logger

renterd config.Renterd
}
)

func (as *apiServer) handleCalculate(jc jape.Context) {
ctx := jc.Request.Context()

body := jc.Request.Body
defer body.Close()

blocks, err := ipfs.BuildBalancedCID("test", body)
blocks, err := as.sia.CalculateBlocks(ctx, body)
if err != nil {
jc.Error(err, http.StatusInternalServerError)
return
Expand All @@ -55,39 +49,14 @@ func (as *apiServer) handlePin(jc jape.Context) {
return
}

r, err := as.node.DownloadCID(ctx, cid)
r, err := as.ipfs.DownloadCID(ctx, cid, nil)
if err != nil {
jc.Error(err, http.StatusInternalServerError)
return
}
defer r.Close()

pr, pw := io.Pipe()
tr := io.TeeReader(r, pw)
uploadErr := make(chan error, 1)

go func() {
defer pw.Close()
defer close(uploadErr)

_, err = as.worker.UploadObject(ctx, tr, cid.Hash().B58String(), api.UploadWithBucket(as.renterd.Bucket))
if err != nil {
uploadErr <- err
}
}()

blocks, err := ipfs.BuildBalancedCID(cidStr, pr)
if err != nil {
jc.Error(err, http.StatusInternalServerError)
return
}

if err := <-uploadErr; err != nil {
jc.Error(err, http.StatusInternalServerError)
return
}

if err := as.store.AddBlocks(ctx, blocks); err != nil {
if err := as.sia.UploadCID(ctx, cid, r); err != nil {
jc.Error(err, http.StatusInternalServerError)
return
}
Expand All @@ -108,50 +77,25 @@ func (as *apiServer) handleUpload(jc jape.Context) {
body := jc.Request.Body
defer body.Close()

pr, pw := io.Pipe()
r := io.TeeReader(body, pw)
uploadErr := make(chan error, 1)

go func() {
defer pw.Close()
defer close(uploadErr)

_, err = as.worker.UploadObject(ctx, r, cid.Hash().B58String(), api.UploadWithBucket(as.renterd.Bucket))
if err != nil {
uploadErr <- err
}
}()

blocks, err := ipfs.BuildBalancedCID(cidStr, pr)
err = as.sia.UploadCID(ctx, cid, body)
if err != nil {
jc.Error(err, http.StatusInternalServerError)
return
}

if err := <-uploadErr; err != nil {
jc.Error(err, http.StatusInternalServerError)
return
}

if err := as.store.AddBlocks(ctx, blocks); err != nil {
jc.Error(err, http.StatusInternalServerError)
return
}
// the root cid is the first block
rootCID := blocks[0].CID
jc.Encode(rootCID.Hash().B58String())
as.log.Info("uploaded cid", zap.String("rootCID", rootCID.Hash().B58String()), zap.Int("blocks", len(blocks)))
jc.Encode(cid.Hash().B58String())
}

// NewAPIHandler returns a new http.Handler that handles requests to the api
func NewAPIHandler(node *ipfs.Node, store Store, cfg config.Config, log *zap.Logger) http.Handler {
func NewAPIHandler(ipfs *ipfs.Node, sia *sia.Node, cfg config.Config, log *zap.Logger) http.Handler {
s := &apiServer{
worker: worker.NewClient(cfg.Renterd.Address, cfg.Renterd.Password),
renterd: cfg.Renterd,

node: node,
store: store,
log: log,
ipfs: ipfs,
sia: sia,
log: log,
}
return jape.Mux(map[string]jape.Handler{
"POST /api/cid/calculate": s.handleCalculate,
Expand Down
Loading

0 comments on commit 09189f6

Please sign in to comment.