Skip to content

Commit

Permalink
Add support of heartbeat interval in "on change" subscription, so that
Browse files Browse the repository at this point in the history
full sync would happen periodically when hb is triggered.
  • Loading branch information
FengPan-Frank committed Oct 11, 2023
1 parent cbb7631 commit a46dd46
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 17 deletions.
31 changes: 20 additions & 11 deletions gnmi_server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ type subscriptionQuery struct {
Query []string
SubMode pb.SubscriptionMode
SampleInterval uint64
HeartbeatInterval uint64
}

func pathToString(q client.Path) string {
Expand Down Expand Up @@ -745,6 +746,7 @@ func createQuery(subListMode pb.SubscriptionList_Mode, target string, queries []
Path: pp,
Mode: qq.SubMode,
SampleInterval: qq.SampleInterval,
HeartbeatInterval: qq.HeartbeatInterval,
})
}

Expand Down Expand Up @@ -792,14 +794,15 @@ func createStateDbQueryOnChangeMode(t *testing.T, paths ...string) client.Query
}

// createCountersDbQueryOnChangeMode creates a query with ON_CHANGE mode.
func createCountersDbQueryOnChangeMode(t *testing.T, paths ...string) client.Query {
func createCountersDbQueryOnChangeMode(t *testing.T, interval time.Duration, paths ...string) client.Query {
return createQueryOrFail(t,
pb.SubscriptionList_STREAM,
"COUNTERS_DB",
[]subscriptionQuery{
{
Query: paths,
SubMode: pb.SubscriptionMode_ON_CHANGE,
HeartbeatInterval: uint64(interval.Seconds()),
},
},
false)
Expand Down Expand Up @@ -1800,9 +1803,15 @@ func runTestSubscribe(t *testing.T, namespace string) {
generateIntervals bool
}
tests := []TestExec {
{
desc: "Testing invalid heartbeat interval",
q: createCountersDbQueryOnChangeMode(t, 10 * time.Second, "NEIGH_STATE_TABLE"),
wantSubErr: fmt.Errorf("rpc error: code = InvalidArgument desc = invalid heartbeat interval: 10s. It cannot be less than %v", sdc.MinHeartbeatInterval),
wantNoti: []client.Notification{},
},
{
desc: "stream query for table COUNTERS_PORT_NAME_MAP with new test_field field",
q: createCountersDbQueryOnChangeMode(t, "COUNTERS_PORT_NAME_MAP"),
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS_PORT_NAME_MAP"),
updates: []tablePathValue{{
dbName: "COUNTERS_DB",
tableName: "COUNTERS_PORT_NAME_MAP",
Expand All @@ -1818,7 +1827,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
},
{
desc: "stream query for table key Ethernet68 with new test_field field",
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet68"),
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet68"),
updates: []tablePathValue{
{
dbName: "COUNTERS_DB",
Expand Down Expand Up @@ -1846,7 +1855,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
},
{
desc: "(use vendor alias) stream query for table key Ethernet68/1 with new test_field field",
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet68/1"),
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet68/1"),
updates: []tablePathValue{
{
dbName: "COUNTERS_DB",
Expand Down Expand Up @@ -1874,7 +1883,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
},
{
desc: "stream query for COUNTERS/Ethernet68/SAI_PORT_STAT_PFC_7_RX_PKTS with update of field value",
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet68", "SAI_PORT_STAT_PFC_7_RX_PKTS"),
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet68", "SAI_PORT_STAT_PFC_7_RX_PKTS"),
updates: []tablePathValue{
{
dbName: "COUNTERS_DB",
Expand Down Expand Up @@ -1902,7 +1911,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
},
{
desc: "(use vendor alias) stream query for COUNTERS/[Ethernet68/1]/SAI_PORT_STAT_PFC_7_RX_PKTS with update of field value",
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet68/1", "SAI_PORT_STAT_PFC_7_RX_PKTS"),
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet68/1", "SAI_PORT_STAT_PFC_7_RX_PKTS"),
updates: []tablePathValue{
{
dbName: "COUNTERS_DB",
Expand Down Expand Up @@ -1930,7 +1939,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
},
{
desc: "stream query for COUNTERS/Ethernet68/Pfcwd with update of field value",
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet68", "Pfcwd"),
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet68", "Pfcwd"),
updates: []tablePathValue{
{
dbName: "COUNTERS_DB",
Expand Down Expand Up @@ -1958,7 +1967,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
},
{
desc: "(use vendor alias) stream query for COUNTERS/[Ethernet68/1]/Pfcwd with update of field value",
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet68/1", "Pfcwd"),
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet68/1", "Pfcwd"),
updates: []tablePathValue{
{
dbName: "COUNTERS_DB",
Expand Down Expand Up @@ -1986,7 +1995,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
},
{
desc: "stream query for table key Ethernet* with new test_field field on Ethernet68",
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet*"),
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet*"),
updates: []tablePathValue{
{
dbName: "COUNTERS_DB",
Expand Down Expand Up @@ -2014,7 +2023,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
},
{
desc: "stream query for table key Ethernet*/SAI_PORT_STAT_PFC_7_RX_PKTS with field value update",
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet*", "SAI_PORT_STAT_PFC_7_RX_PKTS"),
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet*", "SAI_PORT_STAT_PFC_7_RX_PKTS"),
updates: []tablePathValue{
{
dbName: "COUNTERS_DB",
Expand All @@ -2034,7 +2043,7 @@ func runTestSubscribe(t *testing.T, namespace string) {
},
{
desc: "stream query for table key Ethernet*/Pfcwd with field value update",
q: createCountersDbQueryOnChangeMode(t, "COUNTERS", "Ethernet*", "Pfcwd"),
q: createCountersDbQueryOnChangeMode(t, 0, "COUNTERS", "Ethernet*", "Pfcwd"),
updates: []tablePathValue{
{
dbName: "COUNTERS_DB",
Expand Down
47 changes: 41 additions & 6 deletions sonic_data_client/db_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ var Target2RedisDb = make(map[string]map[string]*redis.Client)
// Any non-zero value that less than this threshold is considered invalid argument.
var MinSampleInterval = time.Second

// MinHeartbeatInterval is the lowest HB interval for streaming subscriptions.
// This is reserved value, which should be adjusted per BGPL benchmark result.
var MinHeartbeatInterval = 1 * time.Minute

// IntervalTicker is a factory method to implement interval ticking.
// Exposed for UT purposes.
var IntervalTicker = func(interval time.Duration) <-chan time.Time {
Expand Down Expand Up @@ -212,7 +216,7 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync
for gnmiPath := range c.pathG2S {
c.w.Add(1)
c.synced.Add(1)
go streamOnChangeSubscription(c, gnmiPath)
go streamOnChangeSubscription(c, gnmiPath, nil)
}
} else {
log.V(2).Infof("Stream subscription request received, mode: %v, subscription count: %v",
Expand All @@ -230,7 +234,7 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync
} else if subMode == gnmipb.SubscriptionMode_ON_CHANGE {
c.w.Add(1)
c.synced.Add(1)
go streamOnChangeSubscription(c, sub.GetPath())
go streamOnChangeSubscription(c, nil, sub)
} else {
enqueueFatalMsg(c, fmt.Sprintf("unsupported subscription mode, %v", subMode))
return
Expand All @@ -255,19 +259,36 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync
}

// streamOnChangeSubscription implements Subscription "ON_CHANGE STREAM" mode
func streamOnChangeSubscription(c *DbClient, gnmiPath *gnmipb.Path) {
func streamOnChangeSubscription(c *DbClient, gnmiPath *gnmipb.Path, sub *gnmipb.Subscription) {
if gnmiPath == nil {
gnmiPath = sub.GetPath()
}

// if heartbeatInterval is not assigned, use 0 to ignore periodical full sync
var heartbeatInterval time.Duration = 0
if sub != nil {
var err error
heartbeatInterval, err = validateHeartbeatInterval(sub)
if err != nil {
enqueueFatalMsg(c, err.Error())
c.synced.Done()
c.w.Done()
return
}
}

tblPaths := c.pathG2S[gnmiPath]
log.V(2).Infof("streamOnChangeSubscription gnmiPath: %v", gnmiPath)

if tblPaths[0].field != "" {
if len(tblPaths) > 1 {
go dbFieldMultiSubscribe(c, gnmiPath, true, time.Millisecond*200, false)
go dbFieldMultiSubscribe(c, gnmiPath, true, heartbeatInterval, false)
} else {
go dbFieldSubscribe(c, gnmiPath, true, time.Millisecond*200)
go dbFieldSubscribe(c, gnmiPath, true, heartbeatInterval)
}
} else {
// sample interval and update only parameters are not applicable
go dbTableKeySubscribe(c, gnmiPath, 0, true)
go dbTableKeySubscribe(c, gnmiPath, heartbeatInterval, true)
}
}

Expand Down Expand Up @@ -1340,3 +1361,17 @@ func validateSampleInterval(sub *gnmipb.Subscription) (time.Duration, error) {
return requestedInterval, nil
}
}

// validateHeartbeatInterval validates the heartbeat interval of the given subscription.
func validateHeartbeatInterval(sub *gnmipb.Subscription) (time.Duration, error) {
requestedInterval := time.Duration(sub.GetHeartbeatInterval())
if requestedInterval == 0 {
// If the heartbeat_interval is set to 0, the target MUST create the subscription
// and send the data with the MinHeartbeatInterval
return MinHeartbeatInterval, nil
} else if requestedInterval < MinHeartbeatInterval {
return 0, fmt.Errorf("invalid heartbeat interval: %v. It cannot be less than %v", requestedInterval, MinHeartbeatInterval)
} else {
return requestedInterval, nil
}
}

0 comments on commit a46dd46

Please sign in to comment.