diff --git a/Makefile b/Makefile index 44f25191..92d3dbd0 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ GOFMT=gofmt GC=go build VERSION := $(shell git describe --abbrev=4 --dirty --always --tags) Minversion := $(shell date) -BUILD_PAR = -ldflags "-X main.Version=$(VERSION)" +BUILD_PAR = -ldflags "-X main.Version=$(VERSION)" #-race all: $(GC) $(BUILD_PAR) main.go diff --git a/consensus/dbft/dbftService.go b/consensus/dbft/dbftService.go index b8a112b7..f5fedae3 100644 --- a/consensus/dbft/dbftService.go +++ b/consensus/dbft/dbftService.go @@ -452,7 +452,7 @@ func (ds *DbftService) PrepareRequestReceived(payload *msg.ConsensusPayload, mes ds.context.Signatures = make([][]byte, len(ds.context.Miners)) ds.context.Signatures[payload.MinerIndex] = message.Signature - mempool := ds.localNet.GetMemoryPool() + mempool := ds.localNet.GetTxnPool(true) for _, hash := range ds.context.TransactionHashes[1:] { if transaction, ok := mempool[hash]; ok { if err := ds.AddTransaction(transaction, false); err != nil { @@ -471,7 +471,7 @@ func (ds *DbftService) PrepareRequestReceived(payload *msg.ConsensusPayload, mes //AllowHashes(ds.context.TransactionHashes) log.Info("Prepare Requst finished") if len(ds.context.Transactions) < len(ds.context.TransactionHashes) { - ds.localNet.SynchronizeMemoryPool() + ds.localNet.SynchronizeTxnPool() } } @@ -595,7 +595,7 @@ func (ds *DbftService) Timeout() { } ds.context.Nonce = GetNonce() - transactionsPool := ds.localNet.GetMemoryPool() //TODO: add policy + transactionsPool := ds.localNet.GetTxnPool(true) //TODO: add policy //TODO: add max TX limitation diff --git a/main.go b/main.go index 9b32f917..15113ed7 100644 --- a/main.go +++ b/main.go @@ -125,7 +125,7 @@ func main() { for { log.Debug("ledger.DefaultLedger.Blockchain.BlockHeight= ", ledger.DefaultLedger.Blockchain.BlockHeight) - time.Sleep(15 * time.Second) + time.Sleep(2 * time.Second) } } func InitBlockChain() ledger.Blockchain { diff --git a/net/httpjsonrpc/interfaces.go b/net/httpjsonrpc/interfaces.go index be0e4411..e84c7c32 100644 --- a/net/httpjsonrpc/interfaces.go +++ b/net/httpjsonrpc/interfaces.go @@ -79,7 +79,7 @@ func getConnectionCount(req *http.Request, cmd map[string]interface{}) map[strin func getRawMemPool(req *http.Request, cmd map[string]interface{}) map[string]interface{} { id := cmd["id"] - mempoollist := node.GetTxnPool() + mempoollist := node.GetTxnPool(false) return responsePacking(mempoollist, id) } @@ -92,6 +92,7 @@ func getRawTransaction(req *http.Request, cmd map[string]interface{}) map[string txidArr.Deserialize(bytes.NewReader(txidSlice[0:32])) verbose := params.([]interface{})[1].(bool) tx := node.GetTransaction(txidArr) + // FIXME Get transaction from ledger txBuffer := bytes.NewBuffer([]byte{}) tx.Serialize(txBuffer) if verbose == true { diff --git a/net/message/memorypool.go b/net/message/memorypool.go deleted file mode 100644 index 683b71df..00000000 --- a/net/message/memorypool.go +++ /dev/null @@ -1,18 +0,0 @@ -package message - -import ( - . "GoOnchain/net/protocol" -) - -type memPool struct { - msgHdr - //TBD -} - -func ReqMemoryPool(node Noder) error { - msg := AllocMsg("mempool", 0) - buf, _ := msg.Serialization() - go node.Tx(buf) - - return nil -} diff --git a/net/message/message.go b/net/message/message.go index f6bd74de..213621e0 100644 --- a/net/message/message.go +++ b/net/message/message.go @@ -133,8 +133,8 @@ func AllocMsg(t string, length int) Messager { var msg blockReq copy(msg.msgHdr.CMD[0:len(t)], t) return &msg - case "mempool": - var msg memPool + case "txnpool": + var msg txnPool copy(msg.msgHdr.CMD[0:len(t)], t) return &msg case "alert": diff --git a/net/message/transaction.go b/net/message/transaction.go index 26a65d2f..807f3944 100644 --- a/net/message/transaction.go +++ b/net/message/transaction.go @@ -147,3 +147,17 @@ func (msg trn) DeSerialization(p []byte) error { return nil } + + +type txnPool struct { + msgHdr + //TBD +} + +func ReqTxnPool(node Noder) error { + msg := AllocMsg("txnpool", 0) + buf, _ := msg.Serialization() + go node.Tx(buf) + + return nil +} diff --git a/net/net.go b/net/net.go index bc6258a0..b63ec084 100644 --- a/net/net.go +++ b/net/net.go @@ -10,8 +10,8 @@ import ( ) type Neter interface { - GetMemoryPool() map[common.Uint256]*transaction.Transaction - SynchronizeMemoryPool() + GetTxnPool(cleanPool bool) map[common.Uint256]*transaction.Transaction + SynchronizeTxnPool() Xmit(common.Inventory) error // The transmit interface GetEvent(eventName string) *events.Event } diff --git a/net/node/eventNotice.go b/net/node/eventNotice.go index 0ba8dd6b..1897c93f 100644 --- a/net/node/eventNotice.go +++ b/net/node/eventNotice.go @@ -1,7 +1,6 @@ package node import ( - "GoOnchain/common" "GoOnchain/events" "fmt" ) @@ -16,10 +15,6 @@ func (eq *eventQueue) init() { eq.Block = events.NewEvent() } -func (eq eventQueue) SubscribeMsgQueue(common.InventoryType) { - //TODO -} - func (eq *eventQueue) GetEvent(eventName string) *events.Event { switch eventName { case "consensus": diff --git a/net/node/idCache.go b/net/node/idCache.go index 93c47bec..f5478807 100644 --- a/net/node/idCache.go +++ b/net/node/idCache.go @@ -2,9 +2,13 @@ package node import ( "GoOnchain/common" + "sync" ) -type idCache map[common.Uint256]bool +type idCache struct { + sync.RWMutex + list map[common.Uint256]bool +} func (c *idCache) init() { } diff --git a/net/node/link.go b/net/node/link.go index 8c262adf..c77778d3 100644 --- a/net/node/link.go +++ b/net/node/link.go @@ -21,6 +21,7 @@ import ( ) type link struct { + //Todo Add lock here addr string // The address of the node conn net.Conn // Connect socket with the peer node port uint16 // The server port of the node @@ -240,6 +241,7 @@ func (node *node) Connect(nodeAddr string) { conn.RemoteAddr().Network())) go n.rx() + // FIXME too long waiting time time.Sleep(2 * time.Second) // FIXME is there any timing race with rx buf, _ := NewVersion(node) @@ -290,9 +292,7 @@ func TLSDial(nodeAddr string) (net.Conn, error) { return conn, nil } -// TODO construct a TX channel and other application just drop the message to the channel func (node node) Tx(buf []byte) { - //node.chF <- func() error { common.Trace() str := hex.EncodeToString(buf) log.Debug(fmt.Sprintf("TX buf length: %d\n%s", len(buf), str)) @@ -301,8 +301,6 @@ func (node node) Tx(buf []byte) { if err != nil { log.Error("Error sending messge to peer node ", err.Error()) } - //return err - //} } // func (net net) Xmit(inv Inventory) error { diff --git a/net/node/node.go b/net/node/node.go index f61bb162..5eefa473 100644 --- a/net/node/node.go +++ b/net/node/node.go @@ -24,6 +24,7 @@ const ( ) type node struct { + //sync.RWMutex //The Lock not be used as expected to use function channel instead of lock state uint // node status id uint64 // The nodes's id cap uint32 // The node capability set @@ -60,18 +61,20 @@ func (node node) DumpInfo() { func (node *node) UpdateInfo(t time.Time, version uint32, services uint64, port uint16, nonce uint64, relay uint8, height uint64) { - // TODO need lock - node.UpdateTime(t) - node.id = nonce - node.version = version - node.services = services - node.port = port - if relay == 0 { - node.relay = false - } else { - node.relay = true - } - node.height = uint64(height) +// node.chF <- func() error { + node.UpdateTime(t) + node.id = nonce + node.version = version + node.services = services + node.port = port + if relay == 0 { + node.relay = false + } else { + node.relay = true + } + node.height = uint64(height) +// return nil +// } } func NewNode() *node { @@ -171,20 +174,6 @@ func (node *node) UpdateTime(t time.Time) { node.time = t } -func (node node) GetMemoryPool() map[common.Uint256]*transaction.Transaction { - return node.GetTxnPool() - // TODO refresh the pending transaction pool -} - -func (node node) SynchronizeMemoryPool() { - // Fixme need lock - for _, n := range node.nbrNodes.List { - if n.state == ESTABLISH { - ReqMemoryPool(n) - } - } -} - func (node node) Xmit(inv common.Inventory) error { common.Trace() var buffer []byte @@ -261,25 +250,3 @@ func (node node) GetTime() int64 { t := time.Now() return t.UnixNano() } - -func (node node) GetNeighborAddrs() ([]NodeAddr, uint64) { - var i uint64 - var addrs []NodeAddr - // TODO read lock - for _, n := range node.nbrNodes.List { - if n.GetState() != ESTABLISH { - continue - } - var addr NodeAddr - addr.IpAddr, _ = n.GetAddr16() - addr.Time = n.GetTime() - addr.Services = n.Services() - addr.Port = n.GetPort() - addr.ID = n.GetID() - addrs = append(addrs, addr) - - i++ - } - - return addrs, i -} diff --git a/net/node/nodeMap.go b/net/node/nodeMap.go index 71ceaab8..8dbce6a8 100644 --- a/net/node/nodeMap.go +++ b/net/node/nodeMap.go @@ -8,15 +8,17 @@ import ( // The neigbor node list type nbrNodes struct { - Lock sync.RWMutex + sync.RWMutex + // Todo using the Pool structure List map[uint64]*node } func (nm *nbrNodes) Broadcast(buf []byte) { - // TODO lock the map - // TODO Check whether the node existed or not + nm.RLock() + defer nm.RUnlock() for _, node := range nm.List { if node.state == ESTABLISH && node.relay == true { + // The routie need lock too go node.Tx(buf) } } @@ -28,8 +30,9 @@ func (nm *nbrNodes) NodeExisted(uid uint64) bool { } func (nm *nbrNodes) AddNbrNode(n Noder) { - //TODO lock the node Map - // TODO multi client from the same IP address issue + nm.Lock() + defer nm.Unlock() + if (nm.NodeExisted(n.GetID())) { fmt.Printf("Insert a existed node\n") } else { @@ -43,7 +46,9 @@ func (nm *nbrNodes) AddNbrNode(n Noder) { } func (nm *nbrNodes) DelNbrNode(id uint64) (Noder, bool) { - //TODO lock the node Map + nm.Lock() + defer nm.Unlock() + n, ok := nm.List[id] if (ok == false) { return nil, false @@ -52,8 +57,10 @@ func (nm *nbrNodes) DelNbrNode(id uint64) (Noder, bool) { return n, true } -func (nm nbrNodes) GetConnectionCnt() uint { - //TODO lock the node Map +func (nm *nbrNodes) GetConnectionCnt() uint { + nm.RLock() + defer nm.RUnlock() + var cnt uint for _, node := range nm.List { if node.state == ESTABLISH { @@ -67,7 +74,10 @@ func (nm *nbrNodes) init() { nm.List = make(map[uint64]*node) } -func (nm nbrNodes) NodeEstablished(id uint64) bool { +func (nm *nbrNodes) NodeEstablished(id uint64) bool { + nm.RLock() + defer nm.RUnlock() + n, ok := nm.List[id] if (ok == false) { return false @@ -79,3 +89,27 @@ func (nm nbrNodes) NodeEstablished(id uint64) bool { return true } + +func (node *node) GetNeighborAddrs() ([]NodeAddr, uint64) { + node.nbrNodes.RLock() + defer node.nbrNodes.RUnlock() + + var i uint64 + var addrs []NodeAddr + for _, n := range node.nbrNodes.List { + if n.GetState() != ESTABLISH { + continue + } + var addr NodeAddr + addr.IpAddr, _ = n.GetAddr16() + addr.Time = n.GetTime() + addr.Services = n.Services() + addr.Port = n.GetPort() + addr.ID = n.GetID() + addrs = append(addrs, addr) + + i++ + } + + return addrs, i +} diff --git a/net/node/transactionPool.go b/net/node/transactionPool.go index eb272ee9..f5ca8f81 100644 --- a/net/node/transactionPool.go +++ b/net/node/transactionPool.go @@ -3,11 +3,13 @@ package node import ( "GoOnchain/common" "GoOnchain/core/transaction" + msg "GoOnchain/net/message" + . "GoOnchain/net/protocol" "sync" ) type TXNPool struct { - lock sync.RWMutex + sync.RWMutex list map[common.Uint256]*transaction.Transaction } @@ -34,22 +36,47 @@ type TXNPool struct { // } -func (txnPool TXNPool) GetTransaction(hash common.Uint256) *transaction.Transaction { +func (txnPool *TXNPool) GetTransaction(hash common.Uint256) *transaction.Transaction { + txnPool.RLock() + defer txnPool.RUnlock() + txn := txnPool.list[hash] // Fixme need lock - return txnPool.list[hash] + return txn } func (txnPool *TXNPool) AppendTxnPool(txn *transaction.Transaction) bool { - // TODO add the lock - txnPool.list[txn.Hash()] = txn + txnPool.Lock() + defer txnPool.Unlock() + // TODO Check the TXN already existed case + txnPool.list[txn.Hash()] = txn + return true } -func (txnPool TXNPool) GetTxnPool() map[common.Uint256]*transaction.Transaction { - return txnPool.list +// Attention: clean the trasaction Pool after the consensus confirmed all of the transcation +func (txnPool *TXNPool) GetTxnPool(cleanPool bool) map[common.Uint256]*transaction.Transaction { + txnPool.Lock() + defer txnPool.Unlock() + + list := txnPool.list + if (cleanPool == true) { + txnPool.list = make(map[common.Uint256]*transaction.Transaction) + } + return list } func (txnPool *TXNPool) init() { txnPool.list = make(map[common.Uint256]*transaction.Transaction) } + +func (node *node) SynchronizeTxnPool() { + node.nbrNodes.RLock() + defer node.nbrNodes.RUnlock() + + for _, n := range node.nbrNodes.List { + if n.state == ESTABLISH { + msg.ReqTxnPool(n) + } + } +} diff --git a/net/protocol/protocol.go b/net/protocol/protocol.go index 8baadace..a249ea82 100644 --- a/net/protocol/protocol.go +++ b/net/protocol/protocol.go @@ -61,7 +61,7 @@ type Noder interface { GetHeight() uint64 GetConnectionCnt() uint GetLedger() *ledger.Ledger - GetTxnPool() map[common.Uint256]*transaction.Transaction + GetTxnPool(bool) map[common.Uint256]*transaction.Transaction AppendTxnPool(*transaction.Transaction) bool ExistedID(id common.Uint256) bool ReqNeighborList() @@ -76,13 +76,12 @@ type Noder interface { GetNeighborAddrs() ([]NodeAddr, uint64) GetTransaction(hash common.Uint256) *transaction.Transaction Xmit(common.Inventory) error - GetMemoryPool() map[common.Uint256]*transaction.Transaction - SynchronizeMemoryPool() + SynchronizeTxnPool() } type JsonNoder interface { GetConnectionCnt() uint - GetTxnPool() map[common.Uint256]*transaction.Transaction + GetTxnPool(bool) map[common.Uint256]*transaction.Transaction Xmit(common.Inventory) error GetTransaction(hash common.Uint256) *transaction.Transaction }