Skip to content

Commit

Permalink
Add Status and Sweeper tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
ro-tex committed Jul 26, 2022
1 parent cb39cf4 commit b2976c2
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 55 deletions.
13 changes: 12 additions & 1 deletion skyd/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (c *ClientMock) Pin(skylink string) (skymodules.SiaPath, error) {
func (c *ClientMock) RebuildCache() RebuildCacheResult {
closedCh := make(chan struct{})
close(closedCh)
// Do some work. There are tests which rely on this value to be above 50ms.
// Do some work. There are tests which rely on this value being above 50ms.
time.Sleep(100 * time.Millisecond)
return RebuildCacheResult{
errAvail: closedCh,
Expand Down Expand Up @@ -262,3 +262,14 @@ func (c *ClientMock) MockFilesystem() []string {

return []string{slR0, slA1, slA2, slC0, slC1, slB0}
}

// Skylinks returns a list of all skylinks being held by this mock.
func (c *ClientMock) Skylinks() []string {
c.mu.Lock()
sls := make([]string, len(c.skylinks), 0)
for sl := range c.skylinks {
sls = append(sls, sl)
}
c.mu.Unlock()
return sls
}
13 changes: 13 additions & 0 deletions sweeper/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ type (
}
)

// Close cancels any running sweeper thread.Returns true if it closed a running
// thread and false otherwise.
func (s *schedule) Close() bool {
s.mu.Lock()
defer s.mu.Unlock()
if isOpen(s.cancelCh) {
close(s.cancelCh)
return true
}
return false
}

// Update schedules a new series of sweeps to be run, using the given Sweeper.
// If there are already scheduled sweeps, that schedule is cancelled (running
// sweeps are not interrupted) and a new schedule is established.
Expand All @@ -30,6 +42,7 @@ func (s *schedule) Update(period time.Duration, sweeper *Sweeper) {

go func() {
t := time.NewTicker(s.period)
defer t.Stop()
for {
select {
case <-t.C:
Expand Down
64 changes: 64 additions & 0 deletions sweeper/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package sweeper

import (
"github.com/skynetlabs/pinner/logger"
"sync"
"time"
)

type (
// Status represents the status of a sweep.
Status struct {
InProgress bool
Error error
StartTime time.Time
EndTime time.Time
}
// status is the internal status type that allows thread-safe updates.
status struct {
mu sync.Mutex
staticLogger logger.ExtFieldLogger
status Status
}
)

// Start marks the start of a new process, unless one is already in progress.
// If there is a process in progress then Start returns without any action.
func (st *status) Start() {
st.mu.Lock()
// Double-check for parallel sweeps.
if st.status.InProgress {
st.mu.Unlock()
st.staticLogger.Debug("Attempted to start a sweep while another one was already ongoing.")
return
}
// Initialise the status to "a sweep is running".
st.status.InProgress = true
st.status.Error = nil
st.status.StartTime = time.Now().UTC()
st.status.EndTime = time.Time{}
st.mu.Unlock()
st.staticLogger.Info("Started a sweep.")
}

// Status returns a copy of the current status.
func (st *status) Status() Status {
st.mu.Lock()
s := st.status
st.mu.Unlock()
return s
}

// Finalize marks a run as completed with the given error.
func (st *status) Finalize(err error) {
st.mu.Lock()
if !st.status.InProgress {
st.mu.Unlock()
return
}
st.status.InProgress = false
st.status.EndTime = time.Now().UTC()
st.status.Error = err
st.mu.Unlock()
st.staticLogger.Info("Finalized a sweep.")
}
69 changes: 69 additions & 0 deletions sweeper/status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package sweeper

import (
"github.com/sirupsen/logrus"
"gitlab.com/NebulousLabs/errors"
"io/ioutil"
"testing"
"time"
)

// TestStatus ensures the basic operation of the status type.
func TestStatus(t *testing.T) {
logger := logrus.New()
logger.Out = ioutil.Discard

s := &status{staticLogger: logger}
sentinelErr := errors.New("this should not get set")

// isEmpty is a helper that returns true when the given status has its zero
// value.
isEmpty := func(st Status) bool {
return !(st.InProgress || st.Error != nil || (st.EndTime != time.Time{}) || (st.StartTime != time.Time{}))
}

// Check the status, expect not in progress.
st := s.Status()
if !isEmpty(st) {
t.Fatalf("Status not empty: %+v", st)
}
// Try to finalize before starting, expect nothing to happen.
s.Finalize(sentinelErr)
st = s.Status()
if !isEmpty(st) {
t.Fatalf("Status not empty: %+v", st)
}
// Start and verify.
s.Start()
st = s.Status()
if !st.InProgress || st.StartTime.After(time.Now().UTC()) {
t.Fatalf("Unexpected status: %+v", st)
}
// Store the start time and verify that attempting to start again will not
// change it.
startTime := st.StartTime
s.Start()
st = s.Status()
if st.StartTime != startTime {
t.Fatalf("Expected start time '%s', got '%s'", startTime, st.StartTime)
}
// Finalize and verify.
s.Finalize(sentinelErr)
st = s.Status()
if st.InProgress || !errors.Contains(st.Error, sentinelErr) || (st.EndTime == time.Time{}) {
t.Fatalf("Unexpected status: %+v", st)
}
// Save end time and verify that finalising again has no effect.
endTime := st.EndTime
s.Finalize(nil)
st = s.Status()
if st.EndTime != endTime || !errors.Contains(st.Error, sentinelErr) {
t.Fatalf("Unexpected status: %+v", st)
}
// Start the same status again.
s.Start()
st = s.Status()
if !st.InProgress || st.Error != nil {
t.Fatalf("Unexpected status: %+v", st)
}
}
53 changes: 6 additions & 47 deletions sweeper/sweeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package sweeper

import (
"context"
"sync"
"time"

"github.com/skynetlabs/pinner/database"
Expand All @@ -17,19 +16,6 @@ const (
)

type (
// Status represents the status of a sweep.
Status struct {
InProgress bool
Error error
StartTime time.Time
EndTime time.Time
}
// status is the internal type we use when we want to be able to modify it.
status struct {
Status
mu sync.Mutex
staticLogger logger.ExtFieldLogger
}
// Sweeper takes care of sweeping the files pinned by the local skyd server
// and marks them as pinned by the local server.
Sweeper struct {
Expand All @@ -56,12 +42,14 @@ func New(db *database.DB, skydc skyd.Client, serverName string, logger logger.Ex
}
}

// Close any running Sweeper thread. Return true if a thread was closed.
func (s *Sweeper) Close() bool {
return s.staticSchedule.Close()
}

// Status returns a copy of the status of the current sweep.
func (s *Sweeper) Status() Status {
s.staticStatus.mu.Lock()
st := s.staticStatus.Status
s.staticStatus.mu.Unlock()
return st
return s.staticStatus.Status()
}

// Sweep starts a new skyd sweep, unless one is already underway.
Expand Down Expand Up @@ -130,32 +118,3 @@ func (s *Sweeper) threadedPerformSweep() {
return
}
}

// Start marks the start of a new process, unless one is already in progress.
// If there is a process in progress then Start returns without any action.
func (st *status) Start() {
st.mu.Lock()
// Double-check for parallel sweeps.
if st.InProgress {
st.mu.Unlock()
st.staticLogger.Debug("Attempted to start a sweep while another one was already ongoing.")
return
}
// Initialise the status to "a sweep is running".
st.InProgress = true
st.Error = nil
st.StartTime = time.Now().UTC()
st.EndTime = time.Time{}
st.mu.Unlock()
st.staticLogger.Info("Started a sweep.")
}

// Finalize marks a run as completed with the given error.
func (st *status) Finalize(err error) {
st.mu.Lock()
st.InProgress = false
st.EndTime = time.Now().UTC()
st.Error = err
st.mu.Unlock()
st.staticLogger.Info("Finalized a sweep.")
}
63 changes: 63 additions & 0 deletions test/sweeper/sweeper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package sweeper

import (
"context"
"github.com/skynetlabs/pinner/skyd"
"github.com/skynetlabs/pinner/sweeper"
"github.com/skynetlabs/pinner/test"
"testing"
"time"
)

// TestSweeper ensures that Sweeper properly scans skyd and updates the DB.
func TestSweeper(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
t.Parallel()

ctx := context.Background()
db, err := test.NewDatabase(ctx, t.Name())
if err != nil {
t.Fatal(err)
}
skydc := skyd.NewSkydClientMock()
serverName := t.Name()
logger := test.NewDiscardLogger()

// Ensure there are no skylinks pinned by this server, according to the DB.
sls, err := db.SkylinksForServer(ctx, serverName)
if err != nil {
t.Fatal(err)
}
if len(sls) > 0 {
t.Fatalf("Expected no skylinks marked as pinned by this server, got %d", len(sls))
}

testSweepPeriod := 100 * time.Millisecond
swpr := sweeper.New(db, skydc, serverName, logger)
swpr.UpdateSchedule(testSweepPeriod)
defer swpr.Close()

// Wait for the sweep to come and go through.
// Rebuilding the cache will take 100ms.
time.Sleep(300 * time.Millisecond)

// Ensure the sweep passed and there are skylinks in the DB marked as pinned
// by this server.
sls, err = db.SkylinksForServer(ctx, serverName)
if err != nil {
t.Fatal(err)
}
// Grab the skylinks available in the mock. We expect to see these in the DB.
mockSkylinks := skydc.Skylinks()
if len(sls) != len(mockSkylinks) {
t.Fatalf("Expected %d skylinks, got %d", len(mockSkylinks), len(sls))
}
// Ensure all skylinks are there.
for _, s := range mockSkylinks {
if !test.Contains(sls, s) {
t.Fatalf("Missing skylink '%s'", s)
}
}
}
4 changes: 2 additions & 2 deletions workers/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ import (
- pin it locally and add the current server to its list
- unlock it
PHASE 2:
PHASE 2: <TODO>
- calculate server load by getting the total number and size of files pinned by each server
- only pin underpinned files if the current server is in the lowest 20% of servers, otherwise exit before scanning further
PHASE 3:
PHASE 3: <TODO>
- add a second scanner which looks for skylinks which should be unpinned and unpins them from the local skyd.
*/

Expand Down
5 changes: 0 additions & 5 deletions workers/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ const (
cyclesToWait = 5
)

var (
// maxSleepBetweenScans is the maximum time we might sleep between scans.
maxSleepBetweenScans = time.Duration(float64(sleepBetweenScans) * (1 + sleepVariationFactor))
)

// TestScanner ensures that Scanner does its job.
func TestScanner(t *testing.T) {
if testing.Short() {
Expand Down

0 comments on commit b2976c2

Please sign in to comment.