From bdb3b2feaadb1935e5b5b8291cd16a7a055be5ed Mon Sep 17 00:00:00 2001 From: Joel Bodenmann Date: Mon, 25 Nov 2024 03:07:08 +0100 Subject: [PATCH] client/http: More coro conversion --- lib/malloy/client/controller.hpp | 14 +-- lib/malloy/client/http/connection.hpp | 105 ++++++++-------------- lib/malloy/client/http/connection_tls.hpp | 25 +----- 3 files changed, 48 insertions(+), 96 deletions(-) diff --git a/lib/malloy/client/controller.hpp b/lib/malloy/client/controller.hpp index 169ec5b..e48d788 100644 --- a/lib/malloy/client/controller.hpp +++ b/lib/malloy/client/controller.hpp @@ -288,12 +288,14 @@ namespace malloy::client req.set(malloy::http::field::user_agent, m_cfg.user_agent); // Run - conn->run( - std::to_string(req.port()).c_str(), - req, - std::move(prom), - std::forward(cb), - std::forward(filter)); + boost::asio::co_spawn( + *m_ioc, + conn->run(std::move(req), std::move(prom), std::forward(cb), std::forward(filter)), + [conn](std::exception_ptr e) { // ToDo: Do we need to capture conn to keep it alive here?! + if (e) + std::rethrow_exception(e); + } + ); }); return err_channel; diff --git a/lib/malloy/client/http/connection.hpp b/lib/malloy/client/http/connection.hpp index d610172..a272208 100644 --- a/lib/malloy/client/http/connection.hpp +++ b/lib/malloy/client/http/connection.hpp @@ -15,10 +15,6 @@ #include #include - - -#include - namespace malloy::client::http { @@ -31,9 +27,6 @@ namespace malloy::client::http class connection { public: - using resp_t = typename Filter::response_type; - using callback_t = Callback; - connection(std::shared_ptr logger, boost::asio::io_context& io_ctx, const std::uint64_t body_limit) : m_logger(std::move(logger)), m_io_ctx(io_ctx) @@ -46,92 +39,57 @@ namespace malloy::client::http m_parser.body_limit(body_limit); } - // Start the asynchronous operation - void - run( - char const* port, - malloy::http::request req, - std::promise err_channel, - callback_t&& cb, - Filter&& filter - ) - { - boost::asio::co_spawn( - m_io_ctx, - run_coro(std::string(port), std::move(req), std::move(err_channel), std::move(cb), std::move(filter)), - [me = derived().shared_from_this()](std::exception_ptr e) { - if (e) - std::rethrow_exception(e); - } - ); - } - + // ToDo: Remove port parameter, we can get that from the req + // ToDo: Return something like std::expected instead boost::asio::awaitable - run_coro( - std::string port, // ToDo + run( malloy::http::request req, std::promise err_channel, - callback_t&& cb, + Callback&& cb, Filter&& filter ) { - namespace http = boost::beast::http; - auto executor = co_await boost::asio::this_coro::executor; - auto resolver = boost::asio::ip::tcp::resolver{ executor }; - //auto stream = boost::beast::tcp_stream{ executor }; // Look up the domain name - auto const results = co_await resolver.async_resolve(req.base()[malloy::http::field::host], port); - - // Set the timeout. - derived().stream().expires_after(std::chrono::seconds(30)); + auto resolver = boost::asio::ip::tcp::resolver{ executor }; + auto const results = co_await resolver.async_resolve(req.base()[malloy::http::field::host], std::to_string(req.port())); // Make the connection on the IP address we get from a lookup - co_await derived().stream().async_connect(results); + set_stream_timeout(std::chrono::seconds(30)); // ToDo: Don't hard-code + co_await boost::beast::get_lowest_layer(derived().stream()).async_connect(results); // Call "connected" hook co_await derived().hook_connected(); - // Set the timeout. - derived().stream().expires_after(std::chrono::seconds(30)); - // Send the HTTP request to the remote host - co_await http::async_write(derived().stream(), req); - - // This buffer is used for reading and must be persisted - //boost::beast::flat_buffer buffer; - - // Declare a container to hold the response - //http::response res; - - // Receive the HTTP response - //co_await http::async_read(derived().stream(), buffer, res); - -#if 0 - (void)cb; - (void)filter; + set_stream_timeout(std::chrono::seconds(30)); // ToDo: Don't hard-code + co_await boost::beast::http::async_write(derived().stream(), req); - // Write the message to standard out - std::cout << res << std::endl; -#else // ToDo - m_req_filter = std::move(filter); - //m_req = std::move(req); m_cb.emplace(std::move(cb)); // Pick a body and parse it from the stream // ToDo: Have a look at using boost::beast::http::response instead! + // This would probably involve something like: + // + // boost::beast::flat_buffer buffer; + // http::response res; + // co_await http::async_read(derived().stream(), buffer, res); + // auto bodies = filter.body_for(m_parser.get().base()); - co_await std::visit([this](auto&& body) -> boost::asio::awaitable { + co_await std::visit( + [filter = std::move(filter), this](auto&& body) -> boost::asio::awaitable { using body_t = std::decay_t; auto parser = std::make_shared>(std::move(m_parser)); - m_req_filter.setup_body(parser->get().base(), parser->get().body()); + filter.setup_body(parser->get().base(), parser->get().body()); + + boost::beast::flat_buffer buffer; co_await boost::beast::http::async_read( derived().stream(), - m_buffer, + buffer, *parser, boost::asio::use_awaitable); @@ -139,11 +97,12 @@ namespace malloy::client::http }, std::move(bodies) ); -#endif // Gracefully close the socket boost::beast::error_code ec; - derived().stream().socket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); + set_stream_timeout(std::chrono::seconds(30)); // ToDo: Don't hard-code + // ToDo: This should be co_await too! + boost::beast::get_lowest_layer(derived().stream()).socket().shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); // not_connected happens sometimes // so don't bother reporting it. @@ -160,12 +119,20 @@ namespace malloy::client::http protected: std::shared_ptr m_logger; + template + void + set_stream_timeout(const std::chrono::duration duration) + { + // ToDo: Which one is correct?!?! + + //derived().stream().expires_after(duration); + boost::beast::get_lowest_layer(derived().stream()).expires_after(duration); + } + private: boost::asio::io_context& m_io_ctx; boost::beast::http::response_parser m_parser; - std::optional m_cb; // ToDo: Get rid of this, no longer required - Filter m_req_filter; // ToDo: Get rid of this, no longer required - boost::beast::flat_buffer m_buffer; // ToDo: Get rid of this, no longer required + std::optional m_cb; // ToDo: Get rid of this, no longer required [[nodiscard]] constexpr diff --git a/lib/malloy/client/http/connection_tls.hpp b/lib/malloy/client/http/connection_tls.hpp index 8fcd26c..a1540e4 100644 --- a/lib/malloy/client/http/connection_tls.hpp +++ b/lib/malloy/client/http/connection_tls.hpp @@ -38,33 +38,16 @@ namespace malloy::client::http return m_stream; } - void + // ToDo: Return error code! + boost::asio::awaitable hook_connected() { // Perform the TLS handshake - m_stream.async_handshake( - boost::asio::ssl::stream_base::client, - boost::beast::bind_front_handler( - &connection_tls::on_handshake, - this->shared_from_this() - ) - ); + parent_t::set_stream_timeout(std::chrono::seconds(3)); // ToDo: Do not hard-code! + co_await m_stream.async_handshake(boost::asio::ssl::stream_base::client); } private: boost::beast::ssl_stream> m_stream; - - void - on_handshake(const boost::beast::error_code ec) - { - if (ec) - return parent_t::m_logger->error("on_handshake(): {}", ec.message()); - - // Set a timeout on the operation - boost::beast::get_lowest_layer(m_stream).expires_after(std::chrono::seconds(30)); - - // Send the HTTP request to the remote host - parent_t::send_request(); - } }; }