Skip to content

Commit

Permalink
Merge pull request #45 from SkynetLabs/ivo/limit_repin
Browse files Browse the repository at this point in the history
Limit repinning underpinned skylinks to underutilized servers
  • Loading branch information
ro-tex authored Aug 5, 2022
2 parents bffc0c0 + 162a389 commit 72b3a87
Show file tree
Hide file tree
Showing 18 changed files with 650 additions and 74 deletions.
44 changes: 36 additions & 8 deletions api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,29 @@ package api

import (
"encoding/json"
"gitlab.com/SkynetLabs/skyd/build"
"net/http"
"time"

"github.com/julienschmidt/httprouter"
"github.com/skynetlabs/pinner/conf"
"github.com/skynetlabs/pinner/database"
"github.com/skynetlabs/pinner/lib"
"gitlab.com/NebulousLabs/errors"
"gitlab.com/SkynetLabs/skyd/skymodules"
)

var (
// SleepBeforeForcedScan is used when we schedule a scan because something
// important happened with the cluster, i.e. a server was marked as dead or
// new empty servers were added and we want them to start repinning ASAP.
SleepBeforeForcedScan = build.Select(build.Var{
Standard: time.Hour,
Dev: 10 * time.Second,
Testing: time.Second,
}).(time.Duration)
)

type (
// ServerRemoveRequest describes a payload that marks a server as dead.
ServerRemoveRequest struct {
Expand Down Expand Up @@ -119,19 +132,34 @@ func (api *API) serverRemovePOST(w http.ResponseWriter, req *http.Request, _ htt
api.WriteError(w, errors.New("no server found in request body"), http.StatusBadRequest)
return
}
ctx := req.Context()
// Schedule a scan for underpinned skylinks in an hour (unless one is
// already pending), so all of them can be repinned ASAP but also all
// servers in the cluster will have enough time to get the memo for the scan.
t := lib.Now().Add(SleepBeforeForcedScan)
t0, err := conf.NextScan(ctx, api.staticDB, api.staticLogger)
// We just set it when we encounter an error because we can get such an
// error in two cases - there is no next scan scheduled or there is a
// problem with the DB. In the first case we want to schedule one and in the
// second we'll get the error again with the next operation.
if err != nil || t0.After(t) {
err1 := conf.SetNextScan(ctx, api.staticDB, t)
if err != nil {
err = errors.Compose(err1, errors.AddContext(err, "failed to fetch next scan"))
api.WriteError(w, errors.AddContext(err, "failed to schedule a scan"), http.StatusInternalServerError)
return
}
}
// Remove the server as pinner.
n, err := api.staticDB.RemoveServer(req.Context(), body.Server)
n, err := api.staticDB.RemoveServer(ctx, body.Server)
if err != nil {
api.WriteError(w, errors.AddContext(err, "failed to remove server"), http.StatusInternalServerError)
return
}
// Schedule a scan for underpinned skylinks in an hour, so all of them can
// be repinned ASAP but also all servers in the cluster will have enough
// time to get the memo for the scan.
t := time.Now().UTC().Add(time.Hour)
err = conf.SetNextScan(req.Context(), api.staticDB, t)
if err != nil {
api.WriteError(w, errors.AddContext(err, "failed to schedule a scan"), http.StatusInternalServerError)
// Remove the server's load.
err = api.staticDB.DeleteServerLoad(ctx, body.Server)
if err != nil && !errors.Contains(err, database.ErrServerLoadNotFound) {
api.WriteError(w, errors.AddContext(err, "failed to clean up server's load records"), http.StatusInternalServerError)
return
}
resp := ServerRemoveResponse{
Expand Down
11 changes: 6 additions & 5 deletions conf/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package conf
import (
"context"
"fmt"
"github.com/skynetlabs/pinner/lib"
"log"
"os"
"strconv"
Expand Down Expand Up @@ -62,7 +63,7 @@ const (

const (
// TimeFormat defines the time format we'll use throughout the service.
TimeFormat = time.RFC3339
TimeFormat = time.RFC3339Nano
)

var (
Expand Down Expand Up @@ -248,7 +249,7 @@ func NextScan(ctx context.Context, db *database.DB, logger logger.Logger) (time.
if errors.Contains(err, mongo.ErrNoDocuments) {
logger.Infof("Missing database value for '%s', setting a new one.", ConfNextScan)
// No scan has been scheduled. Schedule one in an hour.
scanTime := time.Now().Add(DefaultNextScanOffset).UTC()
scanTime := lib.Now().Add(DefaultNextScanOffset)
err = SetNextScan(ctx, db, scanTime)
if err != nil {
return time.Time{}, err
Expand All @@ -264,19 +265,19 @@ func NextScan(ctx context.Context, db *database.DB, logger logger.Logger) (time.
logger.Error(errMsg)
build.Critical(errors.AddContext(err, "potential programmer error"))
// The values in the database is unusable. Schedule a scan in an hour.
scanTime := time.Now().Add(DefaultNextScanOffset).UTC()
scanTime := lib.Now().Add(DefaultNextScanOffset)
err = SetNextScan(ctx, db, scanTime)
if err != nil {
return time.Time{}, err
}
return scanTime, nil
}
return t, nil
return t.UTC().Truncate(time.Millisecond), nil
}

// SetNextScan sets the time of the next cluster-wide scan for underpinned files.
func SetNextScan(ctx context.Context, db *database.DB, t time.Time) error {
if t.Before(time.Now().UTC().Add(SleepBetweenChecksForScan)) {
if t.Before(lib.Now().Add(SleepBetweenChecksForScan)) {
return ErrTimeTooSoon
}
return db.SetConfigValue(ctx, ConfNextScan, t.UTC().Format(TimeFormat))
Expand Down
4 changes: 4 additions & 0 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ var (
// collConfig defines the name of the collection which will hold the
// cluster-wide service configuration.
collConfig = "configuration"
// collServerLoad defines the name of the collection which will hold
// information about each server's load in terms of amount of data it's
// pinning
collServerLoad = "server_load"
// collSkylinks defines the name of the collection which will hold
// information about skylinks
collSkylinks = "skylinks"
Expand Down
22 changes: 16 additions & 6 deletions database/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,22 @@ import (
// databases and are iterating over the schema at the same time.
func schema() map[string][]mongo.IndexModel {
return map[string][]mongo.IndexModel{
collConfig: {
{
Keys: bson.D{{"key", 1}},
Options: options.Index().SetName("key").SetUnique(true),
},
},
collServerLoad: {
{
Keys: bson.D{{"server_name", 1}},
Options: options.Index().SetName("server_name").SetUnique(true),
},
{
Keys: bson.D{{"load", 1}},
Options: options.Index().SetName("load"),
},
},
collSkylinks: {
{
Keys: bson.D{{"skylink", 1}},
Expand All @@ -36,11 +52,5 @@ func schema() map[string][]mongo.IndexModel {
Options: options.Index().SetName("pinned"),
},
},
collConfig: {
{
Keys: bson.D{{"key", 1}},
Options: options.Index().SetName("key").SetUnique(true),
},
},
}
}
96 changes: 96 additions & 0 deletions database/serverload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package database

import (
"context"
"gitlab.com/NebulousLabs/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

var (
// ErrServerLoadNotFound is returned when we don't have a record for the
// server's load.
ErrServerLoadNotFound = errors.New("server load not found")
)

// DeleteServerLoad removes the load info for this server. We should use this
// when we mark a server as dead.
func (db *DB) DeleteServerLoad(ctx context.Context, server string) error {
filter := bson.M{"server_name": server}
dr, err := db.staticDB.Collection(collServerLoad).DeleteOne(ctx, filter)
if err == mongo.ErrNoDocuments || dr.DeletedCount == 0 {
return ErrServerLoadNotFound
}
return nil
}

// ServerLoad returns the server load stored in the database.
func (db *DB) ServerLoad(ctx context.Context, server string) (int64, error) {
filter := bson.M{"server_name": server}
sr := db.staticDB.Collection(collServerLoad).FindOne(ctx, filter)
if sr.Err() == mongo.ErrNoDocuments {
return 0, ErrServerLoadNotFound
}
if sr.Err() != nil {
return 0, sr.Err()
}
res := struct {
Load int64
}{}
err := sr.Decode(&res)
if err != nil {
return 0, err
}
return res.Load, nil
}

// ServerLoadPosition returns the position of the server in the list of servers
// based on its load. It also returns the total number of servers.
func (db *DB) ServerLoadPosition(ctx context.Context, server string) (int, int, error) {
// Fetch all server loads, order by load in descending order.
opts := &options.FindOptions{
Sort: bson.M{"load": -1},
}
c, err := db.staticDB.Collection(collServerLoad).Find(ctx, bson.M{}, opts)
if err != nil {
return 0, 0, err
}
found := false
position := 0
cur := struct {
ServerName string `bson:"server_name"`
Load int64 `bson:"load"`
}{}
// Loop over all server loads until we find the one we need.
for c.Next(ctx) {
err = c.Decode(&cur)
if err != nil {
return 0, 0, err
}
if cur.ServerName == server {
found = true
break
}
position++
}
if found {
return position, position + 1 + c.RemainingBatchLength(), nil
}
return 0, 0, ErrServerLoadNotFound
}

// SetServerLoad stores the load of this server in the database.
func (db *DB) SetServerLoad(ctx context.Context, server string, load int64) error {
if server == "" {
return errors.New("invalid server name")
}
filter := bson.M{"server_name": server}
update := bson.M{"$set": bson.M{
"server_name": server,
"load": load,
}}
opts := options.Update().SetUpsert(true)
_, err := db.staticDB.Collection(collServerLoad).UpdateOne(ctx, filter, update, opts)
return err
}
5 changes: 3 additions & 2 deletions database/skylink.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package database

import (
"context"
"github.com/skynetlabs/pinner/lib"
"time"

"gitlab.com/NebulousLabs/errors"
Expand Down Expand Up @@ -197,13 +198,13 @@ func (db *DB) FindAndLockUnderpinned(ctx context.Context, server string, minPinn
// Unlocked.
"$or": bson.A{
bson.M{"lock_expires": bson.M{"$exists": false}},
bson.M{"lock_expires": bson.M{"$lt": time.Now().UTC().Truncate(time.Millisecond)}},
bson.M{"lock_expires": bson.M{"$lt": lib.Now()}},
},
}
update := bson.M{
"$set": bson.M{
"locked_by": server,
"lock_expires": time.Now().UTC().Add(LockDuration).Truncate(time.Millisecond),
"lock_expires": lib.Now().Add(LockDuration),
},
}
opts := options.FindOneAndUpdate().SetReturnDocument(options.After)
Expand Down
12 changes: 12 additions & 0 deletions database/units.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package database

const (
// KiB kilobyte
KiB = 1024
// MiB megabyte
MiB = 1024 * KiB
// GiB gigabyte
GiB = 1024 * MiB
// TiB terabyte
TiB = 1024 * GiB
)
8 changes: 8 additions & 0 deletions lib/lib.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package lib

import "time"

// Now returns the current time in UTC, truncated to milliseconds.
func Now() time.Time {
return time.Now().UTC().Truncate(time.Millisecond)
}
15 changes: 15 additions & 0 deletions skyd/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
type (
// ClientMock is a mock of skyd.Client
ClientMock struct {
contractData uint64
fileHealth map[skymodules.SiaPath]float64
filesystemMock map[skymodules.SiaPath]rdReturnType
metadata map[string]skymodules.SkyfileMetadata
Expand Down Expand Up @@ -41,6 +42,20 @@ func NewSkydClientMock() *ClientMock {
}
}

// ContractData returns the total data from Active and Passive contracts.
func (c *ClientMock) ContractData() (uint64, error) {
c.mu.Lock()
defer c.mu.Unlock()
return c.contractData, nil
}

// SetContractData sets the contract data value returned by the mock.
func (c *ClientMock) SetContractData(n uint64) {
c.mu.Lock()
defer c.mu.Unlock()
c.contractData = n
}

// DiffPinnedSkylinks is a carbon copy of PinnedSkylinksCache's version of the
// method.
func (c *ClientMock) DiffPinnedSkylinks(skylinks []string) (unknown []string, missing []string) {
Expand Down
18 changes: 18 additions & 0 deletions skyd/skyd.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ var (
type (
// Client describes the interface exposed by client.
Client interface {
// ContractData returns the total data from Active and Passive contracts.
ContractData() (uint64, error)
// DiffPinnedSkylinks returns two lists of skylinks - the ones that
// belong to the given list but are not pinned by skyd (unknown) and the
// ones that are pinned by skyd but are not on the list (missing).
Expand Down Expand Up @@ -68,6 +70,22 @@ func NewClient(host, port, password string, cache *PinnedSkylinksCache, logger l
}
}

// ContractData returns the total data from Active and Passive contracts.
func (c *client) ContractData() (uint64, error) {
rcs, err := c.staticClient.RenterContractsGet()
if err != nil {
return 0, err
}
data := uint64(0)
for _, ctr := range rcs.ActiveContracts {
data += ctr.Size
}
for _, ctr := range rcs.PassiveContracts {
data += ctr.Size
}
return data, nil
}

// DiffPinnedSkylinks returns two lists of skylinks - the ones that belong to
// the given list but are not pinned by skyd (unknown) and the ones that are
// pinned by skyd but are not on the list (missing).
Expand Down
5 changes: 3 additions & 2 deletions sweeper/status.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sweeper

import (
"github.com/skynetlabs/pinner/lib"
"github.com/skynetlabs/pinner/logger"
"sync"
"time"
Expand Down Expand Up @@ -36,7 +37,7 @@ func (st *status) Start() {
// Initialise the status to "a sweep is running".
st.status.InProgress = true
st.status.Error = nil
st.status.StartTime = time.Now().UTC()
st.status.StartTime = lib.Now()
st.status.EndTime = time.Time{}
st.mu.Unlock()
st.staticLogger.Info("Started a sweep.")
Expand All @@ -58,7 +59,7 @@ func (st *status) Finalize(err error) {
return
}
st.status.InProgress = false
st.status.EndTime = time.Now().UTC()
st.status.EndTime = lib.Now()
st.status.Error = err
st.mu.Unlock()
st.staticLogger.Info("Finalized a sweep.")
Expand Down
Loading

0 comments on commit 72b3a87

Please sign in to comment.