Skip to content

Commit

Permalink
Merge pull request #55 from SkynetLabs/ivo/parallel_repins
Browse files Browse the repository at this point in the history
Parallel repins
  • Loading branch information
ro-tex authored Aug 24, 2022
2 parents 352dbf3 + 65d38a8 commit e321395
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 85 deletions.
27 changes: 20 additions & 7 deletions conf/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import (
// Default configuration values.
// For individual descriptions see Config.
const (
defaultAccountsHost = "10.10.10.70"
defaultAccountsPort = "3000"
defaultLogFile = "" // disabled logging to file
defaultLogLevel = logrus.InfoLevel
defaultSiaAPIHost = "10.10.10.10"
defaultSiaAPIPort = "9980"
defaultMinPinners = 1
defaultAccountsHost = "10.10.10.70"
defaultAccountsPort = "3000"
defaultLogFile = "" // disabled logging to file
defaultLogLevel = logrus.InfoLevel
defaultScannerThreads = 5
defaultSiaAPIHost = "10.10.10.10"
defaultSiaAPIPort = "9980"
defaultMinPinners = 1
)

// Cluster-wide configuration variable names.
Expand Down Expand Up @@ -105,6 +106,9 @@ type (
// which a skylink needs in order to not be considered underpinned.
// Anything below this value requires more servers to pin the skylink.
MinPinners int
// ScannerThreads defines the number of scanning threads which might attempt
// to pin an underpinned skylink.
ScannerThreads int
// ServerName holds the name of the current server. This name will be
// used for identifying which servers are pinning a given skylink.
ServerName string
Expand Down Expand Up @@ -134,6 +138,7 @@ func LoadConfig() (Config, error) {
LogFile: defaultLogFile,
LogLevel: defaultLogLevel,
MinPinners: defaultMinPinners,
ScannerThreads: defaultScannerThreads,
SiaAPIHost: defaultSiaAPIHost,
SiaAPIPort: defaultSiaAPIPort,
SleepBetweenScans: 0, // This will be ignored by the scanner.
Expand Down Expand Up @@ -179,6 +184,14 @@ func LoadConfig() (Config, error) {
}
cfg.LogLevel = lvl
}
if val, ok = os.LookupEnv("PINNER_SCANNER_THREADS"); ok {
// Check for a bare number and interpret that as seconds.
st, err := strconv.ParseInt(val, 0, 0)
if err != nil {
log.Fatalf("PINNER_SCANNER_THREADS has an invalid value of '%s'", val)
}
cfg.ScannerThreads = int(st)
}
if val, ok = os.LookupEnv("PINNER_SLEEP_BETWEEN_SCANS"); ok {
// Check for a bare number and interpret that as seconds.
if _, err := strconv.ParseInt(val, 0, 0); err == nil {
Expand Down
19 changes: 10 additions & 9 deletions database/skylink.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,15 +262,16 @@ func (db *DB) RemoveServerFromSkylinks(ctx context.Context, skylinks []string, s
// the given server.
//
// The MongoDB query is this:
// db.getCollection('skylinks').find({
// "pinned": { "$ne": false }},
// "$expr": { "$lt": [{ "$size": "$servers" }, 2 ]},
// "servers": { "$nin": [ "ro-tex.siasky.ivo.NOPE" ]},
// "$or": [
// { "lock_expires" : { "$exists": false }},
// { "lock_expires" : { "$lt": new Date() }}
// ]
// })
//
// db.getCollection('skylinks').find({
// "pinned": { "$ne": false }},
// "$expr": { "$lt": [{ "$size": "$servers" }, 2 ]},
// "servers": { "$nin": [ "ro-tex.siasky.ivo.NOPE" ]},
// "$or": [
// { "lock_expires" : { "$exists": false }},
// { "lock_expires" : { "$lt": new Date() }}
// ]
// })
func (db *DB) FindAndLockUnderpinned(ctx context.Context, server string, skipSkylinks []string, minPinners int) (skymodules.Skylink, error) {
if skipSkylinks == nil {
skipSkylinks = make([]string, 0)
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func main() {

// Start the background scanner.
skydClient := skyd.NewClient(cfg.SiaAPIHost, cfg.SiaAPIPort, cfg.SiaAPIPassword, skyd.NewCache(), logger)
scanner := workers.NewScanner(db, logger, cfg.MinPinners, cfg.ServerName, cfg.SleepBetweenScans, skydClient)
scanner := workers.NewScanner(db, logger, cfg.MinPinners, cfg.ScannerThreads, cfg.ServerName, cfg.SleepBetweenScans, skydClient)
err = scanner.Start()
if err != nil {
log.Fatal(errors.AddContext(err, "failed to start Scanner"))
Expand Down
35 changes: 21 additions & 14 deletions skyd/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,14 @@ func (c *ClientMock) DiffPinnedSkylinks(skylinks []string) (unknown []string, mi

// FileHealth returns the health of the given skylink.
// Note that the mock will return 0 (fully healthy) by default.
func (c *ClientMock) FileHealth(sl skymodules.SiaPath) (float64, error) {
func (c *ClientMock) FileHealth(sp skymodules.SiaPath) (float64, error) {
c.mu.Lock()
defer c.mu.Unlock()
return c.fileHealth[sl], nil
health, ok := c.fileHealth[sp]
if !ok {
return 1, nil
}
return health, nil
}

// IsPinning checks whether skyd is pinning the given skylink.
Expand Down Expand Up @@ -129,10 +133,12 @@ func (c *ClientMock) Pin(skylink string) (skymodules.SiaPath, error) {
return skymodules.SiaPath{}, ErrSkylinkAlreadyPinned
}
c.skylinks[skylink] = struct{}{}
sp := skymodules.SiaPath{
Path: skylink,
var sl skymodules.Skylink
err := sl.LoadString(skylink)
if err != nil {
return skymodules.SiaPath{}, err
}
return sp, nil
return sl.SiaPath()
}

// RebuildCache is a noop mock that takes at least 100ms.
Expand Down Expand Up @@ -226,15 +232,16 @@ func (c *ClientMock) SetUnpinError(e error) {
// The mocked structure is the following:
//
// SkynetFolder/ (three dirs, one file)
// dirA/ (two files, one skylink each)
// fileA1 (CAClyosjvI9Fg75N-LRylcfba79bam9Ljp-4qfxS08Q_A1)
// fileA2 (CAClyosjvI9Fg75N-LRylcfba79bam9Ljp-4qfxS08Q_A2)
// dirB/ (one file, one dir)
// dirC/ (one file, two skylinks)
// fileC (CAClyosjvI9Fg75N-LRylcfba79bam9Ljp-4qfxS08Q_C1, C2_uSb3BpGxmSbRAg1xj5T8SdB4hiSFiEW2sEEzxt5MNkg)
// fileB (CAClyosjvI9Fg75N-LRylcfba79bam9Ljp-4qfxS08Q__B)
// dirD/ (empty)
// file (CAClyosjvI9Fg75N-LRylcfba79bam9Ljp-4qfxS08Q___)
//
// dirA/ (two files, one skylink each)
// fileA1 (CAClyosjvI9Fg75N-LRylcfba79bam9Ljp-4qfxS08Q_A1)
// fileA2 (CAClyosjvI9Fg75N-LRylcfba79bam9Ljp-4qfxS08Q_A2)
// dirB/ (one file, one dir)
// dirC/ (one file, two skylinks)
// fileC (CAClyosjvI9Fg75N-LRylcfba79bam9Ljp-4qfxS08Q_C1, C2_uSb3BpGxmSbRAg1xj5T8SdB4hiSFiEW2sEEzxt5MNkg)
// fileB (CAClyosjvI9Fg75N-LRylcfba79bam9Ljp-4qfxS08Q__B)
// dirD/ (empty)
// file (CAClyosjvI9Fg75N-LRylcfba79bam9Ljp-4qfxS08Q___)
func (c *ClientMock) MockFilesystem() []string {
slR0 := "CAClyosjvI9Fg75N-LRylcfba79bam9Ljp-4qfxS08Q___"
slA1 := "CAClyosjvI9Fg75N-LRylcfba79bam9Ljp-4qfxS08Q_A1"
Expand Down
5 changes: 3 additions & 2 deletions test/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ func NewTester(dbName string) (*Tester, error) {
// Start the HTTP server in a goroutine and gracefully stop it once the
// cancel function is called and the context is closed.
srv := &http.Server{
Addr: ":" + testPortalPort,
Handler: server,
Addr: ":" + testPortalPort,
Handler: server,
ReadHeaderTimeout: time.Second,
}
go func() {
_ = srv.ListenAndServe()
Expand Down
131 changes: 90 additions & 41 deletions workers/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/skynetlabs/pinner/lib"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/skynetlabs/pinner/conf"
Expand Down Expand Up @@ -109,11 +110,16 @@ type (
Scanner struct {
staticDB *database.DB
staticLogger logger.Logger
staticScannerThreads int
staticServerName string
staticSkydClient skyd.Client
staticSleepBetweenScans time.Duration
staticTG *threadgroup.ThreadGroup

// Stats variables:
atomicCountPinned uint32
scanStart time.Time

dryRun bool
minPinners int
// skipSkylinks is a list of skylinks which we want to skip during this
Expand All @@ -124,14 +130,15 @@ type (
)

// NewScanner creates a new Scanner instance.
func NewScanner(db *database.DB, logger logger.Logger, minPinners int, serverName string, customSleepBetweenScans time.Duration, skydClient skyd.Client) *Scanner {
func NewScanner(db *database.DB, logger logger.Logger, minPinners int, threads int, serverName string, customSleepBetweenScans time.Duration, skydClient skyd.Client) *Scanner {
sleep := customSleepBetweenScans
if sleep == 0 {
sleep = sleepBetweenScans
}
return &Scanner{
staticDB: db,
staticLogger: logger,
staticScannerThreads: threads,
staticServerName: serverName,
staticSkydClient: skydClient,
staticSleepBetweenScans: sleep,
Expand Down Expand Up @@ -238,7 +245,31 @@ func (s *Scanner) threadedScanAndPin() {
s.staticLogger.Tracef("Start scanning")
s.managedRefreshDryRun()
s.managedRefreshMinPinners()
s.managedPinUnderpinnedSkylinks()
s.managedResetSkippedSkylinks()
s.managedResetStats()

// Start a thread that will print intermediate scanning statistics.
statsCh := make(chan struct{})
err = s.staticTG.Add()
if err != nil {
return // the threadgroup is stopped
}
go s.threadedPrintStats(statsCh)

// Start N threads that will scan for underpinned skylinks and repin
// them. It's possible that at first all of those start pinning skylinks
// without properly respecting the MaxRepairingSkylinks limit. That's
// expected and chosen because of the simplicity of the implementation.
var wg sync.WaitGroup
for i := 0; i < s.staticScannerThreads; i++ {
wg.Add(1)
go func() {
defer wg.Done()
s.managedPinUnderpinnedSkylinks()
}()
}
wg.Wait()
close(statsCh)
s.staticLogger.Tracef("End scanning")

// Schedule the next scan, unless already scheduled:
Expand All @@ -249,6 +280,54 @@ func (s *Scanner) threadedScanAndPin() {
}
}

// threadedPrintStats prints regular updates on the scanning process plus a
// final overview of the pinned and skipped skylinks.
func (s *Scanner) threadedPrintStats(stopCh chan struct{}) {
defer s.staticTG.Done()
intermediateStatsTicker := time.NewTicker(printPinningStatisticsPeriod)
defer intermediateStatsTicker.Stop()

select {
case <-intermediateStatsTicker.C:
// Print intermediate statistics.
t1 := lib.Now()
s.mu.Lock()
numSkipped := len(s.skipSkylinks)
startTime := s.scanStart
s.mu.Unlock()
s.staticLogger.Infof("Time %s, runtime %s, pinned skylinks %d, skipped skylinks %d",
t1.Format(conf.TimeFormat), t1.Sub(startTime).String(), atomic.LoadUint32(&s.atomicCountPinned), numSkipped)
case <-stopCh:
// Print final statistics when finishing the method.
t1 := lib.Now()
s.mu.Lock()
skipped := s.skipSkylinks
startTime := s.scanStart
s.mu.Unlock()
s.staticLogger.Infof("Finished at %s, runtime %s, pinned skylinks %d, skipped skylinks %d",
t1.Format(conf.TimeFormat), t1.Sub(startTime).String(), atomic.LoadUint32(&s.atomicCountPinned), len(skipped))
s.staticLogger.Tracef("Skipped %d skylinks: %v", len(skipped), skipped)
case <-s.staticTG.StopChan():
s.staticLogger.Trace("Stop channel closed")
return
}
}

// managedResetSkippedSkylinks resets the skipped skylinks.
func (s *Scanner) managedResetSkippedSkylinks() {
s.mu.Lock()
s.skipSkylinks = []string{}
s.mu.Unlock()
}

// managedResetStats resets the scanning statistics.
func (s *Scanner) managedResetStats() {
s.mu.Lock()
s.scanStart = lib.Now()
s.mu.Unlock()
atomic.StoreUint32(&s.atomicCountPinned, 0)
}

// staticScheduleNextScan attempts to set the time of the next scan until either we
// succeed, another server succeeds, or Scanner's TG is stopped. Returns true
// when Scanner's TG is stopped.
Expand Down Expand Up @@ -292,26 +371,6 @@ func (s *Scanner) managedPinUnderpinnedSkylinks() {
s.staticLogger.Trace("Entering managedPinUnderpinnedSkylinks")
defer s.staticLogger.Trace("Exiting managedPinUnderpinnedSkylinks")

// Clear out the skipped skylinks from the previous run.
s.mu.Lock()
s.skipSkylinks = []string{}
s.mu.Unlock()

intermediateStatsTicker := time.NewTicker(printPinningStatisticsPeriod)
defer intermediateStatsTicker.Stop()
countPinned := 0
t0 := lib.Now()

// Print final statistics when finishing the method.
defer func() {
t1 := lib.Now()
s.mu.Lock()
skipped := s.skipSkylinks
s.mu.Unlock()
s.staticLogger.Infof("Finished at %s, runtime %s, pinned skylinks %d, skipped skylinks %d", t1.Format(conf.TimeFormat), t1.Sub(t0).String(), countPinned, len(skipped))
s.staticLogger.Tracef("Skipped %d skylinks: %v", len(skipped), skipped)
}()

for {
// Check for service shutdown before talking to the DB.
select {
Expand All @@ -321,21 +380,11 @@ func (s *Scanner) managedPinUnderpinnedSkylinks() {
default:
}

// Print intermediate statistics.
select {
case <-intermediateStatsTicker.C:
t1 := lib.Now()
s.mu.Lock()
numSkipped := len(s.skipSkylinks)
s.mu.Unlock()
s.staticLogger.Infof("Time %s, runtime %s, pinned skylinks %d, skipped skylinks %d", t1.Format(conf.TimeFormat), t1.Sub(t0).String(), countPinned, numSkipped)
default:
}

skylink, sp, continueScanning, err := s.managedFindAndPinOneUnderpinnedSkylink()
if !sp.IsEmpty() {
countPinned++
} else {
atomic.AddUint32(&s.atomicCountPinned, 1)
}
if err != nil {
s.staticLogger.Trace(err)
}
if !continueScanning {
Expand All @@ -345,7 +394,7 @@ func (s *Scanner) managedPinUnderpinnedSkylinks() {
// already logged and the only indication it gives us is whether we
// should wait for the file we pinned to become healthy or not. If there
// is an error, then there is nothing to wait for.
if err == nil && !sp.IsEmpty() {
if !sp.IsEmpty() {
// Block until the pinned skylink becomes healthy or until a timeout.
s.staticWaitUntilHealthy(skylink, sp)
continue
Expand Down Expand Up @@ -378,7 +427,6 @@ func (s *Scanner) managedFindAndPinOneUnderpinnedSkylink() (skylink skymodules.S
s.mu.Unlock()

ctx := context.TODO()

sl, err := s.staticDB.FindAndLockUnderpinned(ctx, s.staticServerName, skipSkylinks, minPinners)
if database.IsNoSkylinksNeedPinning(err) {
return skymodules.Skylink{}, skymodules.SiaPath{}, false, err
Expand Down Expand Up @@ -466,9 +514,9 @@ func (s *Scanner) managedSkipSkylink(sl skymodules.Skylink) {
// we pin another one. It returns a ballpark value.
//
// This method makes some assumptions for simplicity:
// * assumes lazy pinning, meaning that none of the fanout is uploaded
// * all skyfiles are assumed to be large files (base sector + fanout) and the
// metadata is assumed to fill up the base sector (to err on the safe side)
// - assumes lazy pinning, meaning that none of the fanout is uploaded
// - all skyfiles are assumed to be large files (base sector + fanout) and the
// metadata is assumed to fill up the base sector (to err on the safe side)
func (s *Scanner) staticEstimateTimeToFull(skylink skymodules.Skylink) time.Duration {
meta, err := s.staticSkydClient.Metadata(skylink.String())
if err != nil {
Expand Down Expand Up @@ -527,7 +575,8 @@ func (s *Scanner) staticEligibleToPin(ctx context.Context) (bool, error) {
pinnedData, err := s.staticDB.ServerLoad(ctx, s.staticServerName)
if errors.Contains(err, database.ErrServerLoadNotFound) {
// We don't know what the server's load is. Get that data.
load, err := s.staticSkydClient.ContractData()
var load uint64
load, err = s.staticSkydClient.ContractData()
if err != nil {
return false, errors.AddContext(err, "failed to fetch server's load")
}
Expand Down
Loading

0 comments on commit e321395

Please sign in to comment.