diff --git a/sonic_data_client/db_client.go b/sonic_data_client/db_client.go index 3b73b50f..47eab06a 100644 --- a/sonic_data_client/db_client.go +++ b/sonic_data_client/db_client.go @@ -73,6 +73,10 @@ var Target2RedisDb = make(map[string]map[string]*redis.Client) // Any non-zero value that less than this threshold is considered invalid argument. var MinSampleInterval = time.Second +// MinHeartbeatInterval is the lowest HB interval for streaming subscriptions. +// This is reserved value, which should be adjusted per BGPL benchmark result. +var MinHeartbeatInterval = 3 * time.Minute + // IntervalTicker is a factory method to implement interval ticking. // Exposed for UT purposes. var IntervalTicker = func(interval time.Duration) <-chan time.Time { @@ -212,7 +216,7 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync for gnmiPath := range c.pathG2S { c.w.Add(1) c.synced.Add(1) - go streamOnChangeSubscription(c, gnmiPath) + go streamOnChangeSubscription(c, gnmiPath, nil) } } else { log.V(2).Infof("Stream subscription request received, mode: %v, subscription count: %v", @@ -230,7 +234,7 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync } else if subMode == gnmipb.SubscriptionMode_ON_CHANGE { c.w.Add(1) c.synced.Add(1) - go streamOnChangeSubscription(c, sub.GetPath()) + go streamOnChangeSubscription(c, nil, sub) } else { enqueueFatalMsg(c, fmt.Sprintf("unsupported subscription mode, %v", subMode)) return @@ -255,19 +259,36 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync } // streamOnChangeSubscription implements Subscription "ON_CHANGE STREAM" mode -func streamOnChangeSubscription(c *DbClient, gnmiPath *gnmipb.Path) { +func streamOnChangeSubscription(c *DbClient, gnmiPath *gnmipb.Path, sub *gnmipb.Subscription) { + if gnmiPath == nil { + gnmiPath = sub.GetPath() + } + + // if heartbeatInterval is not assigned, use 0 to ignore periodical full sync + var heartbeatInterval time.Duration = 0 + if sub != nil { + var err error + heartbeatInterval, err = validateHeartbeatInterval(sub) + if err != nil { + enqueueFatalMsg(c, err.Error()) + c.synced.Done() + c.w.Done() + return + } + } + tblPaths := c.pathG2S[gnmiPath] log.V(2).Infof("streamOnChangeSubscription gnmiPath: %v", gnmiPath) if tblPaths[0].field != "" { if len(tblPaths) > 1 { - go dbFieldMultiSubscribe(c, gnmiPath, true, time.Millisecond*200, false) + go dbFieldMultiSubscribe(c, gnmiPath, true, heartbeatInterval, false) } else { - go dbFieldSubscribe(c, gnmiPath, true, time.Millisecond*200) + go dbFieldSubscribe(c, gnmiPath, true, heartbeatInterval) } } else { // sample interval and update only parameters are not applicable - go dbTableKeySubscribe(c, gnmiPath, 0, true) + go dbTableKeySubscribe(c, gnmiPath, heartbeatInterval, true) } } @@ -1340,3 +1361,17 @@ func validateSampleInterval(sub *gnmipb.Subscription) (time.Duration, error) { return requestedInterval, nil } } + +// validateHeartbeatInterval validates the heartbeat interval of the given subscription. +func validateHeartbeatInterval(sub *gnmipb.Subscription) (time.Duration, error) { + requestedInterval := time.Duration(sub.GetHeartbeatInterval()) + if requestedInterval == 0 { + // If the heartbeat_interval is set to 0, the target MUST create the subscription + // and send the data with the MinHeartbeatInterval + return MinHeartbeatInterval, nil + } else if requestedInterval < MinHeartbeatInterval { + return 0, fmt.Errorf("invalid heartbeat interval: %v. It cannot be less than %v", requestedInterval, MinHeartbeatInterval) + } else { + return requestedInterval, nil + } +}