From 878eaaae51a8fdcdb559a18a68c2b7fcfe6942e4 Mon Sep 17 00:00:00 2001 From: Renuka Manavalan Date: Sat, 12 Feb 2022 14:17:40 +0000 Subject: [PATCH] First cut; Fixed leak in my repro; ./gnmi_cli --client_types=gnmi -a localhost:50051 -q COUNTERS -logtostderr -insecure -timestamp on -t COUNTERS_DB -qt s -streaming_type ON_CHANGE --- gnmi_server/client_subscribe.go | 6 ++++++ sonic_data_client/db_client.go | 24 +++++++++++++++++++++--- sonic_data_client/transl_data_client.go | 16 +++++++++++++--- telemetry/telemetry.go | 11 +++++++++++ 4 files changed, 51 insertions(+), 6 deletions(-) diff --git a/gnmi_server/client_subscribe.go b/gnmi_server/client_subscribe.go index c1bf22e0e..5e929ef49 100644 --- a/gnmi_server/client_subscribe.go +++ b/gnmi_server/client_subscribe.go @@ -5,6 +5,8 @@ import ( "io" "net" "sync" + "runtime" + "runtime/debug" "github.com/Workiva/go-datastructures/queue" log "github.com/golang/glog" @@ -83,6 +85,7 @@ func (c *Client) Run(stream gnmipb.GNMI_SubscribeServer) (err error) { if err != nil { c.errors++ } + log.V(2).Infof("Client %s exiting", c) }() query, err := stream.Recv() @@ -261,5 +264,8 @@ func (c *Client) send(stream gnmipb.GNMI_SubscribeServer) error { return err } log.V(5).Infof("Client %s done sending, msg count %d, msg %v", c, c.sendMsg, resp) + debug.FreeOSMemory() + n := runtime.NumGoroutine() + log.V(1).Infof("Force mem release; numRoutine=%v", n) } } diff --git a/sonic_data_client/db_client.go b/sonic_data_client/db_client.go index 68fab01e3..6d306d0f3 100644 --- a/sonic_data_client/db_client.go +++ b/sonic_data_client/db_client.go @@ -11,6 +11,8 @@ import ( "strings" "sync" "time" + "runtime" + "runtime/debug" log "github.com/golang/glog" @@ -21,6 +23,7 @@ import ( gnmipb "github.com/openconfig/gnmi/proto/gnmi" ) + const ( // indentString represents the default indentation string used for // JSON. Two spaces are used here. @@ -219,6 +222,9 @@ func (c *DbClient) StreamRun(q *queue.PriorityQueue, stop chan struct{}, w *sync <-c.channel log.V(1).Infof("Exiting StreamRun routine for Client %v", c) + debug.FreeOSMemory() + n := runtime.NumGoroutine() + log.V(1).Infof("Force mem release; numRoutine=%v", n) } // streamOnChangeSubscription implements Subscription "ON_CHANGE STREAM" mode @@ -300,6 +306,9 @@ func (c *DbClient) PollRun(q *queue.PriorityQueue, poll chan struct{}, w *sync.W }, }) log.V(4).Infof("Sync done, poll time taken: %v ms", int64(time.Since(t1)/time.Millisecond)) + debug.FreeOSMemory() + n := runtime.NumGoroutine() + log.V(1).Infof("Force mem release; numRoutine=%v", n) } } func (c *DbClient) OnceRun(q *queue.PriorityQueue, once chan struct{}, w *sync.WaitGroup, subscribe *gnmipb.SubscriptionList) { @@ -797,6 +806,9 @@ func dbFieldMultiSubscribe(c *DbClient, gnmiPath *gnmipb.Path, onChange bool, in log.V(1).Infof("Queue error: %v", err) return err } + debug.FreeOSMemory() + n := runtime.NumGoroutine() + log.V(1).Infof("Force mem release; numRoutine=%v", n) return nil } @@ -874,6 +886,9 @@ func dbFieldSubscribe(c *DbClient, gnmiPath *gnmipb.Path, onChange bool, interva log.V(1).Infof("Queue error: %v", err) return err } + debug.FreeOSMemory() + n := runtime.NumGoroutine() + log.V(1).Infof("Force mem release; numRoutine=%v", n) return nil } @@ -1043,6 +1058,9 @@ func dbTableKeySubscribe(c *DbClient, gnmiPath *gnmipb.Path, interval time.Durat if err = c.q.Put(Value{spbv}); err != nil { return fmt.Errorf("Queue error: %v", err) } + debug.FreeOSMemory() + n := runtime.NumGoroutine() + log.V(1).Infof("Force mem release; numRoutine=%v", n) return nil } @@ -1126,7 +1144,7 @@ func dbTableKeySubscribe(c *DbClient, gnmiPath *gnmipb.Path, interval time.Durat select { case updatedTable := <-updateChannel: - log.V(1).Infof("update received: %v", updatedTable) + // log.V(1).Infof("update received: %v", updatedTable) if interval == 0 { // on-change mode, send the updated data. if err := sendMsiData(updatedTable); err != nil { @@ -1140,7 +1158,7 @@ func dbTableKeySubscribe(c *DbClient, gnmiPath *gnmipb.Path, interval time.Durat } } case <-intervalTicker: - log.V(1).Infof("ticker received: %v", len(msiAll)) + // log.V(1).Infof("ticker received: %v", len(msiAll)) if err := sendMsiData(msiAll); err != nil { handleFatalMsg(err.Error()) @@ -1150,7 +1168,7 @@ func dbTableKeySubscribe(c *DbClient, gnmiPath *gnmipb.Path, interval time.Durat // Clear the payload so that next time it will send only updates if updateOnly { msiAll = make(map[string]interface{}) - log.V(1).Infof("msiAll cleared: %v", len(msiAll)) + // log.V(1).Infof("msiAll cleared: %v", len(msiAll)) } case <-c.channel: diff --git a/sonic_data_client/transl_data_client.go b/sonic_data_client/transl_data_client.go index 78cd663ee..c260c0b7a 100644 --- a/sonic_data_client/transl_data_client.go +++ b/sonic_data_client/transl_data_client.go @@ -335,11 +335,22 @@ func addTimer(c *TranslClient, ticker_map map[int][]*ticker_info, cases *[]refle } func TranslSubscribe(gnmiPaths []*gnmipb.Path, stringPaths []string, pathMap map[string]*gnmipb.Path, c *TranslClient, updates_only bool) { + sync_done := false defer c.w.Done() + + // Helper to signal sync + signalSync := func() { + if !sync_done { + c.synced.Done() + sync_done = true + } + } + + defer signalSync() + rc, ctx := common_utils.GetContext(c.ctx) c.ctx = ctx q := queue.NewPriorityQueue(1, false) - var sync_done bool req := translib.SubscribeRequest{Paths:stringPaths, Q:q, Stop:c.channel} if rc.BundleVersion != nil { nver, err := translib.NewVersion(*rc.BundleVersion) @@ -397,8 +408,7 @@ func TranslSubscribe(gnmiPaths []*gnmipb.Path, stringPaths []string, pathMap map if v.SyncComplete && !sync_done { fmt.Println("SENDING SYNC") - c.synced.Done() - sync_done = true + signalSync() } default: log.V(1).Infof("Unknown data type %v for %s in queue", items[0], c) diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go index 4ec580c6b..19655f60c 100644 --- a/telemetry/telemetry.go +++ b/telemetry/telemetry.go @@ -6,6 +6,8 @@ import ( "flag" "io/ioutil" "time" + "runtime" + "runtime/debug" log "github.com/golang/glog" "google.golang.org/grpc" @@ -131,6 +133,15 @@ func main() { gnmi.GenerateJwtSecretKey() } + go func() { + for { + debug.FreeOSMemory() + n := runtime.NumGoroutine() + log.V(1).Infof("Force mem release; numRoutine=%v", n) + time.Sleep(5 * time.Second) + } + }() + s, err := gnmi.NewServer(cfg, opts) if err != nil { log.Errorf("Failed to create gNMI server: %v", err)