Skip to content

Commit

Permalink
Merge pull request #104 from SiaFoundation/nate/remove-orphans
Browse files Browse the repository at this point in the history
Remove orphaned blocks during rescan
  • Loading branch information
n8maninger authored Apr 18, 2024
2 parents 1418f6e + fbc4b9e commit dd70621
Show file tree
Hide file tree
Showing 15 changed files with 1,358 additions and 872 deletions.
9 changes: 9 additions & 0 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func TestWalletAdd(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer wm.Close()

c, shutdown := runServer(cm, nil, wm)
defer shutdown()
Expand Down Expand Up @@ -279,6 +280,7 @@ func TestWallet(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer wm.Close()

// create seed address vault
sav := wallet.NewSeedAddressVault(wallet.NewSeed(), 0, 20)
Expand Down Expand Up @@ -500,6 +502,7 @@ func TestAddresses(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer wm.Close()

sav := wallet.NewSeedAddressVault(wallet.NewSeed(), 0, 20)
c, shutdown := runServer(cm, nil, wm)
Expand Down Expand Up @@ -696,6 +699,8 @@ func TestV2(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer wm.Close()

c, shutdown := runServer(cm, nil, wm)
defer shutdown()
primaryWallet, err := c.AddWallet(api.WalletUpdateRequest{Name: "primary"})
Expand Down Expand Up @@ -920,6 +925,8 @@ func TestP2P(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer wm1.Close()

l1, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -961,6 +968,8 @@ func TestP2P(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer wm2.Close()

l2, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatal(err)
Expand Down
4 changes: 2 additions & 2 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type (
// A WalletManager manages wallets, keyed by name.
WalletManager interface {
Tip() (types.ChainIndex, error)
Scan(index types.ChainIndex) error
Scan(_ context.Context, index types.ChainIndex) error

AddWallet(wallet.Wallet) (wallet.Wallet, error)
UpdateWallet(wallet.Wallet) (wallet.Wallet, error)
Expand Down Expand Up @@ -308,7 +308,7 @@ func (s *server) rescanHandlerPOST(jc jape.Context) {
}

go func() {
err := s.wm.Scan(index)
err := s.wm.Scan(context.Background(), index)

// update the scan state
s.scanMu.Lock()
Expand Down
1 change: 0 additions & 1 deletion cmd/walletd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ func main() {
<-signalCh
log.Println("Shutting down...")
stop()

case versionCmd:
if len(cmd.Args()) != 0 {
cmd.Usage()
Expand Down
1 change: 1 addition & 0 deletions cmd/walletd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type node struct {

// Close shuts down the node and closes its database.
func (n *node) Close() error {
n.wm.Close()
n.chainStore.Close()
return n.store.Close()
}
Expand Down
110 changes: 110 additions & 0 deletions internal/threadgroup/threadgroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Package threadgroup exposes a ThreadGroup object which can be used to
// facilitate clean shutdown. A ThreadGroup is similar to a sync.WaitGroup,
// but with two important additions: The ability to detect when shutdown has
// been initiated, and protections against adding more threads after shutdown
// has completed.
//
// ThreadGroup was designed with the following shutdown sequence in mind:
//
// 1. Call Stop, signaling that shutdown has begun. After Stop is called, no
// new goroutines should be created.
//
// 2. Wait for Stop to return. When Stop returns, all goroutines should have
// returned.
//
// 3. Free any resources used by the goroutines.
package threadgroup

import (
"context"
"errors"
"sync"
)

type (
// A ThreadGroup is a sync.WaitGroup with additional functionality for
// facilitating clean shutdown.
ThreadGroup struct {
mu sync.Mutex
wg sync.WaitGroup
closed chan struct{}
}
)

// ErrClosed is returned when the threadgroup has already been stopped
var ErrClosed = errors.New("threadgroup closed")

// Done returns a channel that will be closed when the threadgroup is stopped
func (tg *ThreadGroup) Done() <-chan struct{} {
return tg.closed
}

// Add adds a new thread to the group, done must be called to signal that the
// thread is done. Returns ErrClosed if the threadgroup is already closed.
func (tg *ThreadGroup) Add() (func(), error) {
tg.mu.Lock()
defer tg.mu.Unlock()
select {
case <-tg.closed:
return nil, ErrClosed
default:
}
tg.wg.Add(1)
return func() { tg.wg.Done() }, nil
}

// WithContext returns a copy of the parent context. The returned context will
// be cancelled if the parent context is cancelled or if the threadgroup is
// stopped.
func (tg *ThreadGroup) WithContext(parent context.Context) (context.Context, context.CancelFunc) {
// wrap the parent context in a cancellable context
ctx, cancel := context.WithCancel(parent)
// start a goroutine to wait for either the parent context being cancelled
// or the threagroup being stopped
go func() {
select {
case <-ctx.Done():
case <-tg.closed:
}
cancel() // threadgroup is stopping or context cancelled, cancel the context
}()
return ctx, cancel
}

// AddWithContext adds a new thread to the group and returns a copy of the parent
// context. It is a convenience function combining Add and WithContext.
func (tg *ThreadGroup) AddWithContext(parent context.Context) (context.Context, context.CancelFunc, error) {
// try to add to the group
done, err := tg.Add()
if err != nil {
return nil, nil, err
}

ctx, cancel := tg.WithContext(parent)
var once sync.Once
return ctx, func() {
cancel()
// it must be safe to call cancel multiple times, but it is not safe to
// call done multiple times since it's decrementing the waitgroup
once.Do(done)
}, nil
}

// Stop stops accepting new threads and waits for all existing threads to close
func (tg *ThreadGroup) Stop() {
tg.mu.Lock()
select {
case <-tg.closed:
default:
close(tg.closed)
}
tg.mu.Unlock()
tg.wg.Wait()
}

// New creates a new threadgroup
func New() *ThreadGroup {
return &ThreadGroup{
closed: make(chan struct{}),
}
}
89 changes: 89 additions & 0 deletions internal/threadgroup/threadgroup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package threadgroup

import (
"context"
"errors"
"testing"
"time"
)

func TestThreadgroup(t *testing.T) {
tg := New()

for i := 0; i < 10; i++ {
done, err := tg.Add()
if err != nil {
t.Fatal(err)
}
time.AfterFunc(100*time.Millisecond, done)
}
start := time.Now()
tg.Stop()
if time.Since(start) < 100*time.Millisecond {
t.Fatal("expected stop to wait for all threads to complete")
}

_, err := tg.Add()
if !errors.Is(err, ErrClosed) {
t.Fatalf("expected ErrClosed, got %v", err)
}
}

func TestThreadgroupContext(t *testing.T) {
tg := New()

t.Run("context cancel", func(t *testing.T) {
ctx, cancel, err := tg.AddWithContext(context.Background())
if err != nil {
t.Fatal(err)
}
defer cancel()

time.AfterFunc(100*time.Millisecond, cancel)

select {
case <-ctx.Done():
if !errors.Is(ctx.Err(), context.Canceled) {
t.Fatalf("expected Canceled, got %v", ctx.Err())
}
case <-time.After(time.Second):
t.Fatal("expected context to be cancelled")
}
})

t.Run("parent cancel", func(t *testing.T) {
parentCtx, parentCancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer parentCancel()

ctx, cancel, err := tg.AddWithContext(parentCtx)
if err != nil {
t.Fatal(err)
}
defer cancel()

select {
case <-ctx.Done():
if !errors.Is(ctx.Err(), context.DeadlineExceeded) {
t.Fatalf("expected DeadlineExceeded, got %v", ctx.Err())
}
case <-time.After(time.Second):
t.Fatal("expected context to be cancelled")
}
})

t.Run("stop", func(t *testing.T) {
for i := 0; i < 10; i++ {
_, cancel, err := tg.AddWithContext(context.Background())
if err != nil {
t.Fatal(err)
}
time.AfterFunc(100*time.Millisecond, cancel)
}

start := time.Now()
tg.Stop()
if time.Since(start) < 100*time.Millisecond {
t.Fatal("expected threadgroup to wait until all threads complete")
}
})
}
3 changes: 1 addition & 2 deletions persist/sqlite/addresses.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ func (s *Store) AddressSiafundOutputs(address types.Address, offset, limit int)
defer rows.Close()

for rows.Next() {
var siafund types.SiafundElement
err := rows.Scan(decode(&siafund.ID), &siafund.LeafIndex, decodeSlice(&siafund.MerkleProof), &siafund.SiafundOutput.Value, decode(&siafund.ClaimStart), decode(&siafund.SiafundOutput.Address))
siafund, err := scanSiafundElement(rows)
if err != nil {
return fmt.Errorf("failed to scan siafund element: %w", err)
}
Expand Down
Loading

0 comments on commit dd70621

Please sign in to comment.