diff --git a/gnmi_server/server_test.go b/gnmi_server/server_test.go index eb3a5d4f..94779a18 100644 --- a/gnmi_server/server_test.go +++ b/gnmi_server/server_test.go @@ -709,6 +709,7 @@ type subscriptionQuery struct { Query []string SubMode pb.SubscriptionMode SampleInterval uint64 + HeartbeatInterval uint64 } func pathToString(q client.Path) string { @@ -745,6 +746,7 @@ func createQuery(subListMode pb.SubscriptionList_Mode, target string, queries [] Path: pp, Mode: qq.SubMode, SampleInterval: qq.SampleInterval, + HeartbeatInterval: qq.HeartbeatInterval, }) } @@ -792,7 +794,7 @@ func createStateDbQueryOnChangeMode(t *testing.T, paths ...string) client.Query } // createCountersDbQueryOnChangeMode creates a query with ON_CHANGE mode. -func createCountersDbQueryOnChangeMode(t *testing.T, paths ...string) client.Query { +func createCountersDbQueryOnChangeMode(t *testing.T, interval time.Duration, paths ...string) client.Query { return createQueryOrFail(t, pb.SubscriptionList_STREAM, "COUNTERS_DB", @@ -800,6 +802,7 @@ func createCountersDbQueryOnChangeMode(t *testing.T, paths ...string) client.Que { Query: paths, SubMode: pb.SubscriptionMode_ON_CHANGE, + HeartbeatInterval: uint64(interval.Seconds()), }, }, false) @@ -1800,9 +1803,15 @@ func runTestSubscribe(t *testing.T, namespace string) { generateIntervals bool } tests := []TestExec { + { + desc: "Testing invalid heartbeat interval", + q: createCountersDbQueryOnChangeMode(t, 10 * time.Second, "NEIGH_STATE_TABLE"), + wantSubErr: fmt.Errorf("rpc error: code = InvalidArgument desc = invalid heartbeat interval: 10s. It cannot be less than %v", sdc.MinHeartbeatInterval), + wantNoti: []client.Notification{}, + }, { desc: "stream query for table COUNTERS_PORT_NAME_MAP with new test_field field", - q: createCountersDbQueryOnChangeMode(t, "COUNTERS_PORT_NAME_MAP"), + q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS_PORT_NAME_MAP"), updates: []tablePathValue{{ dbName: "COUNTERS_DB", tableName: "COUNTERS_PORT_NAME_MAP", @@ -1818,7 +1827,7 @@ func runTestSubscribe(t *testing.T, namespace string) { }, { desc: "stream query for table key Ethernet68 with new test_field field", - q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet68"), + q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet68"), updates: []tablePathValue{ { dbName: "COUNTERS_DB", @@ -1846,7 +1855,7 @@ func runTestSubscribe(t *testing.T, namespace string) { }, { desc: "(use vendor alias) stream query for table key Ethernet68/1 with new test_field field", - q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet68/1"), + q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet68/1"), updates: []tablePathValue{ { dbName: "COUNTERS_DB", @@ -1874,7 +1883,7 @@ func runTestSubscribe(t *testing.T, namespace string) { }, { desc: "stream query for COUNTERS/Ethernet68/SAI_PORT_STAT_PFC_7_RX_PKTS with update of field value", - q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet68", "SAI_PORT_STAT_PFC_7_RX_PKTS"), + q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet68", "SAI_PORT_STAT_PFC_7_RX_PKTS"), updates: []tablePathValue{ { dbName: "COUNTERS_DB", @@ -1902,7 +1911,7 @@ func runTestSubscribe(t *testing.T, namespace string) { }, { desc: "(use vendor alias) stream query for COUNTERS/[Ethernet68/1]/SAI_PORT_STAT_PFC_7_RX_PKTS with update of field value", - q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet68/1", "SAI_PORT_STAT_PFC_7_RX_PKTS"), + q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet68/1", "SAI_PORT_STAT_PFC_7_RX_PKTS"), updates: []tablePathValue{ { dbName: "COUNTERS_DB", @@ -1930,7 +1939,7 @@ func runTestSubscribe(t *testing.T, namespace string) { }, { desc: "stream query for COUNTERS/Ethernet68/Pfcwd with update of field value", - q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet68", "Pfcwd"), + q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet68", "Pfcwd"), updates: []tablePathValue{ { dbName: "COUNTERS_DB", @@ -1958,7 +1967,7 @@ func runTestSubscribe(t *testing.T, namespace string) { }, { desc: "(use vendor alias) stream query for COUNTERS/[Ethernet68/1]/Pfcwd with update of field value", - q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet68/1", "Pfcwd"), + q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet68/1", "Pfcwd"), updates: []tablePathValue{ { dbName: "COUNTERS_DB", @@ -1986,7 +1995,7 @@ func runTestSubscribe(t *testing.T, namespace string) { }, { desc: "stream query for table key Ethernet* with new test_field field on Ethernet68", - q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet*"), + q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet*"), updates: []tablePathValue{ { dbName: "COUNTERS_DB", @@ -2014,7 +2023,7 @@ func runTestSubscribe(t *testing.T, namespace string) { }, { desc: "stream query for table key Ethernet*/SAI_PORT_STAT_PFC_7_RX_PKTS with field value update", - q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet*", "SAI_PORT_STAT_PFC_7_RX_PKTS"), + q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet*", "SAI_PORT_STAT_PFC_7_RX_PKTS"), updates: []tablePathValue{ { dbName: "COUNTERS_DB", @@ -2034,7 +2043,7 @@ func runTestSubscribe(t *testing.T, namespace string) { }, { desc: "stream query for table key Ethernet*/Pfcwd with field value update", - q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet*", "Pfcwd"), + q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet*", "Pfcwd"), updates: []tablePathValue{ { dbName: "COUNTERS_DB", diff --git a/sonic_data_client/db_client.go b/sonic_data_client/db_client.go index 3b73b50f..e625d87f 100644 --- a/sonic_data_client/db_client.go +++ b/sonic_data_client/db_client.go @@ -73,6 +73,10 @@ var Target2RedisDb = make(map[string]map[string]*redis.Client) // Any non-zero value that less than this threshold is considered invalid argument. var MinSampleInterval = time.Second +// MinHeartbeatInterval is the lowest HB interval for streaming subscriptions. +// This is reserved value, which should be adjusted per BGPL benchmark result. +var MinHeartbeatInterval = 1 * time.Minute + // IntervalTicker is a factory method to implement interval ticking. // Exposed for UT purposes. var IntervalTicker = func(interval time.Duration) <-chan time.Time { @@ -212,7 +216,7 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync for gnmiPath := range c.pathG2S { c.w.Add(1) c.synced.Add(1) - go streamOnChangeSubscription(c, gnmiPath) + go streamOnChangeSubscription(c, gnmiPath, nil) } } else { log.V(2).Infof("Stream subscription request received, mode: %v, subscription count: %v", @@ -230,7 +234,7 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync } else if subMode == gnmipb.SubscriptionMode_ON_CHANGE { c.w.Add(1) c.synced.Add(1) - go streamOnChangeSubscription(c, sub.GetPath()) + go streamOnChangeSubscription(c, nil, sub) } else { enqueueFatalMsg(c, fmt.Sprintf("unsupported subscription mode, %v", subMode)) return @@ -255,19 +259,36 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync } // streamOnChangeSubscription implements Subscription "ON_CHANGE STREAM" mode -func streamOnChangeSubscription(c *DbClient, gnmiPath *gnmipb.Path) { +func streamOnChangeSubscription(c *DbClient, gnmiPath *gnmipb.Path, sub *gnmipb.Subscription) { + if gnmiPath == nil { + gnmiPath = sub.GetPath() + } + + // if heartbeatInterval is not assigned, use 0 to ignore periodical full sync + var heartbeatInterval time.Duration = 0 + if sub != nil { + var err error + heartbeatInterval, err = validateHeartbeatInterval(sub) + if err != nil { + enqueueFatalMsg(c, err.Error()) + c.synced.Done() + c.w.Done() + return + } + } + tblPaths := c.pathG2S[gnmiPath] log.V(2).Infof("streamOnChangeSubscription gnmiPath: %v", gnmiPath) if tblPaths[0].field != "" { if len(tblPaths) > 1 { - go dbFieldMultiSubscribe(c, gnmiPath, true, time.Millisecond*200, false) + go dbFieldMultiSubscribe(c, gnmiPath, true, heartbeatInterval, false) } else { - go dbFieldSubscribe(c, gnmiPath, true, time.Millisecond*200) + go dbFieldSubscribe(c, gnmiPath, true, heartbeatInterval) } } else { // sample interval and update only parameters are not applicable - go dbTableKeySubscribe(c, gnmiPath, 0, true) + go dbTableKeySubscribe(c, gnmiPath, heartbeatInterval, true) } } @@ -1340,3 +1361,17 @@ func validateSampleInterval(sub *gnmipb.Subscription) (time.Duration, error) { return requestedInterval, nil } } + +// validateHeartbeatInterval validates the heartbeat interval of the given subscription. +func validateHeartbeatInterval(sub *gnmipb.Subscription) (time.Duration, error) { + requestedInterval := time.Duration(sub.GetHeartbeatInterval()) + if requestedInterval == 0 { + // If the heartbeat_interval is set to 0, the target MUST create the subscription + // and send the data with the MinHeartbeatInterval + return MinHeartbeatInterval, nil + } else if requestedInterval < MinHeartbeatInterval { + return 0, fmt.Errorf("invalid heartbeat interval: %v. It cannot be less than %v", requestedInterval, MinHeartbeatInterval) + } else { + return requestedInterval, nil + } +}