Skip to content

Commit

Permalink
Fix syncing
Browse files Browse the repository at this point in the history
  • Loading branch information
mike76-dev committed May 5, 2024
1 parent 5a0f763 commit b998b09
Show file tree
Hide file tree
Showing 10 changed files with 219 additions and 59 deletions.
16 changes: 16 additions & 0 deletions init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ DROP TABLE IF EXISTS pt_accounts;
DROP TABLE IF EXISTS pt_stats;
DROP TABLE IF EXISTS pt_credits;
DROP TABLE IF EXISTS pt_announcement;
DROP TABLE IF EXISTS pt_tip;

CREATE TABLE pt_accounts (
id INT NOT NULL AUTO_INCREMENT,
Expand Down Expand Up @@ -131,6 +132,13 @@ CREATE TABLE pt_announcement (
PRIMARY KEY (id)
);

CREATE TABLE pt_tip (
id INT NOT NULL AUTO_INCREMENT,
height BIGINT UNSIGNED NOT NULL,
bid BINARY(32) NOT NULL,
PRIMARY KEY (id)
);

/* manager */

DROP TABLE IF EXISTS mg_email;
Expand All @@ -140,6 +148,7 @@ DROP TABLE IF EXISTS mg_spendings;
DROP TABLE IF EXISTS mg_balances;
DROP TABLE IF EXISTS mg_prices;
DROP TABLE IF EXISTS mg_maintenance;
DROP TABLE IF EXISTS mg_tip;

CREATE TABLE mg_email (
id INT NOT NULL AUTO_INCREMENT,
Expand Down Expand Up @@ -213,6 +222,13 @@ CREATE TABLE mg_maintenance (
PRIMARY KEY (id)
);

CREATE TABLE mg_tip (
id INT NOT NULL AUTO_INCREMENT,
height BIGINT UNSIGNED NOT NULL,
bid BINARY(32) NOT NULL,
PRIMARY KEY (id)
);

/* hostdb */

DROP TABLE IF EXISTS hdb_scanhistory;
Expand Down
48 changes: 32 additions & 16 deletions modules/manager/contractor/contractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,6 @@ func (c *Contractor) Close() error {
// New returns a new Contractor.
func New(db *sql.DB, cm *chain.Manager, s modules.Syncer, m modules.Manager, wallet modules.Wallet, hdb modules.HostDB, dir string) (*Contractor, <-chan error) {
errChan := make(chan error, 1)
defer close(errChan)

// Create the logger.
logger, closeFn, err := persist.NewFileLogger(filepath.Join(dir, "contractor.log"))
Expand Down Expand Up @@ -280,16 +279,44 @@ func New(db *sql.DB, cm *chain.Manager, s modules.Syncer, m modules.Manager, wal
})

// Non-blocking startup.
reorgChan := make(chan struct{}, 1)
reorgChan <- struct{}{}
unsubscribe := cm.OnReorg(func(_ types.ChainIndex) {
select {
case reorgChan <- struct{}{}:
default:
}
})

go func() {
// Subscribe to the consensus set in a separate goroutine.
defer unsubscribe()
defer close(errChan)

if err := c.tg.Add(); err != nil {
c.log.Error("couldn't start a thread", zap.Error(err))
return
}
defer c.tg.Done()
err := contractorAsyncStartup(c)
if err != nil {
c.log.Error("couldn't start contractor", zap.Error(err))

for {
select {
case <-c.tg.StopChan():
return
case <-reorgChan:
}

err := c.sync(c.tip)
if err != nil {
// Reset the contractor consensus variables and try rescanning.
c.mu.Lock()
c.tip = types.ChainIndex{}
c.mu.Unlock()
err = c.sync(c.tip)
}
if err != nil {
c.log.Error("couldn't sync contractor", zap.Error(err))
errChan <- err
}
}
}()

Expand Down Expand Up @@ -377,17 +404,6 @@ func (c *Contractor) sync(index types.ChainIndex) error {
return nil
}

// contractorAsyncStartup handles the async portion of New.
func contractorAsyncStartup(c *Contractor) error {
err := c.sync(c.tip)
if err != nil {
// Reset the contractor consensus variables and try rescanning.
c.tip = types.ChainIndex{}
err = c.sync(c.tip)
}
return err
}

// managedSynced returns true if the contractor is synced with the consensusset.
func (c *Contractor) managedSynced() bool {
c.mu.RLock()
Expand Down
22 changes: 19 additions & 3 deletions modules/manager/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func dbGetAverages(tx *sql.Tx) (avg modules.HostAverages, err error) {
return
}

d := types.NewDecoder(io.LimitedReader{R: bytes.NewBuffer(avgBytes), N: int64(len(avgBytes))})
d := types.NewBufDecoder(avgBytes)
avg.DecodeFrom(d)
err = d.Err()

Expand All @@ -110,6 +110,23 @@ func dbPutAverages(tx *sql.Tx, avg modules.HostAverages) error {
return err
}

// dbGetTip retrieves the last saved chain index.
func dbGetTip(tx *sql.Tx) (tip types.ChainIndex, err error) {
bid := make([]byte, 32)
err = tx.QueryRow("SELECT height, bid FROM mg_tip WHERE id = 1").Scan(&tip.Height, &bid)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return
}
copy(tip.ID[:], bid)
return tip, nil
}

// dbPutTip saves the provided chain index.
func dbPutTip(tx *sql.Tx, tip types.ChainIndex) error {
_, err := tx.Exec("REPLACE INTO mg_tip (id, height, bid) VALUES (1, ?, ?)", tip.Height, tip.ID[:])
return err
}

// GetBalance retrieves the balance information on the account.
// An empty struct is returned when there is no data.
func (m *Manager) GetBalance(email string) (modules.UserBalance, error) {
Expand Down Expand Up @@ -340,8 +357,7 @@ func (m *Manager) getEmailPreferences() error {
return err
}

buf := bytes.NewBuffer(b)
d := types.NewDecoder(io.LimitedReader{R: buf, N: 24})
d := types.NewBufDecoder(b)
var threshold types.Currency
(*types.V1Currency)(&threshold).DecodeFrom(d)
m.email = email
Expand Down
60 changes: 38 additions & 22 deletions modules/manager/hostdb/hostdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,24 +372,6 @@ func hostdbBlockingStartup(db *sql.DB, cm *chain.Manager, s modules.Syncer, dir
return hdb, nil
}

// hostdbAsyncStartup handles the async portion of New.
func hostdbAsyncStartup(hdb *HostDB) error {
err := hdb.sync(hdb.tip)
if err != nil {
// Subscribe again using the new ID. This will cause a triggered scan
// on all of the hosts, but that should be acceptable.
hdb.mu.Lock()
hdb.tip = types.ChainIndex{}
err = hdb.reset()
hdb.mu.Unlock()
if err != nil {
return err
}
err = hdb.sync(hdb.tip)
}
return err
}

func (hdb *HostDB) sync(index types.ChainIndex) error {
for index != hdb.cm.Tip() {
select {
Expand Down Expand Up @@ -424,17 +406,51 @@ func New(db *sql.DB, cm *chain.Manager, s modules.Syncer, dir string) (*HostDB,
}

// Non-blocking startup.
reorgChan := make(chan struct{}, 1)
reorgChan <- struct{}{}
unsubscribe := cm.OnReorg(func(_ types.ChainIndex) {
select {
case reorgChan <- struct{}{}:
default:
}
})

go func() {
defer unsubscribe()
defer close(errChan)
if err := hdb.tg.Add(); err != nil {
errChan <- err
return
}
defer hdb.tg.Done()
// Subscribe to the consensus set in a separate goroutine.
err := hostdbAsyncStartup(hdb)
if err != nil {
errChan <- err

for {
select {
case <-hdb.tg.StopChan():
return
case <-reorgChan:
}

err := hdb.sync(hdb.tip)
if err != nil {
// Subscribe again using the new ID. This will cause a triggered scan
// on all of the hosts, but that should be acceptable.
hdb.mu.Lock()
hdb.tip = types.ChainIndex{}
err = hdb.reset()
hdb.mu.Unlock()
if err != nil {
hdb.log.Error("failed to reset HostDB", zap.Error(err))
errChan <- err
return
}
err = hdb.sync(hdb.tip)
if err != nil {
hdb.log.Error("failed to sync HostDB", zap.Error(err))
errChan <- err
return
}
}
}
}()

Expand Down
1 change: 1 addition & 0 deletions modules/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ type Manager struct {
maintenance bool
bufferSize uint64
multipartUploads map[types.Hash256]struct{}
tip types.ChainIndex

// Block heights at the start of the current and the previous months.
currentMonth blockTimestamp
Expand Down
39 changes: 36 additions & 3 deletions modules/manager/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ func (m *Manager) initPersist(dir string) error {
})

// Load the persisted data.
m.tip, err = dbGetTip(m.dbTx)
if err != nil {
return modules.AddContext(err, "couldn't load tip")
}
m.currentMonth, m.prevMonth, err = dbGetBlockTimestamps(m.dbTx)
if err != nil {
return modules.AddContext(err, "couldn't load block timestamps")
Expand All @@ -139,12 +143,36 @@ func (m *Manager) initPersist(dir string) error {
go m.threadedCheckOutOfSync()

// Subscribe to the consensus set using the most recent consensus change.
reorgChan := make(chan struct{}, 1)
reorgChan <- struct{}{}
unsubscribe := m.cm.OnReorg(func(_ types.ChainIndex) {
select {
case reorgChan <- struct{}{}:
default:
}
})

go func() {
err := m.sync(m.cm.Tip())
if err != nil {
m.log.Error("failed to subscribe", zap.Error(err))
defer unsubscribe()

if err := m.tg.Add(); err != nil {
m.log.Error("couldn't start a thread", zap.Error(err))
return
}
defer m.tg.Done()

for {
select {
case <-m.tg.StopChan():
return
case <-reorgChan:
}

err := m.sync(m.tip)
if err != nil {
m.log.Error("failed to sync manager", zap.Error(err))
}
}
}()

return nil
Expand All @@ -167,6 +195,11 @@ func (m *Manager) sync(index types.ChainIndex) error {
}
if len(caus) > 0 {
index = caus[len(caus)-1].State.Index
m.tip = index
if err := dbPutTip(m.dbTx, index); err != nil {
m.log.Error("failed to save tip", zap.Error(err))
return err
}
}
}
return nil
Expand Down
27 changes: 27 additions & 0 deletions modules/portal/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,16 @@ func (p *Portal) addSiacoinPayment(email string, amount types.Currency, txid typ
return errors.New("zero payment amount provided")
}

// Check if the txid is unique.
var count int
err := p.db.QueryRow("SELECT COUNT(*) FROM pt_payments WHERE txid = ?", txid[:]).Scan(&count)
if err != nil {
return err
}
if count > 0 {
return nil
}

// Update the payments table.
amt := modules.Float64(amount) / modules.Float64(types.HastingsPerSiacoin)
if err := p.putPayment(email, amt, "SC", txid); err != nil {
Expand Down Expand Up @@ -980,3 +990,20 @@ func (p *Portal) managedCheckAnnouncement() {
p.log.Error("unable to expire announcement", zap.Error(err))
}
}

// loadTip retrieves the saved chain index.
func (p *Portal) loadTip() error {
bid := make([]byte, 32)
err := p.db.QueryRow("SELECT height, bid FROM pt_tip WHERE id = 1").Scan(&p.tip.Height, &bid)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return err
}
copy(p.tip.ID[:], bid)
return nil
}

// saveTip saves the current chain index.
func (p *Portal) saveTip() error {
_, err := p.db.Exec("REPLACE INTO pt_tip (id, height, bid) VALUES (1, ?, ?)", p.tip.Height, p.tip.ID[:])
return err
}
10 changes: 6 additions & 4 deletions modules/portal/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ const (

// load loads the Portal's persistent data from disk.
func (p *Portal) load() error {
err := p.loadStats()
if err != nil {
if err := p.loadTip(); err != nil {
return err
}

err = p.loadCredits()
if err != nil {
if err := p.loadStats(); err != nil {
return err
}

if err := p.loadCredits(); err != nil {
return err
}

Expand Down
Loading

0 comments on commit b998b09

Please sign in to comment.