From ed1a487c7b7d6bed126e2bee51f82867c96dc302 Mon Sep 17 00:00:00 2001 From: Yanbo Li Date: Tue, 21 Mar 2017 10:51:08 +0800 Subject: [PATCH] Add sync lock for some map or array structure of node The Map structure of node should be protected with lock to avoid concurrency access, here we protects some fields like the neighbor nodes, transaction pool etc. Rename the memorypool to txn(transaction)pool which is more precisly Signed-off-by: Yanbo Li --- Makefile | 2 +- consensus/dbft/dbftService.go | 6 ++-- main.go | 2 +- net/httpjsonrpc/interfaces.go | 3 +- net/message/memorypool.go | 18 ---------- net/message/message.go | 4 +-- net/message/transaction.go | 14 ++++++++ net/net.go | 4 +-- net/node/eventNotice.go | 5 --- net/node/idCache.go | 6 +++- net/node/link.go | 6 ++-- net/node/node.go | 63 +++++++++-------------------------- net/node/nodeMap.go | 52 ++++++++++++++++++++++++----- net/node/transactionPool.go | 41 +++++++++++++++++++---- net/protocol/protocol.go | 7 ++-- 15 files changed, 127 insertions(+), 106 deletions(-) delete mode 100644 net/message/memorypool.go diff --git a/Makefile b/Makefile index 44f25191..92d3dbd0 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ GOFMT=gofmt GC=go build VERSION := $(shell git describe --abbrev=4 --dirty --always --tags) Minversion := $(shell date) -BUILD_PAR = -ldflags "-X main.Version=$(VERSION)" +BUILD_PAR = -ldflags "-X main.Version=$(VERSION)" #-race all: $(GC) $(BUILD_PAR) main.go diff --git a/consensus/dbft/dbftService.go b/consensus/dbft/dbftService.go index b8a112b7..f5fedae3 100644 --- a/consensus/dbft/dbftService.go +++ b/consensus/dbft/dbftService.go @@ -452,7 +452,7 @@ func (ds *DbftService) PrepareRequestReceived(payload *msg.ConsensusPayload, mes ds.context.Signatures = make([][]byte, len(ds.context.Miners)) ds.context.Signatures[payload.MinerIndex] = message.Signature - mempool := ds.localNet.GetMemoryPool() + mempool := ds.localNet.GetTxnPool(true) for _, hash := range ds.context.TransactionHashes[1:] { if transaction, ok := mempool[hash]; ok { if err := ds.AddTransaction(transaction, false); err != nil { @@ -471,7 +471,7 @@ func (ds *DbftService) PrepareRequestReceived(payload *msg.ConsensusPayload, mes //AllowHashes(ds.context.TransactionHashes) log.Info("Prepare Requst finished") if len(ds.context.Transactions) < len(ds.context.TransactionHashes) { - ds.localNet.SynchronizeMemoryPool() + ds.localNet.SynchronizeTxnPool() } } @@ -595,7 +595,7 @@ func (ds *DbftService) Timeout() { } ds.context.Nonce = GetNonce() - transactionsPool := ds.localNet.GetMemoryPool() //TODO: add policy + transactionsPool := ds.localNet.GetTxnPool(true) //TODO: add policy //TODO: add max TX limitation diff --git a/main.go b/main.go index 9b32f917..15113ed7 100644 --- a/main.go +++ b/main.go @@ -125,7 +125,7 @@ func main() { for { log.Debug("ledger.DefaultLedger.Blockchain.BlockHeight= ", ledger.DefaultLedger.Blockchain.BlockHeight) - time.Sleep(15 * time.Second) + time.Sleep(2 * time.Second) } } func InitBlockChain() ledger.Blockchain { diff --git a/net/httpjsonrpc/interfaces.go b/net/httpjsonrpc/interfaces.go index be0e4411..e84c7c32 100644 --- a/net/httpjsonrpc/interfaces.go +++ b/net/httpjsonrpc/interfaces.go @@ -79,7 +79,7 @@ func getConnectionCount(req *http.Request, cmd map[string]interface{}) map[strin func getRawMemPool(req *http.Request, cmd map[string]interface{}) map[string]interface{} { id := cmd["id"] - mempoollist := node.GetTxnPool() + mempoollist := node.GetTxnPool(false) return responsePacking(mempoollist, id) } @@ -92,6 +92,7 @@ func getRawTransaction(req *http.Request, cmd map[string]interface{}) map[string txidArr.Deserialize(bytes.NewReader(txidSlice[0:32])) verbose := params.([]interface{})[1].(bool) tx := node.GetTransaction(txidArr) + // FIXME Get transaction from ledger txBuffer := bytes.NewBuffer([]byte{}) tx.Serialize(txBuffer) if verbose == true { diff --git a/net/message/memorypool.go b/net/message/memorypool.go deleted file mode 100644 index 683b71df..00000000 --- a/net/message/memorypool.go +++ /dev/null @@ -1,18 +0,0 @@ -package message - -import ( - . "GoOnchain/net/protocol" -) - -type memPool struct { - msgHdr - //TBD -} - -func ReqMemoryPool(node Noder) error { - msg := AllocMsg("mempool", 0) - buf, _ := msg.Serialization() - go node.Tx(buf) - - return nil -} diff --git a/net/message/message.go b/net/message/message.go index f6bd74de..213621e0 100644 --- a/net/message/message.go +++ b/net/message/message.go @@ -133,8 +133,8 @@ func AllocMsg(t string, length int) Messager { var msg blockReq copy(msg.msgHdr.CMD[0:len(t)], t) return &msg - case "mempool": - var msg memPool + case "txnpool": + var msg txnPool copy(msg.msgHdr.CMD[0:len(t)], t) return &msg case "alert": diff --git a/net/message/transaction.go b/net/message/transaction.go index 26a65d2f..807f3944 100644 --- a/net/message/transaction.go +++ b/net/message/transaction.go @@ -147,3 +147,17 @@ func (msg trn) DeSerialization(p []byte) error { return nil } + + +type txnPool struct { + msgHdr + //TBD +} + +func ReqTxnPool(node Noder) error { + msg := AllocMsg("txnpool", 0) + buf, _ := msg.Serialization() + go node.Tx(buf) + + return nil +} diff --git a/net/net.go b/net/net.go index bc6258a0..b63ec084 100644 --- a/net/net.go +++ b/net/net.go @@ -10,8 +10,8 @@ import ( ) type Neter interface { - GetMemoryPool() map[common.Uint256]*transaction.Transaction - SynchronizeMemoryPool() + GetTxnPool(cleanPool bool) map[common.Uint256]*transaction.Transaction + SynchronizeTxnPool() Xmit(common.Inventory) error // The transmit interface GetEvent(eventName string) *events.Event } diff --git a/net/node/eventNotice.go b/net/node/eventNotice.go index 0ba8dd6b..1897c93f 100644 --- a/net/node/eventNotice.go +++ b/net/node/eventNotice.go @@ -1,7 +1,6 @@ package node import ( - "GoOnchain/common" "GoOnchain/events" "fmt" ) @@ -16,10 +15,6 @@ func (eq *eventQueue) init() { eq.Block = events.NewEvent() } -func (eq eventQueue) SubscribeMsgQueue(common.InventoryType) { - //TODO -} - func (eq *eventQueue) GetEvent(eventName string) *events.Event { switch eventName { case "consensus": diff --git a/net/node/idCache.go b/net/node/idCache.go index 93c47bec..f5478807 100644 --- a/net/node/idCache.go +++ b/net/node/idCache.go @@ -2,9 +2,13 @@ package node import ( "GoOnchain/common" + "sync" ) -type idCache map[common.Uint256]bool +type idCache struct { + sync.RWMutex + list map[common.Uint256]bool +} func (c *idCache) init() { } diff --git a/net/node/link.go b/net/node/link.go index 8c262adf..c77778d3 100644 --- a/net/node/link.go +++ b/net/node/link.go @@ -21,6 +21,7 @@ import ( ) type link struct { + //Todo Add lock here addr string // The address of the node conn net.Conn // Connect socket with the peer node port uint16 // The server port of the node @@ -240,6 +241,7 @@ func (node *node) Connect(nodeAddr string) { conn.RemoteAddr().Network())) go n.rx() + // FIXME too long waiting time time.Sleep(2 * time.Second) // FIXME is there any timing race with rx buf, _ := NewVersion(node) @@ -290,9 +292,7 @@ func TLSDial(nodeAddr string) (net.Conn, error) { return conn, nil } -// TODO construct a TX channel and other application just drop the message to the channel func (node node) Tx(buf []byte) { - //node.chF <- func() error { common.Trace() str := hex.EncodeToString(buf) log.Debug(fmt.Sprintf("TX buf length: %d\n%s", len(buf), str)) @@ -301,8 +301,6 @@ func (node node) Tx(buf []byte) { if err != nil { log.Error("Error sending messge to peer node ", err.Error()) } - //return err - //} } // func (net net) Xmit(inv Inventory) error { diff --git a/net/node/node.go b/net/node/node.go index f61bb162..5eefa473 100644 --- a/net/node/node.go +++ b/net/node/node.go @@ -24,6 +24,7 @@ const ( ) type node struct { + //sync.RWMutex //The Lock not be used as expected to use function channel instead of lock state uint // node status id uint64 // The nodes's id cap uint32 // The node capability set @@ -60,18 +61,20 @@ func (node node) DumpInfo() { func (node *node) UpdateInfo(t time.Time, version uint32, services uint64, port uint16, nonce uint64, relay uint8, height uint64) { - // TODO need lock - node.UpdateTime(t) - node.id = nonce - node.version = version - node.services = services - node.port = port - if relay == 0 { - node.relay = false - } else { - node.relay = true - } - node.height = uint64(height) +// node.chF <- func() error { + node.UpdateTime(t) + node.id = nonce + node.version = version + node.services = services + node.port = port + if relay == 0 { + node.relay = false + } else { + node.relay = true + } + node.height = uint64(height) +// return nil +// } } func NewNode() *node { @@ -171,20 +174,6 @@ func (node *node) UpdateTime(t time.Time) { node.time = t } -func (node node) GetMemoryPool() map[common.Uint256]*transaction.Transaction { - return node.GetTxnPool() - // TODO refresh the pending transaction pool -} - -func (node node) SynchronizeMemoryPool() { - // Fixme need lock - for _, n := range node.nbrNodes.List { - if n.state == ESTABLISH { - ReqMemoryPool(n) - } - } -} - func (node node) Xmit(inv common.Inventory) error { common.Trace() var buffer []byte @@ -261,25 +250,3 @@ func (node node) GetTime() int64 { t := time.Now() return t.UnixNano() } - -func (node node) GetNeighborAddrs() ([]NodeAddr, uint64) { - var i uint64 - var addrs []NodeAddr - // TODO read lock - for _, n := range node.nbrNodes.List { - if n.GetState() != ESTABLISH { - continue - } - var addr NodeAddr - addr.IpAddr, _ = n.GetAddr16() - addr.Time = n.GetTime() - addr.Services = n.Services() - addr.Port = n.GetPort() - addr.ID = n.GetID() - addrs = append(addrs, addr) - - i++ - } - - return addrs, i -} diff --git a/net/node/nodeMap.go b/net/node/nodeMap.go index 71ceaab8..8dbce6a8 100644 --- a/net/node/nodeMap.go +++ b/net/node/nodeMap.go @@ -8,15 +8,17 @@ import ( // The neigbor node list type nbrNodes struct { - Lock sync.RWMutex + sync.RWMutex + // Todo using the Pool structure List map[uint64]*node } func (nm *nbrNodes) Broadcast(buf []byte) { - // TODO lock the map - // TODO Check whether the node existed or not + nm.RLock() + defer nm.RUnlock() for _, node := range nm.List { if node.state == ESTABLISH && node.relay == true { + // The routie need lock too go node.Tx(buf) } } @@ -28,8 +30,9 @@ func (nm *nbrNodes) NodeExisted(uid uint64) bool { } func (nm *nbrNodes) AddNbrNode(n Noder) { - //TODO lock the node Map - // TODO multi client from the same IP address issue + nm.Lock() + defer nm.Unlock() + if (nm.NodeExisted(n.GetID())) { fmt.Printf("Insert a existed node\n") } else { @@ -43,7 +46,9 @@ func (nm *nbrNodes) AddNbrNode(n Noder) { } func (nm *nbrNodes) DelNbrNode(id uint64) (Noder, bool) { - //TODO lock the node Map + nm.Lock() + defer nm.Unlock() + n, ok := nm.List[id] if (ok == false) { return nil, false @@ -52,8 +57,10 @@ func (nm *nbrNodes) DelNbrNode(id uint64) (Noder, bool) { return n, true } -func (nm nbrNodes) GetConnectionCnt() uint { - //TODO lock the node Map +func (nm *nbrNodes) GetConnectionCnt() uint { + nm.RLock() + defer nm.RUnlock() + var cnt uint for _, node := range nm.List { if node.state == ESTABLISH { @@ -67,7 +74,10 @@ func (nm *nbrNodes) init() { nm.List = make(map[uint64]*node) } -func (nm nbrNodes) NodeEstablished(id uint64) bool { +func (nm *nbrNodes) NodeEstablished(id uint64) bool { + nm.RLock() + defer nm.RUnlock() + n, ok := nm.List[id] if (ok == false) { return false @@ -79,3 +89,27 @@ func (nm nbrNodes) NodeEstablished(id uint64) bool { return true } + +func (node *node) GetNeighborAddrs() ([]NodeAddr, uint64) { + node.nbrNodes.RLock() + defer node.nbrNodes.RUnlock() + + var i uint64 + var addrs []NodeAddr + for _, n := range node.nbrNodes.List { + if n.GetState() != ESTABLISH { + continue + } + var addr NodeAddr + addr.IpAddr, _ = n.GetAddr16() + addr.Time = n.GetTime() + addr.Services = n.Services() + addr.Port = n.GetPort() + addr.ID = n.GetID() + addrs = append(addrs, addr) + + i++ + } + + return addrs, i +} diff --git a/net/node/transactionPool.go b/net/node/transactionPool.go index eb272ee9..f5ca8f81 100644 --- a/net/node/transactionPool.go +++ b/net/node/transactionPool.go @@ -3,11 +3,13 @@ package node import ( "GoOnchain/common" "GoOnchain/core/transaction" + msg "GoOnchain/net/message" + . "GoOnchain/net/protocol" "sync" ) type TXNPool struct { - lock sync.RWMutex + sync.RWMutex list map[common.Uint256]*transaction.Transaction } @@ -34,22 +36,47 @@ type TXNPool struct { // } -func (txnPool TXNPool) GetTransaction(hash common.Uint256) *transaction.Transaction { +func (txnPool *TXNPool) GetTransaction(hash common.Uint256) *transaction.Transaction { + txnPool.RLock() + defer txnPool.RUnlock() + txn := txnPool.list[hash] // Fixme need lock - return txnPool.list[hash] + return txn } func (txnPool *TXNPool) AppendTxnPool(txn *transaction.Transaction) bool { - // TODO add the lock - txnPool.list[txn.Hash()] = txn + txnPool.Lock() + defer txnPool.Unlock() + // TODO Check the TXN already existed case + txnPool.list[txn.Hash()] = txn + return true } -func (txnPool TXNPool) GetTxnPool() map[common.Uint256]*transaction.Transaction { - return txnPool.list +// Attention: clean the trasaction Pool after the consensus confirmed all of the transcation +func (txnPool *TXNPool) GetTxnPool(cleanPool bool) map[common.Uint256]*transaction.Transaction { + txnPool.Lock() + defer txnPool.Unlock() + + list := txnPool.list + if (cleanPool == true) { + txnPool.list = make(map[common.Uint256]*transaction.Transaction) + } + return list } func (txnPool *TXNPool) init() { txnPool.list = make(map[common.Uint256]*transaction.Transaction) } + +func (node *node) SynchronizeTxnPool() { + node.nbrNodes.RLock() + defer node.nbrNodes.RUnlock() + + for _, n := range node.nbrNodes.List { + if n.state == ESTABLISH { + msg.ReqTxnPool(n) + } + } +} diff --git a/net/protocol/protocol.go b/net/protocol/protocol.go index 8baadace..a249ea82 100644 --- a/net/protocol/protocol.go +++ b/net/protocol/protocol.go @@ -61,7 +61,7 @@ type Noder interface { GetHeight() uint64 GetConnectionCnt() uint GetLedger() *ledger.Ledger - GetTxnPool() map[common.Uint256]*transaction.Transaction + GetTxnPool(bool) map[common.Uint256]*transaction.Transaction AppendTxnPool(*transaction.Transaction) bool ExistedID(id common.Uint256) bool ReqNeighborList() @@ -76,13 +76,12 @@ type Noder interface { GetNeighborAddrs() ([]NodeAddr, uint64) GetTransaction(hash common.Uint256) *transaction.Transaction Xmit(common.Inventory) error - GetMemoryPool() map[common.Uint256]*transaction.Transaction - SynchronizeMemoryPool() + SynchronizeTxnPool() } type JsonNoder interface { GetConnectionCnt() uint - GetTxnPool() map[common.Uint256]*transaction.Transaction + GetTxnPool(bool) map[common.Uint256]*transaction.Transaction Xmit(common.Inventory) error GetTransaction(hash common.Uint256) *transaction.Transaction }