diff --git a/examples/monitor/monitor.go b/examples/monitor/monitor.go index ef0f39ca..0bbbed96 100644 --- a/examples/monitor/monitor.go +++ b/examples/monitor/monitor.go @@ -11,6 +11,7 @@ import ( "github.com/gopcua/opcua" "github.com/gopcua/opcua/debug" + "github.com/gopcua/opcua/id" "github.com/gopcua/opcua/monitor" "github.com/gopcua/opcua/ua" ) @@ -24,6 +25,7 @@ func main() { keyFile = flag.String("key", "", "Path to private key.pem. Required for security mode/policy != None") nodeID = flag.String("node", "", "node id to subscribe to") interval = flag.Duration("interval", opcua.DefaultSubscriptionInterval, "subscription interval") + event = flag.Bool("event", false, "are you subscribing to events") ) flag.BoolVar(&debug.Enable, "debug", false, "enable debug logging") flag.Parse() @@ -83,46 +85,127 @@ func main() { }) wg := &sync.WaitGroup{} - // start callback-based subscription - wg.Add(1) - go startCallbackSub(ctx, m, *interval, 0, wg, *nodeID) + fieldNames := []string{"EventId", "EventType", "Severity", "Time", "Message"} + selects := make([]*ua.SimpleAttributeOperand, len(fieldNames)) + for i, name := range fieldNames { + selects[i] = &ua.SimpleAttributeOperand{ + TypeDefinitionID: ua.NewNumericNodeID(0, id.BaseEventType), + BrowsePath: []*ua.QualifiedName{{NamespaceIndex: 0, Name: name}}, + AttributeID: ua.AttributeIDValue, + } + } - // start channel-based subscription - wg.Add(1) - go startChanSub(ctx, m, *interval, 0, wg, *nodeID) + wheres := &ua.ContentFilter{ + Elements: []*ua.ContentFilterElement{ + { + FilterOperator: ua.FilterOperatorGreaterThanOrEqual, + FilterOperands: []*ua.ExtensionObject{ + { + EncodingMask: 1, + TypeID: &ua.ExpandedNodeID{ + NodeID: ua.NewNumericNodeID(0, id.SimpleAttributeOperand_Encoding_DefaultBinary), + }, + Value: ua.SimpleAttributeOperand{ + TypeDefinitionID: ua.NewNumericNodeID(0, id.BaseEventType), + BrowsePath: []*ua.QualifiedName{{NamespaceIndex: 0, Name: "Severity"}}, + AttributeID: ua.AttributeIDValue, + }, + }, + { + EncodingMask: 1, + TypeID: &ua.ExpandedNodeID{ + NodeID: ua.NewNumericNodeID(0, id.LiteralOperand_Encoding_DefaultBinary), + }, + Value: ua.LiteralOperand{ + Value: ua.MustVariant(uint16(0)), + }, + }, + }, + }, + }, + } + filter := ua.EventFilter{ + SelectClauses: selects, + WhereClause: wheres, + } + + filterExtObj := ua.ExtensionObject{ + EncodingMask: ua.ExtensionObjectBinary, + TypeID: &ua.ExpandedNodeID{ + NodeID: ua.NewNumericNodeID(0, id.EventFilter_Encoding_DefaultBinary), + }, + Value: filter, + } + + if *event { + // start callback-based subscription + wg.Add(1) + go startCallbackSub(ctx, m, *interval, 0, wg, *event, &filterExtObj, *nodeID) + + // start channel-based subscription + wg.Add(1) + go startChanSub(ctx, m, *interval, 0, wg, *event, &filterExtObj, *nodeID) + } else { + // start callback-based subscription + wg.Add(1) + go startCallbackSub(ctx, m, *interval, 0, wg, *event, nil, *nodeID) + + // start channel-based subscription + wg.Add(1) + go startChanSub(ctx, m, *interval, 0, wg, *event, nil, *nodeID) + } <-ctx.Done() wg.Wait() } -func startCallbackSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag time.Duration, wg *sync.WaitGroup, nodes ...string) { +func startCallbackSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag time.Duration, wg *sync.WaitGroup, isEvent bool, filter *ua.ExtensionObject, nodes ...string) { + fieldNames := []string{"EventId", "EventType", "Severity", "Time", "Message"} sub, err := m.Subscribe( ctx, &opcua.SubscriptionParameters{ Interval: interval, }, - func(s *monitor.Subscription, msg *monitor.DataChangeMessage) { - if msg.Error != nil { - log.Printf("[callback] sub=%d error=%s", s.SubscriptionID(), msg.Error) - } else { - log.Printf("[callback] sub=%d ts=%s node=%s value=%v", s.SubscriptionID(), msg.SourceTimestamp.UTC().Format(time.RFC3339), msg.NodeID, msg.Value.Value()) + func(s *monitor.Subscription, msg monitor.Message) { + switch v := msg.(type) { + case *monitor.DataChangeMessage: + if v.Error != nil { + log.Printf("[callback] sub=%d error=%s", s.SubscriptionID(), v.Error) + } else { + log.Printf("[callback] sub=%d ts=%s node=%s value=%v", + s.SubscriptionID(), + v.SourceTimestamp.UTC().Format(time.RFC3339), + v.NodeID, + v.Value.Value()) + } + case *monitor.EventMessage: + if v.Error != nil { + log.Printf("[callback] sub=%d error=%s", s.SubscriptionID(), v.Error) + } else { + log.Printf("[callback] sub=%d event details:", s.SubscriptionID()) + for i, field := range v.EventFields { + if i < len(fieldNames) { + fieldName := fieldNames[i] + log.Printf(" %s: %v", fieldName, field.Value.Value()) + } + } + } + default: + log.Printf("[callback] sub=%d unknown message type=%T", s.SubscriptionID(), msg) } time.Sleep(lag) }, - nodes...) - + isEvent, filter, nodes...) if err != nil { log.Fatal(err) } - defer cleanup(ctx, sub, wg) - <-ctx.Done() } -func startChanSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag time.Duration, wg *sync.WaitGroup, nodes ...string) { - ch := make(chan *monitor.DataChangeMessage, 16) - sub, err := m.ChanSubscribe(ctx, &opcua.SubscriptionParameters{Interval: interval}, ch, nodes...) +func startChanSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag time.Duration, wg *sync.WaitGroup, isEvent bool, filter *ua.ExtensionObject, nodes ...string) { + ch := make(chan monitor.Message, 16) + sub, err := m.ChanSubscribe(ctx, &opcua.SubscriptionParameters{Interval: interval}, ch, isEvent, filter, nodes...) if err != nil { log.Fatal(err) @@ -135,10 +218,27 @@ func startChanSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag tim case <-ctx.Done(): return case msg := <-ch: - if msg.Error != nil { - log.Printf("[channel ] sub=%d error=%s", sub.SubscriptionID(), msg.Error) - } else { - log.Printf("[channel ] sub=%d ts=%s node=%s value=%v", sub.SubscriptionID(), msg.SourceTimestamp.UTC().Format(time.RFC3339), msg.NodeID, msg.Value.Value()) + switch v := msg.(type) { + case *monitor.DataChangeMessage: + if v.Error != nil { + log.Printf("[channel] sub=%d error=%s", sub.SubscriptionID(), v.Error) + } else { + log.Printf("[channel] sub=%d ts=%s node=%s value=%v", + sub.SubscriptionID(), + v.SourceTimestamp.UTC().Format(time.RFC3339), + v.NodeID, + v.Value.Value()) + } + case *monitor.EventMessage: + if v.Error != nil { + log.Printf("[channel] sub=%d error=%s", sub.SubscriptionID(), v.Error) + } else { + out := v.EventFields[0].Value.Value() + log.Printf("[channel] sub=%d event fields=%d", + sub.SubscriptionID(), out) + } + default: + log.Printf("[channel] sub=%d unknown message type: %T", sub.SubscriptionID(), msg) } time.Sleep(lag) } diff --git a/examples/subscribe/subscribe.go b/examples/subscribe/subscribe.go index 8127cd9a..12f026a6 100644 --- a/examples/subscribe/subscribe.go +++ b/examples/subscribe/subscribe.go @@ -133,7 +133,7 @@ func main() { func valueRequest(nodeID *ua.NodeID) *ua.MonitoredItemCreateRequest { handle := uint32(42) - return opcua.NewMonitoredItemCreateRequestWithDefaults(nodeID, ua.AttributeIDValue, handle) + return opcua.NewMonitoredItemCreateRequestWithDefaults(nodeID, ua.AttributeIDValue, nil, handle) } func eventRequest(nodeID *ua.NodeID) (*ua.MonitoredItemCreateRequest, []string) { diff --git a/examples/trigger/trigger.go b/examples/trigger/trigger.go index beafb3b3..47ea8093 100644 --- a/examples/trigger/trigger.go +++ b/examples/trigger/trigger.go @@ -90,7 +90,7 @@ func main() { } miCreateRequests := []*ua.MonitoredItemCreateRequest{ - opcua.NewMonitoredItemCreateRequestWithDefaults(triggeringNode, ua.AttributeIDValue, 42), + opcua.NewMonitoredItemCreateRequestWithDefaults(triggeringNode, ua.AttributeIDValue, nil, 42), { ItemToMonitor: &ua.ReadValueID{ NodeID: triggeredNode, diff --git a/monitor/subscription.go b/monitor/subscription.go index 22f7ef54..9e1a9681 100644 --- a/monitor/subscription.go +++ b/monitor/subscription.go @@ -2,8 +2,10 @@ package monitor import ( "context" + "fmt" "sync" "sync/atomic" + "time" "github.com/gopcua/opcua" "github.com/gopcua/opcua/errors" @@ -22,16 +24,28 @@ var ( type ErrHandler func(*opcua.Client, *Subscription, error) // MsgHandler is a function that is called for each new DataValue -type MsgHandler func(*Subscription, *DataChangeMessage) +type MsgHandler func(*Subscription, Message) + +// Message is an interface that can represent either a DataChangeMessage or an EventMessage +type Message interface { + isMessage() +} -// DataChangeMessage represents the changed DataValue from the server. It also includes a reference -// to the sending NodeID and error (if any) type DataChangeMessage struct { *ua.DataValue Error error NodeID *ua.NodeID } +func (DataChangeMessage) isMessage() {} + +type EventMessage struct { + EventFields []*ua.DataValue + Error error +} + +func (EventMessage) isMessage() {} + // NodeMonitor creates new subscriptions type NodeMonitor struct { client *opcua.Client @@ -88,7 +102,7 @@ func NewNodeMonitor(client *opcua.Client) (*NodeMonitor, error) { return m, nil } -func newSubscription(ctx context.Context, m *NodeMonitor, params *opcua.SubscriptionParameters, notifyChanLength int, nodes ...string) (*Subscription, error) { +func newSubscription(ctx context.Context, m *NodeMonitor, params *opcua.SubscriptionParameters, notifyChanLength int, eventSub bool, filter *ua.ExtensionObject, nodes ...string) (*Subscription, error) { if params == nil { params = &opcua.SubscriptionParameters{} } @@ -106,10 +120,10 @@ func newSubscription(ctx context.Context, m *NodeMonitor, params *opcua.Subscrip return nil, err } - if err = s.AddNodes(ctx, nodes...); err != nil { + if err = s.AddNodes(ctx, eventSub, filter, nodes...); err != nil { + fmt.Printf("Nodes: %v\n", nodes) return nil, err } - return s, nil } @@ -121,13 +135,13 @@ func (m *NodeMonitor) SetErrorHandler(cb ErrHandler) { // Subscribe creates a new callback-based subscription and an optional list of nodes. // The caller must call `Unsubscribe` to stop and clean up resources. Canceling the context // will also cause the subscription to stop, but `Unsubscribe` must still be called. -func (m *NodeMonitor) Subscribe(ctx context.Context, params *opcua.SubscriptionParameters, cb MsgHandler, nodes ...string) (*Subscription, error) { - sub, err := newSubscription(ctx, m, params, DefaultCallbackBufferLen, nodes...) +func (m *NodeMonitor) Subscribe(ctx context.Context, params *opcua.SubscriptionParameters, cb MsgHandler, eventSub bool, filter *ua.ExtensionObject, nodes ...string) (*Subscription, error) { + sub, err := newSubscription(ctx, m, params, DefaultCallbackBufferLen, eventSub, filter, nodes...) if err != nil { return nil, err } - go sub.pump(ctx, nil, cb) + go sub.pump(ctx, nil, cb, true) return sub, nil } @@ -137,13 +151,12 @@ func (m *NodeMonitor) Subscribe(ctx context.Context, params *opcua.SubscriptionP // via the monitor's `ErrHandler`. // The caller must call `Unsubscribe` to stop and clean up resources. Canceling the context // will also cause the subscription to stop, but `Unsubscribe` must still be called. -func (m *NodeMonitor) ChanSubscribe(ctx context.Context, params *opcua.SubscriptionParameters, ch chan<- *DataChangeMessage, nodes ...string) (*Subscription, error) { - sub, err := newSubscription(ctx, m, params, 16, nodes...) +func (m *NodeMonitor) ChanSubscribe(ctx context.Context, params *opcua.SubscriptionParameters, ch chan<- Message, eventSub bool, filter *ua.ExtensionObject, nodes ...string) (*Subscription, error) { + sub, err := newSubscription(ctx, m, params, 16, eventSub, filter, nodes...) if err != nil { return nil, err } - - go sub.pump(ctx, ch, nil) + go sub.pump(ctx, ch, nil, true) return sub, nil } @@ -155,7 +168,7 @@ func (s *Subscription) sendError(err error) { } // internal func to read from internal channel and write to client provided channel -func (s *Subscription) pump(ctx context.Context, notifyCh chan<- *DataChangeMessage, cb MsgHandler) { +func (s *Subscription) pump(ctx context.Context, notifyCh chan<- Message, cb MsgHandler, sub bool) { for { select { case <-ctx.Done(): @@ -183,7 +196,6 @@ func (s *Subscription) pump(ctx context.Context, notifyCh chan<- *DataChangeMess continue } } - switch v := msg.Value.(type) { case *ua.DataChangeNotification: for _, item := range v.MonitoredItems { @@ -216,6 +228,49 @@ func (s *Subscription) pump(ctx context.Context, notifyCh chan<- *DataChangeMess panic("notifyCh or cb must be set") } } + + case *ua.EventNotificationList: + for _, item := range v.Events { + s.mu.RLock() + _, ok := s.handles[item.ClientHandle] + s.mu.RUnlock() + + out := &EventMessage{} + if !ok { + out.Error = errors.Errorf("handle %d not found", item.ClientHandle) + // TODO: should the error also propagate via the monitor callback? + } else { + // Initialize the EventFields slice with the correct size + out.EventFields = make([]*ua.DataValue, len(item.EventFields)) + + for i, field := range item.EventFields { + // Create a new DataValue + dataValue := &ua.DataValue{ + Value: field, + Status: ua.StatusOK, + SourceTimestamp: time.Now(), + ServerTimestamp: time.Now(), + } + out.EventFields[i] = dataValue + } + } + + if notifyCh != nil { + select { + case notifyCh <- out: + atomic.AddUint64(&s.delivered, 1) + default: + atomic.AddUint64(&s.dropped, 1) + s.sendError(ErrSlowConsumer) + } + } else if cb != nil { + cb(s, out) + atomic.AddUint64(&s.delivered, 1) + } else { + panic("notifyCh or cb must be set") + } + + } default: s.sendError(errors.Errorf("unknown message type: %T", msg.Value)) } @@ -254,16 +309,16 @@ func (s *Subscription) Dropped() uint64 { } // AddNodes adds nodes defined by their string representation -func (s *Subscription) AddNodes(ctx context.Context, nodes ...string) error { +func (s *Subscription) AddNodes(ctx context.Context, eventSub bool, filter *ua.ExtensionObject, nodes ...string) error { nodeIDs, err := parseNodeSlice(nodes...) if err != nil { return err } - return s.AddNodeIDs(ctx, nodeIDs...) + return s.AddNodeIDs(ctx, eventSub, filter, nodeIDs...) } // AddNodeIDs adds nodes -func (s *Subscription) AddNodeIDs(ctx context.Context, nodes ...*ua.NodeID) error { +func (s *Subscription) AddNodeIDs(ctx context.Context, eventSub bool, filter *ua.ExtensionObject, nodes ...*ua.NodeID) error { requests := make([]Request, len(nodes)) for i, node := range nodes { @@ -272,12 +327,73 @@ func (s *Subscription) AddNodeIDs(ctx context.Context, nodes ...*ua.NodeID) erro MonitoringMode: ua.MonitoringModeReporting, } } - _, err := s.AddMonitorItems(ctx, requests...) + var err error + if eventSub { + _, err = s.AddMonitorEvents(ctx, filter, requests...) + } else { + _, err = s.AddMonitorItems(ctx, filter, requests...) + } return err } // AddMonitorItems adds nodes with monitoring parameters to the subscription -func (s *Subscription) AddMonitorItems(ctx context.Context, nodes ...Request) ([]Item, error) { +func (s *Subscription) AddMonitorItems(ctx context.Context, filter *ua.ExtensionObject, nodes ...Request) ([]Item, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if len(nodes) == 0 { + // some server implementations allow an empty monitoreditemrequest, some don't. + // better to just return + return nil, nil + } + + toAdd := make([]*ua.MonitoredItemCreateRequest, 0) + + // Add handles and make requests + for i, node := range nodes { + handle := atomic.AddUint32(&s.monitor.nextClientHandle, 1) + s.handles[handle] = nodes[i].NodeID + nodes[i].handle = handle + + request := opcua.NewMonitoredItemCreateRequestWithDefaults(node.NodeID, ua.AttributeIDValue, filter, handle) + request.MonitoringMode = node.MonitoringMode + + if node.MonitoringParameters != nil { + request.RequestedParameters = node.MonitoringParameters + request.RequestedParameters.ClientHandle = handle + } + toAdd = append(toAdd, request) + } + resp, err := s.sub.Monitor(ctx, ua.TimestampsToReturnBoth, toAdd...) + if err != nil { + return nil, err + } + + if resp.ResponseHeader.ServiceResult != ua.StatusOK { + return nil, resp.ResponseHeader.ServiceResult + } + + if len(resp.Results) != len(toAdd) { + return nil, errors.Errorf("monitor items response length mismatch") + } + var monitoredItems []Item + for i, res := range resp.Results { + if res.StatusCode != ua.StatusOK { + return nil, res.StatusCode + } + mn := Item{ + id: res.MonitoredItemID, + handle: nodes[i].handle, + nodeID: toAdd[i].ItemToMonitor.NodeID, + } + s.itemLookup[res.MonitoredItemID] = mn + monitoredItems = append(monitoredItems, mn) + } + return monitoredItems, nil +} + +// AddMonitorItems adds nodes with monitoring parameters to the subscription +func (s *Subscription) AddMonitorEvents(ctx context.Context, filter *ua.ExtensionObject, nodes ...Request) ([]Item, error) { s.mu.Lock() defer s.mu.Unlock() @@ -295,7 +411,7 @@ func (s *Subscription) AddMonitorItems(ctx context.Context, nodes ...Request) ([ s.handles[handle] = nodes[i].NodeID nodes[i].handle = handle - request := opcua.NewMonitoredItemCreateRequestWithDefaults(node.NodeID, ua.AttributeIDValue, handle) + request := opcua.NewMonitoredItemCreateRequestForEvents(node.NodeID, filter, handle) request.MonitoringMode = node.MonitoringMode if node.MonitoringParameters != nil { @@ -305,6 +421,7 @@ func (s *Subscription) AddMonitorItems(ctx context.Context, nodes ...Request) ([ toAdd = append(toAdd, request) } resp, err := s.sub.Monitor(ctx, ua.TimestampsToReturnBoth, toAdd...) + if err != nil { return nil, err } @@ -318,6 +435,7 @@ func (s *Subscription) AddMonitorItems(ctx context.Context, nodes ...Request) ([ } var monitoredItems []Item for i, res := range resp.Results { + if res.StatusCode != ua.StatusOK { return nil, res.StatusCode } @@ -329,7 +447,6 @@ func (s *Subscription) AddMonitorItems(ctx context.Context, nodes ...Request) ([ s.itemLookup[res.MonitoredItemID] = mn monitoredItems = append(monitoredItems, mn) } - return monitoredItems, nil } @@ -412,12 +529,10 @@ func parseNodeSlice(nodes ...string) ([]*ua.NodeID, error) { var err error nodeIDs := make([]*ua.NodeID, len(nodes)) - for i, node := range nodes { if nodeIDs[i], err = ua.ParseNodeID(node); err != nil { return nil, err } } - return nodeIDs, nil } diff --git a/subscription.go b/subscription.go index f08a81f6..2fbed622 100644 --- a/subscription.go +++ b/subscription.go @@ -52,7 +52,7 @@ type monitoredItem struct { ts ua.TimestampsToReturn } -func NewMonitoredItemCreateRequestWithDefaults(nodeID *ua.NodeID, attributeID ua.AttributeID, clientHandle uint32) *ua.MonitoredItemCreateRequest { +func NewMonitoredItemCreateRequestWithDefaults(nodeID *ua.NodeID, attributeID ua.AttributeID, filter *ua.ExtensionObject, clientHandle uint32) *ua.MonitoredItemCreateRequest { if attributeID == 0 { attributeID = ua.AttributeIDValue } @@ -66,12 +66,31 @@ func NewMonitoredItemCreateRequestWithDefaults(nodeID *ua.NodeID, attributeID ua RequestedParameters: &ua.MonitoringParameters{ ClientHandle: clientHandle, DiscardOldest: true, - Filter: nil, + Filter: filter, QueueSize: 10, SamplingInterval: 0.0, }, } } +func NewMonitoredItemCreateRequestForEvents(nodeID *ua.NodeID, filter *ua.ExtensionObject, clientHandle uint32) *ua.MonitoredItemCreateRequest { + req := &ua.MonitoredItemCreateRequest{ + ItemToMonitor: &ua.ReadValueID{ + NodeID: nodeID, + AttributeID: ua.AttributeIDEventNotifier, + DataEncoding: &ua.QualifiedName{}, + }, + MonitoringMode: ua.MonitoringModeReporting, + RequestedParameters: &ua.MonitoringParameters{ + ClientHandle: clientHandle, + DiscardOldest: true, + Filter: filter, + QueueSize: 10, + SamplingInterval: 1.0, + }, + } + + return req +} type PublishNotificationData struct { SubscriptionID uint32 @@ -141,6 +160,7 @@ func (s *Subscription) Monitor(ctx context.Context, ts ua.TimestampsToReturn, it ts: ts, } } + s.itemsMu.Unlock() return res, err diff --git a/uatest/reconnection_test.go b/uatest/reconnection_test.go index e919c53c..97d71062 100644 --- a/uatest/reconnection_test.go +++ b/uatest/reconnection_test.go @@ -82,7 +82,7 @@ func TestAutoReconnection(t *testing.T) { }, } - ch := make(chan *monitor.DataChangeMessage, 5) + ch := make(chan monitor.Message, 5) sctx, cancel := context.WithCancel(ctx) defer cancel() @@ -90,6 +90,8 @@ func TestAutoReconnection(t *testing.T) { sctx, &opcua.SubscriptionParameters{Interval: opcua.DefaultSubscriptionInterval}, ch, + false, + nil, currentTimeNodeID, ) if err != nil { @@ -99,9 +101,19 @@ func TestAutoReconnection(t *testing.T) { for _, tt := range tests { ok := t.Run(tt.name, func(t *testing.T) { - - if msg := <-ch; msg.Error != nil { - t.Fatalf("No error expected for first value: %s", msg.Error) + // Wait for the first message + select { + case msg := <-ch: + switch v := msg.(type) { + case *monitor.DataChangeMessage: + if v.Error != nil { + t.Fatalf("No error expected for first value: %s", v.Error) + } + default: + t.Fatalf("Unexpected message type: %T", msg) + } + case <-time.After(5 * time.Second): + t.Fatal("Timeout waiting for first message") } downC := make(chan struct{}, 1) @@ -146,8 +158,13 @@ func TestAutoReconnection(t *testing.T) { case <-rTimeout.C: t.Fatal("Timeout reached, reconnection failed") case msg := <-ch: - if err := msg.Error; err != nil { - t.Fatal(err) + switch v := msg.(type) { + case *monitor.DataChangeMessage: + if v.Error != nil { + t.Fatal(v.Error) + } + default: + t.Fatalf("Unexpected message type: %T", msg) } } })