diff --git a/autopaho/net.go b/autopaho/net.go index c4a8d9e..1c9e8b5 100644 --- a/autopaho/net.go +++ b/autopaho/net.go @@ -42,7 +42,6 @@ import ( // context is cancelled (in which case nil will be returned). func establishServerConnection(ctx context.Context, cfg ClientConfig, firstConnection bool) (*paho.Client, *paho.Connack) { // Note: We do not touch b.cli in order to avoid adding thread safety issues. - var err error var attempt int = 0 for { @@ -53,51 +52,51 @@ func establishServerConnection(ctx context.Context, cfg ClientConfig, firstConne return nil, nil } for _, u := range cfg.ServerUrls { - connectionCtx, cancelConnCtx := context.WithTimeout(ctx, cfg.ConnectTimeout) - - if cfg.AttemptConnection != nil { // Use custom function if it is provided - cfg.Conn, err = cfg.AttemptConnection(ctx, cfg, u) - } else { - switch strings.ToLower(u.Scheme) { - case "mqtt", "tcp", "": - cfg.Conn, err = attemptTCPConnection(connectionCtx, u.Host) - case "ssl", "tls", "mqtts", "mqtt+ssl", "tcps": - cfg.Conn, err = attemptTLSConnection(connectionCtx, cfg.TlsCfg, u.Host) - case "ws": - cfg.Conn, err = attemptWebsocketConnection(connectionCtx, nil, cfg.WebSocketCfg, u) - case "wss": - cfg.Conn, err = attemptWebsocketConnection(connectionCtx, cfg.TlsCfg, cfg.WebSocketCfg, u) - default: - if cfg.OnConnectError != nil { - cfg.OnConnectError(fmt.Errorf("unsupported scheme (%s) user in url %s", u.Scheme, u.String())) - } - cancelConnCtx() - continue - } - } - var connack *paho.Connack - if err == nil { - cli := paho.NewClient(cfg.ClientConfig) - if cfg.PahoDebug != nil { - cli.SetDebugLogger(cfg.PahoDebug) - } - if cfg.PahoErrors != nil { - cli.SetErrorLogger(cfg.PahoErrors) + cp, err := cfg.buildConnectPacket(firstConnection, u) + if err == nil { + connectionCtx, cancelConnCtx := context.WithTimeout(ctx, cfg.ConnectTimeout) + + if cfg.AttemptConnection != nil { // Use custom function if it is provided + cfg.Conn, err = cfg.AttemptConnection(ctx, cfg, u) + } else { + switch strings.ToLower(u.Scheme) { + case "mqtt", "tcp", "": + cfg.Conn, err = attemptTCPConnection(connectionCtx, u.Host) + case "ssl", "tls", "mqtts", "mqtt+ssl", "tcps": + cfg.Conn, err = attemptTLSConnection(connectionCtx, cfg.TlsCfg, u.Host) + case "ws": + cfg.Conn, err = attemptWebsocketConnection(connectionCtx, nil, cfg.WebSocketCfg, u) + case "wss": + cfg.Conn, err = attemptWebsocketConnection(connectionCtx, cfg.TlsCfg, cfg.WebSocketCfg, u) + default: + if cfg.OnConnectError != nil { + cfg.OnConnectError(fmt.Errorf("unsupported scheme (%s) user in url %s", u.Scheme, u.String())) + } + cancelConnCtx() + continue + } } - var cp *paho.Connect - cp, err = cfg.buildConnectPacket(firstConnection, u) if err == nil { + cli := paho.NewClient(cfg.ClientConfig) + if cfg.PahoDebug != nil { + cli.SetDebugLogger(cfg.PahoDebug) + } + + if cfg.PahoErrors != nil { + cli.SetErrorLogger(cfg.PahoErrors) + } + connack, err = cli.Connect(connectionCtx, cp) // will return an error if the connection is unsuccessful (checks the reason code) if err == nil { // Successfully connected cancelConnCtx() return cli, connack } } + cancelConnCtx() } - cancelConnCtx() // Possible failure was due to outer context being cancelled if ctx.Err() != nil {