Skip to content

Commit

Permalink
Merge pull request #572 from gopcua/clarify-submux-lock
Browse files Browse the repository at this point in the history
Clarify submux lock
  • Loading branch information
magiconair authored Apr 8, 2022
2 parents 543c263 + 52042d1 commit 6bb1cf2
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 33 deletions.
73 changes: 44 additions & 29 deletions client_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (c *Client) SubscribeWithContext(ctx context.Context, params *SubscriptionP
}

c.subs[sub.SubscriptionID] = sub
c.updatePublishTimeout()
c.updatePublishTimeout_NeedsSubMuxRLock()
return sub, nil
}

Expand All @@ -98,11 +98,14 @@ func (c *Client) SubscriptionIDs() []uint32 {
// recreateSubscriptions creates new subscriptions
// with the same parameters to replace the previous ones
func (c *Client) recreateSubscription(ctx context.Context, id uint32) error {
c.subMux.Lock()
defer c.subMux.Unlock()

sub, ok := c.subs[id]
if !ok {
return ua.StatusBadSubscriptionIDInvalid
}
return sub.recreate(ctx)
return sub.recreate_NeedsSubMuxLock(ctx)
}

// transferSubscriptions ask the server to transfer the given subscriptions
Expand All @@ -123,10 +126,10 @@ func (c *Client) transferSubscriptions(ctx context.Context, ids []uint32) (*ua.T
// republishSubscriptions sends republish requests for the given subscription id.
func (c *Client) republishSubscription(ctx context.Context, id uint32, availableSeq []uint32) error {
c.subMux.RLock()
defer c.subMux.RUnlock()
sub := c.subs[id]
c.subMux.RUnlock()

sub, ok := c.subs[id]
if !ok {
if sub == nil {
return errors.Errorf("invalid subscription id %d", id)
}

Expand Down Expand Up @@ -208,11 +211,8 @@ func (c *Client) sendRepublishRequests(ctx context.Context, sub *Subscription, a
}
}

// registerSubscription register a subscription
func (c *Client) registerSubscription(sub *Subscription) error {
c.subMux.Lock()
defer c.subMux.Unlock()

// registerSubscription_NeedsSubMuxLock registers a subscription
func (c *Client) registerSubscription_NeedsSubMuxLock(sub *Subscription) error {
if sub.SubscriptionID == 0 {
return ua.StatusBadSubscriptionIDInvalid
}
Expand All @@ -227,19 +227,23 @@ func (c *Client) registerSubscription(sub *Subscription) error {

func (c *Client) forgetSubscription(ctx context.Context, id uint32) {
c.subMux.Lock()
delete(c.subs, id)
c.updatePublishTimeout()
c.forgetSubscription_NeedsSubMuxLock(ctx, id)
c.subMux.Unlock()
}

func (c *Client) forgetSubscription_NeedsSubMuxLock(ctx context.Context, id uint32) {
delete(c.subs, id)
c.updatePublishTimeout_NeedsSubMuxRLock()
stats.Subscription().Add("Count", -1)

if len(c.subs) == 0 {
// todo(fs): are we holding the lock too long here?
// todo(fs): consider running this as a go routine
c.pauseSubscriptions(ctx)
}
}

func (c *Client) updatePublishTimeout() {
// we need to hold the subMux lock already

func (c *Client) updatePublishTimeout_NeedsSubMuxRLock() {
maxTimeout := uasc.MaxTimeout
for _, s := range c.subs {
if d := s.publishTimeout(); d < maxTimeout {
Expand All @@ -249,19 +253,25 @@ func (c *Client) updatePublishTimeout() {
c.setPublishTimeout(maxTimeout)
}

func (c *Client) notifySubscriptionsOfError(ctx context.Context, subID uint32, err error) {
// we need to hold the subMux lock already
func (c *Client) notifySubscriptionOfError(ctx context.Context, subID uint32, err error) {
c.subMux.RLock()
s := c.subs[subID]
c.subMux.RUnlock()

subsToNotify := c.subs
if subID != 0 {
subsToNotify = map[uint32]*Subscription{
subID: c.subs[subID],
}
if s == nil {
return
}
for _, sub := range subsToNotify {
go s.notify(ctx, &PublishNotificationData{Error: err})
}

func (c *Client) notifyAllSubscriptionsOfError(ctx context.Context, err error) {
c.subMux.RLock()
defer c.subMux.RUnlock()

for _, s := range c.subs {
go func(s *Subscription) {
s.notify(ctx, &PublishNotificationData{Error: err})
}(sub)
}(s)
}
}

Expand Down Expand Up @@ -430,7 +440,11 @@ func (c *Client) publish(ctx context.Context) error {
case err != nil && res != nil:
// irrecoverable error
// todo(fs): do we need to stop and forget the subscription?
c.notifySubscriptionsOfError(ctx, res.SubscriptionID, err)
if res.SubscriptionID == 0 {
c.notifyAllSubscriptionsOfError(ctx, err)
} else {
c.notifySubscriptionOfError(ctx, res.SubscriptionID, err)
}
dlog.Printf("error: %s", err)
return err

Expand All @@ -441,17 +455,18 @@ func (c *Client) publish(ctx context.Context) error {
default:
c.subMux.Lock()
// handle pending acks for all subscriptions
c.handleAcks(res.Results)
c.handleAcks_NeedsSubMuxLock(res.Results)

sub, ok := c.subs[res.SubscriptionID]
if !ok {
c.subMux.Unlock()
// todo(fs): should we return an error here?
dlog.Printf("error: unknown subscription %d", res.SubscriptionID)
return nil
}

// handle the publish response for a specific subscription
c.handleNotification(ctx, sub, res)
c.handleNotification_NeedsSubMuxLock(ctx, sub, res)
c.subMux.Unlock()

c.notifySubscription(ctx, sub, res.NotificationMessage)
Expand All @@ -461,7 +476,7 @@ func (c *Client) publish(ctx context.Context) error {
return nil
}

func (c *Client) handleAcks(res []ua.StatusCode) {
func (c *Client) handleAcks_NeedsSubMuxLock(res []ua.StatusCode) {
dlog := debug.NewPrefixLogger("publish: ")

// we assume that the number of results in the response match
Expand Down Expand Up @@ -494,7 +509,7 @@ func (c *Client) handleAcks(res []ua.StatusCode) {
dlog.Printf("notAcked=%v", notAcked)
}

func (c *Client) handleNotification(ctx context.Context, sub *Subscription, res *ua.PublishResponse) {
func (c *Client) handleNotification_NeedsSubMuxLock(ctx context.Context, sub *Subscription, res *ua.PublishResponse) {
dlog := debug.NewPrefixLogger("publish: sub %d: ", res.SubscriptionID)

// keep-alive message
Expand Down
8 changes: 4 additions & 4 deletions subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,9 @@ func (p *SubscriptionParameters) setDefaults() {
}
}

// recreate creates a new subscription based on the previous subscription
// recreate_NeedsSubMuxLock creates a new subscription based on the previous subscription
// parameters and monitored items.
func (s *Subscription) recreate(ctx context.Context) error {
func (s *Subscription) recreate_NeedsSubMuxLock(ctx context.Context) error {
dlog := debug.NewPrefixLogger("sub %d: recreate: ", s.SubscriptionID)

if s.SubscriptionID == terminatedSubscriptionID {
Expand All @@ -361,7 +361,7 @@ func (s *Subscription) recreate(ctx context.Context) error {
})
dlog.Print("subscription deleted")
}
s.c.forgetSubscription(ctx, s.SubscriptionID)
s.c.forgetSubscription_NeedsSubMuxLock(ctx, s.SubscriptionID)
dlog.Printf("subscription forgotton")

req := &ua.CreateSubscriptionRequest{
Expand Down Expand Up @@ -394,7 +394,7 @@ func (s *Subscription) recreate(ctx context.Context) error {
s.lastSeq = 0
s.nextSeq = 1

if err := s.c.registerSubscription(s); err != nil {
if err := s.c.registerSubscription_NeedsSubMuxLock(s); err != nil {
return err
}
dlog.Printf("subscription registered")
Expand Down

0 comments on commit 6bb1cf2

Please sign in to comment.