Skip to content

Commit

Permalink
Merge pull request #30 from SkynetLabs/ivo/dry_run
Browse files Browse the repository at this point in the history
Dry Run
  • Loading branch information
ro-tex authored May 30, 2022
2 parents bc16e91 + 04b7e43 commit 8a8ad92
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 11 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ jobs:
run: make lint
- name: Run unit tests
run: make test
# TODO: Run long tests once we get mongo working in CI.

# Make a release if this is a manually trigger job, i.e. workflow_dispatch
release:
Expand Down
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ all: release
count = 1
# pkgs changes which packages the makefile calls operate on. run changes which
# tests are run during testing.
pkgs = ./ ./api ./database ./skyd
pkgs = ./ ./api ./database ./skyd ./test ./test/workers

# integration-pkgs defines the packages which contain integration tests
integration-pkgs = ./test ./test/api ./test/database

# run determines which tests run when running any variation of 'make test'.
run = .

# fmt calls go fmt on all packages.
fmt:
gofmt -s -l -w $(pkgs)
Expand Down
24 changes: 24 additions & 0 deletions conf/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ const (
// Cluster-wide configuration variable names.
// Stored in the database.
const (
// ConfDryRun holds the name of the configuration setting which defines
// whether we execute pin/unpin calls against skyd or not. Note that all
// database operations will still be executed, i.e. skylinks records will
// be updated. After using this option you will need to prune the database
// before being able to use the service in "actual mode".
ConfDryRun = "dry_run"
// ConfMinPinners holds the name of the configuration setting which defines
// the minimum number of pinners we want to ensure for each skyfile.
ConfMinPinners = "min_pinners"
Expand Down Expand Up @@ -135,6 +141,24 @@ func LoadConfig() (Config, error) {
return cfg, nil
}

// DryRun returns the cluster-wide value of the dry_run switch. This switch
// tells Pinner to omit the pin/unpin calls to skyd and assume they were
// successful.
func DryRun(ctx context.Context, db *database.DB) (bool, error) {
val, err := db.ConfigValue(ctx, ConfDryRun)
if errors.Contains(err, mongo.ErrNoDocuments) {
return false, nil
}
if err != nil {
return false, err
}
dr, err := strconv.ParseBool(val)
if err != nil {
return false, err
}
return dr, nil
}

// MinPinners returns the cluster-wide value of the minimum number of servers we
// expect to be pinning each skylink.
func MinPinners(ctx context.Context, db *database.DB) (int, error) {
Expand Down
98 changes: 95 additions & 3 deletions test/workers/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/skynetlabs/pinner/conf"
"github.com/skynetlabs/pinner/skyd"
"github.com/skynetlabs/pinner/test"
"github.com/skynetlabs/pinner/workers"
Expand Down Expand Up @@ -58,9 +59,100 @@ func TestScanner(t *testing.T) {
if err != nil {
t.Fatal(err)
}
// Wait for one cycle - the skylink should be picked up and pinned on the
// local skyd.
time.Sleep(workers.SleepBetweenScans())

// Wait - the skylink should be picked up and pinned on the local skyd.
time.Sleep(3 * workers.SleepBetweenScans())

// Make sure the skylink is pinned on the local (mock) skyd.
if !skydcm.IsPinning(sl.String()) {
t.Fatal("We expected skyd to be pinning this.")
}
}

// TestScannerDryRun ensures that dry_run works as expected.
func TestScannerDryRun(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
// Don't run this test in parallel since we set "dry_run". mongo is shared
// by the tests.

ctx := context.Background()
db, err := test.NewDatabase(ctx, t.Name())
if err != nil {
t.Fatal(err)
}
// Set dry_run: true.
err = db.SetConfigValue(ctx, conf.ConfDryRun, "true")
if err != nil {
t.Fatal(err)
}
defer func() {
err = db.SetConfigValue(ctx, conf.ConfDryRun, "false")
if err != nil {
t.Fatal(err)
}
}()

cfg, err := test.LoadTestConfig()
if err != nil {
t.Fatal(err)
}
skydcm := skyd.NewSkydClientMock()
scanner := workers.NewScanner(db, test.NewDiscardLogger(), cfg.MinPinners, cfg.ServerName, skydcm)
defer func() {
if e := scanner.Close(); e != nil {
t.Error(errors.AddContext(e, "failed to close threadgroup"))
}
}()
err = scanner.Start()
if err != nil {
t.Fatal(err)
}

// Trigger a pin event.
//
// Add a skylink from the name of a different server.
sl := test.RandomSkylink()
otherServer := "other server"
_, err = db.CreateSkylink(ctx, sl, otherServer)
if err != nil {
t.Fatal(err)
}
// Sleep for two cycles.
time.Sleep(2 * workers.SleepBetweenScans())
// Make sure the skylink isn't pinned on the local (mock) skyd.
if skydcm.IsPinning(sl.String()) {
t.Fatal("We didn't expect skyd to be pinning this.")
}
// Remove the other server, making the file underpinned.
err = db.RemoveServerFromSkylink(ctx, sl, otherServer)
if err != nil {
t.Fatal(err)
}

// Wait - the skylink should not be picked up and pinned on the local skyd.
time.Sleep(3 * workers.SleepBetweenScans())

// Verify skyd doesn't have the pin.
//
// Make sure the skylink is not pinned on the local (mock) skyd.
if skydcm.IsPinning(sl.String()) {
t.Fatal("We did not expect skyd to be pinning this.")
}

// Turn off dry run.
err = db.SetConfigValue(ctx, conf.ConfDryRun, "false")
if err != nil {
t.Fatal(err)
}

// Verify skyd gets the pin.
//
// Wait enough time for the pinner to pick up the new value for dry_run and
// rescan.
time.Sleep(10 * workers.SleepBetweenScans())

// Make sure the skylink is pinned on the local (mock) skyd.
if !skydcm.IsPinning(sl.String()) {
t.Fatal("We expected skyd to be pinning this.")
Expand Down
42 changes: 35 additions & 7 deletions workers/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type (
staticSkydClient skyd.Client
staticTG *threadgroup.ThreadGroup

dryRun bool
minPinners int
mu sync.Mutex
}
Expand Down Expand Up @@ -146,8 +147,11 @@ func (s *Scanner) threadedScanAndPin() {
s.staticLogger.Warn(errors.AddContext(res.ExternErr, "failed to rebuild skyd client cache"))
}
}

s.refreshDryRun()
s.refreshMinPinners()
s.pinUnderpinnedSkylinks()

// Sleep between database scans.
select {
case <-time.After(SleepBetweenScans()):
Expand All @@ -171,7 +175,7 @@ func (s *Scanner) pinUnderpinnedSkylinks() {
default:
}

skylink, sp, continueScanning, err := s.findAndPinOneUnderpinnedSkylink()
skylink, sp, continueScanning, err := s.managedFindAndPinOneUnderpinnedSkylink()
if !continueScanning {
return
}
Expand All @@ -188,7 +192,7 @@ func (s *Scanner) pinUnderpinnedSkylinks() {
// avoid a tight(ish) loop of errors when we either fail to pin or
// fail to mark as pinned. Note that this only happens when we want
// to continue scanning, otherwise we would have exited right after
// findAndPinOneUnderpinnedSkylink.
// managedFindAndPinOneUnderpinnedSkylink.
select {
case <-s.staticTG.StopChan():
s.staticLogger.Trace("Stop channel closed.")
Expand All @@ -198,16 +202,21 @@ func (s *Scanner) pinUnderpinnedSkylinks() {
}
}

// findAndPinOneUnderpinnedSkylink scans the database for one skylinks which is
// managedFindAndPinOneUnderpinnedSkylink scans the database for one skylinks which is
// either locked by the current server or underpinned. If it finds such a
// skylink, it pins it to the local skyd. The method returns true until it finds
// no further skylinks to process or until it encounters an unrecoverable error,
// such as bad credentials, dead skyd, etc.
func (s *Scanner) findAndPinOneUnderpinnedSkylink() (skylink skymodules.Skylink, sf skymodules.SiaPath, continueScanning bool, err error) {
s.staticLogger.Trace("Entering findAndPinOneUnderpinnedSkylink")
defer s.staticLogger.Trace("Exiting findAndPinOneUnderpinnedSkylink")
func (s *Scanner) managedFindAndPinOneUnderpinnedSkylink() (skylink skymodules.Skylink, sf skymodules.SiaPath, continueScanning bool, err error) {
s.staticLogger.Trace("Entering managedFindAndPinOneUnderpinnedSkylink")
defer s.staticLogger.Trace("Exiting managedFindAndPinOneUnderpinnedSkylink")

s.mu.Lock()
dryRun := s.dryRun
minPinners := s.minPinners
s.mu.Unlock()

sl, err := s.staticDB.FindAndLockUnderpinned(context.TODO(), s.staticServerName, s.minPinners)
sl, err := s.staticDB.FindAndLockUnderpinned(context.TODO(), s.staticServerName, minPinners)
if database.IsNoSkylinksNeedPinning(err) {
return skymodules.Skylink{}, skymodules.SiaPath{}, false, err
}
Expand All @@ -222,6 +231,12 @@ func (s *Scanner) findAndPinOneUnderpinnedSkylink() (skylink skymodules.Skylink,
}
}()

// Check for a dry run.
if dryRun {
s.staticLogger.Infof("[DRY RUN] Successfully pinned '%s'", sl)
return skymodules.Skylink{}, skymodules.SiaPath{}, false, errors.New("dry run")
}

sf, err = s.staticSkydClient.Pin(sl.String())
if errors.Contains(err, skyd.ErrSkylinkAlreadyPinned) {
s.staticLogger.Info(err)
Expand Down Expand Up @@ -279,6 +294,19 @@ func (s *Scanner) estimateTimeToFull(skylink skymodules.Skylink) time.Duration {
return time.Duration(secondsRemaining) * time.Second
}

// refreshDryRun makes sure the local value of dry_run matches the one
// in the database.
func (s *Scanner) refreshDryRun() {
dr, err := conf.DryRun(context.TODO(), s.staticDB)
if err != nil {
s.staticLogger.Warn(errors.AddContext(err, "failed to fetch the DB value for dry_run"))
return
}
s.mu.Lock()
s.dryRun = dr
s.mu.Unlock()
}

// refreshMinPinners makes sure the local value of min pinners matches the one
// in the database.
func (s *Scanner) refreshMinPinners() {
Expand Down

0 comments on commit 8a8ad92

Please sign in to comment.