Skip to content

Commit

Permalink
Merge pull request #533 from gopcua/issue-532-stats
Browse files Browse the repository at this point in the history
client: expose statistics
  • Loading branch information
magiconair authored Jan 3, 2022
2 parents afa4913 + 8cb7988 commit 8b599ae
Show file tree
Hide file tree
Showing 9 changed files with 297 additions and 4 deletions.
57 changes: 56 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package opcua
import (
"context"
"crypto/rand"
"expvar"
"fmt"
"io"
"log"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/gopcua/opcua/debug"
"github.com/gopcua/opcua/errors"
"github.com/gopcua/opcua/id"
"github.com/gopcua/opcua/stats"
"github.com/gopcua/opcua/ua"
"github.com/gopcua/opcua/uacp"
"github.com/gopcua/opcua/uasc"
Expand Down Expand Up @@ -186,17 +188,23 @@ func (c *Client) Connect(ctx context.Context) (err error) {

c.setState(Connecting)
if err := c.Dial(ctx); err != nil {
stats.RecordError(err)

return err
}

s, err := c.CreateSession(c.cfg.session)
if err != nil {
c.Close()
stats.RecordError(err)

return err
}

if err := c.ActivateSession(s); err != nil {
c.Close()
stats.RecordError(err)

return err
}
c.setState(Connected)
Expand All @@ -214,6 +222,8 @@ func (c *Client) Connect(ctx context.Context) (err error) {
// todo(fs): and you should find a commit that implements this option.
if err := c.UpdateNamespaces(); err != nil {
c.Close()
stats.RecordError(err)

return err
}

Expand All @@ -237,6 +247,8 @@ func (c *Client) monitor(ctx context.Context) {
return

case err, ok := <-c.sechanErr:
stats.RecordError(err)

// return if channel or connection is closed
if !ok || err == io.EOF && c.State() == Closed {
dlog.Print("closed")
Expand Down Expand Up @@ -490,6 +502,8 @@ func (c *Client) monitor(ctx context.Context) {

// Dial establishes a secure channel.
func (c *Client) Dial(ctx context.Context) error {
stats.Client().Add("Dial", 1)

if c.SecureChannel() != nil {
return errors.Errorf("secure channel already connected")
}
Expand Down Expand Up @@ -518,6 +532,8 @@ func (c *Client) Dial(ctx context.Context) error {

// Close closes the session and the secure channel.
func (c *Client) Close() error {
stats.Client().Add("Close", 1)

// try to close the session but ignore any error
// so that we close the underlying channel and connection.
c.CloseSession()
Expand Down Expand Up @@ -552,6 +568,9 @@ func (c *Client) State() ConnState {

func (c *Client) setState(s ConnState) {
c.atomicState.Store(s)
n := new(expvar.Int)
n.Set(int64(s))
stats.Client().Set("State", n)
}

// Namespaces returns the currently cached list of namespaces.
Expand All @@ -578,6 +597,7 @@ func (c *Client) SecureChannel() *uasc.SecureChannel {

func (c *Client) setSecureChannel(sc *uasc.SecureChannel) {
c.atomicSechan.Store(sc)
stats.Client().Add("SecureChannel", 1)
}

// Session returns the active session.
Expand All @@ -587,6 +607,7 @@ func (c *Client) Session() *Session {

func (c *Client) setSession(s *Session) {
c.atomicSession.Store(s)
stats.Client().Add("Session", 1)
}

// sessionClosed returns true when there is no session.
Expand Down Expand Up @@ -710,6 +731,7 @@ func (c *Client) ActivateSession(s *Session) error {
if c.SecureChannel() == nil {
return ua.StatusBadServerNotConnected
}
stats.Client().Add("ActivateSession", 1)
sig, sigAlg, err := c.SecureChannel().NewSessionSignature(s.serverCertificate, s.serverNonce)
if err != nil {
log.Printf("error creating session signature: %s", err)
Expand Down Expand Up @@ -781,6 +803,7 @@ func (c *Client) ActivateSession(s *Session) error {
//
// See Part 4, 5.6.4
func (c *Client) CloseSession() error {
stats.Client().Add("CloseSession", 1)
if err := c.closeSession(c.Session()); err != nil {
return err
}
Expand All @@ -804,6 +827,7 @@ func (c *Client) closeSession(s *Session) error {
// caller is responsible to close or re-activate the session. If the client
// does not have an active session the function returns no error.
func (c *Client) DetachSession() (*Session, error) {
stats.Client().Add("DetachSession", 1)
s := c.Session()
c.setSession(nil)
return s, nil
Expand All @@ -813,7 +837,12 @@ func (c *Client) DetachSession() (*Session, error) {
// the response. If the client has an active session it injects the
// authentication token.
func (c *Client) Send(req ua.Request, h func(interface{}) error) error {
return c.sendWithTimeout(req, c.cfg.sechan.RequestTimeout, h)
stats.Client().Add("Send", 1)

err := c.sendWithTimeout(req, c.cfg.sechan.RequestTimeout, h)
stats.RecordError(err)

return err
}

// sendWithTimeout sends the request via the secure channel with a custom timeout and registers a handler for
Expand All @@ -837,6 +866,8 @@ func (c *Client) Node(id *ua.NodeID) *Node {
}

func (c *Client) GetEndpoints() (*ua.GetEndpointsResponse, error) {
stats.Client().Add("GetEndpoints", 1)

req := &ua.GetEndpointsRequest{
EndpointURL: c.endpointURL,
}
Expand Down Expand Up @@ -872,6 +903,9 @@ func cloneReadRequest(req *ua.ReadRequest) *ua.ReadRequest {
// By default, the function requests the value of the nodes
// in the default encoding of the server.
func (c *Client) Read(req *ua.ReadRequest) (*ua.ReadResponse, error) {
stats.Client().Add("Read", 1)
stats.Client().Add("NodesToRead", int64(len(req.NodesToRead)))

// clone the request and the ReadValueIDs to set defaults without
// manipulating them in-place.
req = cloneReadRequest(req)
Expand Down Expand Up @@ -904,6 +938,9 @@ func (c *Client) Read(req *ua.ReadRequest) (*ua.ReadResponse, error) {

// Write executes a synchronous write request.
func (c *Client) Write(req *ua.WriteRequest) (*ua.WriteResponse, error) {
stats.Client().Add("Write", 1)
stats.Client().Add("NodesToWrite", int64(len(req.NodesToWrite)))

var res *ua.WriteResponse
err := c.Send(req, func(v interface{}) error {
return safeAssign(v, &res)
Expand Down Expand Up @@ -937,6 +974,9 @@ func cloneBrowseRequest(req *ua.BrowseRequest) *ua.BrowseRequest {

// Browse executes a synchronous browse request.
func (c *Client) Browse(req *ua.BrowseRequest) (*ua.BrowseResponse, error) {
stats.Client().Add("Browse", 1)
stats.Client().Add("NodesToBrowse", int64(len(req.NodesToBrowse)))

// clone the request and the NodesToBrowse to set defaults without
// manipulating them in-place.
req = cloneBrowseRequest(req)
Expand All @@ -950,6 +990,8 @@ func (c *Client) Browse(req *ua.BrowseRequest) (*ua.BrowseResponse, error) {

// Call executes a synchronous call request for a single method.
func (c *Client) Call(req *ua.CallMethodRequest) (*ua.CallMethodResult, error) {
stats.Client().Add("Call", 1)

creq := &ua.CallRequest{
MethodsToCall: []*ua.CallMethodRequest{req},
}
Expand All @@ -968,6 +1010,8 @@ func (c *Client) Call(req *ua.CallMethodRequest) (*ua.CallMethodResult, error) {

// BrowseNext executes a synchronous browse request.
func (c *Client) BrowseNext(req *ua.BrowseNextRequest) (*ua.BrowseNextResponse, error) {
stats.Client().Add("BrowseNext", 1)

var res *ua.BrowseNextResponse
err := c.Send(req, func(v interface{}) error {
return safeAssign(v, &res)
Expand All @@ -978,6 +1022,9 @@ func (c *Client) BrowseNext(req *ua.BrowseNextRequest) (*ua.BrowseNextResponse,
// RegisterNodes registers node ids for more efficient reads.
// Part 4, Section 5.8.5
func (c *Client) RegisterNodes(req *ua.RegisterNodesRequest) (*ua.RegisterNodesResponse, error) {
stats.Client().Add("RegisterNodes", 1)
stats.Client().Add("NodesToRegister", int64(len(req.NodesToRegister)))

var res *ua.RegisterNodesResponse
err := c.Send(req, func(v interface{}) error {
return safeAssign(v, &res)
Expand All @@ -988,6 +1035,9 @@ func (c *Client) RegisterNodes(req *ua.RegisterNodesRequest) (*ua.RegisterNodesR
// UnregisterNodes unregisters node ids previously registered with RegisterNodes.
// Part 4, Section 5.8.6
func (c *Client) UnregisterNodes(req *ua.UnregisterNodesRequest) (*ua.UnregisterNodesResponse, error) {
stats.Client().Add("UnregisterNodes", 1)
stats.Client().Add("NodesToUnregister", int64(len(req.NodesToUnregister)))

var res *ua.UnregisterNodesResponse
err := c.Send(req, func(v interface{}) error {
return safeAssign(v, &res)
Expand All @@ -996,6 +1046,9 @@ func (c *Client) UnregisterNodes(req *ua.UnregisterNodesRequest) (*ua.Unregister
}

func (c *Client) HistoryReadRawModified(nodes []*ua.HistoryReadValueID, details *ua.ReadRawModifiedDetails) (*ua.HistoryReadResponse, error) {
stats.Client().Add("HistoryReadRawModified", 1)
stats.Client().Add("HistoryReadValueID", int64(len(nodes)))

// Part 4, 5.10.3 HistoryRead
req := &ua.HistoryReadRequest{
TimestampsToReturn: ua.TimestampsToReturnBoth,
Expand All @@ -1017,6 +1070,7 @@ func (c *Client) HistoryReadRawModified(nodes []*ua.HistoryReadValueID, details

// NamespaceArray returns the list of namespaces registered on the server.
func (c *Client) NamespaceArray() ([]string, error) {
stats.Client().Add("NamespaceArray", 1)
node := c.Node(ua.NewNumericNodeID(0, id.Server_NamespaceArray))
v, err := node.Value()
if err != nil {
Expand All @@ -1032,6 +1086,7 @@ func (c *Client) NamespaceArray() ([]string, error) {

// UpdateNamespaces updates the list of cached namespaces from the server.
func (c *Client) UpdateNamespaces() error {
stats.Client().Add("UpdateNamespaces", 1)
ns, err := c.NamespaceArray()
if err != nil {
return err
Expand Down
8 changes: 8 additions & 0 deletions client_sub.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/gopcua/opcua/debug"
"github.com/gopcua/opcua/errors"
"github.com/gopcua/opcua/stats"
"github.com/gopcua/opcua/ua"
"github.com/gopcua/opcua/uasc"
)
Expand All @@ -16,6 +17,8 @@ import (
// Parameters that have not been set are set to their default values.
// See opcua.DefaultSubscription* constants
func (c *Client) Subscribe(params *SubscriptionParameters, notifyCh chan *PublishNotificationData) (*Subscription, error) {
stats.Client().Add("Subscribe", 1)

if params == nil {
params = &SubscriptionParameters{}
}
Expand All @@ -41,6 +44,8 @@ func (c *Client) Subscribe(params *SubscriptionParameters, notifyCh chan *Publis
return nil, res.ResponseHeader.ServiceResult
}

stats.Subscription().Add("Count", 1)

// start the publish loop if it isn't already running
c.resumech <- struct{}{}

Expand Down Expand Up @@ -217,6 +222,7 @@ func (c *Client) forgetSubscription(ctx context.Context, id uint32) {
delete(c.subs, id)
c.updatePublishTimeout()
c.subMux.Unlock()
stats.Subscription().Add("Count", -1)

if len(c.subs) == 0 {
c.pauseSubscriptions(ctx)
Expand Down Expand Up @@ -374,6 +380,7 @@ func (c *Client) publish(ctx context.Context) error {
// send the next publish request
// note that res contains data even if an error was returned
res, err := c.sendPublishRequest()
stats.RecordError(err)
switch {
case err == io.EOF:
dlog.Printf("eof: pausing publish loop")
Expand Down Expand Up @@ -518,6 +525,7 @@ func (c *Client) sendPublishRequest() (*ua.PublishResponse, error) {
err := c.sendWithTimeout(req, c.publishTimeout(), func(v interface{}) error {
return safeAssign(v, &res)
})
stats.RecordError(err)
dlog.Printf("PublishResponse: %s", debug.ToJSON(res))
return res, err
}
4 changes: 4 additions & 0 deletions generate.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ rm -f */*_gen.go
go run cmd/id/main.go
go run cmd/status/main.go
go run cmd/service/*.go
go run cmd/stats/*.go

# install stringer if not installed already
command -v stringer || go get -u golang.org/x/tools/cmd/stringer
Expand All @@ -18,5 +19,8 @@ echo "Wrote ua/enums_strings_gen.go"
stringer -type ConnState -output connstate_strings_gen.go
echo "Wrote connstate_strings_gen.go"

(cd stats && stringer -type Metric -output metrics_strings_gen.go)
echo "Wrote stats/metrics_strings_gen.go"

# remove golang.org/x/tools/cmd/stringer from list of dependencies
go mod tidy
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ require (
github.com/pascaldekloe/goe v0.1.0
github.com/pkg/errors v0.8.1
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e // indirect
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 // indirect
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e h1:WUoyKPm6nCo1BnNUvPGnFG3T5DUVem42yDJZZ4CNxMA=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 h1:id054HUawV2/6IGm2IV8KZQjqtwAOo2CYlOToYqa0d0=
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Loading

0 comments on commit 8b599ae

Please sign in to comment.