Skip to content

Commit

Permalink
Merge pull request #211 from gopcua/uasc-server-prep
Browse files Browse the repository at this point in the history
move recv() loop from uasc into opcua package
  • Loading branch information
dwhutchison authored Jun 17, 2019
2 parents 1c1102b + 024329a commit 4c56f8a
Show file tree
Hide file tree
Showing 12 changed files with 437 additions and 187 deletions.
47 changes: 41 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"crypto/rand"
"fmt"
"io"
"log"
"reflect"
"sort"
Expand All @@ -25,7 +26,7 @@ import (
// GetEndpoints returns the available endpoint descriptions for the server.
func GetEndpoints(endpoint string) ([]*ua.EndpointDescription, error) {
c := NewClient(endpoint)
if err := c.Dial(); err != nil {
if err := c.Dial(context.Background()); err != nil {
return nil, err
}
defer c.Close()
Expand Down Expand Up @@ -127,11 +128,15 @@ func NewClient(endpoint string, opts ...Option) *Client {
}

// Connect establishes a secure channel and creates a new session.
func (c *Client) Connect() (err error) {
func (c *Client) Connect(ctx context.Context) (err error) {
if ctx == nil {
ctx = context.Background()
}

if c.sechan != nil {
return fmt.Errorf("already connected")
}
if err := c.Dial(); err != nil {
if err := c.Dial(ctx); err != nil {
return err
}
s, err := c.CreateSession(c.sessionCfg)
Expand All @@ -147,12 +152,16 @@ func (c *Client) Connect() (err error) {
}

// Dial establishes a secure channel.
func (c *Client) Dial() error {
func (c *Client) Dial(ctx context.Context) error {
if ctx == nil {
ctx = context.Background()
}

c.once.Do(func() { c.session.Store((*Session)(nil)) })
if c.sechan != nil {
return fmt.Errorf("secure channel already connected")
}
conn, err := uacp.Dial(context.Background(), c.endpointURL)
conn, err := uacp.Dial(ctx, c.endpointURL)
if err != nil {
return err
}
Expand All @@ -161,14 +170,40 @@ func (c *Client) Dial() error {
_ = conn.Close()
return err
}
c.sechan = sechan
go c.monitorChannel(ctx)

if err := sechan.Open(); err != nil {
_ = conn.Close()
c.sechan = nil
return err
}
c.sechan = sechan

return nil
}

func (c *Client) monitorChannel(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
msg := c.sechan.Receive(ctx)
if msg.Err != nil {
if msg.Err == io.EOF {
debug.Printf("Connection closed")
} else {
debug.Printf("Received error: %s", msg.Err)
}
// todo (dh): apart from the above message, we're ignoring this error because there is nothing watching it
// I'd prefer to have a way to return the error to the upper application.
return
}
debug.Printf("Received unsolicited message from server: %T", msg.V)
}
}
}

// Close closes the session and the secure channel.
func (c *Client) Close() error {
// try to close the session but ignore any error
Expand Down
5 changes: 4 additions & 1 deletion examples/accesslevel/accesslevel.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package main

import (
"context"
"flag"
"log"

Expand All @@ -22,8 +23,10 @@ func main() {
flag.Parse()
log.SetFlags(0)

ctx := context.Background()

c := opcua.NewClient(*endpoint)
if err := c.Connect(); err != nil {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
Expand Down
5 changes: 4 additions & 1 deletion examples/browse/browse.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package main

import (
"context"
"flag"
"fmt"
"log"
Expand Down Expand Up @@ -52,8 +53,10 @@ func main() {
flag.Parse()
log.SetFlags(0)

ctx := context.Background()

c := opcua.NewClient(*endpoint)
if err := c.Connect(); err != nil {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
Expand Down
7 changes: 5 additions & 2 deletions examples/crypto/crypto.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package main

import (
"bufio"
"context"
"crypto/rsa"
"crypto/tls"
"flag"
Expand Down Expand Up @@ -41,6 +42,8 @@ func main() {
flag.Parse()
log.SetFlags(0)

ctx := context.Background()

// Get a list of the endpoints for our target server
endpoints, err := opcua.GetEndpoints(*endpoint)
if err != nil {
Expand All @@ -58,7 +61,7 @@ func main() {

// Create a Client with the selected options
c := opcua.NewClient(*endpoint, opts...)
if err := c.Connect(); err != nil {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
Expand All @@ -83,7 +86,7 @@ func main() {
d := opcua.NewClient(*endpoint, opts...)

// Create a channel only and do not activate it automatically
d.Dial()
d.Dial(ctx)
defer d.Close()

// Activate the previous session on the new channel
Expand Down
5 changes: 4 additions & 1 deletion examples/datetime/datetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package main

import (
"context"
"flag"
"fmt"
"log"
Expand All @@ -24,6 +25,8 @@ func main() {
flag.Parse()
log.SetFlags(0)

ctx := context.Background()

endpoints, err := opcua.GetEndpoints(*endpoint)
if err != nil {
log.Fatal(err)
Expand All @@ -45,7 +48,7 @@ func main() {
}

c := opcua.NewClient(ep.EndpointURL, opts...)
if err := c.Connect(); err != nil {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
Expand Down
5 changes: 4 additions & 1 deletion examples/history-read/history-read.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package main

import (
"context"
"flag"
"log"
"time"
Expand All @@ -21,8 +22,10 @@ func main() {
flag.Parse()
log.SetFlags(0)

ctx := context.Background()

c := opcua.NewClient(*endpoint)
if err := c.Connect(); err != nil {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
Expand Down
5 changes: 4 additions & 1 deletion examples/read/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package main

import (
"context"
"flag"
"log"

Expand All @@ -22,8 +23,10 @@ func main() {
flag.Parse()
log.SetFlags(0)

ctx := context.Background()

c := opcua.NewClient(*endpoint, opcua.SecurityMode(ua.MessageSecurityModeNone))
if err := c.Connect(); err != nil {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
Expand Down
4 changes: 3 additions & 1 deletion examples/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ func main() {
)
flag.Parse()

ctx := context.Background()

log.Printf("Listening on %s", *endpoint)
l, err := uacp.Listen(*endpoint, nil)
if err != nil {
log.Fatal(err)
}
c, err := l.Accept(context.Background())
c, err := l.Accept(ctx)
if err != nil {
log.Fatal(err)
}
Expand Down
14 changes: 7 additions & 7 deletions examples/subscribe/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ func main() {
flag.Parse()
log.SetFlags(0)

// add an arbitrary timeout to demonstrate how to stop a subscription
// with a context.
d := 30 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), d)
defer cancel()

endpoints, err := opcua.GetEndpoints(*endpoint)
if err != nil {
log.Fatal(err)
Expand All @@ -50,7 +56,7 @@ func main() {
}

c := opcua.NewClient(ep.EndpointURL, opts...)
if err := c.Connect(); err != nil {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
Expand All @@ -77,12 +83,6 @@ func main() {
log.Fatal(err)
}

// add an arbitrary timeout to demonstrate how to stop a subscription
// with a context.
d := 30 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), d)
defer cancel()

go sub.Run(ctx) // start Publish loop

// read from subscription's notification channel until ctx is cancelled
Expand Down
5 changes: 4 additions & 1 deletion examples/write/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package main

import (
"context"
"flag"
"log"

Expand All @@ -23,8 +24,10 @@ func main() {
flag.Parse()
log.SetFlags(0)

ctx := context.Background()

c := opcua.NewClient(*endpoint)
if err := c.Connect(); err != nil {
if err := c.Connect(ctx); err != nil {
log.Fatal(err)
}
defer c.Close()
Expand Down
16 changes: 15 additions & 1 deletion uapolicy/cert_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,25 @@

package uapolicy

import "crypto/sha1"
import (
"crypto/rsa"
"crypto/sha1"
"crypto/x509"
)

// Thumbprint returns the thumbprint of a DER-encoded certificate
func Thumbprint(c []byte) []byte {
thumbprint := sha1.Sum(c)

return thumbprint[:]
}

// PublicKey returns the RSA PublicKey from a DER-encoded certificate
func PublicKey(c []byte) (*rsa.PublicKey, error) {
cert, err := x509.ParseCertificate(c)
if err != nil {
return nil, err
}

return cert.PublicKey.(*rsa.PublicKey), nil
}
Loading

0 comments on commit 4c56f8a

Please sign in to comment.