Skip to content

Commit

Permalink
Bugfix: wait for processing all message before complete
Browse files Browse the repository at this point in the history
Signed-off-by: Weiqiang Tang <[email protected]>
  • Loading branch information
weiqiangt committed Feb 7, 2021
1 parent ddcf15f commit 6fa2463
Showing 1 changed file with 19 additions and 21 deletions.
40 changes: 19 additions & 21 deletions ofctrl/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package ofctrl

import (
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -118,28 +119,22 @@ func (tx *Transaction) createBundleAddFlowMessage(flowMod *openflow13.FlowMod) (
}

func (tx *Transaction) listenReply() {
for {
select {
case reply, ok := <-tx.controlReplyCh:
if !ok { // controlReplyCh closed.
return
for reply := range tx.controlReplyCh {
switch reply.msgType {
case BundleControlMessage:
select {
case tx.controlIntCh <- reply:
case <-time.After(messageTimeout):
log.Warningln("BundleControlMessage reply message accept timeout")
}
switch reply.msgType {
case BundleControlMessage:
select {
case tx.controlIntCh <- reply:
case <-time.After(messageTimeout):
log.Warningln("BundleControlMessage reply message accept timeout")
}
case BundleAddMessage:
if !reply.succeed {
func() {
tx.lock.Lock()
defer tx.lock.Unlock()
// Remove failed add message from successAdd.
delete(tx.successAdd, reply.xID)
}()
}
case BundleAddMessage:
if !reply.succeed {
func() {
tx.lock.Lock()
defer tx.lock.Unlock()
// Remove failed add message from successAdd.
delete(tx.successAdd, reply.xID)
}()
}
}
}
Expand Down Expand Up @@ -194,6 +189,9 @@ func (tx *Transaction) Complete() (int, error) {
}
tx.closed = true
}
for len(tx.controlReplyCh) > 0 {
runtime.Gosched()
}
tx.lock.Lock()
defer tx.lock.Unlock()
return len(tx.successAdd), nil
Expand Down

0 comments on commit 6fa2463

Please sign in to comment.