Skip to content

Commit

Permalink
sqlite: better update proof logic
Browse files Browse the repository at this point in the history
  • Loading branch information
n8maninger committed Jan 11, 2024
1 parent 3b7f51d commit 25883f9
Showing 1 changed file with 39 additions and 38 deletions.
77 changes: 39 additions & 38 deletions persist/sqlite/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,38 @@ func updateLastIndexedTip(tx txn, tip types.ChainIndex) error {
return err
}

func getStateElementBatch(stmt *loggedStmt, offset, limit int) ([]types.StateElement, error) {
rows, err := stmt.Query(limit, offset)
if err != nil {
return nil, fmt.Errorf("failed to query siacoin elements: %w", err)
}
defer rows.Close()

var updated []types.StateElement
for rows.Next() {
var se types.StateElement
err := rows.Scan(decode(&se.ID), decodeSlice(&se.MerkleProof), &se.LeafIndex)
if err != nil {
return nil, fmt.Errorf("failed to scan state element: %w", err)
}
updated = append(updated, se)
}
return updated, nil
}

func updateStateElement(stmt *loggedStmt, se types.StateElement) error {
res, err := stmt.Exec(encodeSlice(se.MerkleProof), se.LeafIndex, encode(se.ID))
if err != nil {
return fmt.Errorf("failed to update siacoin element %q: %w", se.ID, err)
} else if n, err := res.RowsAffected(); err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
} else if n != 1 {
return fmt.Errorf("expected 1 row to be affected, got %d", n)
}
return nil
}

// how slow is this going to be 😬?
//
// todo: determine if it's feasible for exchange mode to keep everything in
// memory.
func updateElementProofs(tx txn, table string, updater proofUpdater) error {
stmt, err := tx.Prepare(`SELECT id, merkle_proof, leaf_index FROM ` + table + ` LIMIT $1 OFFSET $2`)
if err != nil {
Expand All @@ -267,47 +295,20 @@ func updateElementProofs(tx txn, table string, updater proofUpdater) error {
}
defer updateStmt.Close()

var updated []types.StateElement
for offset := 0; ; offset += updateProofBatchSize {
updated = updated[:0]

more, err := func(n int) (bool, error) {
rows, err := stmt.Query(updateProofBatchSize, n)
if err != nil {
return false, fmt.Errorf("failed to query siacoin elements: %w", err)
}
defer rows.Close()

var more bool
for rows.Next() {
// if we get here, there may be more rows to process
more = true

var se types.StateElement
err := rows.Scan(decode(&se.ID), decodeSlice(&se.MerkleProof), &se.LeafIndex)
if err != nil {
return false, fmt.Errorf("failed to scan state element: %w", err)
}
updater.UpdateElementProof(&se)
updated = append(updated, se)
}
return more, nil
}(offset)
elements, err := getStateElementBatch(stmt, offset, updateProofBatchSize)
if err != nil {
return err
return fmt.Errorf("failed to get state element batch: %w", err)
} else if len(elements) == 0 {
break
}

for _, se := range updated {
var dummy types.Hash256
err := updateStmt.QueryRow(encodeSlice(se.MerkleProof), se.LeafIndex, encode(se.ID)).Scan(decode(&dummy))
if err != nil {
return fmt.Errorf("failed to update siacoin element %q: %w", se.ID, err)
for _, se := range elements {
updater.UpdateElementProof(&se)
if err := updateStateElement(updateStmt, se); err != nil {
return fmt.Errorf("failed to update state element: %w", err)
}
}

if !more {
break
}
}
return nil
}
Expand Down

0 comments on commit 25883f9

Please sign in to comment.