diff --git a/CMakeLists.txt b/CMakeLists.txt index fcb8103ec..2e432baa2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -50,6 +50,7 @@ find_path(MHD_INCLUDE_DIR microhttpd.h) find_library(MHD_LIBRARY NAMES microhttpd) find_library(BOOSTTEST_LIBRARY NAMES boost_unit_test_framework) find_library(BOOSTPO_LIBRARY NAMES boost_program_options) +find_library(BOOSTSYSTEM_LIBRARY NAMES boost_system) find_path(TCMALLOC_INCLUDE_DIR gperftools/heap-profiler.h) find_library(TCMALLOC_LIBRARY NAMES tcmalloc_and_profiler) find_path(HIREDIS_INCLUDE_DIR hiredis/hiredis.h) diff --git a/examples/ace/ttcp/CMakeLists.txt b/examples/ace/ttcp/CMakeLists.txt index 1b4834bbb..028c967de 100644 --- a/examples/ace/ttcp/CMakeLists.txt +++ b/examples/ace/ttcp/CMakeLists.txt @@ -1,4 +1,3 @@ - if(BOOSTPO_LIBRARY) add_executable(ttcp_blocking ttcp_blocking.cc common.cc main.cc) target_link_libraries(ttcp_blocking muduo_base boost_program_options) @@ -6,6 +5,13 @@ if(BOOSTPO_LIBRARY) add_executable(ttcp_muduo ttcp.cc common.cc main.cc) target_link_libraries(ttcp_muduo muduo_net boost_program_options) - set_target_properties(ttcp_muduo PROPERTIES COMPILE_FLAGS "-Wno-error=conversion") + + if(BOOSTSYSTEM_LIBRARY) + add_executable(ttcp_asio_sync ttcp_asio_sync.cc common.cc main.cc) + target_link_libraries(ttcp_asio_sync muduo_base boost_program_options boost_system) + + add_executable(ttcp_asio_async ttcp_asio_async.cc common.cc main.cc) + target_link_libraries(ttcp_asio_async muduo_base boost_program_options boost_system) + endif() endif() diff --git a/examples/ace/ttcp/ttcp_asio_async.cc b/examples/ace/ttcp/ttcp_asio_async.cc new file mode 100644 index 000000000..ab1168191 --- /dev/null +++ b/examples/ace/ttcp/ttcp_asio_async.cc @@ -0,0 +1,173 @@ +#include + +#include +#include +#include +#include +#include + +using boost::asio::ip::tcp; + +void transmit(const Options& opt) +{ + try + { + } + catch (std::exception& e) + { + LOG_ERROR << e.what(); + } +} + +class TtcpServerConnection : boost::noncopyable, + public boost::enable_shared_from_this +{ + public: + TtcpServerConnection(boost::asio::io_service& io_service) + : socket_(io_service), count_(0), payload_(NULL), ack_(0) + { + sessionMessage_.number = 0; + sessionMessage_.length = 0; + } + + ~TtcpServerConnection() + { + ::free(payload_); + } + + tcp::socket& socket() { return socket_; } + + void start() + { + std::ostringstream oss; + oss << socket_.remote_endpoint(); + LOG_INFO << "Got connection from " << oss.str(); + boost::asio::async_read( + socket_, boost::asio::buffer(&sessionMessage_, sizeof(sessionMessage_)), + boost::bind(&TtcpServerConnection::handleReadSession, shared_from_this(), + boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); + } + + private: + void handleReadSession(const boost::system::error_code& error, size_t len) + { + if (!error && len == sizeof sessionMessage_) + { + sessionMessage_.number = ntohl(sessionMessage_.number); + sessionMessage_.length = ntohl(sessionMessage_.length); + printf("receive number = %d\nreceive length = %d\n", + sessionMessage_.number, sessionMessage_.length); + const int total_len = static_cast(sizeof(int32_t) + sessionMessage_.length); + payload_ = static_cast(::malloc(total_len)); + payload_->length = 0; + boost::asio::async_read( + socket_, boost::asio::buffer(&payload_->length, sizeof payload_->length), + boost::bind(&TtcpServerConnection::handleReadLength, shared_from_this(), + boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); + } + else + { + LOG_ERROR << "read session message: " << error.message(); + } + } + + void handleReadLength(const boost::system::error_code& error, size_t len) + { + if (!error && len == sizeof payload_->length) + { + payload_->length = ntohl(payload_->length); + assert(payload_->length == sessionMessage_.length); + boost::asio::async_read( + socket_, boost::asio::buffer(&payload_->data, payload_->length), + boost::bind(&TtcpServerConnection::handleReadPayload, shared_from_this(), + boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); + } + else + { + LOG_ERROR << "read length: " << error.message(); + } + } + + void handleReadPayload(const boost::system::error_code& error, size_t len) + { + if (!error && len == static_cast(payload_->length)) + { + ack_ = htonl(payload_->length); + boost::asio::async_write( + socket_, boost::asio::buffer(&ack_, sizeof ack_), + boost::bind(&TtcpServerConnection::handleWriteAck, shared_from_this(), + boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); + } + else + { + LOG_ERROR << "read payload data: " << error.message(); + } + } + + void handleWriteAck(const boost::system::error_code& error, size_t len) + { + if (!error && len == sizeof ack_) + { + if (++count_ < sessionMessage_.number) + { + payload_->length = 0; + boost::asio::async_read( + socket_, boost::asio::buffer(&payload_->length, sizeof payload_->length), + boost::bind(&TtcpServerConnection::handleReadLength, shared_from_this(), + boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); + } + else + { + LOG_INFO << "Done"; + } + } + else + { + LOG_ERROR << "write ack: " << error.message(); + } + } + + tcp::socket socket_; + int count_; + struct SessionMessage sessionMessage_; + struct PayloadMessage* payload_; + int32_t ack_; +}; +typedef boost::shared_ptr TtcpServerConnectionPtr; + +void startAccept(tcp::acceptor& acceptor); + +void handleAccept(tcp::acceptor& acceptor, TtcpServerConnectionPtr new_connection, + const boost::system::error_code& error) +{ + if (!error) + { + new_connection->start(); + } + + startAccept(acceptor); +} + +void startAccept(tcp::acceptor& acceptor) +{ + TtcpServerConnectionPtr new_connection(new TtcpServerConnection(acceptor.get_io_service())); + acceptor.async_accept(new_connection->socket(), + boost::bind(handleAccept, boost::ref(acceptor), new_connection, + boost::asio::placeholders::error)); +} + +void receive(const Options& opt) +{ + try + { + boost::asio::io_service io_service; + tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), opt.port)); + startAccept(acceptor); + io_service.run(); + } + catch (std::exception& e) + { + LOG_ERROR << e.what(); + } +} + diff --git a/examples/ace/ttcp/ttcp_asio_sync.cc b/examples/ace/ttcp/ttcp_asio_sync.cc new file mode 100644 index 000000000..1bad4b121 --- /dev/null +++ b/examples/ace/ttcp/ttcp_asio_sync.cc @@ -0,0 +1,75 @@ +#include + +#include +#include +#include + +using boost::asio::ip::tcp; + +void transmit(const Options& opt) +{ + try + { + } + catch (std::exception& e) + { + LOG_ERROR << e.what(); + } +} + +void receive(const Options& opt) +{ + try + { + boost::asio::io_service io_service; + tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), opt.port)); + tcp::socket socket(io_service); + acceptor.accept(socket); + + struct SessionMessage sessionMessage = { 0, 0 }; + boost::system::error_code error; + size_t nr = boost::asio::read(socket, boost::asio::buffer(&sessionMessage, sizeof sessionMessage), error); + if (nr != sizeof sessionMessage) + { + LOG_ERROR << "read session message: " << error.message(); + exit(1); + } + + sessionMessage.number = ntohl(sessionMessage.number); + sessionMessage.length = ntohl(sessionMessage.length); + printf("receive number = %d\nreceive length = %d\n", + sessionMessage.number, sessionMessage.length); + const int total_len = static_cast(sizeof(int32_t) + sessionMessage.length); + PayloadMessage* payload = static_cast(::malloc(total_len)); + // std::unique_ptr freeIt(payload, ::free); + boost::shared_ptr freeIt(payload, ::free); + assert(payload); + + for (int i = 0; i < sessionMessage.number; ++i) + { + payload->length = 0; + if (boost::asio::read(socket, boost::asio::buffer(&payload->length, sizeof(payload->length)), error) != sizeof(payload->length)) + { + LOG_ERROR << "read length: " << error.message(); + exit(1); + } + payload->length = ntohl(payload->length); + assert(payload->length == sessionMessage.length); + if (boost::asio::read(socket, boost::asio::buffer(payload->data, payload->length), error) != static_cast(payload->length)) + { + LOG_ERROR << "read payload data: " << error.message(); + exit(1); + } + int32_t ack = htonl(payload->length); + if (boost::asio::write(socket, boost::asio::buffer(&ack, sizeof(ack))) != sizeof(ack)) + { + LOG_ERROR << "write ack: " << error.message(); + exit(1); + } + } + } + catch (std::exception& e) + { + LOG_ERROR << e.what(); + } +} diff --git a/examples/procmon/procmon.cc b/examples/procmon/procmon.cc index e16b1eb3f..b930aa0a5 100644 --- a/examples/procmon/procmon.cc +++ b/examples/procmon/procmon.cc @@ -90,7 +90,7 @@ class Procmon : boost::noncopyable kBootTime_(getBootTime()), pid_(pid), server_(loop, InetAddress(port), getName()), - procname_(ProcessInfo::procname(readProcFile("stat")).as_string()), + procname_(procname ? procname : ProcessInfo::procname(readProcFile("stat")).as_string()), hostname_(ProcessInfo::hostname()), cmdline_(getCmdLine()), ticks_(0), @@ -462,7 +462,7 @@ int main(int argc, char* argv[]) EventLoop loop; uint16_t port = static_cast(atoi(argv[2])); - Procmon procmon(&loop, pid, port, argc > 3 ? argv[3] : ""); + Procmon procmon(&loop, pid, port, argc > 3 ? argv[3] : NULL); procmon.start(); loop.loop(); }