Skip to content

Commit

Permalink
Merge pull request #44 from SkynetLabs/ivo/mark_dead
Browse files Browse the repository at this point in the history
Mark server as dead/removed
  • Loading branch information
ro-tex authored Aug 4, 2022
2 parents c8a3959 + bb73acb commit bffc0c0
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 0 deletions.
46 changes: 46 additions & 0 deletions api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"encoding/json"
"net/http"
"time"

"github.com/julienschmidt/httprouter"
"github.com/skynetlabs/pinner/conf"
Expand All @@ -12,6 +13,16 @@ import (
)

type (
// ServerRemoveRequest describes a payload that marks a server as dead.
ServerRemoveRequest struct {
Server string `json:"server"`
}
// ServerRemoveResponse returns the removed server and the number of
// skylinks it was pinning.
ServerRemoveResponse struct {
Server string `json:"server"`
NumSkylinks int64 `json:"numSkylinks"`
}
// HealthGET is the response type of GET /health
HealthGET struct {
DBAlive bool `json:"dbAlive"`
Expand Down Expand Up @@ -95,6 +106,41 @@ func (api *API) unpinPOST(w http.ResponseWriter, req *http.Request, _ httprouter
api.WriteSuccess(w)
}

// serverRemovePOST informs pinner that a given server is dead and should be removed as
// pinner from all skylinks it's marked as pinning.
func (api *API) serverRemovePOST(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
var body ServerRemoveRequest
err := json.NewDecoder(req.Body).Decode(&body)
if err != nil {
api.WriteError(w, err, http.StatusBadRequest)
return
}
if body.Server == "" {
api.WriteError(w, errors.New("no server found in request body"), http.StatusBadRequest)
return
}
// Remove the server as pinner.
n, err := api.staticDB.RemoveServer(req.Context(), body.Server)
if err != nil {
api.WriteError(w, errors.AddContext(err, "failed to remove server"), http.StatusInternalServerError)
return
}
// Schedule a scan for underpinned skylinks in an hour, so all of them can
// be repinned ASAP but also all servers in the cluster will have enough
// time to get the memo for the scan.
t := time.Now().UTC().Add(time.Hour)
err = conf.SetNextScan(req.Context(), api.staticDB, t)
if err != nil {
api.WriteError(w, errors.AddContext(err, "failed to schedule a scan"), http.StatusInternalServerError)
return
}
resp := ServerRemoveResponse{
Server: body.Server,
NumSkylinks: n,
}
api.WriteJSON(w, resp)
}

// sweepPOST instructs pinner to scan the list of skylinks pinned by skyd and
// update its database. This call is non-blocking, i.e. it will immediately
// return with a success and it will only start a new sweep if there isn't one
Expand Down
1 change: 1 addition & 0 deletions api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ func (api *API) buildHTTPRoutes() {

api.staticRouter.POST("/pin", api.pinPOST)
api.staticRouter.POST("/unpin", api.unpinPOST)
api.staticRouter.POST("/server/remove", api.serverRemovePOST)
api.staticRouter.POST("/sweep", api.sweepPOST)
api.staticRouter.GET("/sweep/status", api.sweepStatusGET)
}
15 changes: 15 additions & 0 deletions database/skylink.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,21 @@ func (db *DB) AddServerForSkylinks(ctx context.Context, skylinks []string, serve
return err
}

// RemoveServer removes the server as pinner from all skylinks in the database.
// Returns the number of skylinks from which the server was removed as pinner.
func (db *DB) RemoveServer(ctx context.Context, server string) (int64, error) {
filter := bson.M{"servers": server}
update := bson.M{"$pull": bson.M{"servers": server}}
ur, err := db.staticDB.Collection(collSkylinks).UpdateMany(ctx, filter, update)
if errors.Contains(err, mongo.ErrNoDocuments) {
return 0, nil
}
if err != nil {
return 0, err
}
return ur.ModifiedCount, nil
}

// RemoveServerFromSkylinks removes a server from the list of servers known to
// be pinning these skylinks. If a skylink does not exist in the database it
// will not be inserted.
Expand Down
69 changes: 69 additions & 0 deletions test/api/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func TestHandlers(t *testing.T) {
{name: "Health", test: testHandlerHealthGET},
{name: "Pin", test: testHandlerPinPOST},
{name: "Unpin", test: testHandlerUnpinPOST},
{name: "ServerRemove", test: testServerRemovePOST},
{name: "Sweep", test: testHandlerSweep},
}

Expand Down Expand Up @@ -161,6 +162,74 @@ func testHandlerUnpinPOST(t *testing.T, tt *test.Tester) {
}
}

// testServerRemovePOST tests "POST /server/remove"
func testServerRemovePOST(t *testing.T, tt *test.Tester) {
sl1 := test.RandomSkylink()
sl2 := test.RandomSkylink()
server := t.Name()

// Pass empty server name.
_, _, err := tt.ServerRemovePOST("")
if err == nil || !strings.Contains(err.Error(), "no server found in request body") {
t.Fatalf("Expected '%s', got '%s'", "no server found in request body", err)
}
// Remove a non-existent server. Expect no error, zero skylinks.
r, status, err := tt.ServerRemovePOST(server)
if err != nil || status != http.StatusOK || r.NumSkylinks != 0 {
t.Fatalf("Expected no error, status 200, and zero skylinks affected, got error '%v', status %d and %d skylinks afffected", err, status, r.NumSkylinks)
}
// Create skylinks and mark them as pinned by the server.
_, err = tt.DB.CreateSkylink(tt.Ctx, sl1, server)
if err != nil {
t.Fatal(err)
}
_, err = tt.DB.CreateSkylink(tt.Ctx, sl2, server)
if err != nil {
t.Fatal(err)
}
// Remove the server.
r, status, err = tt.ServerRemovePOST(server)
if err != nil || status != http.StatusOK {
t.Fatal(status, err)
}
// Make sure there's a scan scheduled for about an hour later.
t0, err := conf.NextScan(tt.Ctx, tt.DB, tt.Logger)
if err != nil {
t.Fatal(err)
}
timeTarget := time.Now().UTC().Add(time.Hour)
tolerance := time.Minute
if t0.Before(timeTarget.Add(-1*tolerance)) || t0.After(timeTarget.Add(tolerance)) {
t.Fatalf("Expected the next scan to be in one hour (%s), got %s", timeTarget.String(), t0.String())
}
// Make sure the response mentions two skylinks.
if r.NumSkylinks != 2 {
t.Fatalf("Expected 2 skylinks affected, got %d", r.NumSkylinks)
}
// Make sure the server is no longer marked as pinner for those two skylinks.
foundSl, err := tt.DB.FindSkylink(tt.Ctx, sl1)
if err != nil {
t.Fatal(err)
}
if foundSl.Skylink != sl1.String() {
t.Fatal("Unexpected skylink.")
}
if test.Contains(foundSl.Servers, server) {
t.Fatalf("Expected to not find '%s' in servers list, got '%v'", server, foundSl.Servers)
}
// Same for the second skylink.
foundSl, err = tt.DB.FindSkylink(tt.Ctx, sl2)
if err != nil {
t.Fatal(err)
}
if foundSl.Skylink != sl2.String() {
t.Fatal("Unexpected skylink.")
}
if test.Contains(foundSl.Servers, server) {
t.Fatalf("Expected to not find '%s' in servers list, got '%v'", server, foundSl.Servers)
}
}

// testHandlerSweep tests both "POST /sweep" and "GET /sweep/status"
func testHandlerSweep(t *testing.T, tt *test.Tester) {
// Prepare for the test by setting the state of skyd's mock.
Expand Down
10 changes: 10 additions & 0 deletions test/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,16 @@ func (t *Tester) SetFollowRedirects(f bool) {
t.FollowRedirects = f
}

// ServerRemovePOST removes a server as pinner.
func (t *Tester) ServerRemovePOST(server string) (api.ServerRemoveResponse, int, error) {
body, err := json.Marshal(api.ServerRemoveRequest{
Server: server,
})
var resp api.ServerRemoveResponse
r, err := t.Request(http.MethodPost, "/server/remove", nil, body, nil, &resp)
return resp, r.StatusCode, err
}

// SweepPOST kicks off a background process which gets all files pinned by skyd
// and marks them in the DB as pinned by the current server. It also goes over
// all files in the DB that are marked as pinned by the local skyd and unmarks
Expand Down

0 comments on commit bffc0c0

Please sign in to comment.