Skip to content

Commit

Permalink
Add sync lock for some map or array structure of node
Browse files Browse the repository at this point in the history
The Map structure of node should be protected with lock to avoid
concurrency access, here we protects some fields like the neighbor
nodes, transaction pool etc.

Rename the memorypool to txn(transaction)pool which is
more precisly

Signed-off-by: Yanbo Li <[email protected]>
  • Loading branch information
dreamfly281 committed Mar 23, 2017
1 parent f754a83 commit ed1a487
Show file tree
Hide file tree
Showing 15 changed files with 127 additions and 106 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ GOFMT=gofmt
GC=go build
VERSION := $(shell git describe --abbrev=4 --dirty --always --tags)
Minversion := $(shell date)
BUILD_PAR = -ldflags "-X main.Version=$(VERSION)"
BUILD_PAR = -ldflags "-X main.Version=$(VERSION)" #-race

all:
$(GC) $(BUILD_PAR) main.go
Expand Down
6 changes: 3 additions & 3 deletions consensus/dbft/dbftService.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (ds *DbftService) PrepareRequestReceived(payload *msg.ConsensusPayload, mes
ds.context.Signatures = make([][]byte, len(ds.context.Miners))
ds.context.Signatures[payload.MinerIndex] = message.Signature

mempool := ds.localNet.GetMemoryPool()
mempool := ds.localNet.GetTxnPool(true)
for _, hash := range ds.context.TransactionHashes[1:] {
if transaction, ok := mempool[hash]; ok {
if err := ds.AddTransaction(transaction, false); err != nil {
Expand All @@ -471,7 +471,7 @@ func (ds *DbftService) PrepareRequestReceived(payload *msg.ConsensusPayload, mes
//AllowHashes(ds.context.TransactionHashes)
log.Info("Prepare Requst finished")
if len(ds.context.Transactions) < len(ds.context.TransactionHashes) {
ds.localNet.SynchronizeMemoryPool()
ds.localNet.SynchronizeTxnPool()
}
}

Expand Down Expand Up @@ -595,7 +595,7 @@ func (ds *DbftService) Timeout() {
}

ds.context.Nonce = GetNonce()
transactionsPool := ds.localNet.GetMemoryPool() //TODO: add policy
transactionsPool := ds.localNet.GetTxnPool(true) //TODO: add policy

//TODO: add max TX limitation

Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func main() {

for {
log.Debug("ledger.DefaultLedger.Blockchain.BlockHeight= ", ledger.DefaultLedger.Blockchain.BlockHeight)
time.Sleep(15 * time.Second)
time.Sleep(2 * time.Second)
}
}
func InitBlockChain() ledger.Blockchain {
Expand Down
3 changes: 2 additions & 1 deletion net/httpjsonrpc/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func getConnectionCount(req *http.Request, cmd map[string]interface{}) map[strin

func getRawMemPool(req *http.Request, cmd map[string]interface{}) map[string]interface{} {
id := cmd["id"]
mempoollist := node.GetTxnPool()
mempoollist := node.GetTxnPool(false)
return responsePacking(mempoollist, id)
}

Expand All @@ -92,6 +92,7 @@ func getRawTransaction(req *http.Request, cmd map[string]interface{}) map[string
txidArr.Deserialize(bytes.NewReader(txidSlice[0:32]))
verbose := params.([]interface{})[1].(bool)
tx := node.GetTransaction(txidArr)
// FIXME Get transaction from ledger
txBuffer := bytes.NewBuffer([]byte{})
tx.Serialize(txBuffer)
if verbose == true {
Expand Down
18 changes: 0 additions & 18 deletions net/message/memorypool.go

This file was deleted.

4 changes: 2 additions & 2 deletions net/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ func AllocMsg(t string, length int) Messager {
var msg blockReq
copy(msg.msgHdr.CMD[0:len(t)], t)
return &msg
case "mempool":
var msg memPool
case "txnpool":
var msg txnPool
copy(msg.msgHdr.CMD[0:len(t)], t)
return &msg
case "alert":
Expand Down
14 changes: 14 additions & 0 deletions net/message/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,17 @@ func (msg trn) DeSerialization(p []byte) error {

return nil
}


type txnPool struct {
msgHdr
//TBD
}

func ReqTxnPool(node Noder) error {
msg := AllocMsg("txnpool", 0)
buf, _ := msg.Serialization()
go node.Tx(buf)

return nil
}
4 changes: 2 additions & 2 deletions net/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
)

type Neter interface {
GetMemoryPool() map[common.Uint256]*transaction.Transaction
SynchronizeMemoryPool()
GetTxnPool(cleanPool bool) map[common.Uint256]*transaction.Transaction
SynchronizeTxnPool()
Xmit(common.Inventory) error // The transmit interface
GetEvent(eventName string) *events.Event
}
Expand Down
5 changes: 0 additions & 5 deletions net/node/eventNotice.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package node

import (
"GoOnchain/common"
"GoOnchain/events"
"fmt"
)
Expand All @@ -16,10 +15,6 @@ func (eq *eventQueue) init() {
eq.Block = events.NewEvent()
}

func (eq eventQueue) SubscribeMsgQueue(common.InventoryType) {
//TODO
}

func (eq *eventQueue) GetEvent(eventName string) *events.Event {
switch eventName {
case "consensus":
Expand Down
6 changes: 5 additions & 1 deletion net/node/idCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package node

import (
"GoOnchain/common"
"sync"
)

type idCache map[common.Uint256]bool
type idCache struct {
sync.RWMutex
list map[common.Uint256]bool
}

func (c *idCache) init() {
}
Expand Down
6 changes: 2 additions & 4 deletions net/node/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
)

type link struct {
//Todo Add lock here
addr string // The address of the node
conn net.Conn // Connect socket with the peer node
port uint16 // The server port of the node
Expand Down Expand Up @@ -240,6 +241,7 @@ func (node *node) Connect(nodeAddr string) {
conn.RemoteAddr().Network()))
go n.rx()

// FIXME too long waiting time
time.Sleep(2 * time.Second)
// FIXME is there any timing race with rx
buf, _ := NewVersion(node)
Expand Down Expand Up @@ -290,9 +292,7 @@ func TLSDial(nodeAddr string) (net.Conn, error) {
return conn, nil
}

// TODO construct a TX channel and other application just drop the message to the channel
func (node node) Tx(buf []byte) {
//node.chF <- func() error {
common.Trace()
str := hex.EncodeToString(buf)
log.Debug(fmt.Sprintf("TX buf length: %d\n%s", len(buf), str))
Expand All @@ -301,8 +301,6 @@ func (node node) Tx(buf []byte) {
if err != nil {
log.Error("Error sending messge to peer node ", err.Error())
}
//return err
//}
}

// func (net net) Xmit(inv Inventory) error {
Expand Down
63 changes: 15 additions & 48 deletions net/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
)

type node struct {
//sync.RWMutex //The Lock not be used as expected to use function channel instead of lock
state uint // node status
id uint64 // The nodes's id
cap uint32 // The node capability set
Expand Down Expand Up @@ -60,18 +61,20 @@ func (node node) DumpInfo() {

func (node *node) UpdateInfo(t time.Time, version uint32, services uint64,
port uint16, nonce uint64, relay uint8, height uint64) {
// TODO need lock
node.UpdateTime(t)
node.id = nonce
node.version = version
node.services = services
node.port = port
if relay == 0 {
node.relay = false
} else {
node.relay = true
}
node.height = uint64(height)
// node.chF <- func() error {
node.UpdateTime(t)
node.id = nonce
node.version = version
node.services = services
node.port = port
if relay == 0 {
node.relay = false
} else {
node.relay = true
}
node.height = uint64(height)
// return nil
// }
}

func NewNode() *node {
Expand Down Expand Up @@ -171,20 +174,6 @@ func (node *node) UpdateTime(t time.Time) {
node.time = t
}

func (node node) GetMemoryPool() map[common.Uint256]*transaction.Transaction {
return node.GetTxnPool()
// TODO refresh the pending transaction pool
}

func (node node) SynchronizeMemoryPool() {
// Fixme need lock
for _, n := range node.nbrNodes.List {
if n.state == ESTABLISH {
ReqMemoryPool(n)
}
}
}

func (node node) Xmit(inv common.Inventory) error {
common.Trace()
var buffer []byte
Expand Down Expand Up @@ -261,25 +250,3 @@ func (node node) GetTime() int64 {
t := time.Now()
return t.UnixNano()
}

func (node node) GetNeighborAddrs() ([]NodeAddr, uint64) {
var i uint64
var addrs []NodeAddr
// TODO read lock
for _, n := range node.nbrNodes.List {
if n.GetState() != ESTABLISH {
continue
}
var addr NodeAddr
addr.IpAddr, _ = n.GetAddr16()
addr.Time = n.GetTime()
addr.Services = n.Services()
addr.Port = n.GetPort()
addr.ID = n.GetID()
addrs = append(addrs, addr)

i++
}

return addrs, i
}
52 changes: 43 additions & 9 deletions net/node/nodeMap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,17 @@ import (

// The neigbor node list
type nbrNodes struct {
Lock sync.RWMutex
sync.RWMutex
// Todo using the Pool structure
List map[uint64]*node
}

func (nm *nbrNodes) Broadcast(buf []byte) {
// TODO lock the map
// TODO Check whether the node existed or not
nm.RLock()
defer nm.RUnlock()
for _, node := range nm.List {
if node.state == ESTABLISH && node.relay == true {
// The routie need lock too
go node.Tx(buf)
}
}
Expand All @@ -28,8 +30,9 @@ func (nm *nbrNodes) NodeExisted(uid uint64) bool {
}

func (nm *nbrNodes) AddNbrNode(n Noder) {
//TODO lock the node Map
// TODO multi client from the same IP address issue
nm.Lock()
defer nm.Unlock()

if (nm.NodeExisted(n.GetID())) {
fmt.Printf("Insert a existed node\n")
} else {
Expand All @@ -43,7 +46,9 @@ func (nm *nbrNodes) AddNbrNode(n Noder) {
}

func (nm *nbrNodes) DelNbrNode(id uint64) (Noder, bool) {
//TODO lock the node Map
nm.Lock()
defer nm.Unlock()

n, ok := nm.List[id]
if (ok == false) {
return nil, false
Expand All @@ -52,8 +57,10 @@ func (nm *nbrNodes) DelNbrNode(id uint64) (Noder, bool) {
return n, true
}

func (nm nbrNodes) GetConnectionCnt() uint {
//TODO lock the node Map
func (nm *nbrNodes) GetConnectionCnt() uint {
nm.RLock()
defer nm.RUnlock()

var cnt uint
for _, node := range nm.List {
if node.state == ESTABLISH {
Expand All @@ -67,7 +74,10 @@ func (nm *nbrNodes) init() {
nm.List = make(map[uint64]*node)
}

func (nm nbrNodes) NodeEstablished(id uint64) bool {
func (nm *nbrNodes) NodeEstablished(id uint64) bool {
nm.RLock()
defer nm.RUnlock()

n, ok := nm.List[id]
if (ok == false) {
return false
Expand All @@ -79,3 +89,27 @@ func (nm nbrNodes) NodeEstablished(id uint64) bool {

return true
}

func (node *node) GetNeighborAddrs() ([]NodeAddr, uint64) {
node.nbrNodes.RLock()
defer node.nbrNodes.RUnlock()

var i uint64
var addrs []NodeAddr
for _, n := range node.nbrNodes.List {
if n.GetState() != ESTABLISH {
continue
}
var addr NodeAddr
addr.IpAddr, _ = n.GetAddr16()
addr.Time = n.GetTime()
addr.Services = n.Services()
addr.Port = n.GetPort()
addr.ID = n.GetID()
addrs = append(addrs, addr)

i++
}

return addrs, i
}
Loading

0 comments on commit ed1a487

Please sign in to comment.