Skip to content
This repository has been archived by the owner on Nov 2, 2018. It is now read-only.

Commit

Permalink
Merge pull request #1364 from NebulousLabs/consensus-threadgroup
Browse files Browse the repository at this point in the history
Consensus threadgroup
  • Loading branch information
lukechampine authored Jul 25, 2016
2 parents 25fb653 + b058896 commit bd4b222
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 48 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ language: go

os:
- linux
- osx

go:
- 1.6
Expand Down
20 changes: 11 additions & 9 deletions api/ecosystem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
// synchronizationCheck takes a bunch of server testers as input and checks
// that they all have the same current block as the first server tester. The
// first server tester needs to have the most recent block in order for the
// test to work.
// check to work.
func synchronizationCheck(sts []*serverTester) (types.BlockID, error) {
// Prefer returning an error in the event of a zero-length server tester -
// an error should be returned if the developer accidentally uses a nil
Expand All @@ -34,7 +34,7 @@ func synchronizationCheck(sts []*serverTester) (types.BlockID, error) {
for i := range sts {
// Spin until the current block matches the leader block.
success := false
for j := 0; j < 50; j++ {
for j := 0; j < 100; j++ {
err = sts[i].getAPI("/consensus", &cg)
if err != nil {
return types.BlockID{}, err
Expand All @@ -43,7 +43,7 @@ func synchronizationCheck(sts []*serverTester) (types.BlockID, error) {
success = true
break
}
time.Sleep(time.Millisecond * 50)
time.Sleep(time.Millisecond * 100)
}
if !success {
return types.BlockID{}, errors.New("synchronization check failed - nodes do not seem to be synchronized")
Expand All @@ -60,11 +60,9 @@ func synchronizationCheck(sts []*serverTester) (types.BlockID, error) {
// host must get a file contract to the blockchain despite not getting any of
// the dependencies into the transaction pool from the flood network.
func TestHostPoorConnectivity(t *testing.T) {
t.Skip("Necessary consensus functions unavailable")
if testing.Short() {
t.SkipNow()
}
t.Parallel()

// Create the various nodes that will be forming the simulated ecosystem of
// this test.
Expand Down Expand Up @@ -175,7 +173,7 @@ func TestHostPoorConnectivity(t *testing.T) {
// instead of creating orphans.
var cg ConsensusGET
success := false
for j := 0; j < 50; j++ {
for j := 0; j < 100; j++ {
err = allTesters[i].getAPI("/consensus", &cg)
if err != nil {
t.Fatal(err)
Expand All @@ -184,11 +182,15 @@ func TestHostPoorConnectivity(t *testing.T) {
success = true
break
}
time.Sleep(time.Millisecond * 50)
time.Sleep(time.Millisecond * 100)
}
if !success {
t.Fatal("nodes do not seem to be synchronizing")
}
err := allTesters[i].cs.Flush()
if err != nil {
t.Fatal(err)
}

// Mine a block for this node. The next iteration will wait for
// synchronization before mining the block for the next node.
Expand All @@ -202,7 +204,7 @@ func TestHostPoorConnectivity(t *testing.T) {
// Wait until the leader has the most recent block.
var cg ConsensusGET
success := false
for i := 0; i < 50; i++ {
for i := 0; i < 100; i++ {
err = allTesters[0].getAPI("/consensus", &cg)
if err != nil {
t.Fatal(err)
Expand All @@ -211,7 +213,7 @@ func TestHostPoorConnectivity(t *testing.T) {
success = true
break
}
time.Sleep(time.Millisecond * 50)
time.Sleep(time.Millisecond * 100)
}
if !success {
t.Fatal("nodes do not seem to be synchronizing")
Expand Down
4 changes: 4 additions & 0 deletions modules/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ type (
// blockchain.
CurrentBlock() types.Block

// Flush will cause the consensus set to finish all in-progress
// routines.
Flush() error

// Height returns the current height of consensus.
Height() types.BlockHeight

Expand Down
36 changes: 24 additions & 12 deletions modules/consensus/accept.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@ var (
errOrphan = errors.New("block has no known parent")
)

// managedBroadcastBlock will broadcast a block to the consensus set's peers.
func (cs *ConsensusSet) managedBroadcastBlock(b types.Block) {
// COMPATv0.5.1 - broadcast the block to all peers <= v0.5.1 and block header to all peers > v0.5.1.
var relayBlockPeers, relayHeaderPeers []modules.Peer
for _, p := range cs.gateway.Peers() {
if build.VersionCmp(p.Version, "0.5.1") <= 0 {
relayBlockPeers = append(relayBlockPeers, p)
} else {
relayHeaderPeers = append(relayHeaderPeers, p)
}
}
go cs.gateway.Broadcast("RelayBlock", b, relayBlockPeers)
go cs.gateway.Broadcast("RelayHeader", b.Header(), relayHeaderPeers)
}

// validateHeaderAndBlock does some early, low computation verification on the
// block. Callers should not assume that validation will happen in a particular
// order.
Expand Down Expand Up @@ -237,10 +252,11 @@ func (cs *ConsensusSet) managedAcceptBlock(b types.Block) error {
if err == errFutureTimestamp {
go func() {
time.Sleep(time.Duration(b.Timestamp-(types.CurrentTimestamp()+types.FutureThreshold)) * time.Second)
err := cs.AcceptBlock(b)
err := cs.managedAcceptBlock(b)
if err != nil {
cs.log.Debugln("WARN: failed to accept a future block:", err)
}
cs.managedBroadcastBlock(b)
}()
}
return err
Expand Down Expand Up @@ -283,20 +299,16 @@ func (cs *ConsensusSet) managedAcceptBlock(b types.Block) error {
// without error, it will be relayed to all connected peers. This function
// should only be called for new blocks.
func (cs *ConsensusSet) AcceptBlock(b types.Block) error {
err := cs.managedAcceptBlock(b)
err := cs.tg.Add()
if err != nil {
return err
}
// COMPATv0.5.1 - broadcast the block to all peers <= v0.5.1 and block header to all peers > v0.5.1.
var relayBlockPeers, relayHeaderPeers []modules.Peer
for _, p := range cs.gateway.Peers() {
if build.VersionCmp(p.Version, "0.5.1") <= 0 {
relayBlockPeers = append(relayBlockPeers, p)
} else {
relayHeaderPeers = append(relayHeaderPeers, p)
}
defer cs.tg.Done()

err = cs.managedAcceptBlock(b)
if err != nil {
return err
}
go cs.gateway.Broadcast("RelayBlock", b, relayBlockPeers)
go cs.gateway.Broadcast("RelayHeader", b.Header(), relayHeaderPeers)
cs.managedBroadcastBlock(b)
return nil
}
33 changes: 24 additions & 9 deletions modules/consensus/consensusset.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/NebulousLabs/Sia/encoding"
"github.com/NebulousLabs/Sia/modules"
"github.com/NebulousLabs/Sia/persist"
"github.com/NebulousLabs/Sia/sync"
"github.com/NebulousLabs/Sia/types"

"github.com/NebulousLabs/bolt"
Expand Down Expand Up @@ -81,6 +82,7 @@ type ConsensusSet struct {
log *persist.Logger
mu demotemutex.DemoteMutex
persistDir string
tg sync.ThreadGroup
}

// New returns a new ConsensusSet, containing at least the genesis block. If
Expand Down Expand Up @@ -141,9 +143,16 @@ func New(gateway modules.Gateway, persistDir string) (*ConsensusSet, error) {
// Register RPCs
gateway.RegisterRPC("SendBlocks", cs.rpcSendBlocks)
gateway.RegisterRPC("RelayBlock", cs.rpcRelayBlock) // COMPATv0.5.1
gateway.RegisterRPC("RelayHeader", cs.rpcRelayHeader)
gateway.RegisterRPC("RelayHeader", cs.threadedRPCRelayHeader)
gateway.RegisterRPC("SendBlk", cs.rpcSendBlk)
gateway.RegisterConnectCall("SendBlocks", cs.threadedReceiveBlocks)
cs.tg.OnStop(func() {
cs.gateway.UnregisterRPC("SendBlocks")
cs.gateway.UnregisterRPC("RelayBlock")
cs.gateway.UnregisterRPC("RelayHeader")
cs.gateway.UnregisterRPC("SendBlk")
cs.gateway.UnregisterConnectCall("SendBlocks")
})

// Mark that we are synced with the network.
cs.mu.Lock()
Expand Down Expand Up @@ -188,17 +197,17 @@ func (cs *ConsensusSet) ChildTarget(id types.BlockID) (target types.Target, exis

// Close safely closes the block database.
func (cs *ConsensusSet) Close() error {
err := cs.tg.Stop()
if err != nil {
return err
}

// Shouldn't be necessary when `Stop` call is complete, but currently
// `Stop` will not pause all ongoing processes, meaning a lock is needed
// during the rest of shutdown.
cs.mu.Lock()
defer cs.mu.Unlock()

if cs.synced {
cs.gateway.UnregisterRPC("SendBlocks")
cs.gateway.UnregisterRPC("RelayBlock") // COMPATv0.5.1
cs.gateway.UnregisterRPC("RelayHeader")
cs.gateway.UnregisterRPC("SendBlk")
cs.gateway.UnregisterConnectCall("SendBlocks")
}

var errs []error
if err := cs.db.Close(); err != nil {
errs = append(errs, fmt.Errorf("db.Close failed: %v", err))
Expand All @@ -222,6 +231,12 @@ func (cs *ConsensusSet) CurrentBlock() (block types.Block) {
return block
}

// Flush will block until the consensus set has finished all in-progress
// routines.
func (cs *ConsensusSet) Flush() error {
return cs.tg.Flush()
}

// Height returns the height of the consensus set.
func (cs *ConsensusSet) Height() (height types.BlockHeight) {
_ = cs.db.View(func(tx *bolt.Tx) error {
Expand Down
13 changes: 11 additions & 2 deletions modules/consensus/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,17 @@ func (cs *ConsensusSet) initializeSubscribe(subscriber modules.ConsensusSetSubsc
// As a special case, using an empty id as the start will have all the changes
// sent to the modules starting with the genesis block.
func (cs *ConsensusSet) ConsensusSetSubscribe(subscriber modules.ConsensusSetSubscriber, start modules.ConsensusChangeID) error {
err := cs.tg.Add()
if err != nil {
return err
}
defer cs.tg.Done()
cs.mu.Lock()
defer cs.mu.Unlock()
cs.subscribers = append(cs.subscribers, subscriber)

// Get the input module caught up to the currenct consnesus set.
err := cs.initializeSubscribe(subscriber, start)
cs.subscribers = append(cs.subscribers, subscriber)
err = cs.initializeSubscribe(subscriber, start)
if err != nil {
// Remove the subscriber from the set of subscribers.
cs.subscribers = cs.subscribers[:len(cs.subscribers)-1]
Expand All @@ -182,6 +187,10 @@ func (cs *ConsensusSet) ConsensusSetSubscribe(subscriber modules.ConsensusSetSub
// garbage collection and rescanning. If the subscriber is not found in the
// subscriber database, no action is taken.
func (cs *ConsensusSet) Unsubscribe(subscriber modules.ConsensusSetSubscriber) {
if cs.tg.Add() != nil {
return
}
defer cs.tg.Done()
cs.mu.Lock()
defer cs.mu.Unlock()

Expand Down
47 changes: 34 additions & 13 deletions modules/consensus/synchronize.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package consensus
import (
"errors"
"net"
"sync"
"time"

"github.com/NebulousLabs/Sia/build"
Expand Down Expand Up @@ -343,7 +344,7 @@ func (cs *ConsensusSet) rpcRelayBlock(conn modules.PeerConn) error {
}

// Submit the block to the consensus set and broadcast it.
err = cs.AcceptBlock(b)
err = cs.managedAcceptBlock(b)
if err == errOrphan {
// If the block is an orphan, try to find the parents. The block
// received from the peer is discarded and will be downloaded again if
Expand All @@ -358,14 +359,27 @@ func (cs *ConsensusSet) rpcRelayBlock(conn modules.PeerConn) error {
if err != nil {
return err
}
cs.managedBroadcastBlock(b)
return nil
}

// rpcRelayHeader is an RPC that accepts a block header from a peer.
func (cs *ConsensusSet) rpcRelayHeader(conn modules.PeerConn) error {
// threadedRPCRelayHeader is an RPC that accepts a block header from a peer.
func (cs *ConsensusSet) threadedRPCRelayHeader(conn modules.PeerConn) error {
err := cs.tg.Add()
if err != nil {
return err
}
wg := new(sync.WaitGroup)
defer func() {
go func() {
wg.Wait()
cs.tg.Done()
}()
}()

// Decode the block header from the connection.
var h types.BlockHeader
err := encoding.ReadObject(conn, &h, types.BlockHeaderSize)
err = encoding.ReadObject(conn, &h, types.BlockHeaderSize)
if err != nil {
return err
}
Expand All @@ -379,23 +393,30 @@ func (cs *ConsensusSet) rpcRelayHeader(conn modules.PeerConn) error {
cs.mu.RUnlock()
if err == errOrphan {
// If the header is an orphan, try to find the parents.
wg.Add(1)
go func() {
err := cs.gateway.RPC(conn.RPCAddr(), "SendBlocks", cs.threadedReceiveBlocks)
if err != nil {
cs.log.Debugln("WARN: failed to get parents of orphan header:", err)
}
wg.Done()
}()
return nil
} else if err != nil {
return err
}
// If the header is valid and extends the heaviest chain, fetch, accept it,
// and broadcast it.

// If the header is valid and extends the heaviest chain, fetch the
// corresponding block. Call needs to be made in a separate goroutine
// because an exported call to the gateway is used, which is a deadlock
// risk given that rpcRelayHeader is called from the gateway.
wg.Add(1)
go func() {
err := cs.gateway.RPC(conn.RPCAddr(), "SendBlk", cs.threadedReceiveBlock(h.ID()))
err = cs.gateway.RPC(conn.RPCAddr(), "SendBlk", cs.threadedReceiveBlock(h.ID()))
if err != nil {
cs.log.Debugln("WARN: failed to get header's corresponding block:", err)
}
wg.Done()
}()
return nil
}
Expand Down Expand Up @@ -431,26 +452,26 @@ func (cs *ConsensusSet) rpcSendBlk(conn modules.PeerConn) error {
return nil
}

// threadedReceiveBlock takes a block id and returns an RPCFunc that requests
// that block and then calls AcceptBlock on it. The returned function should be
// used as the calling end of the SendBlk RPC. Note that although the function
// threadedReceiveBlock takes a block id and returns an RPCFunc that requests that
// block and then calls AcceptBlock on it. The returned function should be used
// as the calling end of the SendBlk RPC. Note that although the function
// itself does not do any locking, it is still prefixed with "threaded" because
// the function it returns calls the exported method AcceptBlock.
func (cs *ConsensusSet) threadedReceiveBlock(id types.BlockID) modules.RPCFunc {
managedFN := func(conn modules.PeerConn) error {
return func(conn modules.PeerConn) error {
if err := encoding.WriteObject(conn, id); err != nil {
return err
}
var block types.Block
if err := encoding.ReadObject(conn, &block, types.BlockSizeLimit); err != nil {
return err
}
if err := cs.AcceptBlock(block); err != nil {
if err := cs.managedAcceptBlock(block); err != nil {
return err
}
cs.managedBroadcastBlock(block)
return nil
}
return managedFN
}

// threadedInitialBlockchainDownload performs the IBD on outbound peers. Blocks
Expand Down
2 changes: 1 addition & 1 deletion modules/consensus/synchronize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1070,7 +1070,7 @@ func TestRelayHeader(t *testing.T) {
go func() {
errChan <- encoding.WriteObject(p1, tt.header)
}()
err = cst.cs.rpcRelayHeader(mockP2)
err = cst.cs.threadedRPCRelayHeader(mockP2)
if err != tt.errWant {
t.Errorf("%s: expected '%v', got '%v'", tt.errMSG, tt.errWant, err)
}
Expand Down
Loading

0 comments on commit bd4b222

Please sign in to comment.