From 6fa2463d79544612b193633e97e0eec1aee9715b Mon Sep 17 00:00:00 2001 From: Weiqiang Tang Date: Sun, 7 Feb 2021 17:41:04 +0800 Subject: [PATCH] Bugfix: wait for processing all message before complete Signed-off-by: Weiqiang Tang --- ofctrl/transaction.go | 40 +++++++++++++++++++--------------------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/ofctrl/transaction.go b/ofctrl/transaction.go index 5f01f0fc..5bda640f 100644 --- a/ofctrl/transaction.go +++ b/ofctrl/transaction.go @@ -6,6 +6,7 @@ package ofctrl import ( "fmt" + "runtime" "sync" "sync/atomic" "time" @@ -118,28 +119,22 @@ func (tx *Transaction) createBundleAddFlowMessage(flowMod *openflow13.FlowMod) ( } func (tx *Transaction) listenReply() { - for { - select { - case reply, ok := <-tx.controlReplyCh: - if !ok { // controlReplyCh closed. - return + for reply := range tx.controlReplyCh { + switch reply.msgType { + case BundleControlMessage: + select { + case tx.controlIntCh <- reply: + case <-time.After(messageTimeout): + log.Warningln("BundleControlMessage reply message accept timeout") } - switch reply.msgType { - case BundleControlMessage: - select { - case tx.controlIntCh <- reply: - case <-time.After(messageTimeout): - log.Warningln("BundleControlMessage reply message accept timeout") - } - case BundleAddMessage: - if !reply.succeed { - func() { - tx.lock.Lock() - defer tx.lock.Unlock() - // Remove failed add message from successAdd. - delete(tx.successAdd, reply.xID) - }() - } + case BundleAddMessage: + if !reply.succeed { + func() { + tx.lock.Lock() + defer tx.lock.Unlock() + // Remove failed add message from successAdd. + delete(tx.successAdd, reply.xID) + }() } } } @@ -194,6 +189,9 @@ func (tx *Transaction) Complete() (int, error) { } tx.closed = true } + for len(tx.controlReplyCh) > 0 { + runtime.Gosched() + } tx.lock.Lock() defer tx.lock.Unlock() return len(tx.successAdd), nil