diff --git a/explorer/explorer.go b/explorer/explorer.go index 9425b34..fdb6bf6 100644 --- a/explorer/explorer.go +++ b/explorer/explorer.go @@ -67,7 +67,7 @@ type Store interface { SiafundElements(ids []types.SiafundOutputID) (result []SiafundOutput, err error) Hosts(pks []types.PublicKey) ([]Host, error) - HostsForScanning(maxLastScan, minLastAnnouncement time.Time, offset, limit uint64) ([]Host, error) + HostsForScanning(maxLastScan, minLastAnnouncement time.Time, limit uint64) ([]Host, error) } // Explorer implements a Sia explorer. diff --git a/explorer/scan.go b/explorer/scan.go index 36288a8..d45add3 100644 --- a/explorer/scan.go +++ b/explorer/scan.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "net" - "sync" "time" crhpv2 "go.sia.tech/core/rhp/v2" @@ -159,70 +158,6 @@ func (e *Explorer) scanV2Host(locator geoip.Locator, host Host) (HostScan, error }, nil } -func (e *Explorer) addHostScans(hosts chan Host) { - // use default included ip2location database - locator, err := geoip.NewIP2LocationLocator("") - if err != nil { - e.log.Error("Failed to create geoip database", zap.Error(err)) - return - } - defer locator.Close() - - worker := func() { - var scans []HostScan - for host := range hosts { - if e.isClosed() { - break - } - - var scan HostScan - var addr string - var ok bool - var err error - - if host.IsV2() { - addr, ok = host.V2SiamuxAddr() - if !ok { - e.log.Debug("Host did not have any v2 siamux net addresses in its announcement, unable to scan", zap.Stringer("pk", host.PublicKey)) - continue - } - scan, err = e.scanV2Host(locator, host) - } else { - scan, err = e.scanV1Host(locator, host) - } - if err != nil { - scans = append(scans, HostScan{ - PublicKey: host.PublicKey, - Success: false, - Timestamp: types.CurrentTimestamp(), - }) - e.log.Debug("Scanning host failed", zap.String("addr", addr), zap.Stringer("pk", host.PublicKey), zap.Error(err)) - continue - } - - e.log.Debug("Scanning host succeeded", zap.String("addr", addr), zap.Stringer("pk", host.PublicKey)) - scans = append(scans, scan) - } - - if err := e.s.AddHostScans(scans); err != nil { - e.log.Error("Failed to add host scans to DB", zap.Error(err)) - } - } - - // launch all workers - var wg sync.WaitGroup - for t := 0; t < e.scanCfg.Threads; t++ { - wg.Add(1) - go func() { - defer wg.Done() - worker() - }() - } - - // wait until they're done - wg.Wait() -} - func (e *Explorer) isClosed() bool { select { case <-e.ctx.Done(): @@ -232,33 +167,6 @@ func (e *Explorer) isClosed() bool { } } -func (e *Explorer) fetchHosts(hosts chan Host) { - var exhausted bool - offset := 0 - - t := types.CurrentTimestamp() - cutoff := t.Add(-e.scanCfg.MaxLastScan) - lastAnnouncement := t.Add(-e.scanCfg.MinLastAnnouncement) - - for !exhausted && !e.isClosed() { - batch, err := e.s.HostsForScanning(cutoff, lastAnnouncement, uint64(offset), scanBatchSize) - if err != nil { - e.log.Error("failed to get hosts for scanning", zap.Error(err)) - return - } else if len(batch) < scanBatchSize { - exhausted = true - } - - for _, host := range batch { - select { - case <-e.ctx.Done(): - return - case hosts <- host: - } - } - } -} - func (e *Explorer) scanHosts() { e.log.Info("Waiting for syncing to complete before scanning hosts") // don't scan hosts till we're at least nearly done with syncing @@ -268,40 +176,59 @@ func (e *Explorer) scanHosts() { } e.log.Info("Syncing complete, will begin scanning hosts") + locator, err := geoip.NewIP2LocationLocator("") + if err != nil { + e.log.Info("failed to create geoip database:", zap.Error(err)) + return + } + defer locator.Close() + for !e.isClosed() { - // fetch hosts - hosts := make(chan Host, scanBatchSize) - e.wg.Add(1) - go func() { - defer e.wg.Done() - e.fetchHosts(hosts) - close(hosts) - }() - - // scan hosts - e.wg.Add(1) - go func() { - defer e.wg.Done() - e.addHostScans(hosts) - }() - - // wait for scans to complete - waitChan := make(chan struct{}) - go func() { - e.wg.Wait() - close(waitChan) - }() - select { - case <-waitChan: - case <-e.ctx.Done(): + lastScanCutoff := time.Now().Add(-e.scanCfg.MaxLastScan) + lastAnnouncementCutoff := time.Now().Add(-e.scanCfg.MinLastAnnouncement) + + batch, err := e.s.HostsForScanning(lastScanCutoff, lastAnnouncementCutoff, scanBatchSize) + if err != nil { + e.log.Info("failed to get hosts for scanning:", zap.Error(err)) return + } else if len(batch) == 0 { + select { + case <-e.ctx.Done(): + e.log.Info("shutdown:", zap.Error(e.ctx.Err())) + return + case <-time.After(15 * time.Second): + continue // check again + } } - // pause - select { - case <-e.ctx.Done(): + results := make([]HostScan, len(batch)) + for i, host := range batch { + e.wg.Add(1) + go func(i int, host Host) { + defer e.wg.Done() + + var err error + if len(host.V2NetAddresses) > 0 { + results[i], err = e.scanV2Host(locator, host) + } else { + results[i], err = e.scanV1Host(locator, host) + } + if err != nil { + e.log.Debug("host scan failed", zap.Stringer("pk", host.PublicKey), zap.Error(err)) + results[i] = HostScan{ + PublicKey: host.PublicKey, + Success: false, + Timestamp: types.CurrentTimestamp(), + } + return + } + }(i, host) + } + e.wg.Wait() + + if err := e.s.AddHostScans(results); err != nil { + e.log.Info("failed to add host scans to DB:", zap.Error(err)) return - case <-time.After(30 * time.Second): } } } diff --git a/persist/sqlite/addresses.go b/persist/sqlite/addresses.go index 760c908..edb90a5 100644 --- a/persist/sqlite/addresses.go +++ b/persist/sqlite/addresses.go @@ -140,9 +140,9 @@ func (st *Store) Hosts(pks []types.PublicKey) (result []explorer.Host, err error // HostsForScanning returns hosts ordered by the transaction they were created in. // Note that only the PublicKey, NetAddress, and V2NetAddresses fields are // populated. -func (s *Store) HostsForScanning(maxLastScan, minLastAnnouncement time.Time, offset, limit uint64) (result []explorer.Host, err error) { +func (s *Store) HostsForScanning(maxLastScan, minLastAnnouncement time.Time, limit uint64) (result []explorer.Host, err error) { err = s.transaction(func(tx *txn) error { - rows, err := tx.Query(`SELECT public_key, net_address FROM host_info WHERE last_scan <= ? AND last_announcement >= ? ORDER BY last_scan ASC LIMIT ? OFFSET ?`, encode(maxLastScan), encode(minLastAnnouncement), limit, offset) + rows, err := tx.Query(`SELECT public_key, net_address FROM host_info WHERE last_scan <= ? AND last_announcement >= ? ORDER BY last_scan ASC LIMIT ?`, encode(maxLastScan), encode(minLastAnnouncement), limit) if err != nil { return err } diff --git a/persist/sqlite/consensus_test.go b/persist/sqlite/consensus_test.go index f5dffb7..eebd9f5 100644 --- a/persist/sqlite/consensus_test.go +++ b/persist/sqlite/consensus_test.go @@ -1651,7 +1651,7 @@ func TestHostAnnouncement(t *testing.T) { } ts := time.Unix(0, 0) - hosts, err := db.HostsForScanning(ts, ts, 0, 100) + hosts, err := db.HostsForScanning(ts, ts, 100) if err != nil { t.Fatal(err) } diff --git a/persist/sqlite/scan_test.go b/persist/sqlite/scan_test.go index b8797b7..b08ab27 100644 --- a/persist/sqlite/scan_test.go +++ b/persist/sqlite/scan_test.go @@ -207,6 +207,45 @@ func TestScan(t *testing.T) { t.Fatal(err) } + for { + tip, err := e.Tip() + if err != nil { + t.Fatal(err) + } + if tip != cm.Tip() { + time.Sleep(time.Second) + } else { + break + } + } + + { + lastScanCutoff := time.Now().Add(-cfg.MaxLastScan) + lastAnnouncementCutoff := time.Now().Add(-cfg.MinLastAnnouncement) + + dbHosts, err := db.HostsForScanning(lastScanCutoff, lastAnnouncementCutoff, 100) + if err != nil { + t.Fatal(err) + } + testutil.Equal(t, "len(hostsForScanning)", 3, len(dbHosts)) + + sort.Slice(dbHosts, func(i, j int) bool { + return dbHosts[i].NetAddress < dbHosts[j].NetAddress + }) + + host1 := dbHosts[0] + testutil.Equal(t, "host1.V2NetAddresses", ha3, host1.V2NetAddresses) + testutil.Equal(t, "host1.PublicKey", pubkey3, host1.PublicKey) + + host2 := dbHosts[1] + testutil.Equal(t, "host2.NetAddress", ha2.NetAddress, host2.NetAddress) + testutil.Equal(t, "host2.PublicKey", ha2.PublicKey, host2.PublicKey) + + host3 := dbHosts[2] + testutil.Equal(t, "host3.NetAddress", "sia1.euregiohosting.nl:9982", host3.NetAddress) + testutil.Equal(t, "host3.PublicKey", pubkey1, host3.PublicKey) + } + time.Sleep(4 * cfg.Timeout) { @@ -302,4 +341,15 @@ func TestScan(t *testing.T) { log.Fatal("SectorSize = 0 on host that's supposed to be active") } } + + { + lastScanCutoff := time.Now().Add(-cfg.MaxLastScan) + lastAnnouncementCutoff := time.Now().Add(-cfg.MinLastAnnouncement) + + hosts, err := db.HostsForScanning(lastScanCutoff, lastAnnouncementCutoff, 100) + if err != nil { + t.Fatal(err) + } + testutil.Equal(t, "len(hostsForScanning)", 0, len(hosts)) + } }