diff --git a/init.sql b/init.sql index 7623c3c..7ce65b6 100644 --- a/init.sql +++ b/init.sql @@ -81,6 +81,7 @@ DROP TABLE IF EXISTS pt_accounts; DROP TABLE IF EXISTS pt_stats; DROP TABLE IF EXISTS pt_credits; DROP TABLE IF EXISTS pt_announcement; +DROP TABLE IF EXISTS pt_tip; CREATE TABLE pt_accounts ( id INT NOT NULL AUTO_INCREMENT, @@ -131,6 +132,13 @@ CREATE TABLE pt_announcement ( PRIMARY KEY (id) ); +CREATE TABLE pt_tip ( + id INT NOT NULL AUTO_INCREMENT, + height BIGINT UNSIGNED NOT NULL, + bid BINARY(32) NOT NULL, + PRIMARY KEY (id) +); + /* manager */ DROP TABLE IF EXISTS mg_email; @@ -140,6 +148,7 @@ DROP TABLE IF EXISTS mg_spendings; DROP TABLE IF EXISTS mg_balances; DROP TABLE IF EXISTS mg_prices; DROP TABLE IF EXISTS mg_maintenance; +DROP TABLE IF EXISTS mg_tip; CREATE TABLE mg_email ( id INT NOT NULL AUTO_INCREMENT, @@ -213,6 +222,13 @@ CREATE TABLE mg_maintenance ( PRIMARY KEY (id) ); +CREATE TABLE mg_tip ( + id INT NOT NULL AUTO_INCREMENT, + height BIGINT UNSIGNED NOT NULL, + bid BINARY(32) NOT NULL, + PRIMARY KEY (id) +); + /* hostdb */ DROP TABLE IF EXISTS hdb_scanhistory; diff --git a/modules/manager/contractor/contractor.go b/modules/manager/contractor/contractor.go index ac20495..62104c5 100644 --- a/modules/manager/contractor/contractor.go +++ b/modules/manager/contractor/contractor.go @@ -251,7 +251,6 @@ func (c *Contractor) Close() error { // New returns a new Contractor. func New(db *sql.DB, cm *chain.Manager, s modules.Syncer, m modules.Manager, wallet modules.Wallet, hdb modules.HostDB, dir string) (*Contractor, <-chan error) { errChan := make(chan error, 1) - defer close(errChan) // Create the logger. logger, closeFn, err := persist.NewFileLogger(filepath.Join(dir, "contractor.log")) @@ -280,16 +279,44 @@ func New(db *sql.DB, cm *chain.Manager, s modules.Syncer, m modules.Manager, wal }) // Non-blocking startup. + reorgChan := make(chan struct{}, 1) + reorgChan <- struct{}{} + unsubscribe := cm.OnReorg(func(_ types.ChainIndex) { + select { + case reorgChan <- struct{}{}: + default: + } + }) + go func() { - // Subscribe to the consensus set in a separate goroutine. + defer unsubscribe() + defer close(errChan) + if err := c.tg.Add(); err != nil { c.log.Error("couldn't start a thread", zap.Error(err)) return } defer c.tg.Done() - err := contractorAsyncStartup(c) - if err != nil { - c.log.Error("couldn't start contractor", zap.Error(err)) + + for { + select { + case <-c.tg.StopChan(): + return + case <-reorgChan: + } + + err := c.sync(c.tip) + if err != nil { + // Reset the contractor consensus variables and try rescanning. + c.mu.Lock() + c.tip = types.ChainIndex{} + c.mu.Unlock() + err = c.sync(c.tip) + } + if err != nil { + c.log.Error("couldn't sync contractor", zap.Error(err)) + errChan <- err + } } }() @@ -377,17 +404,6 @@ func (c *Contractor) sync(index types.ChainIndex) error { return nil } -// contractorAsyncStartup handles the async portion of New. -func contractorAsyncStartup(c *Contractor) error { - err := c.sync(c.tip) - if err != nil { - // Reset the contractor consensus variables and try rescanning. - c.tip = types.ChainIndex{} - err = c.sync(c.tip) - } - return err -} - // managedSynced returns true if the contractor is synced with the consensusset. func (c *Contractor) managedSynced() bool { c.mu.RLock() diff --git a/modules/manager/database.go b/modules/manager/database.go index 1b89e58..1bef251 100644 --- a/modules/manager/database.go +++ b/modules/manager/database.go @@ -91,7 +91,7 @@ func dbGetAverages(tx *sql.Tx) (avg modules.HostAverages, err error) { return } - d := types.NewDecoder(io.LimitedReader{R: bytes.NewBuffer(avgBytes), N: int64(len(avgBytes))}) + d := types.NewBufDecoder(avgBytes) avg.DecodeFrom(d) err = d.Err() @@ -110,6 +110,23 @@ func dbPutAverages(tx *sql.Tx, avg modules.HostAverages) error { return err } +// dbGetTip retrieves the last saved chain index. +func dbGetTip(tx *sql.Tx) (tip types.ChainIndex, err error) { + bid := make([]byte, 32) + err = tx.QueryRow("SELECT height, bid FROM mg_tip WHERE id = 1").Scan(&tip.Height, &bid) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return + } + copy(tip.ID[:], bid) + return tip, nil +} + +// dbPutTip saves the provided chain index. +func dbPutTip(tx *sql.Tx, tip types.ChainIndex) error { + _, err := tx.Exec("REPLACE INTO mg_tip (id, height, bid) VALUES (1, ?, ?)", tip.Height, tip.ID[:]) + return err +} + // GetBalance retrieves the balance information on the account. // An empty struct is returned when there is no data. func (m *Manager) GetBalance(email string) (modules.UserBalance, error) { @@ -340,8 +357,7 @@ func (m *Manager) getEmailPreferences() error { return err } - buf := bytes.NewBuffer(b) - d := types.NewDecoder(io.LimitedReader{R: buf, N: 24}) + d := types.NewBufDecoder(b) var threshold types.Currency (*types.V1Currency)(&threshold).DecodeFrom(d) m.email = email diff --git a/modules/manager/hostdb/hostdb.go b/modules/manager/hostdb/hostdb.go index e672a73..f9e1601 100644 --- a/modules/manager/hostdb/hostdb.go +++ b/modules/manager/hostdb/hostdb.go @@ -372,24 +372,6 @@ func hostdbBlockingStartup(db *sql.DB, cm *chain.Manager, s modules.Syncer, dir return hdb, nil } -// hostdbAsyncStartup handles the async portion of New. -func hostdbAsyncStartup(hdb *HostDB) error { - err := hdb.sync(hdb.tip) - if err != nil { - // Subscribe again using the new ID. This will cause a triggered scan - // on all of the hosts, but that should be acceptable. - hdb.mu.Lock() - hdb.tip = types.ChainIndex{} - err = hdb.reset() - hdb.mu.Unlock() - if err != nil { - return err - } - err = hdb.sync(hdb.tip) - } - return err -} - func (hdb *HostDB) sync(index types.ChainIndex) error { for index != hdb.cm.Tip() { select { @@ -424,17 +406,51 @@ func New(db *sql.DB, cm *chain.Manager, s modules.Syncer, dir string) (*HostDB, } // Non-blocking startup. + reorgChan := make(chan struct{}, 1) + reorgChan <- struct{}{} + unsubscribe := cm.OnReorg(func(_ types.ChainIndex) { + select { + case reorgChan <- struct{}{}: + default: + } + }) + go func() { + defer unsubscribe() defer close(errChan) if err := hdb.tg.Add(); err != nil { errChan <- err return } defer hdb.tg.Done() - // Subscribe to the consensus set in a separate goroutine. - err := hostdbAsyncStartup(hdb) - if err != nil { - errChan <- err + + for { + select { + case <-hdb.tg.StopChan(): + return + case <-reorgChan: + } + + err := hdb.sync(hdb.tip) + if err != nil { + // Subscribe again using the new ID. This will cause a triggered scan + // on all of the hosts, but that should be acceptable. + hdb.mu.Lock() + hdb.tip = types.ChainIndex{} + err = hdb.reset() + hdb.mu.Unlock() + if err != nil { + hdb.log.Error("failed to reset HostDB", zap.Error(err)) + errChan <- err + return + } + err = hdb.sync(hdb.tip) + if err != nil { + hdb.log.Error("failed to sync HostDB", zap.Error(err)) + errChan <- err + return + } + } } }() diff --git a/modules/manager/manager.go b/modules/manager/manager.go index 60aac9f..32f810c 100644 --- a/modules/manager/manager.go +++ b/modules/manager/manager.go @@ -177,6 +177,7 @@ type Manager struct { maintenance bool bufferSize uint64 multipartUploads map[types.Hash256]struct{} + tip types.ChainIndex // Block heights at the start of the current and the previous months. currentMonth blockTimestamp diff --git a/modules/manager/persist.go b/modules/manager/persist.go index b14eb07..d901186 100644 --- a/modules/manager/persist.go +++ b/modules/manager/persist.go @@ -114,6 +114,10 @@ func (m *Manager) initPersist(dir string) error { }) // Load the persisted data. + m.tip, err = dbGetTip(m.dbTx) + if err != nil { + return modules.AddContext(err, "couldn't load tip") + } m.currentMonth, m.prevMonth, err = dbGetBlockTimestamps(m.dbTx) if err != nil { return modules.AddContext(err, "couldn't load block timestamps") @@ -139,12 +143,36 @@ func (m *Manager) initPersist(dir string) error { go m.threadedCheckOutOfSync() // Subscribe to the consensus set using the most recent consensus change. + reorgChan := make(chan struct{}, 1) + reorgChan <- struct{}{} + unsubscribe := m.cm.OnReorg(func(_ types.ChainIndex) { + select { + case reorgChan <- struct{}{}: + default: + } + }) + go func() { - err := m.sync(m.cm.Tip()) - if err != nil { - m.log.Error("failed to subscribe", zap.Error(err)) + defer unsubscribe() + + if err := m.tg.Add(); err != nil { + m.log.Error("couldn't start a thread", zap.Error(err)) return } + defer m.tg.Done() + + for { + select { + case <-m.tg.StopChan(): + return + case <-reorgChan: + } + + err := m.sync(m.tip) + if err != nil { + m.log.Error("failed to sync manager", zap.Error(err)) + } + } }() return nil @@ -167,6 +195,11 @@ func (m *Manager) sync(index types.ChainIndex) error { } if len(caus) > 0 { index = caus[len(caus)-1].State.Index + m.tip = index + if err := dbPutTip(m.dbTx, index); err != nil { + m.log.Error("failed to save tip", zap.Error(err)) + return err + } } } return nil diff --git a/modules/portal/database.go b/modules/portal/database.go index 30517c8..82e22f9 100644 --- a/modules/portal/database.go +++ b/modules/portal/database.go @@ -315,6 +315,16 @@ func (p *Portal) addSiacoinPayment(email string, amount types.Currency, txid typ return errors.New("zero payment amount provided") } + // Check if the txid is unique. + var count int + err := p.db.QueryRow("SELECT COUNT(*) FROM pt_payments WHERE txid = ?", txid[:]).Scan(&count) + if err != nil { + return err + } + if count > 0 { + return nil + } + // Update the payments table. amt := modules.Float64(amount) / modules.Float64(types.HastingsPerSiacoin) if err := p.putPayment(email, amt, "SC", txid); err != nil { @@ -980,3 +990,20 @@ func (p *Portal) managedCheckAnnouncement() { p.log.Error("unable to expire announcement", zap.Error(err)) } } + +// loadTip retrieves the saved chain index. +func (p *Portal) loadTip() error { + bid := make([]byte, 32) + err := p.db.QueryRow("SELECT height, bid FROM pt_tip WHERE id = 1").Scan(&p.tip.Height, &bid) + if err != nil && !errors.Is(err, sql.ErrNoRows) { + return err + } + copy(p.tip.ID[:], bid) + return nil +} + +// saveTip saves the current chain index. +func (p *Portal) saveTip() error { + _, err := p.db.Exec("REPLACE INTO pt_tip (id, height, bid) VALUES (1, ?, ?)", p.tip.Height, p.tip.ID[:]) + return err +} diff --git a/modules/portal/persist.go b/modules/portal/persist.go index 8bfdc06..b036632 100644 --- a/modules/portal/persist.go +++ b/modules/portal/persist.go @@ -14,13 +14,15 @@ const ( // load loads the Portal's persistent data from disk. func (p *Portal) load() error { - err := p.loadStats() - if err != nil { + if err := p.loadTip(); err != nil { return err } - err = p.loadCredits() - if err != nil { + if err := p.loadStats(); err != nil { + return err + } + + if err := p.loadCredits(); err != nil { return err } diff --git a/modules/portal/portal.go b/modules/portal/portal.go index 51476d9..72427cb 100644 --- a/modules/portal/portal.go +++ b/modules/portal/portal.go @@ -32,6 +32,7 @@ type Portal struct { // Atomic stats. authStats map[string]authenticationStats credits modules.CreditData + tip types.ChainIndex // Watch list of SC payment transactions. transactions map[types.TransactionID]types.Address @@ -131,13 +132,39 @@ func New(config *persist.SatdConfig, db *sql.DB, ms mail.MailSender, cm *chain.M } // Subscribe to the consensus set using the most recent consensus change. + reorgChan := make(chan struct{}, 1) + reorgChan <- struct{}{} + unsubscribe := cm.OnReorg(func(_ types.ChainIndex) { + select { + case reorgChan <- struct{}{}: + default: + } + }) + go func() { - err := pt.sync(pt.cm.Tip()) - if err != nil { - pt.log.Error("couldn't subscribe to consensus updates", zap.Error(err)) + defer unsubscribe() + + if err := pt.tg.Add(); err != nil { + pt.log.Error("couldn't start a thread", zap.Error(err)) return } + defer pt.tg.Done() + + for { + select { + case <-pt.tg.StopChan(): + return + case <-reorgChan: + } + + err := pt.sync(pt.tip) + if err != nil { + pt.log.Error("couldn't sync portal", zap.Error(err)) + return + } + } }() + pt.tg.OnStop(func() { // We don't want any recently made payments to go unnoticed. pt.managedCheckWallet() @@ -147,6 +174,12 @@ func New(config *persist.SatdConfig, db *sql.DB, ms mail.MailSender, cm *chain.M } func (p *Portal) sync(index types.ChainIndex) error { + addrs, err := p.getSiacoinAddresses() + if err != nil { + p.log.Error("couldn't get account addresses", zap.Error(err)) + return err + } + for index != p.cm.Tip() { select { case <-p.tg.StopChan(): @@ -157,12 +190,17 @@ func (p *Portal) sync(index types.ChainIndex) error { if err != nil { p.log.Error("failed to subscribe to chain manager", zap.Error(err)) return err - } else if err := p.UpdateChainState(crus, caus); err != nil { + } else if err := p.UpdateChainState(crus, caus, addrs); err != nil { p.log.Error("failed to update chain state", zap.Error(err)) return err } if len(caus) > 0 { index = caus[len(caus)-1].State.Index + p.tip = index + if err := p.saveTip(); err != nil { + p.log.Error("failed to save tip", zap.Error(err)) + return err + } } } return nil diff --git a/modules/portal/update.go b/modules/portal/update.go index 3269df4..3b76b43 100644 --- a/modules/portal/update.go +++ b/modules/portal/update.go @@ -3,6 +3,7 @@ package portal import ( "time" + "go.sia.tech/core/types" "go.sia.tech/coreutils/chain" "go.uber.org/zap" ) @@ -68,13 +69,7 @@ func (p *Portal) managedCheckWallet() { } // UpdateChainState applies or reverts the updates from the ChainManager. -func (p *Portal) UpdateChainState(reverted []chain.RevertUpdate, applied []chain.ApplyUpdate) error { - addrs, err := p.getSiacoinAddresses() - if err != nil { - p.log.Error("couldn't get account addresses", zap.Error(err)) - return err - } - +func (p *Portal) UpdateChainState(reverted []chain.RevertUpdate, applied []chain.ApplyUpdate, addrs map[types.Address]string) error { p.mu.Lock() defer p.mu.Unlock()