diff --git a/go.mod b/go.mod index 695984d..d86e998 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.20 require ( github.com/ipfs/boxo v0.13.1 github.com/ipfs/kubo v0.23.0 + github.com/libp2p/go-libp2p v0.31.0 gopkg.in/yaml.v2 v2.4.0 ) @@ -85,7 +86,6 @@ require ( github.com/libp2p/go-cidranger v1.1.0 // indirect github.com/libp2p/go-doh-resolver v0.4.0 // indirect github.com/libp2p/go-flow-metrics v0.1.0 // indirect - github.com/libp2p/go-libp2p v0.31.0 // indirect github.com/libp2p/go-libp2p-asn-util v0.3.0 // indirect github.com/libp2p/go-libp2p-kad-dht v0.24.4 // indirect github.com/libp2p/go-libp2p-kbucket v0.6.3 // indirect diff --git a/main.go b/main.go index 14f5c0b..50be4b1 100644 --- a/main.go +++ b/main.go @@ -10,12 +10,15 @@ import ( "os" "os/signal" "path/filepath" + "time" iface "github.com/ipfs/boxo/coreiface" "github.com/ipfs/boxo/coreiface/options" + "github.com/ipfs/boxo/coreiface/path" "github.com/ipfs/boxo/files" "github.com/ipfs/kubo/core" "github.com/ipfs/kubo/core/coreapi" + "github.com/libp2p/go-libp2p/core/host" "gopkg.in/yaml.v2" ) @@ -40,32 +43,32 @@ func readConfig() (*Config, error) { func uploadHandler(api iface.CoreAPI, cfg *Config) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { - http.Error(w, "Only POST allowed", http.StatusMethodNotAllowed) + http.Error(w, "only POST allowed", http.StatusMethodNotAllowed) return } err := r.ParseMultipartForm(2 << 30) // 2GB limit if err != nil { - http.Error(w, "Couldn't parse multipart form", http.StatusBadRequest) + http.Error(w, "couldn't parse multipart form", http.StatusBadRequest) return } fileHeader := r.MultipartForm.File["file"] if len(fileHeader) == 0 { - http.Error(w, "No file provided", http.StatusBadRequest) + http.Error(w, "no file provided", http.StatusBadRequest) return } file, err := fileHeader[0].Open() if err != nil { - http.Error(w, "Couldn't open file", http.StatusInternalServerError) + http.Error(w, "couldn't open file", http.StatusInternalServerError) return } defer file.Close() data, err := io.ReadAll(file) if err != nil { - http.Error(w, "Unable to read the file", http.StatusBadRequest) + http.Error(w, "unable to read the file", http.StatusBadRequest) return } @@ -74,7 +77,7 @@ func uploadHandler(api iface.CoreAPI, cfg *Config) http.HandlerFunc { ctx := context.Background() cidFile, err := api.Unixfs().Add(ctx, files.NewBytesFile(data), options.Unixfs.Pin(false)) if err != nil { - http.Error(w, "Failed to add file", http.StatusInternalServerError) + http.Error(w, "failed to add file", http.StatusInternalServerError) return } @@ -85,17 +88,18 @@ func uploadHandler(api iface.CoreAPI, cfg *Config) http.HandlerFunc { } fmt.Fprintf(w, "CID: %s\n", cidFile.Cid()) - fmt.Fprintf(w, "%s/ipfs/%s\n", cfg.Domain, cidFile.Cid()) + fmt.Fprintf(w, "URL: %s/ipfs/%s\n", cfg.Domain, cidFile.Cid()) } } -func downloadHandler(cfg *Config) http.HandlerFunc { +func downloadHandler(api iface.CoreAPI, cfg *Config) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { cid := filepath.Base(r.URL.Path) + // Try to fetch the file from renterd req, err := http.NewRequest("GET", fmt.Sprintf("%s/api/worker/objects/%s?bucket=%s", cfg.RenterdURL, cid, cfg.RenterdBucket), nil) if err != nil { - http.Error(w, "Failed to create request", http.StatusInternalServerError) + http.Error(w, "failed to create request", http.StatusInternalServerError) return } @@ -105,23 +109,64 @@ func downloadHandler(cfg *Config) http.HandlerFunc { client := &http.Client{} resp, err := client.Do(req) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - defer resp.Body.Close() - for key, values := range resp.Header { - for _, value := range values { - w.Header().Set(key, value) + if err == nil && resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusPartialContent { + // If the file was found in renterd, serve it to the client + defer resp.Body.Close() + + for key, values := range resp.Header { + for _, value := range values { + w.Header().Set(key, value) + } } + + if r.Header.Get("Range") != "" { + w.WriteHeader(http.StatusPartialContent) + } + + fmt.Println("retreived from renterd: ", cid) + io.Copy(w, resp.Body) + return } - if r.Header.Get("Range") != "" { - w.WriteHeader(http.StatusPartialContent) + // If the file wasn't found in renterd or there was an error, try fetching from IPFS + ctx := context.Background() + // TODO: fetch from IPFS with range requests + data, err := api.Unixfs().Get(ctx, path.New(cid)) + if err != nil { + message := fmt.Sprintf("failed to retrieve from IPFS: %s", cid) + fmt.Println(message) + http.Error(w, message, http.StatusInternalServerError) + return } - io.Copy(w, resp.Body) + switch f := data.(type) { + case files.File: + bytes, err := io.ReadAll(f) + go func() { + if err != nil { + fmt.Println("failed to pin to renterd: ", cid) + return + } + err = saveFileToRenterd(bytes, cid, cfg) + if err != nil { + fmt.Println("failed to pin to renterd: ", cid) + return + } + fmt.Println("pinned to renterd: ", cid) + }() + w.Write(bytes) + case files.Directory: + message := fmt.Sprintf("resolves to a directory, not a file: %s", cid) + fmt.Println(message) + http.Error(w, message, http.StatusBadRequest) + return + default: + message := fmt.Sprintf("unknown data type: %s", cid) + fmt.Println(message) + http.Error(w, message, http.StatusInternalServerError) + return + } } } @@ -141,7 +186,7 @@ func saveFileToRenterd(data []byte, cid string, cfg *Config) error { return err } if resp.StatusCode != http.StatusOK { - return fmt.Errorf("failed to save file to renterd: %s", resp.Status) + return fmt.Errorf("failed to save to renterd: %s", resp.Status) } defer resp.Body.Close() @@ -152,44 +197,66 @@ func basicAuth(handler http.HandlerFunc, password string) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { user, pass, ok := r.BasicAuth() if !ok || user != "" || pass != password { - http.Error(w, "Unauthorized", http.StatusUnauthorized) + http.Error(w, "unauthorized", http.StatusUnauthorized) return } handler(w, r) } } +func printConnectedPeersEveryMinute(ctx context.Context, h host.Host) { + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + peerInfos := h.Network().Peers() + fmt.Printf("Connected to %d peers.\n", len(peerInfos)) + // for _, peerInfo := range peerInfos { + // fmt.Println(peerInfo) + // } + case <-ctx.Done(): + return + } + } +} + func main() { ctx := context.Background() cfg, err := readConfig() if err != nil { - fmt.Println("Failed to read config:", err) + fmt.Println("failed to read config:", err) return } fmt.Printf("renterd:\t%s\n", cfg.RenterdURL) fmt.Printf("bucket:\t\t%s\n", cfg.RenterdBucket) node, err := core.NewNode(ctx, &core.BuildCfg{ - Online: false, + Online: true, }) if err != nil { - fmt.Println("Failed to start IPFS node:", err) + fmt.Println("failed to start IPFS node:", err) return } defer node.Close() api, err := coreapi.NewCoreAPI(node) if err != nil { - fmt.Println("Failed to create core API") + fmt.Println("failed to create core API") return } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go printConnectedPeersEveryMinute(ctx, node.PeerHost) + http.HandleFunc("/upload", basicAuth(uploadHandler(api, cfg), cfg.UploadPassword)) - http.HandleFunc("/ipfs/", downloadHandler(cfg)) + http.HandleFunc("/ipfs/", downloadHandler(api, cfg)) server := &http.Server{Addr: ":8080"} go func() { - fmt.Println("Server running on :8080") + fmt.Println("server running on :8080") if err := server.ListenAndServe(); err != http.ErrServerClosed { fmt.Printf("ListenAndServe(): %s\n", err) } @@ -200,8 +267,8 @@ func main() { signal.Notify(stop, os.Interrupt) <-stop - fmt.Println("\nShutting down server...") + fmt.Println("\nshutting down server...") if err := server.Shutdown(ctx); err != nil { - fmt.Printf("Error shutting down: %s\n", err) + fmt.Printf("error shutting down: %s\n", err) } }