Skip to content

Commit

Permalink
Merge pull request #40 from SkynetLabs/ivo/schedule
Browse files Browse the repository at this point in the history
Implement a sweeping schedule.
  • Loading branch information
ro-tex authored Jul 28, 2022
2 parents 97da18e + b2976c2 commit b7dd636
Show file tree
Hide file tree
Showing 14 changed files with 509 additions and 201 deletions.
18 changes: 4 additions & 14 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ import (
"encoding/json"
"fmt"
"net/http"
"sync"
"time"

"github.com/julienschmidt/httprouter"
"github.com/skynetlabs/pinner/database"
"github.com/skynetlabs/pinner/logger"
"github.com/skynetlabs/pinner/skyd"
"github.com/skynetlabs/pinner/sweeper"
"gitlab.com/NebulousLabs/errors"
"gitlab.com/SkynetLabs/skyd/build"
)
Expand All @@ -23,17 +22,7 @@ type (
staticLogger logger.ExtFieldLogger
staticRouter *httprouter.Router
staticSkydClient skyd.Client

latestSweepStatus SweepStatus
latestSweepStatusMu sync.Mutex
}

// SweepStatus represents the status of a sweep.
SweepStatus struct {
InProgress bool
Error error
StartTime time.Time
EndTime time.Time
staticSweeper *sweeper.Sweeper
}

// errorWrap is a helper type for converting an `error` struct to JSON.
Expand All @@ -43,7 +32,7 @@ type (
)

// New returns a new initialised API.
func New(serverName string, db *database.DB, logger logger.ExtFieldLogger, skydClient skyd.Client) (*API, error) {
func New(serverName string, db *database.DB, logger logger.ExtFieldLogger, skydClient skyd.Client, sweeper *sweeper.Sweeper) (*API, error) {
if db == nil {
return nil, errors.New("no DB provided")
}
Expand All @@ -59,6 +48,7 @@ func New(serverName string, db *database.DB, logger logger.ExtFieldLogger, skydC
staticLogger: logger,
staticRouter: router,
staticSkydClient: skydClient,
staticSweeper: sweeper,
}
apiInstance.buildHTTPRoutes()
return apiInstance, nil
Expand Down
115 changes: 3 additions & 112 deletions api/handlers.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
package api

import (
"context"
"encoding/json"
"net/http"
"sync"
"time"

"github.com/julienschmidt/httprouter"
"github.com/skynetlabs/pinner/conf"
"github.com/skynetlabs/pinner/database"
"gitlab.com/NebulousLabs/errors"
"gitlab.com/SkynetLabs/skyd/build"
"gitlab.com/SkynetLabs/skyd/skymodules"
)

Expand Down Expand Up @@ -64,7 +60,7 @@ func (api *API) pinPOST(w http.ResponseWriter, req *http.Request, _ httprouter.P
// If the skylink already exists, add this server to its list of servers and
// mark the skylink as pinned.
if errors.Contains(err, database.ErrSkylinkExists) {
err = api.staticDB.AddServerForSkylink(req.Context(), sl, api.staticServerName, true)
err = api.staticDB.AddServerForSkylinks(req.Context(), []string{sl.String()}, api.staticServerName, true)
}
if err != nil {
api.WriteError(w, err, http.StatusInternalServerError)
Expand Down Expand Up @@ -105,25 +101,13 @@ func (api *API) unpinPOST(w http.ResponseWriter, req *http.Request, _ httprouter
// already running. The response is 202 Accepted and the response body contains
// an endpoint link on which the caller can check the status of the sweep.
func (api *API) sweepPOST(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
api.latestSweepStatusMu.Lock()
// If there is no sweep in progress - kick one off.
if !api.latestSweepStatus.InProgress {
go api.threadedPerformSweep()
}
api.latestSweepStatusMu.Unlock()
// TODO If we want to be able to uniquely identify sweeps we can issue ids
// for them and keep their statuses in a map. This would be the appropriate
// RESTful approach. I am not sure we need that because all we care about
// is to be able to kick off one and wait for it to end and this
// implementations is sufficient for that.
api.staticSweeper.Sweep()
api.WriteJSONCustomStatus(w, SweepPOSTResponse{"/sweep/status"}, http.StatusAccepted)
}

// sweepStatusGET responds with the status of the latest sweep.
func (api *API) sweepStatusGET(w http.ResponseWriter, _ *http.Request, _ httprouter.Params) {
api.latestSweepStatusMu.Lock()
defer api.latestSweepStatusMu.Unlock()
api.WriteJSON(w, api.latestSweepStatus)
api.WriteJSON(w, api.staticSweeper.Status())
}

// parseAndResolve parses the given string representation of a skylink and
Expand All @@ -146,96 +130,3 @@ func (api *API) parseAndResolve(skylink string) (skymodules.Skylink, error) {
}
return sl, nil
}

// threadedPerformSweep performs the actual sweep operation.
// TODO This can be moved to a separate `sweeper` type which would also hold the
// status of the latest sweep as well as the relevant mutex. This can
// streamline the structure of API. I recommend this as a F/U.
func (api *API) threadedPerformSweep() {
api.latestSweepStatusMu.Lock()
// Double-check for parallel sweeps.
if api.latestSweepStatus.InProgress {
api.latestSweepStatusMu.Unlock()
return
}
// Initialise the status to "a sweep is running".
api.latestSweepStatus = SweepStatus{
InProgress: true,
Error: nil,
StartTime: time.Now().UTC(),
EndTime: time.Time{},
}
api.latestSweepStatusMu.Unlock()
// Define an error variable which will represent the success of the scan.
var err error
// Ensure that we'll finalize the sweep on returning from this method.
defer func() {
api.latestSweepStatusMu.Lock()
api.latestSweepStatus.InProgress = false
api.latestSweepStatus.EndTime = time.Now().UTC()
api.latestSweepStatus.Error = err
api.latestSweepStatusMu.Unlock()
}()

// Perform the actual sweep.
wg := sync.WaitGroup{}
wg.Add(1)
var cacheErr error
go func() {
defer wg.Done()
res := api.staticSkydClient.RebuildCache()
<-res.ErrAvail
cacheErr = res.ExternErr
}()

// We use an independent context because we are not strictly bound to a
// specific API call. Also, this operation can take significant amount of
// time and we don't want it to fail because of a timeout.
ctx := context.Background()
dbCtx, cancel := context.WithDeadline(ctx, time.Now().UTC().Add(database.MongoDefaultTimeout))
defer cancel()

// Get pinned skylinks from the DB
dbSkylinks, err := api.staticDB.SkylinksForServer(dbCtx, api.staticServerName)
if err != nil {
err = errors.AddContext(err, "failed to fetch skylinks for server")
return
}
wg.Wait()
if cacheErr != nil {
err = errors.AddContext(cacheErr, "failed to rebuild skyd cache")
return
}

unknown, missing := api.staticSkydClient.DiffPinnedSkylinks(dbSkylinks)

// Remove all unknown skylink from the database.
var s skymodules.Skylink
for _, sl := range unknown {
s, err = database.SkylinkFromString(sl)
if err != nil {
err = errors.AddContext(err, "invalid skylink found in DB")
build.Critical(err)
continue
}
err = api.staticDB.RemoveServerFromSkylink(ctx, s, api.staticServerName)
if err != nil {
err = errors.AddContext(err, "failed to unpin skylink")
return
}
}
// Add all missing skylinks to the database.
for _, sl := range missing {
s, err = database.SkylinkFromString(sl)
if err != nil {
err = errors.AddContext(err, "invalid skylink reported by skyd")
build.Critical(err)
continue
}
err = api.staticDB.AddServerForSkylink(ctx, s, api.staticServerName, false)
if err != nil {
err = errors.AddContext(err, "failed to unpin skylink")
return
}
}
}
34 changes: 17 additions & 17 deletions database/skylink.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,20 +113,20 @@ func (db *DB) MarkUnpinned(ctx context.Context, skylink skymodules.Skylink) erro
return err
}

// AddServerForSkylink adds a new server to the list of servers known to be pinning
// this skylink. If the skylink does not already exist in the database it will
// be inserted. This operation is idempotent.
// AddServerForSkylinks adds a new server to the list of servers known to be
// pinning these skylinks. If a skylink does not already exist in the database
// it will be inserted. This operation is idempotent.
//
// The `markPinned` flag sets the `unpin` field of the skylink to false when
// The `markPinned` flag sets the `unpin` field of a skylink to false when
// raised but it doesn't set it to false when not raised. The reason for that is
// that it accommodates a specific use case - adding a server to the list of
// pinners of a given skylink will set the unpin field to false is we are doing
// that because we know that a user is pinning it but not so if we are running
// a server sweep and documenting which skylinks are pinned by this server.
func (db *DB) AddServerForSkylink(ctx context.Context, skylink skymodules.Skylink, server string, markPinned bool) error {
db.staticLogger.Tracef("Entering AddServerForSkylink. Skylink: '%s', server: '%s'", skylink, server)
defer db.staticLogger.Tracef("Exiting AddServerForSkylink. Skylink: '%s', server: '%s'", skylink, server)
filter := bson.M{"skylink": skylink.String()}
func (db *DB) AddServerForSkylinks(ctx context.Context, skylinks []string, server string, markPinned bool) error {
db.staticLogger.Tracef("Entering AddServerForSkylinks. Skylink: '%v', server: '%s'", skylinks, server)
defer db.staticLogger.Tracef("Exiting AddServerForSkylinks. Skylink: '%v', server: '%s'", skylinks, server)
filter := bson.M{"skylink": bson.M{"$in": skylinks}}
var update bson.M
if markPinned {
update = bson.M{
Expand All @@ -137,22 +137,22 @@ func (db *DB) AddServerForSkylink(ctx context.Context, skylink skymodules.Skylin
update = bson.M{"$addToSet": bson.M{"servers": server}}
}
opts := options.Update().SetUpsert(true)
_, err := db.staticDB.Collection(collSkylinks).UpdateOne(ctx, filter, update, opts)
_, err := db.staticDB.Collection(collSkylinks).UpdateMany(ctx, filter, update, opts)
return err
}

// RemoveServerFromSkylink removes a server to the list of servers known to be
// pinning this skylink. If the skylink does not exist in the database it will
// not be inserted.
func (db *DB) RemoveServerFromSkylink(ctx context.Context, skylink skymodules.Skylink, server string) error {
db.staticLogger.Tracef("Entering RemoveServerFromSkylink. Skylink: '%s', server: '%s'", skylink, server)
defer db.staticLogger.Tracef("Exiting RemoveServerFromSkylink. Skylink: '%s', server: '%s'", skylink, server)
// RemoveServerFromSkylinks removes a server from the list of servers known to
// be pinning these skylinks. If a skylink does not exist in the database it
// will not be inserted.
func (db *DB) RemoveServerFromSkylinks(ctx context.Context, skylinks []string, server string) error {
db.staticLogger.Tracef("Entering RemoveServerFromSkylinks. Skylink: '%v', server: '%s'", skylinks, server)
defer db.staticLogger.Tracef("Exiting RemoveServerFromSkylinks. Skylink: '%v', server: '%s'", skylinks, server)
filter := bson.M{
"skylink": skylink.String(),
"skylink": bson.M{"$in": skylinks},
"servers": server,
}
update := bson.M{"$pull": bson.M{"servers": server}}
_, err := db.staticDB.Collection(collSkylinks).UpdateOne(ctx, filter, update)
_, err := db.staticDB.Collection(collSkylinks).UpdateMany(ctx, filter, update)
return err
}

Expand Down
6 changes: 5 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/skynetlabs/pinner/database"
"github.com/skynetlabs/pinner/logger"
"github.com/skynetlabs/pinner/skyd"
"github.com/skynetlabs/pinner/sweeper"
"github.com/skynetlabs/pinner/workers"
"gitlab.com/NebulousLabs/errors"
)
Expand Down Expand Up @@ -48,9 +49,12 @@ func main() {
if err != nil {
log.Fatal(errors.AddContext(err, "failed to start Scanner"))
}
swpr := sweeper.New(db, skydClient, cfg.ServerName, logger)
// Schedule a regular sweep..
swpr.UpdateSchedule(sweeper.SweepInterval)

// Initialise the server.
server, err := api.New(cfg.ServerName, db, logger, skydClient)
server, err := api.New(cfg.ServerName, db, logger, skydClient, swpr)
if err != nil {
log.Fatal(errors.AddContext(err, "failed to build the api"))
}
Expand Down
13 changes: 12 additions & 1 deletion skyd/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (c *ClientMock) Pin(skylink string) (skymodules.SiaPath, error) {
func (c *ClientMock) RebuildCache() RebuildCacheResult {
closedCh := make(chan struct{})
close(closedCh)
// Do some work. There are tests which rely on this value to be above 50ms.
// Do some work. There are tests which rely on this value being above 50ms.
time.Sleep(100 * time.Millisecond)
return RebuildCacheResult{
errAvail: closedCh,
Expand Down Expand Up @@ -262,3 +262,14 @@ func (c *ClientMock) MockFilesystem() []string {

return []string{slR0, slA1, slA2, slC0, slC1, slB0}
}

// Skylinks returns a list of all skylinks being held by this mock.
func (c *ClientMock) Skylinks() []string {
c.mu.Lock()
sls := make([]string, len(c.skylinks), 0)
for sl := range c.skylinks {
sls = append(sls, sl)
}
c.mu.Unlock()
return sls
}
69 changes: 69 additions & 0 deletions sweeper/schedule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package sweeper

import (
"sync"
"time"
)

type (
// schedule defines how often, if at all, we sweep this server automatically.
schedule struct {
period time.Duration
cancelCh chan struct{}
mu sync.Mutex
}
)

// Close cancels any running sweeper thread.Returns true if it closed a running
// thread and false otherwise.
func (s *schedule) Close() bool {
s.mu.Lock()
defer s.mu.Unlock()
if isOpen(s.cancelCh) {
close(s.cancelCh)
return true
}
return false
}

// Update schedules a new series of sweeps to be run, using the given Sweeper.
// If there are already scheduled sweeps, that schedule is cancelled (running
// sweeps are not interrupted) and a new schedule is established.
func (s *schedule) Update(period time.Duration, sweeper *Sweeper) {
s.mu.Lock()
defer s.mu.Unlock()

if isOpen(s.cancelCh) {
close(s.cancelCh)
}

s.period = period
s.cancelCh = make(chan struct{})

go func() {
t := time.NewTicker(s.period)
defer t.Stop()
for {
select {
case <-t.C:
sweeper.Sweep()
case <-s.cancelCh:
return
}
}
}()
}

// isOpen checks whether a channel is open (and not nil).
// The question the function answers is "Can I close this?"
func isOpen(ch chan struct{}) bool {
if ch == nil {
return false
}
select {
case _, open := <-ch:
return open
default:
return true
}
}
Loading

0 comments on commit b7dd636

Please sign in to comment.