Skip to content

Commit

Permalink
Merge pull request #188 from balajisundaravel/master
Browse files Browse the repository at this point in the history
Project import generated by Copybara
  • Loading branch information
dplore authored Oct 21, 2024
2 parents 776b4d0 + 56523db commit 91fd031
Show file tree
Hide file tree
Showing 31 changed files with 1,283 additions and 4,796 deletions.
15 changes: 14 additions & 1 deletion cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type Target struct {
ts time.Time // latest timestamp for an update
excludedMeta stringset.Set // set of metadata not to generate update for
futureThreshold time.Duration // how far in the future an update can be accepted
eventDriven bool // whether to emulate event driven
}

// Name returns the name of the target.
Expand All @@ -83,6 +84,7 @@ type options struct {
serverName string
excludedUpdateMeta stringset.Set
futureThreshold time.Duration
DisableEventDriven bool
}

// Option defines the function prototype to set options for creating a Cache.
Expand Down Expand Up @@ -139,6 +141,16 @@ func WithFutureThreshold(futureThreshold time.Duration) Option {
}
}

// DisableEventDrivenEmulation returns an Option to disable event-driven
// emulation. Note that it only disables event-driven emulation. If the
// input data to Cache is already event-driven, this won't be able to
// disable the event-driven nature of the original data.
func DisableEventDrivenEmulation() Option {
return func(o *options) {
o.DisableEventDriven = true
}
}

// Cache is a structure holding state information for multiple targets.
type Cache struct {
opts options
Expand Down Expand Up @@ -287,6 +299,7 @@ func (c *Cache) Add(target string) *Target {
lat: latency.New(c.opts.latencyWindows, latOpts),
excludedMeta: c.opts.excludedUpdateMeta,
futureThreshold: c.opts.futureThreshold,
eventDriven: !c.opts.DisableEventDriven,
}
t.meta.SetStr(metadata.ServerName, c.opts.serverName)
c.targets[target] = t
Expand Down Expand Up @@ -556,7 +569,7 @@ func (t *Target) gnmiUpdate(n *pb.Notification) (*ctree.Leaf, error) {
}
oldval.Update(n)
// Simulate event-driven for all non-atomic updates.
if !n.Atomic && value.Equal(old.Update[0].Val, n.Update[0].Val) {
if !n.Atomic && value.Equal(old.Update[0].Val, n.Update[0].Val) && t.eventDriven {
t.meta.AddInt(metadata.SuppressedCount, 1)
return nil, nil
}
Expand Down
48 changes: 48 additions & 0 deletions cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,54 @@ func TestMetadataSuppressed(t *testing.T) {
}
}

func TestMetadataSuppressedWithEventDrivenDisabled(t *testing.T) {
c := New([]string{"dev1"}, DisableEventDrivenEmulation())
// Unique values not suppressed.
for i := 0; i < 10; i++ {
c.GnmiUpdate(gnmiNotification("dev1", []string{"prefix", "path"}, []string{"update", "path"}, int64(i), strconv.Itoa(i), false))
c.GetTarget("dev1").updateMeta(nil)
path := metadata.Path(metadata.SuppressedCount)
c.Query("dev1", path, func(_ []string, _ *ctree.Leaf, v any) error {
suppressedCount := v.(*pb.Notification).Update[0].Val.GetIntVal()
if suppressedCount != 0 {
t.Errorf("got suppressedCount = %d, want 0", suppressedCount)
}
return nil
})
path = metadata.Path(metadata.UpdateCount)
c.Query("dev1", path, func(_ []string, _ *ctree.Leaf, v any) error {
updates := v.(*pb.Notification).Update[0].Val.GetIntVal()
if updates != int64(i+1) {
t.Errorf("got updates %d, want %d", updates, i+1)
}
return nil
})
}
c.Reset("dev1")

// Duplicate values also not suppressed.
for i := 0; i < 10; i++ {
c.GnmiUpdate(gnmiNotification("dev1", []string{"prefix", "path"}, []string{"update", "path"}, int64(i), "same value", false))
c.GetTarget("dev1").updateMeta(nil)
path := metadata.Path(metadata.SuppressedCount)
c.Query("dev1", path, func(_ []string, _ *ctree.Leaf, v any) error {
suppressedCount := v.(*pb.Notification).Update[0].Val.GetIntVal()
if suppressedCount != 0 {
t.Errorf("got suppressedCount = %d, want 0", suppressedCount)
}
return nil
})
path = metadata.Path(metadata.UpdateCount)
c.Query("dev1", path, func(_ []string, _ *ctree.Leaf, v any) error {
updates := v.(*pb.Notification).Update[0].Val.GetIntVal()
if updates != int64(i+1) {
t.Errorf("got updates %d, want %d", updates, i+1)
}
return nil
})
}
}

func TestMetadataLatency(t *testing.T) {
window := 2 * time.Second
opt, _ := WithLatencyWindows([]string{"2s"}, 2*time.Second)
Expand Down
32 changes: 24 additions & 8 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ var (
"p": "PROTO", "proto": "PROTO", "PROTO": "PROTO",
"sp": "SHORTPROTO", "shortproto": "SHORTPROTO", "SHORTPROTO": "SHORTPROTO",
}
// Stub for testing.
since = time.Since
)

// Config is a type to hold parameters that affect how the cli sends and
Expand All @@ -66,11 +68,14 @@ type Config struct {
// on - human readable timestamp according to layout
// raw - int64 nanos since epoch
// <FORMAT> - human readable timestamp according to <FORMAT>
Timestamp string // Formatting of timestamp in result output.
DisplaySize bool
Latency bool // Show latency to client
ClientTypes []string // List of client types to try.
Location *time.Location // Location that time formatting uses in lieu of the local time zone.
Timestamp string // Formatting of timestamp in result output.
DisplaySize bool
Latency bool // Show latency to client. For single DisplayType only.
ClientTypes []string // List of client types to try.
Location *time.Location // Location that time formatting uses in lieu of the local time zone.
FilterDeletes bool // Filter out delete results. For single DisplayType only.
FilterUpdates bool // Filter out update results. For single DisplayType only.
FilterMinLatency time.Duration // Filter out results not meeting minimum latency. For single DisplayType only.
}

// QueryType returns a client query type for t after trying aliases for the
Expand Down Expand Up @@ -160,14 +165,21 @@ func genHandler(cfg *Config) client.NotificationHandler {
var buf bytes.Buffer // Reuse the same buffer in either case.
iDisplay := func(p client.Path, v interface{}, ts time.Time) {
buf.Reset()
var latency time.Duration
if cfg.Latency || cfg.FilterMinLatency > 0 {
latency = since(ts)
}
if cfg.FilterMinLatency > 0 && latency < cfg.FilterMinLatency {
return
}
buf.WriteString(strings.Join(p, cfg.Delimiter))
buf.WriteString(fmt.Sprintf(", %v", v))
t := formatTime(ts, cfg)
if t != nil {
buf.WriteString(fmt.Sprintf(", %v", t))
}
if cfg.Latency {
buf.WriteString(fmt.Sprintf(", %s", time.Since(ts)))
buf.WriteString(fmt.Sprintf(", %s", latency))
}
cfg.Display(buf.Bytes())
}
Expand All @@ -176,9 +188,13 @@ func genHandler(cfg *Config) client.NotificationHandler {
default:
return fmt.Errorf("invalid type: %#v", v)
case client.Update:
iDisplay(v.Path, v.Val, v.TS)
if !cfg.FilterUpdates {
iDisplay(v.Path, v.Val, v.TS)
}
case client.Delete:
iDisplay(v.Path, v.Val, v.TS)
if !cfg.FilterDeletes {
iDisplay(v.Path, v.Val, v.TS)
}
case client.Sync, client.Connected:
case client.Error:
return v
Expand Down
84 changes: 84 additions & 0 deletions cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,15 @@ func TestSendQueryAndDisplay(t *testing.T) {
display := func(b []byte) {
displayOut += string(b) + "\n"
}
now := time.Unix(300, 0)
tests := []struct {
desc string
updates []*fpb.Value
query client.Query
cfg Config
want string
sort bool
since func(time.Time) time.Duration
}{{
desc: "single target single output with provided layout",
updates: []*fpb.Value{
Expand Down Expand Up @@ -271,6 +273,84 @@ func TestSendQueryAndDisplay(t *testing.T) {
want: `dev1/a/b, 5
dev1/a/c, 5
dev1/a/c, <nil>
`,
}, {
desc: "single target single output with FilterMinLatency",
updates: []*fpb.Value{
{Path: []string{"a", "b"}, Value: &fpb.Value_IntValue{IntValue: &fpb.IntValue{Value: 5}}, Repeat: 1, Timestamp: &fpb.Timestamp{Timestamp: now.Add(-62 * time.Second).UnixNano()}},
{Path: []string{"a", "c"}, Value: &fpb.Value_IntValue{IntValue: &fpb.IntValue{Value: 5}}, Repeat: 1, Timestamp: &fpb.Timestamp{Timestamp: now.Add(-59 * time.Second).UnixNano()}},
{Path: []string{"a", "d"}, Value: &fpb.Value_IntValue{IntValue: &fpb.IntValue{Value: 7}}, Repeat: 1, Timestamp: &fpb.Timestamp{Timestamp: now.Add(-61 * time.Second).UnixNano()}},
{Path: []string{"a", "e"}, Value: &fpb.Value_Delete{Delete: &fpb.DeleteValue{}}, Repeat: 1, Timestamp: &fpb.Timestamp{Timestamp: now.Add(-60 * time.Second).UnixNano()}},
{Path: []string{"a", "f"}, Value: &fpb.Value_Delete{Delete: &fpb.DeleteValue{}}, Repeat: 1, Timestamp: &fpb.Timestamp{Timestamp: now.Add(-59 * time.Second).UnixNano()}},
{Value: &fpb.Value_Sync{Sync: 1}, Repeat: 1, Timestamp: &fpb.Timestamp{Timestamp: now.Add(-59 * time.Second).UnixNano()}},
},
query: client.Query{
Target: "dev1",
Queries: []client.Path{{"a"}},
Type: client.Once,
TLS: &tls.Config{InsecureSkipVerify: true},
},
cfg: Config{
Delimiter: "/",
Display: display,
DisplayPrefix: "",
DisplayIndent: " ",
DisplayType: "single",
FilterMinLatency: time.Minute,
},
since: func(ts time.Time) time.Duration { return now.Sub(ts) },
want: `dev1/a/b, 5
dev1/a/d, 7
dev1/a/e, <nil>
`,
}, {
desc: "single target single output with FilterDeletes",
updates: []*fpb.Value{
{Path: []string{"a", "b"}, Value: &fpb.Value_IntValue{IntValue: &fpb.IntValue{Value: 5}}, Repeat: 1, Timestamp: &fpb.Timestamp{Timestamp: 100}},
{Path: []string{"a", "c"}, Value: &fpb.Value_Delete{Delete: &fpb.DeleteValue{}}, Repeat: 1, Timestamp: &fpb.Timestamp{Timestamp: 101}},
{Path: []string{"a", "d"}, Value: &fpb.Value_IntValue{IntValue: &fpb.IntValue{Value: 7}}, Repeat: 1, Timestamp: &fpb.Timestamp{Timestamp: 102}},
{Value: &fpb.Value_Sync{Sync: 1}, Repeat: 1, Timestamp: &fpb.Timestamp{Timestamp: 103}},
},
query: client.Query{
Target: "dev1",
Queries: []client.Path{{"a"}},
Type: client.Once,
TLS: &tls.Config{InsecureSkipVerify: true},
},
cfg: Config{
Delimiter: "/",
Display: display,
DisplayPrefix: "",
DisplayIndent: " ",
DisplayType: "single",
FilterDeletes: true,
},
want: `dev1/a/b, 5
dev1/a/d, 7
`,
}, {
desc: "single target single output with FilterUpdates",
updates: []*fpb.Value{
{Path: []string{"a", "b"}, Value: &fpb.Value_IntValue{IntValue: &fpb.IntValue{Value: 5}}, Repeat: 1, Timestamp: &fpb.Timestamp{Timestamp: 100}},
{Path: []string{"a", "c"}, Value: &fpb.Value_Delete{Delete: &fpb.DeleteValue{}}, Repeat: 1, Timestamp: &fpb.Timestamp{Timestamp: 101}},
{Path: []string{"a", "d"}, Value: &fpb.Value_IntValue{IntValue: &fpb.IntValue{Value: 7}}, Repeat: 1, Timestamp: &fpb.Timestamp{Timestamp: 102}},
{Value: &fpb.Value_Sync{Sync: 1}, Repeat: 1, Timestamp: &fpb.Timestamp{Timestamp: 103}},
},
query: client.Query{
Target: "dev1",
Queries: []client.Path{{"a"}},
Type: client.Once,
TLS: &tls.Config{InsecureSkipVerify: true},
},
cfg: Config{
Delimiter: "/",
Display: display,
DisplayPrefix: "",
DisplayIndent: " ",
DisplayType: "single",
FilterUpdates: true,
},
want: `dev1/a/c, <nil>
`,
}, {
desc: "single target multiple paths",
Expand Down Expand Up @@ -717,6 +797,10 @@ sync_response: true
}
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
if tt.since != nil {
since = tt.since
defer func() { since = time.Since }()
}
displayOut = ""
s, err := gnmi.New(
&fpb.Config{
Expand Down
6 changes: 6 additions & 0 deletions cmd/gnmi_cli/gnmi_cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ func init() {
flag.StringVar(&cfg.Timestamp, "timestamp", "", "Specify timestamp formatting in output. One of (<empty string>, on, raw, <FORMAT>) where <empty string> is disabled, on is human readable, raw is int64 nanos since epoch, and <FORMAT> is according to golang time.Format(<FORMAT>)")
flag.BoolVar(&cfg.DisplaySize, "display_size", false, "Display the total size of query response.")
flag.BoolVar(&cfg.Latency, "latency", false, "Display the latency for receiving each update (Now - update timestamp).")
flag.DurationVar(&cfg.FilterMinLatency, "filter_min_latency", 0, "Filter out results with latency < the specified minium latency. No filtering if 0. Works with single display type only.")
flag.BoolVar(&cfg.FilterDeletes, "filter_deletes", false, "Filter out delete results. Works with single display type only.")
flag.BoolVar(&cfg.FilterUpdates, "filter_updates", false, "Filter out update results. Works with single display type only.")
flag.StringVar(&q.TLS.ServerName, "server_name", "", "When set, CLI will use this hostname to verify server certificate during TLS handshake.")
flag.BoolVar(&q.TLS.InsecureSkipVerify, "tls_skip_verify", false, "When set, CLI will not verify the server certificate during TLS handshake.")

Expand All @@ -118,6 +121,9 @@ func init() {
flag.DurationVar(&cfg.PollingInterval, "pi", cfg.PollingInterval, "Short for polling_interval.")
flag.BoolVar(&cfg.DisplaySize, "ds", cfg.DisplaySize, "Short for display_size.")
flag.BoolVar(&cfg.Latency, "l", cfg.Latency, "Short for latency.")
flag.DurationVar(&cfg.FilterMinLatency, "flml", 0, "Short for filter_min_latency.")
flag.BoolVar(&cfg.FilterDeletes, "fld", false, "Short for filter_deletes.")
flag.BoolVar(&cfg.FilterUpdates, "flu", false, "Short for filter_updates.")
flag.StringVar(reqProto, "p", *reqProto, "Short for request proto.")
}

Expand Down
34 changes: 16 additions & 18 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,30 +1,28 @@
module github.com/openconfig/gnmi

go 1.21

toolchain go1.22.5
go 1.22.0

require (
bitbucket.org/creachadair/stringset v0.0.14
github.com/cenkalti/backoff/v4 v4.1.1
github.com/golang/glog v1.2.1
github.com/cenkalti/backoff/v4 v4.3.0
github.com/golang/glog v1.2.2
github.com/google/go-cmp v0.6.0
github.com/kylelemons/godebug v1.1.0
github.com/openconfig/grpctunnel v0.0.0-20220819142823-6f5422b8ca70
github.com/openconfig/ygot v0.6.0
github.com/protocolbuffers/txtpbfmt v0.0.0-20220608084003-fc78c767cd6a
golang.org/x/crypto v0.26.0
golang.org/x/net v0.28.0
google.golang.org/grpc v1.66.0
google.golang.org/protobuf v1.34.2
github.com/openconfig/grpctunnel v0.1.0
github.com/openconfig/ygot v0.29.20
github.com/protocolbuffers/txtpbfmt v0.0.0-20240823084532-8e6b51fa9bef
golang.org/x/crypto v0.28.0
golang.org/x/net v0.30.0
google.golang.org/grpc v1.67.1
google.golang.org/protobuf v1.35.1
)

require (
github.com/golang/protobuf v1.5.4 // indirect
github.com/mitchellh/go-wordwrap v1.0.1 // indirect
github.com/openconfig/goyang v0.0.0-20200115183954-d0a48929f0ea // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/term v0.23.0 // indirect
golang.org/x/text v0.17.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed // indirect
github.com/openconfig/goyang v1.6.0 // indirect
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/term v0.25.0 // indirect
golang.org/x/text v0.19.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 // indirect
)
Loading

0 comments on commit 91fd031

Please sign in to comment.