先贴代码,利用了boost asio协程,所以代码基本是同步的编写,但实际是异步地执行。
#define BOOST_ASIO_HAS_CO_AWAIT #include <boost/asio/awaitable.hpp> #include <boost/asio/buffer.hpp> #include <boost/asio/co_spawn.hpp> #include <boost/asio/connect.hpp> #include <boost/asio/detached.hpp> #include <boost/asio/io_context.hpp> #include <boost/asio/ip/tcp.hpp> #include <boost/asio/read.hpp> #include <boost/asio/signal_set.hpp> #include <boost/asio/this_coro.hpp> #include <boost/asio/use_awaitable.hpp> #include <boost/asio/write.hpp> #include <chrono> #include <cstddef> #include <cstdint> #include <cstring> #include <iostream> #include <memory> #include <string> using boost::asio::awaitable; using boost::asio::co_spawn; using boost::asio::detached; using boost::asio::use_awaitable; using boost::asio::ip::tcp; namespace this_coro = boost::asio::this_coro; constexpr int32_t kHeadLen = 4; constexpr int32_t kMaxBodyLen = 1024 * 1024; const char *kPort = "55555"; const char *kAddress = "127.0.0.1"; class Message { public: Message() {} ~Message() {} bool DecodeHeader() { body_length_ = atoi(header_); if (body_length_ > kMaxBodyLen) { body_length_ = 0; return false; } return true; } void EncodeHeader() { std::sprintf(header_, "%4d", body_length_); } void InitBodyBuffer(const int32_t body_len) { std::shared_ptr<char> ptr(new char[body_len], std::default_delete<char[]>()); body_ = ptr; body_length_ = body_len; } char *GetHeaderBuffer() { return header_; } char *GetBodyBuffer() { return body_.get(); } int32_t HeaderLen() { return kHeadLen; } int32_t BodyLen() { return body_length_; } private: int32_t body_length_ = 0; char header_[kHeadLen + 1] = {'0'}; std::shared_ptr<char> body_; }; awaitable<void> client(tcp::socket s, const int32_t index) { Message message; std::string body = std::to_string(index); message.InitBodyBuffer(body.length()); std::strncpy(message.GetBodyBuffer(), body.c_str(), body.length()); message.EncodeHeader(); co_await async_write( s, boost::asio::buffer(message.GetHeaderBuffer(), message.HeaderLen()), use_awaitable); co_await async_write( s, boost::asio::buffer(message.GetBodyBuffer(), message.BodyLen()), use_awaitable); char data_recv[1024]; std::size_t n = co_await async_read( s, boost::asio::buffer(data_recv, message.BodyLen()), use_awaitable); std::cout << "Reply is: "; std::cout.write(data_recv, n); std::cout << "\n"; } awaitable<void> server_handler(tcp::socket socket) { Message message; std::size_t n = co_await async_read( socket, boost::asio::buffer(message.GetHeaderBuffer(), message.HeaderLen()), use_awaitable); if (!message.DecodeHeader()) { std::cout << "Decode header fail.\n"; co_return; } message.InitBodyBuffer(message.BodyLen()); n = co_await async_read( socket, boost::asio::buffer(message.GetBodyBuffer(), message.BodyLen()), use_awaitable); std::cout << "Recieve is: "; std::cout.write(message.GetBodyBuffer(), n); std::cout << "\n"; n = co_await async_write( socket, boost::asio::buffer(message.GetBodyBuffer(), message.BodyLen()), use_awaitable); } awaitable<void> start_all_client() { auto executor = co_await this_coro::executor; for (int32_t i = 0; i < 10000; ++i) { tcp::socket s(executor); tcp::resolver resolver(executor); co_await boost::asio::async_connect(s, resolver.resolve(kAddress, kPort), use_awaitable); co_spawn(executor, client(std::move(s), i), detached); } } awaitable<void> listener() { auto executor = co_await this_coro::executor; tcp::acceptor acceptor(executor, {tcp::v4(), 55555}); for (;;) { tcp::socket socket = co_await acceptor.async_accept(use_awaitable); co_spawn(executor, server_handler(std::move(socket)), detached); } } int main(int argc, char **argv) { if (argc != 2) { std::cout << "[Role] 1:server 0:client\n"; std::cout << "[Usage] ./test1 Role\n"; return -1; } int32_t role = std::stoi(argv[1]); boost::asio::io_context io_context(1); boost::asio::signal_set signals(io_context, SIGINT, SIGTERM); signals.async_wait([&](auto, auto) { io_context.stop(); }); if (role == 1) { co_spawn(io_context, listener(), detached); } else if (role == 0) { co_spawn(io_context, start_all_client(), detached); } io_context.run(); return 0; }
CMakeLists.txt如下
cmake_minimum_required(VERSION 3.6) set(CMAKE_CXX_STANDARD 20) set(CMAKE_EXPORT_COMPILE_COMMANDS ON) set(CMAKE_CXX_COMPILER "clang++") set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -stdlib=libc++ -lc++abi") set(CMAKE_CXX_FLAGS "-fcoroutines-ts --stdlib=libc++ -Xclang -fconcepts-ts") find_package(Boost COMPONENTS system) include_directories( ${BOOST_INCLUDE_DIRS} ) add_executable(echo echo.cpp) target_link_libraries(echo ${Boost_LIBRARIES} pthread )