Skip to content

Commit

Permalink
add ttcp asio examples
Browse files Browse the repository at this point in the history
  • Loading branch information
chenshuo committed Apr 17, 2018
1 parent 7c2474b commit 783748b
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 4 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ find_path(MHD_INCLUDE_DIR microhttpd.h)
find_library(MHD_LIBRARY NAMES microhttpd)
find_library(BOOSTTEST_LIBRARY NAMES boost_unit_test_framework)
find_library(BOOSTPO_LIBRARY NAMES boost_program_options)
find_library(BOOSTSYSTEM_LIBRARY NAMES boost_system)
find_path(TCMALLOC_INCLUDE_DIR gperftools/heap-profiler.h)
find_library(TCMALLOC_LIBRARY NAMES tcmalloc_and_profiler)
find_path(HIREDIS_INCLUDE_DIR hiredis/hiredis.h)
Expand Down
10 changes: 8 additions & 2 deletions examples/ace/ttcp/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@

if(BOOSTPO_LIBRARY)
add_executable(ttcp_blocking ttcp_blocking.cc common.cc main.cc)
target_link_libraries(ttcp_blocking muduo_base boost_program_options)
set_target_properties(ttcp_blocking PROPERTIES COMPILE_FLAGS "-Wno-error=old-style-cast -Wno-error=conversion")

add_executable(ttcp_muduo ttcp.cc common.cc main.cc)
target_link_libraries(ttcp_muduo muduo_net boost_program_options)
set_target_properties(ttcp_muduo PROPERTIES COMPILE_FLAGS "-Wno-error=conversion")

if(BOOSTSYSTEM_LIBRARY)
add_executable(ttcp_asio_sync ttcp_asio_sync.cc common.cc main.cc)
target_link_libraries(ttcp_asio_sync muduo_base boost_program_options boost_system)

add_executable(ttcp_asio_async ttcp_asio_async.cc common.cc main.cc)
target_link_libraries(ttcp_asio_async muduo_base boost_program_options boost_system)
endif()
endif()

173 changes: 173 additions & 0 deletions examples/ace/ttcp/ttcp_asio_async.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
#include <examples/ace/ttcp/common.h>

#include <muduo/base/Logging.h>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <stdio.h>

using boost::asio::ip::tcp;

void transmit(const Options& opt)
{
try
{
}
catch (std::exception& e)
{
LOG_ERROR << e.what();
}
}

class TtcpServerConnection : boost::noncopyable,
public boost::enable_shared_from_this<TtcpServerConnection>
{
public:
TtcpServerConnection(boost::asio::io_service& io_service)
: socket_(io_service), count_(0), payload_(NULL), ack_(0)
{
sessionMessage_.number = 0;
sessionMessage_.length = 0;
}

~TtcpServerConnection()
{
::free(payload_);
}

tcp::socket& socket() { return socket_; }

void start()
{
std::ostringstream oss;
oss << socket_.remote_endpoint();
LOG_INFO << "Got connection from " << oss.str();
boost::asio::async_read(
socket_, boost::asio::buffer(&sessionMessage_, sizeof(sessionMessage_)),
boost::bind(&TtcpServerConnection::handleReadSession, shared_from_this(),
boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}

private:
void handleReadSession(const boost::system::error_code& error, size_t len)
{
if (!error && len == sizeof sessionMessage_)
{
sessionMessage_.number = ntohl(sessionMessage_.number);
sessionMessage_.length = ntohl(sessionMessage_.length);
printf("receive number = %d\nreceive length = %d\n",
sessionMessage_.number, sessionMessage_.length);
const int total_len = static_cast<int>(sizeof(int32_t) + sessionMessage_.length);
payload_ = static_cast<PayloadMessage*>(::malloc(total_len));
payload_->length = 0;
boost::asio::async_read(
socket_, boost::asio::buffer(&payload_->length, sizeof payload_->length),
boost::bind(&TtcpServerConnection::handleReadLength, shared_from_this(),
boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
else
{
LOG_ERROR << "read session message: " << error.message();
}
}

void handleReadLength(const boost::system::error_code& error, size_t len)
{
if (!error && len == sizeof payload_->length)
{
payload_->length = ntohl(payload_->length);
assert(payload_->length == sessionMessage_.length);
boost::asio::async_read(
socket_, boost::asio::buffer(&payload_->data, payload_->length),
boost::bind(&TtcpServerConnection::handleReadPayload, shared_from_this(),
boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
else
{
LOG_ERROR << "read length: " << error.message();
}
}

void handleReadPayload(const boost::system::error_code& error, size_t len)
{
if (!error && len == static_cast<size_t>(payload_->length))
{
ack_ = htonl(payload_->length);
boost::asio::async_write(
socket_, boost::asio::buffer(&ack_, sizeof ack_),
boost::bind(&TtcpServerConnection::handleWriteAck, shared_from_this(),
boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
else
{
LOG_ERROR << "read payload data: " << error.message();
}
}

void handleWriteAck(const boost::system::error_code& error, size_t len)
{
if (!error && len == sizeof ack_)
{
if (++count_ < sessionMessage_.number)
{
payload_->length = 0;
boost::asio::async_read(
socket_, boost::asio::buffer(&payload_->length, sizeof payload_->length),
boost::bind(&TtcpServerConnection::handleReadLength, shared_from_this(),
boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}
else
{
LOG_INFO << "Done";
}
}
else
{
LOG_ERROR << "write ack: " << error.message();
}
}

tcp::socket socket_;
int count_;
struct SessionMessage sessionMessage_;
struct PayloadMessage* payload_;
int32_t ack_;
};
typedef boost::shared_ptr<TtcpServerConnection> TtcpServerConnectionPtr;

void startAccept(tcp::acceptor& acceptor);

void handleAccept(tcp::acceptor& acceptor, TtcpServerConnectionPtr new_connection,
const boost::system::error_code& error)
{
if (!error)
{
new_connection->start();
}

startAccept(acceptor);
}

void startAccept(tcp::acceptor& acceptor)
{
TtcpServerConnectionPtr new_connection(new TtcpServerConnection(acceptor.get_io_service()));
acceptor.async_accept(new_connection->socket(),
boost::bind(handleAccept, boost::ref(acceptor), new_connection,
boost::asio::placeholders::error));
}

void receive(const Options& opt)
{
try
{
boost::asio::io_service io_service;
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), opt.port));
startAccept(acceptor);
io_service.run();
}
catch (std::exception& e)
{
LOG_ERROR << e.what();
}
}

75 changes: 75 additions & 0 deletions examples/ace/ttcp/ttcp_asio_sync.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#include <examples/ace/ttcp/common.h>

#include <muduo/base/Logging.h>
#include <boost/asio.hpp>
#include <stdio.h>

using boost::asio::ip::tcp;

void transmit(const Options& opt)
{
try
{
}
catch (std::exception& e)
{
LOG_ERROR << e.what();
}
}

void receive(const Options& opt)
{
try
{
boost::asio::io_service io_service;
tcp::acceptor acceptor(io_service, tcp::endpoint(tcp::v4(), opt.port));
tcp::socket socket(io_service);
acceptor.accept(socket);

struct SessionMessage sessionMessage = { 0, 0 };
boost::system::error_code error;
size_t nr = boost::asio::read(socket, boost::asio::buffer(&sessionMessage, sizeof sessionMessage), error);
if (nr != sizeof sessionMessage)
{
LOG_ERROR << "read session message: " << error.message();
exit(1);
}

sessionMessage.number = ntohl(sessionMessage.number);
sessionMessage.length = ntohl(sessionMessage.length);
printf("receive number = %d\nreceive length = %d\n",
sessionMessage.number, sessionMessage.length);
const int total_len = static_cast<int>(sizeof(int32_t) + sessionMessage.length);
PayloadMessage* payload = static_cast<PayloadMessage*>(::malloc(total_len));
// std::unique_ptr<PayloadMessage, void (*)(void*)> freeIt(payload, ::free);
boost::shared_ptr<PayloadMessage> freeIt(payload, ::free);
assert(payload);

for (int i = 0; i < sessionMessage.number; ++i)
{
payload->length = 0;
if (boost::asio::read(socket, boost::asio::buffer(&payload->length, sizeof(payload->length)), error) != sizeof(payload->length))
{
LOG_ERROR << "read length: " << error.message();
exit(1);
}
payload->length = ntohl(payload->length);
assert(payload->length == sessionMessage.length);
if (boost::asio::read(socket, boost::asio::buffer(payload->data, payload->length), error) != static_cast<size_t>(payload->length))
{
LOG_ERROR << "read payload data: " << error.message();
exit(1);
}
int32_t ack = htonl(payload->length);
if (boost::asio::write(socket, boost::asio::buffer(&ack, sizeof(ack))) != sizeof(ack))
{
LOG_ERROR << "write ack: " << error.message();
exit(1);
}
}
}
catch (std::exception& e)
{
LOG_ERROR << e.what();
}
}
4 changes: 2 additions & 2 deletions examples/procmon/procmon.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class Procmon : boost::noncopyable
kBootTime_(getBootTime()),
pid_(pid),
server_(loop, InetAddress(port), getName()),
procname_(ProcessInfo::procname(readProcFile("stat")).as_string()),
procname_(procname ? procname : ProcessInfo::procname(readProcFile("stat")).as_string()),
hostname_(ProcessInfo::hostname()),
cmdline_(getCmdLine()),
ticks_(0),
Expand Down Expand Up @@ -462,7 +462,7 @@ int main(int argc, char* argv[])

EventLoop loop;
uint16_t port = static_cast<uint16_t>(atoi(argv[2]));
Procmon procmon(&loop, pid, port, argc > 3 ? argv[3] : "");
Procmon procmon(&loop, pid, port, argc > 3 ? argv[3] : NULL);
procmon.start();
loop.loop();
}

0 comments on commit 783748b

Please sign in to comment.