diff --git a/api/handlers.go b/api/handlers.go index 9a385af..9d3a044 100644 --- a/api/handlers.go +++ b/api/handlers.go @@ -3,6 +3,7 @@ package api import ( "encoding/json" "net/http" + "time" "github.com/julienschmidt/httprouter" "github.com/skynetlabs/pinner/conf" @@ -12,6 +13,16 @@ import ( ) type ( + // ServerRemoveRequest describes a payload that marks a server as dead. + ServerRemoveRequest struct { + Server string `json:"server"` + } + // ServerRemoveResponse returns the removed server and the number of + // skylinks it was pinning. + ServerRemoveResponse struct { + Server string `json:"server"` + NumSkylinks int64 `json:"numSkylinks"` + } // HealthGET is the response type of GET /health HealthGET struct { DBAlive bool `json:"dbAlive"` @@ -95,6 +106,41 @@ func (api *API) unpinPOST(w http.ResponseWriter, req *http.Request, _ httprouter api.WriteSuccess(w) } +// serverRemovePOST informs pinner that a given server is dead and should be removed as +// pinner from all skylinks it's marked as pinning. +func (api *API) serverRemovePOST(w http.ResponseWriter, req *http.Request, _ httprouter.Params) { + var body ServerRemoveRequest + err := json.NewDecoder(req.Body).Decode(&body) + if err != nil { + api.WriteError(w, err, http.StatusBadRequest) + return + } + if body.Server == "" { + api.WriteError(w, errors.New("no server found in request body"), http.StatusBadRequest) + return + } + // Remove the server as pinner. + n, err := api.staticDB.RemoveServer(req.Context(), body.Server) + if err != nil { + api.WriteError(w, errors.AddContext(err, "failed to remove server"), http.StatusInternalServerError) + return + } + // Schedule a scan for underpinned skylinks in an hour, so all of them can + // be repinned ASAP but also all servers in the cluster will have enough + // time to get the memo for the scan. + t := time.Now().UTC().Add(time.Hour) + err = conf.SetNextScan(req.Context(), api.staticDB, t) + if err != nil { + api.WriteError(w, errors.AddContext(err, "failed to schedule a scan"), http.StatusInternalServerError) + return + } + resp := ServerRemoveResponse{ + Server: body.Server, + NumSkylinks: n, + } + api.WriteJSON(w, resp) +} + // sweepPOST instructs pinner to scan the list of skylinks pinned by skyd and // update its database. This call is non-blocking, i.e. it will immediately // return with a success and it will only start a new sweep if there isn't one diff --git a/api/routes.go b/api/routes.go index 605366d..f199a21 100644 --- a/api/routes.go +++ b/api/routes.go @@ -6,6 +6,7 @@ func (api *API) buildHTTPRoutes() { api.staticRouter.POST("/pin", api.pinPOST) api.staticRouter.POST("/unpin", api.unpinPOST) + api.staticRouter.POST("/server/remove", api.serverRemovePOST) api.staticRouter.POST("/sweep", api.sweepPOST) api.staticRouter.GET("/sweep/status", api.sweepStatusGET) } diff --git a/database/skylink.go b/database/skylink.go index 7543bad..b7868f6 100644 --- a/database/skylink.go +++ b/database/skylink.go @@ -141,6 +141,21 @@ func (db *DB) AddServerForSkylinks(ctx context.Context, skylinks []string, serve return err } +// RemoveServer removes the server as pinner from all skylinks in the database. +// Returns the number of skylinks from which the server was removed as pinner. +func (db *DB) RemoveServer(ctx context.Context, server string) (int64, error) { + filter := bson.M{"servers": server} + update := bson.M{"$pull": bson.M{"servers": server}} + ur, err := db.staticDB.Collection(collSkylinks).UpdateMany(ctx, filter, update) + if errors.Contains(err, mongo.ErrNoDocuments) { + return 0, nil + } + if err != nil { + return 0, err + } + return ur.ModifiedCount, nil +} + // RemoveServerFromSkylinks removes a server from the list of servers known to // be pinning these skylinks. If a skylink does not exist in the database it // will not be inserted. diff --git a/test/api/handlers_test.go b/test/api/handlers_test.go index 23e8ace..e6826f5 100644 --- a/test/api/handlers_test.go +++ b/test/api/handlers_test.go @@ -43,6 +43,7 @@ func TestHandlers(t *testing.T) { {name: "Health", test: testHandlerHealthGET}, {name: "Pin", test: testHandlerPinPOST}, {name: "Unpin", test: testHandlerUnpinPOST}, + {name: "ServerRemove", test: testServerRemovePOST}, {name: "Sweep", test: testHandlerSweep}, } @@ -161,6 +162,74 @@ func testHandlerUnpinPOST(t *testing.T, tt *test.Tester) { } } +// testServerRemovePOST tests "POST /server/remove" +func testServerRemovePOST(t *testing.T, tt *test.Tester) { + sl1 := test.RandomSkylink() + sl2 := test.RandomSkylink() + server := t.Name() + + // Pass empty server name. + _, _, err := tt.ServerRemovePOST("") + if err == nil || !strings.Contains(err.Error(), "no server found in request body") { + t.Fatalf("Expected '%s', got '%s'", "no server found in request body", err) + } + // Remove a non-existent server. Expect no error, zero skylinks. + r, status, err := tt.ServerRemovePOST(server) + if err != nil || status != http.StatusOK || r.NumSkylinks != 0 { + t.Fatalf("Expected no error, status 200, and zero skylinks affected, got error '%v', status %d and %d skylinks afffected", err, status, r.NumSkylinks) + } + // Create skylinks and mark them as pinned by the server. + _, err = tt.DB.CreateSkylink(tt.Ctx, sl1, server) + if err != nil { + t.Fatal(err) + } + _, err = tt.DB.CreateSkylink(tt.Ctx, sl2, server) + if err != nil { + t.Fatal(err) + } + // Remove the server. + r, status, err = tt.ServerRemovePOST(server) + if err != nil || status != http.StatusOK { + t.Fatal(status, err) + } + // Make sure there's a scan scheduled for about an hour later. + t0, err := conf.NextScan(tt.Ctx, tt.DB, tt.Logger) + if err != nil { + t.Fatal(err) + } + timeTarget := time.Now().UTC().Add(time.Hour) + tolerance := time.Minute + if t0.Before(timeTarget.Add(-1*tolerance)) || t0.After(timeTarget.Add(tolerance)) { + t.Fatalf("Expected the next scan to be in one hour (%s), got %s", timeTarget.String(), t0.String()) + } + // Make sure the response mentions two skylinks. + if r.NumSkylinks != 2 { + t.Fatalf("Expected 2 skylinks affected, got %d", r.NumSkylinks) + } + // Make sure the server is no longer marked as pinner for those two skylinks. + foundSl, err := tt.DB.FindSkylink(tt.Ctx, sl1) + if err != nil { + t.Fatal(err) + } + if foundSl.Skylink != sl1.String() { + t.Fatal("Unexpected skylink.") + } + if test.Contains(foundSl.Servers, server) { + t.Fatalf("Expected to not find '%s' in servers list, got '%v'", server, foundSl.Servers) + } + // Same for the second skylink. + foundSl, err = tt.DB.FindSkylink(tt.Ctx, sl2) + if err != nil { + t.Fatal(err) + } + if foundSl.Skylink != sl2.String() { + t.Fatal("Unexpected skylink.") + } + if test.Contains(foundSl.Servers, server) { + t.Fatalf("Expected to not find '%s' in servers list, got '%v'", server, foundSl.Servers) + } +} + // testHandlerSweep tests both "POST /sweep" and "GET /sweep/status" func testHandlerSweep(t *testing.T, tt *test.Tester) { // Prepare for the test by setting the state of skyd's mock. diff --git a/test/tester.go b/test/tester.go index fe60e83..d01549f 100644 --- a/test/tester.go +++ b/test/tester.go @@ -213,6 +213,16 @@ func (t *Tester) SetFollowRedirects(f bool) { t.FollowRedirects = f } +// ServerRemovePOST removes a server as pinner. +func (t *Tester) ServerRemovePOST(server string) (api.ServerRemoveResponse, int, error) { + body, err := json.Marshal(api.ServerRemoveRequest{ + Server: server, + }) + var resp api.ServerRemoveResponse + r, err := t.Request(http.MethodPost, "/server/remove", nil, body, nil, &resp) + return resp, r.StatusCode, err +} + // SweepPOST kicks off a background process which gets all files pinned by skyd // and marks them in the DB as pinned by the current server. It also goes over // all files in the DB that are marked as pinned by the local skyd and unmarks