Skip to content

Commit

Permalink
feat: ability to fetch from ipfs, auto pin to renterd
Browse files Browse the repository at this point in the history
  • Loading branch information
alexfreska committed Oct 6, 2023
1 parent 2e1f4d2 commit 1d6d6ae
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 31 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.20
require (
github.com/ipfs/boxo v0.13.1
github.com/ipfs/kubo v0.23.0
github.com/libp2p/go-libp2p v0.31.0
gopkg.in/yaml.v2 v2.4.0
)

Expand Down Expand Up @@ -85,7 +86,6 @@ require (
github.com/libp2p/go-cidranger v1.1.0 // indirect
github.com/libp2p/go-doh-resolver v0.4.0 // indirect
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p v0.31.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.3.0 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.24.4 // indirect
github.com/libp2p/go-libp2p-kbucket v0.6.3 // indirect
Expand Down
127 changes: 97 additions & 30 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ import (
"os"
"os/signal"
"path/filepath"
"time"

iface "github.com/ipfs/boxo/coreiface"
"github.com/ipfs/boxo/coreiface/options"
"github.com/ipfs/boxo/coreiface/path"
"github.com/ipfs/boxo/files"
"github.com/ipfs/kubo/core"
"github.com/ipfs/kubo/core/coreapi"
"github.com/libp2p/go-libp2p/core/host"
"gopkg.in/yaml.v2"
)

Expand All @@ -40,32 +43,32 @@ func readConfig() (*Config, error) {
func uploadHandler(api iface.CoreAPI, cfg *Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Only POST allowed", http.StatusMethodNotAllowed)
http.Error(w, "only POST allowed", http.StatusMethodNotAllowed)
return
}

err := r.ParseMultipartForm(2 << 30) // 2GB limit
if err != nil {
http.Error(w, "Couldn't parse multipart form", http.StatusBadRequest)
http.Error(w, "couldn't parse multipart form", http.StatusBadRequest)
return
}

fileHeader := r.MultipartForm.File["file"]
if len(fileHeader) == 0 {
http.Error(w, "No file provided", http.StatusBadRequest)
http.Error(w, "no file provided", http.StatusBadRequest)
return
}

file, err := fileHeader[0].Open()
if err != nil {
http.Error(w, "Couldn't open file", http.StatusInternalServerError)
http.Error(w, "couldn't open file", http.StatusInternalServerError)
return
}
defer file.Close()

data, err := io.ReadAll(file)
if err != nil {
http.Error(w, "Unable to read the file", http.StatusBadRequest)
http.Error(w, "unable to read the file", http.StatusBadRequest)
return
}

Expand All @@ -74,7 +77,7 @@ func uploadHandler(api iface.CoreAPI, cfg *Config) http.HandlerFunc {
ctx := context.Background()
cidFile, err := api.Unixfs().Add(ctx, files.NewBytesFile(data), options.Unixfs.Pin(false))
if err != nil {
http.Error(w, "Failed to add file", http.StatusInternalServerError)
http.Error(w, "failed to add file", http.StatusInternalServerError)
return
}

Expand All @@ -85,17 +88,18 @@ func uploadHandler(api iface.CoreAPI, cfg *Config) http.HandlerFunc {
}

fmt.Fprintf(w, "CID: %s\n", cidFile.Cid())
fmt.Fprintf(w, "%s/ipfs/%s\n", cfg.Domain, cidFile.Cid())
fmt.Fprintf(w, "URL: %s/ipfs/%s\n", cfg.Domain, cidFile.Cid())
}
}

func downloadHandler(cfg *Config) http.HandlerFunc {
func downloadHandler(api iface.CoreAPI, cfg *Config) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
cid := filepath.Base(r.URL.Path)

// Try to fetch the file from renterd
req, err := http.NewRequest("GET", fmt.Sprintf("%s/api/worker/objects/%s?bucket=%s", cfg.RenterdURL, cid, cfg.RenterdBucket), nil)
if err != nil {
http.Error(w, "Failed to create request", http.StatusInternalServerError)
http.Error(w, "failed to create request", http.StatusInternalServerError)
return
}

Expand All @@ -105,23 +109,64 @@ func downloadHandler(cfg *Config) http.HandlerFunc {

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer resp.Body.Close()

for key, values := range resp.Header {
for _, value := range values {
w.Header().Set(key, value)
if err == nil && resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusPartialContent {
// If the file was found in renterd, serve it to the client
defer resp.Body.Close()

for key, values := range resp.Header {
for _, value := range values {
w.Header().Set(key, value)
}
}

if r.Header.Get("Range") != "" {
w.WriteHeader(http.StatusPartialContent)
}

fmt.Println("retreived from renterd: ", cid)
io.Copy(w, resp.Body)
return
}

if r.Header.Get("Range") != "" {
w.WriteHeader(http.StatusPartialContent)
// If the file wasn't found in renterd or there was an error, try fetching from IPFS
ctx := context.Background()
// TODO: fetch from IPFS with range requests
data, err := api.Unixfs().Get(ctx, path.New(cid))
if err != nil {
message := fmt.Sprintf("failed to retrieve from IPFS: %s", cid)
fmt.Println(message)
http.Error(w, message, http.StatusInternalServerError)
return
}

io.Copy(w, resp.Body)
switch f := data.(type) {
case files.File:
bytes, err := io.ReadAll(f)
go func() {
if err != nil {
fmt.Println("failed to pin to renterd: ", cid)
return
}
err = saveFileToRenterd(bytes, cid, cfg)
if err != nil {
fmt.Println("failed to pin to renterd: ", cid)
return
}
fmt.Println("pinned to renterd: ", cid)
}()
w.Write(bytes)
case files.Directory:
message := fmt.Sprintf("resolves to a directory, not a file: %s", cid)
fmt.Println(message)
http.Error(w, message, http.StatusBadRequest)
return
default:
message := fmt.Sprintf("unknown data type: %s", cid)
fmt.Println(message)
http.Error(w, message, http.StatusInternalServerError)
return
}
}
}

Expand All @@ -141,7 +186,7 @@ func saveFileToRenterd(data []byte, cid string, cfg *Config) error {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("failed to save file to renterd: %s", resp.Status)
return fmt.Errorf("failed to save to renterd: %s", resp.Status)
}
defer resp.Body.Close()

Expand All @@ -152,44 +197,66 @@ func basicAuth(handler http.HandlerFunc, password string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
user, pass, ok := r.BasicAuth()
if !ok || user != "" || pass != password {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
handler(w, r)
}
}

func printConnectedPeersEveryMinute(ctx context.Context, h host.Host) {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()

for {
select {
case <-ticker.C:
peerInfos := h.Network().Peers()
fmt.Printf("Connected to %d peers.\n", len(peerInfos))
// for _, peerInfo := range peerInfos {
// fmt.Println(peerInfo)
// }
case <-ctx.Done():
return
}
}
}

func main() {
ctx := context.Background()
cfg, err := readConfig()
if err != nil {
fmt.Println("Failed to read config:", err)
fmt.Println("failed to read config:", err)
return
}
fmt.Printf("renterd:\t%s\n", cfg.RenterdURL)
fmt.Printf("bucket:\t\t%s\n", cfg.RenterdBucket)
node, err := core.NewNode(ctx, &core.BuildCfg{
Online: false,
Online: true,
})
if err != nil {
fmt.Println("Failed to start IPFS node:", err)
fmt.Println("failed to start IPFS node:", err)
return
}
defer node.Close()

api, err := coreapi.NewCoreAPI(node)
if err != nil {
fmt.Println("Failed to create core API")
fmt.Println("failed to create core API")
return
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go printConnectedPeersEveryMinute(ctx, node.PeerHost)

http.HandleFunc("/upload", basicAuth(uploadHandler(api, cfg), cfg.UploadPassword))
http.HandleFunc("/ipfs/", downloadHandler(cfg))
http.HandleFunc("/ipfs/", downloadHandler(api, cfg))

server := &http.Server{Addr: ":8080"}

go func() {
fmt.Println("Server running on :8080")
fmt.Println("server running on :8080")
if err := server.ListenAndServe(); err != http.ErrServerClosed {
fmt.Printf("ListenAndServe(): %s\n", err)
}
Expand All @@ -200,8 +267,8 @@ func main() {
signal.Notify(stop, os.Interrupt)

<-stop
fmt.Println("\nShutting down server...")
fmt.Println("\nshutting down server...")
if err := server.Shutdown(ctx); err != nil {
fmt.Printf("Error shutting down: %s\n", err)
fmt.Printf("error shutting down: %s\n", err)
}
}

0 comments on commit 1d6d6ae

Please sign in to comment.