Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Development/websocket #1794

Closed
wants to merge 14 commits into from
Closed

Development/websocket #1794

wants to merge 14 commits into from

Conversation

msieben
Copy link
Contributor

@msieben msieben commented Nov 21, 2024

No description provided.

msieben and others added 12 commits November 6, 2024 15:23
…m item

- Rename 'WEBSERVER' to 'WEBSERVICE' as both client and server utilize state information

- Unsollicited 'Pong' may act as a heart beat
…ructure are not yet contructed, and, improve SSL read and write use

- SSL read and write methods should only be used if the handshake is successfull

- Do not return error values for SSL read and write methods as number of read or written bytes
…ly) propagate to SSL structure after construction.

- Context should exist

- Options should be set
@@ -105,10 +105,14 @@ namespace Crypto {

// Signal a state change, Opened, Closed or Accepted
void StateChange() override {
if (IsOpen() && _context == nullptr) { // Initialize may not yet have happened
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By design the ASSERT(_context != nullptr) and lets add ASSERT(_ssl != nullptr) could be added here, cause if either of these are a nullptr, the Initialize should have signaled that by sending out an error != CORE_ERROR_NONE and than the statechange will never be called!
So I consider this unusefull additional code and think that it is a design flaw if a socket could be "open" but it was not Initialized(), see:

if ((m_Socket != INVALID_SOCKET) && (Initialize() == Core::ERROR_NONE)) {

if (IsOpen() == true) {
int result;

if (IsOpen() == true && _ssl != nullptr) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the SecureSocketPort::Handler::Initialize() an error is returned if _ssl was not constructed. Please code defensively, just ASSERT here on an (_ssl != nullptr);

SSL_set_tlsext_host_name(static_cast<SSL*>(_ssl), RemoteNode().HostName().c_str());
result = SSL_connect(static_cast<SSL*>(_ssl));
if (result == 1) {
int result{0};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stick to coding as done every where int result = 0;

return (SSL_write(static_cast<SSL*>(_ssl), buffer, length));
int32_t result = 0;

if (_handShaking == CONNECTED) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was/is a design flaw, if someone would write already while there is no "OPEN"/"CONNECTED state, it might end up in a busy-loop. Suggest an ASSERT( _handShaking == CONNECTED); Programitically the user of the socket has to wait till the StateChange -> Open has ocured before the Write can be used!!


if (_handShaking != CONNECTED) {
const_cast<Handler&>(*this).Update();
if (_handShaking == CONNECTED) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be interesting to test if an ASSERT(handshaking == CONNECTED); fires. If so I guess the Update is usefull for the system to update its information if it was not CONNECTED (something that now is turned off).
If the ASSERT does not fire, lets clean up and do not call Update() and make the code more lean and mean!


if ((_context = SSL_CTX_new(TLS_method())) != nullptr) {
if ( // Returns bit-mask after adding options
((SSL_CTX_set_options(static_cast<SSL_CTX*>(_context), SSL_OP_ALL | SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3) & (SSL_OP_ALL | SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3)) == (SSL_OP_ALL | SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SSL_CTX_set_options contruction feels a bit like you are checking the openSSL implementation if it does what you are asking it to do. The manual does not give any restrictions on mutal exclusive options. So since we do not actively want to check third party libraries in Production or Release I suggest putting this check in an ASSERT if you want to check OpenSSL

@pwielders
Copy link
Contributor

@msieben you probably need this: #1795 to continue development of a SecureSocketPort server!

@pwielders
Copy link
Contributor

@msieben as I worked through your test code to reproduce the reason why you added the Initialize() in the StateChange() to find the bug mentioned in the PR above, I saw that the code you worte for the testserver/client is not thread safe! std::vector is not thread safe, you are using it (also in your test) in a multithreaded way, so be carefull!

@pwielders
Copy link
Contributor

pwielders commented Nov 22, 2024

Test code:

using namespace Thunder;

class SecureStream : public Crypto::SecureSocketPort {
private:
    // Validat client certificate
    class Validator : public Crypto::SecureSocketPort::IValidator {
    public:
        Validator(Validator&&) = delete;
        Validator(const Validator&) = delete;
        Validator& operator=(Validator&&) = delete;
        Validator& operator=(const Validator&) = delete;

        Validator() = default;
        ~Validator() override = default;

        bool Validate(const Certificate& certificate) const override {
            // Print certificate properties
            std::cout << std::dec << __LINE__ << " : " << __FUNCTION__ << "\n";
            std::cout << " |--> Issuer = " << certificate.Issuer() << "\n";
            std::cout << " |--> Subject = " << certificate.Subject() << "\n";
            std::cout << " |--> Valid from = " << certificate.ValidFrom().ToRFC1123() << "\n";
            std::cout << " |--> Valid until = " << certificate.ValidTill().ToRFC1123() << "\n";
            return true; // Always accept
        }
    };

public:
    SecureStream(SecureStream&&) = delete;
    SecureStream(const SecureStream&) = delete;
    SecureStream& operator=(SecureStream&&) = delete;
    SecureStream& operator=(const SecureStream&) = delete;

    // In essence, all parameters to SecureSocket are passed to a base class SocketPort
    SecureStream(
        const SOCKET& socket,
        const Core::NodeId& localNode,
        const uint16_t sendBufferSize,
        const uint16_t receiveBufferSize,
        const string& prefix)
        : Crypto::SecureSocketPort(Core::SocketPort::STREAM, socket, localNode, sendBufferSize, receiveBufferSize)
        , _prefix(prefix)
        , _validator() {
        // Validate custom (sefl signed) certificates
        uint32_t result = Callback(&_validator);
    }
    SecureStream(
        const bool,
        const Core::NodeId& localNode,
        const Core::NodeId& remoteNode,
        const uint16_t sendBufferSize,
        const uint16_t receiveBufferSize,
        const string& prefix)
        : Crypto::SecureSocketPort(Core::SocketPort::STREAM, localNode, remoteNode, sendBufferSize, receiveBufferSize, sendBufferSize, receiveBufferSize)
        , _prefix(prefix)
        , _validator() {
        // Validate custom (self signed) client certificates
        uint32_t result = Callback(&_validator);
    }
    ~SecureStream() override = default;

private:
    const string _prefix;
    Validator _validator;
};

template <typename STREAM, uint16_t SENDBUFFERSIZE, uint16_t RECEIVEBUFFERSIZE>
class WebSocketServer : public Web::WebSocketServerType<STREAM> {
private:
    using BaseClass = Web::WebSocketServerType<STREAM>;
    using Message = std::basic_string<uint8_t>;
    using Messages = std::vector<Message>;

public:
    WebSocketServer() = delete;
    WebSocketServer(WebSocketServer&&) = delete;
    WebSocketServer(const WebSocketServer&) = delete;
    WebSocketServer& operator=(WebSocketServer&&) = delete;
    WebSocketServer& operator=(const WebSocketServer&) = delete;

    // SocketServerType defines SocketHandler of type SocketListener. SocketListener triggers Accept on StateChange, effectively, calling SocketServerType::Accept(SOCKET, NodeId) which creates a WebSocketServer with these parameters
    WebSocketServer(const SOCKET& socket, const Core::NodeId remoteNode, Core::SocketServerType<WebSocketServer<STREAM, SENDBUFFERSIZE, RECEIVEBUFFERSIZE>>*)
        // Initially this should be defined as a regular TCP socket
        : BaseClass(false /* binary*/, false /*masking */, socket, remoteNode, SENDBUFFERSIZE /* send buffer size */, RECEIVEBUFFERSIZE /* receive buffer size */, "WebSocketServerType")
    {
    }
    ~WebSocketServer() override {
        ASSERT(BaseClass::IsClosed() == true);
        BaseClass::Close(Core::infinite);
    }

public:
    // Non-idle then data available to send
    bool IsIdle() const override { 
        Core::SafeSyncType<Core::CriticalSection> lock(_adminLock);

        return _post.size() == 0; 
    }
    // Put data in the queue to send (to the (connected) client)
    bool Submit(const Message& message)
    {
        _adminLock.Lock();

        _post.emplace_back(message);

        // Trigger a call to SendData
        bool trigger = (_post.size() == 1);

        _adminLock.Unlock();

        if (trigger == true) {
            BaseClass::Link().Trigger();
        }

        return true;
    }

private:
    // Allow for eventfull state updates in this class
    void StateChange() override
    {
        std::cout << std::dec << __LINE__ << " : " << __FUNCTION__ << "\n";

        // Socket port open AND upgraded to WebSocket 
        std::cout << " |--> IsOpen() = " << this->IsOpen() << "\n";
        // Link will not accept new messages as they arrive
        std::cout << " |--> IsSuspended() = " << this->IsSuspended() << "\n";
        // Socket has been closed (removed), link cannot receive new TCP data
        std::cout << " |--> IsClosed() = " << this->IsClosed() << "\n";
        // Regular HTTP connection, no upgraded connection
        std::cout << " |--> IsWebServer() = " << this->IsWebServer() << "\n";
        // Upgrade in progress
        std::cout << " |--> IsUpgrading() = " << this->IsUpgrading() << "\n";
        // Upgraded connection
        std::cout << " |--> IsWebSocket() = " << this->IsWebSocket() << "\n";
        // Finishing frame received
        std::cout << " |--> IsCompleted() = " << this->IsCompleted() << "\n";
    }

    // Reflects payload, effective after upgrade
    uint16_t SendData(uint8_t* dataFrame, const uint16_t maxSendSize) override
    {
        // Prerequisited for calling this, otherwise there is no use calling this !!
        ASSERT(dataFrame != nullptr);
        ASSERT(maxSendSize > 0);
        ASSERT(BaseClass::IsOpen());
        ASSERT(BaseClass::IsWebSocket());
        ASSERT(BaseClass::IsCompleted());

        std::cout << std::dec << __LINE__ << " : " << __FUNCTION__ << "\n";

        uint16_t count = 0;

        if (IsIdle() == false) {

            _adminLock.Lock();

            Message& message = _post.front();

            count = static_cast<uint16_t>(std::min(message.size(), static_cast<size_t>(maxSendSize)));

            /* void* */ memcpy(dataFrame, message.data(), count);

            if (count == message.size()) {
                // full message loaded, drop it.
                _post.erase(_post.begin());
            }
            else {
                // Partial message drop, remove what we dropped.
                message.erase(0, count);
            }

            _adminLock.Unlock();

            std::cout << " |--> dataFrame (" << count << " ) = ";
            for (uint16_t index = 0; index < count; index++) {
                std::cout << std::hex << static_cast<int>(dataFrame[index]) << " ";
            }
            std::cout << "\n";

        }

        return count;
    }

      // Reflects payload, effective after upgrade
    uint16_t ReceiveData(uint8_t* dataFrame, const uint16_t receivedSize) override {
        std::cout << std::dec << __LINE__ << " : " << __FUNCTION__ << "\n";

        if (receivedSize > 0) {
            std::cout << " |--> dataFrame ( " << receivedSize << " ) = ";
            for (int32_t index = 0; index < receivedSize; index++) {
                std::cout << std::hex << static_cast<int>(dataFrame[index]) << " ";
            }
            std::cout << "\n";
        }

        // Echo the data in reverse order

        Message message(dataFrame, receivedSize);

        std::reverse(message.begin(), message.end());

        Submit(message);

        return(receivedSize);
    }

private:
    mutable Core::CriticalSection _adminLock;
    Messages _post; // Send message queue
};

template <typename STREAM>
class WebSocketClient : public Web::WebSocketClientType<STREAM> {
public:
    using Message = std::basic_string<uint8_t>;

private:
    using BaseClass = Web::WebSocketClientType<STREAM>;
    using Messages = std::vector<Message>;

public:
    WebSocketClient() = delete;
    WebSocketClient(WebSocketClient&&) = delete;
    WebSocketClient(const WebSocketClient&) = delete;
    WebSocketClient& operator=(WebSocketClient&&) = delete;
    WebSocketClient& operator=(const WebSocketClient&) = delete;

    template <typename... Args>
    WebSocketClient(
        const string& path,
        const string& protocol,
        const string& query,
        const string& origin,
        const bool binary,
        const bool masking,
        Args&&... args)
        : BaseClass(path, protocol, query, origin, binary, masking, /* <Arguments SocketStream> */ std::forward<Args>(args)... /*</Arguments SocketStream>*/) {
    }
    ~WebSocketClient() override {
        ASSERT(BaseClass::IsClosed() == true);
        BaseClass::Close(Core::infinite);
    }

public:
    // Non-idle then data available to send
    bool IsIdle() const override { 
        Core::SafeSyncType<Core::CriticalSection> lock(_adminLock);

        return _post.size() == 0; 
    }
    bool Submit(const Message& message)
    {
        _adminLock.Lock();

        _post.emplace_back(message);

        // Trigger a call to SendData
        bool trigger = (_post.size() == 1);

        _adminLock.Unlock();

        if (trigger == true) {
            BaseClass::Link().Trigger();
        }

        return true;
    }
    Message Response()
    {
        std::cout << std::dec << __LINE__ << " : " << __FUNCTION__ << "\n";

        Message message;

        _adminLock.Lock();

        if (_response.size() == 0) {
            _adminLock.Unlock();
        }
        else {
            message = _response.front();
            _response.erase(_response.begin());
            _adminLock.Unlock();

            std::cout << " |--> message ( " << message.size() << " ) = ";
            for (int32_t index = 0; index < message.size(); index++) {
                std::cout << std::hex << static_cast<int>(message[index]) << " ";
            }
            std::cout << "\n";
        }

        return message;
    }

private:
    // Allow for eventfull state updates in this class
    void StateChange() override
    {
        std::cout << std::dec << __LINE__ << " : " << __FUNCTION__ << "\n";

        // Socket port open AND upgraded to WebSocket 
        std::cout << " |--> IsOpen() = " << this->IsOpen() << "\n";
        // Link will not accept new messages as they arrive
        std::cout << " |--> IsSuspended() = " << this->IsSuspended() << "\n";
        // Socket has been closed (removed), link cannot receive new TCP data
        std::cout << " |--> IsClosed() = " << this->IsClosed() << "\n";
        // Regular HTTP connection, no upgraded connection
        std::cout << " |--> IsWebServer() = " << this->IsWebServer() << "\n";
        // Upgrade in progress
        std::cout << " |--> IsUpgrading() = " << this->IsUpgrading() << "\n";
        // Upgraded connection
        std::cout << " |--> IsWebSocket() = " << this->IsWebSocket() << "\n";
        // Finishing frame received
        std::cout << " |--> IsCompleted() = " << this->IsCompleted() << "\n";
    }

    // Reflects payload, effective after upgrade
    uint16_t SendData(uint8_t* dataFrame, const uint16_t maxSendSize) override
    {
        // Prerequisited for calling this, otherwise there is no use calling this !!
        ASSERT(dataFrame != nullptr);
        ASSERT(maxSendSize > 0);
        ASSERT(BaseClass::IsOpen());
        ASSERT(BaseClass::IsWebSocket());
        ASSERT(BaseClass::IsCompleted());

        std::cout << std::dec << __LINE__ << " : " << __FUNCTION__ << "\n";

        uint16_t count = 0;

        if (IsIdle() == false) {

            _adminLock.Lock();

            Message& message = _post.front();

            count = static_cast<uint16_t>(std::min(message.size(), static_cast<size_t>(maxSendSize)));

            /* void* */ memcpy(dataFrame, message.data(), count);

            if (count == message.size()) {
                /* iterator */ _post.erase(_post.begin());
            }
            else {
                /* this */ message.erase(0, count);
            }

            _adminLock.Unlock();

            std::cout << " |--> dataFrame ( " << count << "/" << maxSendSize << " ) = ";
            for (int32_t index = 0; index < count; index++) {
                std::cout << std::hex << static_cast<int>(dataFrame[index]) << " ";
            }
            std::cout << "\n";
        }

        return count;
    }

    // Reflects payload, effective after upgrade
    uint16_t ReceiveData(uint8_t* dataFrame, const uint16_t receivedSize) override
    {
        std::cout << std::dec << __LINE__ << " : " << __FUNCTION__ << "\n";

        if (receivedSize > 0) {
            _adminLock.Lock();
            _response.emplace_back(Message(dataFrame, receivedSize));
            _adminLock.Unlock();

            std::cout << " |--> dataFrame ( " << receivedSize << " ) = ";
            for (int32_t index = 0; index < receivedSize; index++) {
                std::cout << std::hex << static_cast<int>(dataFrame[index]) << " ";
            }
            std::cout << "\n";
        }

        return receivedSize;
    }

private:
    mutable Core::CriticalSection _adminLock;
    Messages _post; // Send message queue
    Messages _response; // Send message queue
};

and the test:

const TCHAR localHostName[] = "127.0.0.1";
constexpr uint16_t tcpServerPort     = 12345;   // TCP, default 80 or 443 (SSL)

// The minimum size is determined by the HTTP upgrade process. The limit here is above that threshold.
constexpr uint16_t sendBufferSize    = 1024;
constexpr uint16_t receiveBufferSize = 1024;
constexpr uint32_t maxWaitTimeMs     = 4000;

const std::string webSocketURIPath;     // HTTP URI part, empty path allowed 
const std::string webSocketProtocol;    // Optional HTTP field, WebSocket SubProtocol, ie, Sec-WebSocket-Protocol
const std::string webSocketURIQuery;    // HTTP URI part, absent query allowe
const std::string webSocketOrigin;      // Optional, set by browser clients
constexpr bool binary    = false;       // Flag to indicate WebSocket opcode 0x1 (test frame) or 0x2 (binary frame)  
constexpr bool masking   = true;        // Flag set by client to enable masking
constexpr bool rawSocket = false;

const uint8_t request[] = {"Lets confuse the russians !!!!!"};

void OpeningSecuredServerPort()
{
    const Core::NodeId localNode(localHostName, tcpServerPort, Core::NodeId::TYPE_IPV4);

    // This is a listening socket as result of using SocketServerType which enables listening
    Core::SocketServerType<WebSocketServer<SecureStream, sendBufferSize, receiveBufferSize>> server(localNode /* listening node*/);

    if (server.Open(maxWaitTimeMs) == Core::ERROR_NONE) {

        // Time to open a Client, see if I can get some data :-)
        WebSocketClient<SecureStream> client(webSocketURIPath, webSocketProtocol, webSocketURIQuery, webSocketOrigin, false, true, rawSocket, localNode.AnyInterface(), localNode, sendBufferSize, receiveBufferSize, "WebSocketClient");

        if (client.Open(3000) == Core::ERROR_NONE) {
            std::basic_string<uint8_t> message(request, sizeof(request));
            // Seems we have connections, now exchange a message
            client.Submit(message);

            // Sleep for some time so we can send and receive it :-)
            SleepMs(1000);

            WebSocketClient<SecureStream>::Message response = client.Response();

            printf("%s\n\n", response.c_str());
        }
    }
}

@msieben
Copy link
Contributor Author

msieben commented Nov 22, 2024

@pwielders thanks for the feedback though it was originally not intended to be merged, hence, the checks instead of the ASSERTs to enable a continuing workflow. Nevertheless, I appreciate the refreshing hints and advices. The underlying issue as presented in #1795 should nullify al lot of the commits that comprise this PR and present a better base to continue efforts.

@msieben msieben closed this Nov 22, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants