diff --git a/client.go b/client.go index 8323a623..c7834e02 100644 --- a/client.go +++ b/client.go @@ -7,6 +7,7 @@ package opcua import ( "context" "crypto/rand" + "expvar" "fmt" "io" "log" @@ -20,6 +21,7 @@ import ( "github.com/gopcua/opcua/debug" "github.com/gopcua/opcua/errors" "github.com/gopcua/opcua/id" + "github.com/gopcua/opcua/stats" "github.com/gopcua/opcua/ua" "github.com/gopcua/opcua/uacp" "github.com/gopcua/opcua/uasc" @@ -186,17 +188,23 @@ func (c *Client) Connect(ctx context.Context) (err error) { c.setState(Connecting) if err := c.Dial(ctx); err != nil { + stats.RecordError(err) + return err } s, err := c.CreateSession(c.cfg.session) if err != nil { c.Close() + stats.RecordError(err) + return err } if err := c.ActivateSession(s); err != nil { c.Close() + stats.RecordError(err) + return err } c.setState(Connected) @@ -214,6 +222,8 @@ func (c *Client) Connect(ctx context.Context) (err error) { // todo(fs): and you should find a commit that implements this option. if err := c.UpdateNamespaces(); err != nil { c.Close() + stats.RecordError(err) + return err } @@ -237,6 +247,8 @@ func (c *Client) monitor(ctx context.Context) { return case err, ok := <-c.sechanErr: + stats.RecordError(err) + // return if channel or connection is closed if !ok || err == io.EOF && c.State() == Closed { dlog.Print("closed") @@ -490,6 +502,8 @@ func (c *Client) monitor(ctx context.Context) { // Dial establishes a secure channel. func (c *Client) Dial(ctx context.Context) error { + stats.Client().Add("Dial", 1) + if c.SecureChannel() != nil { return errors.Errorf("secure channel already connected") } @@ -518,6 +532,8 @@ func (c *Client) Dial(ctx context.Context) error { // Close closes the session and the secure channel. func (c *Client) Close() error { + stats.Client().Add("Close", 1) + // try to close the session but ignore any error // so that we close the underlying channel and connection. c.CloseSession() @@ -552,6 +568,9 @@ func (c *Client) State() ConnState { func (c *Client) setState(s ConnState) { c.atomicState.Store(s) + n := new(expvar.Int) + n.Set(int64(s)) + stats.Client().Set("State", n) } // Namespaces returns the currently cached list of namespaces. @@ -578,6 +597,7 @@ func (c *Client) SecureChannel() *uasc.SecureChannel { func (c *Client) setSecureChannel(sc *uasc.SecureChannel) { c.atomicSechan.Store(sc) + stats.Client().Add("SecureChannel", 1) } // Session returns the active session. @@ -587,6 +607,7 @@ func (c *Client) Session() *Session { func (c *Client) setSession(s *Session) { c.atomicSession.Store(s) + stats.Client().Add("Session", 1) } // sessionClosed returns true when there is no session. @@ -710,6 +731,7 @@ func (c *Client) ActivateSession(s *Session) error { if c.SecureChannel() == nil { return ua.StatusBadServerNotConnected } + stats.Client().Add("ActivateSession", 1) sig, sigAlg, err := c.SecureChannel().NewSessionSignature(s.serverCertificate, s.serverNonce) if err != nil { log.Printf("error creating session signature: %s", err) @@ -781,6 +803,7 @@ func (c *Client) ActivateSession(s *Session) error { // // See Part 4, 5.6.4 func (c *Client) CloseSession() error { + stats.Client().Add("CloseSession", 1) if err := c.closeSession(c.Session()); err != nil { return err } @@ -804,6 +827,7 @@ func (c *Client) closeSession(s *Session) error { // caller is responsible to close or re-activate the session. If the client // does not have an active session the function returns no error. func (c *Client) DetachSession() (*Session, error) { + stats.Client().Add("DetachSession", 1) s := c.Session() c.setSession(nil) return s, nil @@ -813,7 +837,12 @@ func (c *Client) DetachSession() (*Session, error) { // the response. If the client has an active session it injects the // authentication token. func (c *Client) Send(req ua.Request, h func(interface{}) error) error { - return c.sendWithTimeout(req, c.cfg.sechan.RequestTimeout, h) + stats.Client().Add("Send", 1) + + err := c.sendWithTimeout(req, c.cfg.sechan.RequestTimeout, h) + stats.RecordError(err) + + return err } // sendWithTimeout sends the request via the secure channel with a custom timeout and registers a handler for @@ -837,6 +866,8 @@ func (c *Client) Node(id *ua.NodeID) *Node { } func (c *Client) GetEndpoints() (*ua.GetEndpointsResponse, error) { + stats.Client().Add("GetEndpoints", 1) + req := &ua.GetEndpointsRequest{ EndpointURL: c.endpointURL, } @@ -872,6 +903,9 @@ func cloneReadRequest(req *ua.ReadRequest) *ua.ReadRequest { // By default, the function requests the value of the nodes // in the default encoding of the server. func (c *Client) Read(req *ua.ReadRequest) (*ua.ReadResponse, error) { + stats.Client().Add("Read", 1) + stats.Client().Add("NodesToRead", int64(len(req.NodesToRead))) + // clone the request and the ReadValueIDs to set defaults without // manipulating them in-place. req = cloneReadRequest(req) @@ -904,6 +938,9 @@ func (c *Client) Read(req *ua.ReadRequest) (*ua.ReadResponse, error) { // Write executes a synchronous write request. func (c *Client) Write(req *ua.WriteRequest) (*ua.WriteResponse, error) { + stats.Client().Add("Write", 1) + stats.Client().Add("NodesToWrite", int64(len(req.NodesToWrite))) + var res *ua.WriteResponse err := c.Send(req, func(v interface{}) error { return safeAssign(v, &res) @@ -937,6 +974,9 @@ func cloneBrowseRequest(req *ua.BrowseRequest) *ua.BrowseRequest { // Browse executes a synchronous browse request. func (c *Client) Browse(req *ua.BrowseRequest) (*ua.BrowseResponse, error) { + stats.Client().Add("Browse", 1) + stats.Client().Add("NodesToBrowse", int64(len(req.NodesToBrowse))) + // clone the request and the NodesToBrowse to set defaults without // manipulating them in-place. req = cloneBrowseRequest(req) @@ -950,6 +990,8 @@ func (c *Client) Browse(req *ua.BrowseRequest) (*ua.BrowseResponse, error) { // Call executes a synchronous call request for a single method. func (c *Client) Call(req *ua.CallMethodRequest) (*ua.CallMethodResult, error) { + stats.Client().Add("Call", 1) + creq := &ua.CallRequest{ MethodsToCall: []*ua.CallMethodRequest{req}, } @@ -968,6 +1010,8 @@ func (c *Client) Call(req *ua.CallMethodRequest) (*ua.CallMethodResult, error) { // BrowseNext executes a synchronous browse request. func (c *Client) BrowseNext(req *ua.BrowseNextRequest) (*ua.BrowseNextResponse, error) { + stats.Client().Add("BrowseNext", 1) + var res *ua.BrowseNextResponse err := c.Send(req, func(v interface{}) error { return safeAssign(v, &res) @@ -978,6 +1022,9 @@ func (c *Client) BrowseNext(req *ua.BrowseNextRequest) (*ua.BrowseNextResponse, // RegisterNodes registers node ids for more efficient reads. // Part 4, Section 5.8.5 func (c *Client) RegisterNodes(req *ua.RegisterNodesRequest) (*ua.RegisterNodesResponse, error) { + stats.Client().Add("RegisterNodes", 1) + stats.Client().Add("NodesToRegister", int64(len(req.NodesToRegister))) + var res *ua.RegisterNodesResponse err := c.Send(req, func(v interface{}) error { return safeAssign(v, &res) @@ -988,6 +1035,9 @@ func (c *Client) RegisterNodes(req *ua.RegisterNodesRequest) (*ua.RegisterNodesR // UnregisterNodes unregisters node ids previously registered with RegisterNodes. // Part 4, Section 5.8.6 func (c *Client) UnregisterNodes(req *ua.UnregisterNodesRequest) (*ua.UnregisterNodesResponse, error) { + stats.Client().Add("UnregisterNodes", 1) + stats.Client().Add("NodesToUnregister", int64(len(req.NodesToUnregister))) + var res *ua.UnregisterNodesResponse err := c.Send(req, func(v interface{}) error { return safeAssign(v, &res) @@ -996,6 +1046,9 @@ func (c *Client) UnregisterNodes(req *ua.UnregisterNodesRequest) (*ua.Unregister } func (c *Client) HistoryReadRawModified(nodes []*ua.HistoryReadValueID, details *ua.ReadRawModifiedDetails) (*ua.HistoryReadResponse, error) { + stats.Client().Add("HistoryReadRawModified", 1) + stats.Client().Add("HistoryReadValueID", int64(len(nodes))) + // Part 4, 5.10.3 HistoryRead req := &ua.HistoryReadRequest{ TimestampsToReturn: ua.TimestampsToReturnBoth, @@ -1017,6 +1070,7 @@ func (c *Client) HistoryReadRawModified(nodes []*ua.HistoryReadValueID, details // NamespaceArray returns the list of namespaces registered on the server. func (c *Client) NamespaceArray() ([]string, error) { + stats.Client().Add("NamespaceArray", 1) node := c.Node(ua.NewNumericNodeID(0, id.Server_NamespaceArray)) v, err := node.Value() if err != nil { @@ -1032,6 +1086,7 @@ func (c *Client) NamespaceArray() ([]string, error) { // UpdateNamespaces updates the list of cached namespaces from the server. func (c *Client) UpdateNamespaces() error { + stats.Client().Add("UpdateNamespaces", 1) ns, err := c.NamespaceArray() if err != nil { return err diff --git a/client_sub.go b/client_sub.go index ac8108be..48efd214 100644 --- a/client_sub.go +++ b/client_sub.go @@ -8,6 +8,7 @@ import ( "github.com/gopcua/opcua/debug" "github.com/gopcua/opcua/errors" + "github.com/gopcua/opcua/stats" "github.com/gopcua/opcua/ua" "github.com/gopcua/opcua/uasc" ) @@ -16,6 +17,8 @@ import ( // Parameters that have not been set are set to their default values. // See opcua.DefaultSubscription* constants func (c *Client) Subscribe(params *SubscriptionParameters, notifyCh chan *PublishNotificationData) (*Subscription, error) { + stats.Client().Add("Subscribe", 1) + if params == nil { params = &SubscriptionParameters{} } @@ -41,6 +44,8 @@ func (c *Client) Subscribe(params *SubscriptionParameters, notifyCh chan *Publis return nil, res.ResponseHeader.ServiceResult } + stats.Subscription().Add("Count", 1) + // start the publish loop if it isn't already running c.resumech <- struct{}{} @@ -217,6 +222,7 @@ func (c *Client) forgetSubscription(ctx context.Context, id uint32) { delete(c.subs, id) c.updatePublishTimeout() c.subMux.Unlock() + stats.Subscription().Add("Count", -1) if len(c.subs) == 0 { c.pauseSubscriptions(ctx) @@ -374,6 +380,7 @@ func (c *Client) publish(ctx context.Context) error { // send the next publish request // note that res contains data even if an error was returned res, err := c.sendPublishRequest() + stats.RecordError(err) switch { case err == io.EOF: dlog.Printf("eof: pausing publish loop") @@ -518,6 +525,7 @@ func (c *Client) sendPublishRequest() (*ua.PublishResponse, error) { err := c.sendWithTimeout(req, c.publishTimeout(), func(v interface{}) error { return safeAssign(v, &res) }) + stats.RecordError(err) dlog.Printf("PublishResponse: %s", debug.ToJSON(res)) return res, err } diff --git a/generate.sh b/generate.sh index fc9edde1..7e72fa42 100755 --- a/generate.sh +++ b/generate.sh @@ -4,6 +4,7 @@ rm -f */*_gen.go go run cmd/id/main.go go run cmd/status/main.go go run cmd/service/*.go +go run cmd/stats/*.go # install stringer if not installed already command -v stringer || go get -u golang.org/x/tools/cmd/stringer @@ -18,5 +19,8 @@ echo "Wrote ua/enums_strings_gen.go" stringer -type ConnState -output connstate_strings_gen.go echo "Wrote connstate_strings_gen.go" +(cd stats && stringer -type Metric -output metrics_strings_gen.go) +echo "Wrote stats/metrics_strings_gen.go" + # remove golang.org/x/tools/cmd/stringer from list of dependencies go mod tidy diff --git a/go.mod b/go.mod index 97e2f6f7..54f50a93 100644 --- a/go.mod +++ b/go.mod @@ -6,5 +6,5 @@ require ( github.com/pascaldekloe/goe v0.1.0 github.com/pkg/errors v0.8.1 golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 - golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e // indirect + golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 // indirect ) diff --git a/go.sum b/go.sum index 1ef2f07a..85b90854 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,6 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e h1:WUoyKPm6nCo1BnNUvPGnFG3T5DUVem42yDJZZ4CNxMA= -golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 h1:id054HUawV2/6IGm2IV8KZQjqtwAOo2CYlOToYqa0d0= +golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/stats/stats.go b/stats/stats.go new file mode 100644 index 00000000..549fad1c --- /dev/null +++ b/stats/stats.go @@ -0,0 +1,91 @@ +// Package stats provides instrumentation for the gopcua library via expvar. +// +// The API is experimental and might change. +package stats + +import ( + "expvar" + "io" + "reflect" + + "github.com/gopcua/opcua/ua" +) + +// stats is the global statistics counter. +var stats = NewStats() + +func init() { + expvar.Publish("gopcua", expvar.Func(func() interface{} { return stats })) +} + +// Stats collects gopcua statistics via expvar. +type Stats struct { + Client *expvar.Map + Error *expvar.Map + Subscription *expvar.Map +} + +func NewStats() *Stats { + return &Stats{ + Client: &expvar.Map{}, + Error: &expvar.Map{}, + Subscription: &expvar.Map{}, + } +} + +// Reset resets all counters to zero. +func (s *Stats) Reset() { + s.Client.Init() + s.Error.Init() + s.Subscription.Init() +} + +// RecordError updates the metric for an error by one. +func (s *Stats) RecordError(err error) { + if err == nil { + return + } + switch err { + case io.EOF: + s.Error.Add("io.EOF", 1) + case ua.StatusOK: + s.Error.Add("ua.StatusOK", 1) + case ua.StatusBad: + s.Error.Add("ua.StatusBad", 1) + case ua.StatusUncertain: + s.Error.Add("ua.StatusUncertain", 1) + default: + switch x := err.(type) { + case ua.StatusCode: + s.Error.Add("ua."+ua.StatusCodes[x].Name, 1) + default: + s.Error.Add(reflect.TypeOf(err).String(), 1) + } + } +} + +// convenience functions for the global statistics + +// Reset resets all counters to zero. +func Reset() { + stats.Reset() +} + +// Client is the global client statistics map. +func Client() *expvar.Map { + return stats.Client +} + +// Error is the global error statistics map. +func Error() *expvar.Map { + return stats.Error +} + +// Subscription is the global subscription statistics map. +func Subscription() *expvar.Map { + return stats.Subscription +} + +func RecordError(err error) { + stats.RecordError(err) +} diff --git a/stats/stats_test.go b/stats/stats_test.go new file mode 100644 index 00000000..651cdbc9 --- /dev/null +++ b/stats/stats_test.go @@ -0,0 +1,53 @@ +package stats + +import ( + "errors" + "expvar" + "io" + "testing" + + "github.com/gopcua/opcua/ua" + "github.com/pascaldekloe/goe/verify" +) + +func newExpVarInt(i int64) *expvar.Int { + v := &expvar.Int{} + v.Set(i) + return v +} + +func TestConvienienceFuncs(t *testing.T) { + Reset() + + Client().Add("a", 1) + verify.Values(t, "", Client().Get("a"), newExpVarInt(1)) + + Error().Add("b", 2) + verify.Values(t, "", Error().Get("b"), newExpVarInt(2)) + + Subscription().Add("c", 3) + verify.Values(t, "", Subscription().Get("c"), newExpVarInt(3)) +} + +func TestRecordError(t *testing.T) { + tests := []struct { + err error + key string + }{ + {io.EOF, "io.EOF"}, + {ua.StatusOK, "ua.StatusOK"}, + {ua.StatusBad, "ua.StatusBad"}, + {ua.StatusUncertain, "ua.StatusUncertain"}, + {ua.StatusBadAlreadyExists, "ua.StatusBadAlreadyExists"}, + {errors.New("hello"), "*errors.errorString"}, + } + + for _, tt := range tests { + t.Run(tt.key, func(t *testing.T) { + s := NewStats() + s.RecordError(tt.err) + got, want := s.Error.Get(tt.key), newExpVarInt(1) + verify.Values(t, "", got, want) + }) + } +} diff --git a/subscription.go b/subscription.go index dfee002a..5b0699b1 100644 --- a/subscription.go +++ b/subscription.go @@ -9,6 +9,7 @@ import ( "github.com/gopcua/opcua/debug" "github.com/gopcua/opcua/errors" "github.com/gopcua/opcua/id" + "github.com/gopcua/opcua/stats" "github.com/gopcua/opcua/ua" "github.com/gopcua/opcua/uasc" ) @@ -81,6 +82,7 @@ type PublishNotificationData struct { // Cancel stops the subscription and removes it // from the client and the server. func (s *Subscription) Cancel(ctx context.Context) error { + stats.Subscription().Add("Cancel", 1) s.c.forgetSubscription(ctx, s.SubscriptionID) return s.delete() } @@ -110,6 +112,9 @@ func (s *Subscription) delete() error { } func (s *Subscription) Monitor(ts ua.TimestampsToReturn, items ...*ua.MonitoredItemCreateRequest) (*ua.CreateMonitoredItemsResponse, error) { + stats.Subscription().Add("Monitor", 1) + stats.Subscription().Add("MonitoredItems", int64(len(items))) + // Part 4, 5.12.2.2 CreateMonitoredItems Service Parameters req := &ua.CreateMonitoredItemsRequest{ SubscriptionID: s.SubscriptionID, @@ -142,6 +147,9 @@ func (s *Subscription) Monitor(ts ua.TimestampsToReturn, items ...*ua.MonitoredI } func (s *Subscription) Unmonitor(monitoredItemIDs ...uint32) (*ua.DeleteMonitoredItemsResponse, error) { + stats.Subscription().Add("Unmonitor", 1) + stats.Subscription().Add("UnmonitoredItems", int64(len(monitoredItemIDs))) + req := &ua.DeleteMonitoredItemsRequest{ MonitoredItemIDs: monitoredItemIDs, SubscriptionID: s.SubscriptionID, @@ -165,6 +173,9 @@ func (s *Subscription) Unmonitor(monitoredItemIDs ...uint32) (*ua.DeleteMonitore } func (s *Subscription) ModifyMonitoredItems(ts ua.TimestampsToReturn, items ...*ua.MonitoredItemModifyRequest) (*ua.ModifyMonitoredItemsResponse, error) { + stats.Subscription().Add("ModifyMonitoredItems", 1) + stats.Subscription().Add("ModifiedMonitoredItems", int64(len(items))) + s.itemsMu.Lock() for _, item := range items { id := item.MonitoredItemID @@ -212,6 +223,8 @@ func (s *Subscription) ModifyMonitoredItems(ts ua.TimestampsToReturn, items ...* // To add links from a triggering item to an item to report provide the server assigned ID(s) in the `add` argument. // To remove links from a triggering item to an item to report provide the server assigned ID(s) in the `remove` argument. func (s *Subscription) SetTriggering(triggeringItemID uint32, add, remove []uint32) (*ua.SetTriggeringResponse, error) { + stats.Subscription().Add("SetTriggering", 1) + // Part 4, 5.12.5.2 SetTriggering Service Parameters req := &ua.SetTriggeringRequest{ SubscriptionID: s.SubscriptionID, diff --git a/uatest/stats_test.go b/uatest/stats_test.go new file mode 100644 index 00000000..7b114c3c --- /dev/null +++ b/uatest/stats_test.go @@ -0,0 +1,69 @@ +//go:build integration +// +build integration + +package uatest + +import ( + "context" + "expvar" + "testing" + + "github.com/gopcua/opcua" + "github.com/gopcua/opcua/stats" + "github.com/pascaldekloe/goe/verify" +) + +func newExpVarInt(i int64) *expvar.Int { + v := &expvar.Int{} + v.Set(i) + return v +} + +func TestStats(t *testing.T) { + stats.Reset() + + srv := NewServer("rw_server.py") + defer srv.Close() + + c := opcua.NewClient(srv.Endpoint, srv.Opts...) + if err := c.Connect(context.Background()); err != nil { + t.Fatal(err) + } + + c.Close() + + want := map[string]*expvar.Int{ + "Dial": newExpVarInt(1), + "ActivateSession": newExpVarInt(1), + "NamespaceArray": newExpVarInt(1), + "UpdateNamespaces": newExpVarInt(1), + "NodesToRead": newExpVarInt(1), + "Read": newExpVarInt(1), + "Send": newExpVarInt(1), + "Close": newExpVarInt(1), + "CloseSession": newExpVarInt(2), + "SecureChannel": newExpVarInt(2), + "Session": newExpVarInt(5), + "State": newExpVarInt(0), + } + + got := map[string]expvar.Var{} + stats.Client().Do(func(kv expvar.KeyValue) { got[kv.Key] = kv.Value }) + for k := range got { + if _, ok := want[k]; !ok { + t.Fatalf("got unexpected key %q", k) + } + } + for k := range want { + if _, ok := got[k]; !ok { + t.Fatalf("missing expected key %q", k) + } + } + + for k, ev := range want { + v := stats.Client().Get(k) + if !verify.Values(t, "", v, ev) { + t.Errorf("got %s for %q, want %s", v.String(), k, ev.String()) + } + } +}