Skip to content

Commit

Permalink
Add support of heartbeat interval in "on change" subscription, so that
Browse files Browse the repository at this point in the history
full sync would happen periodically when hb is triggered.
  • Loading branch information
FengPan-Frank committed Oct 11, 2023
1 parent cbb7631 commit d57f25b
Showing 1 changed file with 41 additions and 6 deletions.
47 changes: 41 additions & 6 deletions sonic_data_client/db_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ var Target2RedisDb = make(map[string]map[string]*redis.Client)
// Any non-zero value that less than this threshold is considered invalid argument.
var MinSampleInterval = time.Second

// MinHeartbeatInterval is the lowest HB interval for streaming subscriptions.
// This is reserved value, which should be adjusted per BGPL benchmark result.
var MinHeartbeatInterval = 3 * time.Minute

// IntervalTicker is a factory method to implement interval ticking.
// Exposed for UT purposes.
var IntervalTicker = func(interval time.Duration) <-chan time.Time {
Expand Down Expand Up @@ -212,7 +216,7 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync
for gnmiPath := range c.pathG2S {
c.w.Add(1)
c.synced.Add(1)
go streamOnChangeSubscription(c, gnmiPath)
go streamOnChangeSubscription(c, gnmiPath, nil)
}
} else {
log.V(2).Infof("Stream subscription request received, mode: %v, subscription count: %v",
Expand All @@ -230,7 +234,7 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync
} else if subMode == gnmipb.SubscriptionMode_ON_CHANGE {
c.w.Add(1)
c.synced.Add(1)
go streamOnChangeSubscription(c, sub.GetPath())
go streamOnChangeSubscription(c, nil, sub)
} else {
enqueueFatalMsg(c, fmt.Sprintf("unsupported subscription mode, %v", subMode))
return
Expand All @@ -255,19 +259,36 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync
}

// streamOnChangeSubscription implements Subscription "ON_CHANGE STREAM" mode
func streamOnChangeSubscription(c *DbClient, gnmiPath *gnmipb.Path) {
func streamOnChangeSubscription(c *DbClient, gnmiPath *gnmipb.Path, sub *gnmipb.Subscription) {
if gnmiPath == nil {
gnmiPath = sub.GetPath()
}

// if heartbeatInterval is not assigned, use 0 to ignore periodical full sync
var heartbeatInterval time.Duration = 0
if sub != nil {
var err error
heartbeatInterval, err = validateHeartbeatInterval(sub)
if err != nil {
enqueueFatalMsg(c, err.Error())
c.synced.Done()
c.w.Done()
return
}
}

tblPaths := c.pathG2S[gnmiPath]
log.V(2).Infof("streamOnChangeSubscription gnmiPath: %v", gnmiPath)

if tblPaths[0].field != "" {
if len(tblPaths) > 1 {
go dbFieldMultiSubscribe(c, gnmiPath, true, time.Millisecond*200, false)
go dbFieldMultiSubscribe(c, gnmiPath, true, heartbeatInterval, false)
} else {
go dbFieldSubscribe(c, gnmiPath, true, time.Millisecond*200)
go dbFieldSubscribe(c, gnmiPath, true, heartbeatInterval)
}
} else {
// sample interval and update only parameters are not applicable
go dbTableKeySubscribe(c, gnmiPath, 0, true)
go dbTableKeySubscribe(c, gnmiPath, heartbeatInterval, true)
}
}

Expand Down Expand Up @@ -1340,3 +1361,17 @@ func validateSampleInterval(sub *gnmipb.Subscription) (time.Duration, error) {
return requestedInterval, nil
}
}

// validateHeartbeatInterval validates the heartbeat interval of the given subscription.
func validateHeartbeatInterval(sub *gnmipb.Subscription) (time.Duration, error) {
requestedInterval := time.Duration(sub.GetHeartbeatInterval())
if requestedInterval == 0 {
// If the heartbeat_interval is set to 0, the target MUST create the subscription
// and send the data with the MinHeartbeatInterval
return MinHeartbeatInterval, nil
} else if requestedInterval < MinHeartbeatInterval {
return 0, fmt.Errorf("invalid heartbeat interval: %v. It cannot be less than %v", requestedInterval, MinHeartbeatInterval)
} else {
return requestedInterval, nil
}
}

0 comments on commit d57f25b

Please sign in to comment.