之前的文章,我们解决了在跨平台下收发1000k消息的粘包、少包问题
在测试的过程我们也发现了
windows下select的限制是64
Liunx下select的默认是1024
接下来我们来研究如何突破windows下select64的限制
通过WinSock2.h查看到windows下FD_SETSIZE是64
/* * Select uses arrays of SOCKETs. These macros manipulate such * arrays. FD_SETSIZE may be defined by the user before including * this file, but the default here should be >= 64. * * CAVEAT IMPLEMENTOR and USER: THESE MACROS AND TYPES MUST BE * INCLUDED IN WINSOCK2.H EXACTLY AS SHOWN HERE. */ #ifndef FD_SETSIZE #define FD_SETSIZE 64 #endif /* FD_SETSIZE */
因为MacOS和Linux一样都是使用的Unix系统
Linux下的select限制通过在MacOS系统中来查看源码
通过源码查看到Linux下的FD_SETSIZE是1024
#ifdef FD_SETSIZE #define __DARWIN_FD_SETSIZE FD_SETSIZE #else /* !FD_SETSIZE */ #define __DARWIN_FD_SETSIZE 1024 #endif /* FD_SETSIZE */ #define __DARWIN_NBBY 8 /* bits in a byte */ #define __DARWIN_NFDBITS (sizeof(__int32_t) * __DARWIN_NBBY) /* bits per mask */ #define __DARWIN_howmany(x, y) ((((x) % (y)) == 0) ? ((x) / (y)) : (((x) / (y)) + 1)) /* # y's == x bits? */ __BEGIN_DECLS typedef struct fd_set { __int32_t fds_bits[__DARWIN_howmany(__DARWIN_FD_SETSIZE, __DARWIN_NFDBITS)]; } fd_set; __END_DECLS
Windows下使用IOCP网络模型
Linux下使用epoll网络模型
在自己的头文件中重新定义这个宏(注意:必须在WindSock2.h头文件之前定义,因为其有ifndef的条件)
#ifdef _WIN32 #define FD_SETSIZE 1024 #define _CRT_SECURE_NO_WARNINGS #define WIN32_LEAN_AND_MEAN #define _WINSOCK_DEPRECATED_NO_WARNINGS #include<windows.h> #include<WinSock2.h> #pragma comment(lib,"ws2_32.lib") #else #include<unistd.h> //uni std #include<arpa/inet.h> #include<string.h> #define SOCKET int #define INVALID_SOCKET (SOCKET)(~0) #define SOCKET_ERROR (-1) #endif
server本机,client本机,client连接1000\
- 测试结论:
1、1000个client的连接效率还是比较慢的
2、收发消息带宽稳定在1Gbps
3、TCP延迟在4ms左右,丢包率0%
#ifndef _MessageHeader_hpp_ #define _MessageHeader_hpp_ enum CMD { CMD_LOGIN, CMD_LOGIN_RESULT, CMD_LOGOUT, CMD_LOGOUT_RESULT, CMD_NEW_USER_JOIN, CMD_ERROR }; struct DataHeader { DataHeader() { dataLength = sizeof(DataHeader); cmd = CMD_ERROR; } short dataLength; short cmd; }; //DataPackage struct Login : public DataHeader { Login() { dataLength = sizeof(Login); cmd = CMD_LOGIN; } char userName[32]; char PassWord[32]; char data[932]; }; struct LoginResult : public DataHeader { LoginResult() { dataLength = sizeof(LoginResult); cmd = CMD_LOGIN_RESULT; result = 0; } int result; char data[992]; }; struct Logout : public DataHeader { Logout() { dataLength = sizeof(Logout); cmd = CMD_LOGOUT; } char userName[32]; }; struct LogoutResult : public DataHeader { LogoutResult() { dataLength = sizeof(LogoutResult); cmd = CMD_LOGOUT_RESULT; result = 0; } int result; }; struct NewUserJoin : public DataHeader { NewUserJoin() { dataLength = sizeof(NewUserJoin); cmd = CMD_NEW_USER_JOIN; scok = 0; } int scok; }; #endif // !_MessageHeader_hpp_
#ifndef _EasyTcpServer_hpp_ #define _EasyTcpServer_hpp_ #ifdef _WIN32 #define FD_SETSIZE 1024 #define _CRT_SECURE_NO_WARNINGS #define WIN32_LEAN_AND_MEAN #define _WINSOCK_DEPRECATED_NO_WARNINGS #include<windows.h> #include<WinSock2.h> #pragma comment(lib,"ws2_32.lib") #else #include<unistd.h> //uni std #include<arpa/inet.h> #include<string.h> #define SOCKET int #define INVALID_SOCKET (SOCKET)(~0) #define SOCKET_ERROR (-1) #endif #include<stdio.h> #include<vector> #include"MessageHeader.hpp" //缓冲区最小单元大小 #ifndef RECV_BUFF_SZIE #define RECV_BUFF_SZIE 10240 #endif // !RECV_BUFF_SZIE class ClientSocket { public: ClientSocket(SOCKET sockfd = INVALID_SOCKET) { _sockfd = sockfd; memset(_szMsgBuf, 0, sizeof(_szMsgBuf)); _lastPos = 0; } SOCKET sockfd() { return _sockfd; } char* msgBuf() { return _szMsgBuf; } int getLastPos() { return _lastPos; } void setLastPos(int pos) { _lastPos = pos; } private: // socket fd_set file desc set SOCKET _sockfd; //第二缓冲区 消息缓冲区 char _szMsgBuf[RECV_BUFF_SZIE * 10]; //消息缓冲区的数据尾部位置 int _lastPos; }; class EasyTcpServer { private: SOCKET _sock; std::vector<ClientSocket*> _clients; public: EasyTcpServer() { _sock = INVALID_SOCKET; } virtual ~EasyTcpServer() { Close(); } //初始化Socket SOCKET InitSocket() { #ifdef _WIN32 //启动Windows socket 2.x环境 WORD ver = MAKEWORD(2, 2); WSADATA dat; WSAStartup(ver, &dat); #endif if (INVALID_SOCKET != _sock) { printf("<socket=%d>关闭旧连接...\n", (int)_sock); Close(); } _sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (INVALID_SOCKET == _sock) { printf("错误,建立socket失败...\n"); } else { printf("建立socket=<%d>成功...\n", (int)_sock); } return _sock; } //绑定IP和端口号 int Bind(const char* ip, unsigned short port) { //if (INVALID_SOCKET == _sock) //{ // InitSocket(); //} // 2 bind 绑定用于接受客户端连接的网络端口 sockaddr_in _sin = {}; _sin.sin_family = AF_INET; _sin.sin_port = htons(port);//host to net unsigned short #ifdef _WIN32 if (ip) { _sin.sin_addr.S_un.S_addr = inet_addr(ip); } else { _sin.sin_addr.S_un.S_addr = INADDR_ANY; } #else if (ip) { _sin.sin_addr.s_addr = inet_addr(ip); } else { _sin.sin_addr.s_addr = INADDR_ANY; } #endif int ret = bind(_sock, (sockaddr*)&_sin, sizeof(_sin)); if (SOCKET_ERROR == ret) { printf("错误,绑定网络端口<%d>失败...\n", port); } else { printf("绑定网络端口<%d>成功...\n", port); } return ret; } //监听端口号 int Listen(int n) { // 3 listen 监听网络端口 int ret = listen(_sock, n); if (SOCKET_ERROR == ret) { printf("socket=<%d>错误,监听网络端口失败...\n", _sock); } else { printf("socket=<%d>监听网络端口成功...\n", _sock); } return ret; } //接受客户端连接 SOCKET Accept() { // 4 accept 等待接受客户端连接 sockaddr_in clientAddr = {}; int nAddrLen = sizeof(sockaddr_in); SOCKET cSock = INVALID_SOCKET; #ifdef _WIN32 cSock = accept(_sock, (sockaddr*)&clientAddr, &nAddrLen); #else cSock = accept(_sock, (sockaddr*)&clientAddr, (socklen_t*)&nAddrLen); #endif if (INVALID_SOCKET == cSock) { printf("socket=<%d>错误,接受到无效客户端SOCKET...\n", (int)_sock); } else { NewUserJoin userJoin; SendDataToAll(&userJoin); _clients.push_back(new ClientSocket(cSock)); printf("socket=<%d>新客户端加入:socket = %d,IP = %s \n", (int)_sock, (int)cSock, inet_ntoa(clientAddr.sin_addr)); } return cSock; } //关闭Socket void Close() { if (_sock != INVALID_SOCKET) { #ifdef _WIN32 for (int n = (int)_clients.size() - 1; n >= 0; n--) { closesocket(_clients[n]->sockfd()); delete _clients[n]; } // 8 关闭套节字closesocket closesocket(_sock); //------------ //清除Windows socket环境 WSACleanup(); #else for (int n = (int)_clients.size() - 1; n >= 0; n--) { close(_clients[n]->sockfd()); delete _clients[n]; } // 8 关闭套节字closesocket close(_sock); #endif _clients.clear(); } } //处理网络消息 int _nCount = 0; bool OnRun() { if (isRun()) { //伯克利套接字 BSD socket fd_set fdRead;//描述符(socket) 集合 fd_set fdWrite; fd_set fdExp; //清理集合 FD_ZERO(&fdRead); FD_ZERO(&fdWrite); FD_ZERO(&fdExp); //将描述符(socket)加入集合 FD_SET(_sock, &fdRead); FD_SET(_sock, &fdWrite); FD_SET(_sock, &fdExp); SOCKET maxSock = _sock; for (int n = (int)_clients.size() - 1; n >= 0; n--) { FD_SET(_clients[n]->sockfd(), &fdRead); if (maxSock < _clients[n]->sockfd()) { maxSock = _clients[n]->sockfd(); } } ///nfds 是一个整数值 是指fd_set集合中所有描述符(socket)的范围,而不是数量 ///既是所有文件描述符最大值+1 在Windows中这个参数可以写0 timeval t = { 1,0 }; int ret = select(maxSock + 1, &fdRead, &fdWrite, &fdExp, &t); // //printf("select ret=%d count=%d\n", ret, _nCount++); if (ret < 0) { printf("select任务结束。\n"); Close(); return false; } //判断描述符(socket)是否在集合中 if (FD_ISSET(_sock, &fdRead)) { FD_CLR(_sock, &fdRead); Accept(); } for (int n = (int)_clients.size() - 1; n >= 0; n--) { if (FD_ISSET(_clients[n]->sockfd(), &fdRead)) { if (-1 == RecvData(_clients[n])) { auto iter = _clients.begin() + n;//std::vector<SOCKET>::iterator if (iter != _clients.end()) { delete _clients[n]; _clients.erase(iter); } } } } return true; } return false; } //是否工作中 bool isRun() { return _sock != INVALID_SOCKET; } //缓冲区 char _szRecv[RECV_BUFF_SZIE] = {}; //接收数据 处理粘包 拆分包 int RecvData(ClientSocket* pClient) { // 5 接收客户端数据 int nLen = (int)recv(pClient->sockfd(), _szRecv, RECV_BUFF_SZIE, 0); //printf("nLen=%d\n", nLen); if (nLen <= 0) { printf("客户端<Socket=%d>已退出,任务结束。\n", pClient->sockfd()); return -1; } //将收取到的数据拷贝到消息缓冲区 memcpy(pClient->msgBuf() + pClient->getLastPos(), _szRecv, nLen); //消息缓冲区的数据尾部位置后移 pClient->setLastPos(pClient->getLastPos() + nLen); //判断消息缓冲区的数据长度大于消息头DataHeader长度 while (pClient->getLastPos() >= sizeof(DataHeader)) { //这时就可以知道当前消息的长度 DataHeader* header = (DataHeader*)pClient->msgBuf(); //判断消息缓冲区的数据长度大于消息长度 if (pClient->getLastPos() >= header->dataLength) { //消息缓冲区剩余未处理数据的长度 int nSize = pClient->getLastPos() - header->dataLength; //处理网络消息 OnNetMsg(pClient->sockfd(), header); //将消息缓冲区剩余未处理数据前移 memcpy(pClient->msgBuf(), pClient->msgBuf() + header->dataLength, nSize); //消息缓冲区的数据尾部位置前移 pClient->setLastPos(nSize); } else { //消息缓冲区剩余数据不够一条完整消息 break; } } return 0; } //响应网络消息 virtual void OnNetMsg(SOCKET cSock, DataHeader* header) { switch (header->cmd) { case CMD_LOGIN: { Login* login = (Login*)header; //printf("收到客户端<Socket=%d>请求:CMD_LOGIN,数据长度:%d,userName=%s PassWord=%s\n", cSock, login->dataLength, login->userName, login->PassWord); //忽略判断用户密码是否正确的过程 LoginResult ret; SendData(cSock, &ret); } break; case CMD_LOGOUT: { Logout* logout = (Logout*)header; //printf("收到客户端<Socket=%d>请求:CMD_LOGOUT,数据长度:%d,userName=%s \n", cSock, logout->dataLength, logout->userName); //忽略判断用户密码是否正确的过程 LogoutResult ret; SendData(cSock, &ret); } break; default: { printf("<socket=%d>收到未定义消息,数据长度:%d\n", cSock, header->dataLength); //DataHeader ret; //SendData(cSock, &ret); } break; } } //发送指定Socket数据 int SendData(SOCKET cSock, DataHeader* header) { if (isRun() && header) { return send(cSock, (const char*)header, header->dataLength, 0); } return SOCKET_ERROR; } void SendDataToAll(DataHeader* header) { for (int n = (int)_clients.size() - 1; n >= 0; n--) { SendData(_clients[n]->sockfd(), header); } } }; #endif // !_EasyTcpServer_hpp_
#include "EasyTcpServer.hpp" #include<thread> bool g_bRun = true; void cmdThread() { while (true) { char cmdBuf[256] = {}; scanf("%s", cmdBuf); if (0 == strcmp(cmdBuf, "exit")) { g_bRun = false; printf("退出cmdThread线程\n"); break; } else { printf("不支持的命令。\n"); } } } int main() { EasyTcpServer server; server.InitSocket(); server.Bind(nullptr, 4567); server.Listen(5); //启动UI线程 std::thread t1(cmdThread); t1.detach(); while (g_bRun) { server.OnRun(); //printf("空闲时间处理其它业务..\n"); } server.Close(); printf("已退出。\n"); getchar(); return 0; }
#ifndef _EasyTcpClient_hpp_ #define _EasyTcpClient_hpp_ #ifdef _WIN32 #define _CRT_SECURE_NO_WARNINGS #define _WINSOCK_DEPRECATED_NO_WARNINGS #define WIN32_LEAN_AND_MEAN #include<windows.h> #include<WinSock2.h> #pragma comment(lib,"ws2_32.lib") #else #include<unistd.h> //uni std #include<arpa/inet.h> #include<string.h> #define SOCKET int #define INVALID_SOCKET (SOCKET)(~0) #define SOCKET_ERROR (-1) #endif #include <stdio.h> #include "MessageHeader.hpp" class EasyTcpClient { SOCKET _sock; public: EasyTcpClient() { _sock = INVALID_SOCKET; } virtual ~EasyTcpClient() { Close(); } //初始化socket void InitSocket() { #ifdef _WIN32 //启动Windows socket 2.x环境 WORD ver = MAKEWORD(2, 2); WSADATA dat; WSAStartup(ver, &dat); #endif if (INVALID_SOCKET != _sock) { printf("<socket=%d>关闭旧连接...\n", _sock); Close(); } _sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (INVALID_SOCKET == _sock) { printf("错误,建立Socket失败...\n"); } else { printf("建立Socket=<%d>成功...\n", _sock); } } //连接服务器 int Connect(const char* ip, unsigned short port) { if (INVALID_SOCKET == _sock) { InitSocket(); } // 2 连接服务器 connect sockaddr_in _sin = {}; _sin.sin_family = AF_INET; _sin.sin_port = htons(port); #ifdef _WIN32 _sin.sin_addr.S_un.S_addr = inet_addr(ip); #else _sin.sin_addr.s_addr = inet_addr(ip); #endif printf("<socket=%d>正在连接服务器<%s:%d>...\n", _sock, ip, port); int ret = connect(_sock, (sockaddr*)&_sin, sizeof(sockaddr_in)); if (SOCKET_ERROR == ret) { printf("<socket=%d>错误,连接服务器<%s:%d>失败...\n", _sock, ip, port); } else { printf("<socket=%d>连接服务器<%s:%d>成功...\n", _sock, ip, port); } return ret; } //关闭套节字closesocket void Close() { if (_sock != INVALID_SOCKET) { #ifdef _WIN32 closesocket(_sock); //清除Windows socket环境 WSACleanup(); #else close(_sock); #endif _sock = INVALID_SOCKET; } } //处理网络消息 int _nCount = 0; bool OnRun() { if (isRun()) { fd_set fdReads; FD_ZERO(&fdReads); FD_SET(_sock, &fdReads); timeval t = { 0,0 }; int ret = select(_sock + 1, &fdReads, 0, 0, &t); //printf("select ret=%d count=%d\n", ret, _nCount++); if (ret < 0) { printf("<socket=%d>select任务结束1\n", _sock); Close(); return false; } if (FD_ISSET(_sock, &fdReads)) { FD_CLR(_sock, &fdReads); if (-1 == RecvData(_sock)) { printf("<socket=%d>select任务结束2\n", _sock); Close(); return false; } } return true; } return false; } //是否工作中 bool isRun() { return _sock != INVALID_SOCKET; } //缓冲区最小单元大小 #ifndef RECV_BUFF_SZIE #define RECV_BUFF_SZIE 10240 #endif // !RECV_BUFF_SZIE //第二缓冲区 消息缓冲区 char _szMsgBuf[RECV_BUFF_SZIE * 10] = {}; //消息缓冲区的数据尾部位置 int _lastPos = 0; //接收缓冲区 char _szRecv[RECV_BUFF_SZIE] = {}; //接收数据 处理粘包 拆分包 int RecvData(SOCKET cSock) { // 5 接收数据 int nLen = (int)recv(cSock, _szRecv, RECV_BUFF_SZIE, 0); //printf("nLen=%d\n", nLen); if (nLen <= 0) { printf("<socket=%d>与服务器断开连接,任务结束。\n", cSock); return -1; } //将收取到的数据拷贝到消息缓冲区 memcpy(_szMsgBuf + _lastPos, _szRecv, nLen); //消息缓冲区的数据尾部位置后移 _lastPos += nLen; //判断消息缓冲区的数据长度大于消息头DataHeader长度 while (_lastPos >= sizeof(DataHeader)) { //这时就可以知道当前消息的长度 DataHeader* header = (DataHeader*)_szMsgBuf; //判断消息缓冲区的数据长度大于消息长度 if (_lastPos >= header->dataLength) { //消息缓冲区剩余未处理数据的长度 int nSize = _lastPos - header->dataLength; //处理网络消息 OnNetMsg(header); //将消息缓冲区剩余未处理数据前移 memcpy(_szMsgBuf, _szMsgBuf + header->dataLength, nSize); //消息缓冲区的数据尾部位置前移 _lastPos = nSize; } else { //消息缓冲区剩余数据不够一条完整消息 break; } } return 0; } //响应网络消息 virtual void OnNetMsg(DataHeader* header) { switch (header->cmd) { case CMD_LOGIN_RESULT: { LoginResult* login = (LoginResult*)header; //printf("<socket=%d>收到服务端消息:CMD_LOGIN_RESULT,数据长度:%d\n", _sock, login->dataLength); } break; case CMD_LOGOUT_RESULT: { LogoutResult* logout = (LogoutResult*)header; //printf("<socket=%d>收到服务端消息:CMD_LOGOUT_RESULT,数据长度:%d\n", _sock, logout->dataLength); } break; case CMD_NEW_USER_JOIN: { NewUserJoin* userJoin = (NewUserJoin*)header; //printf("<socket=%d>收到服务端消息:CMD_NEW_USER_JOIN,数据长度:%d\n", _sock, userJoin->dataLength); } break; case CMD_ERROR: { printf("<socket=%d>收到服务端消息:CMD_ERROR,数据长度:%d\n", _sock, header->dataLength); } break; default: { printf("<socket=%d>收到未定义消息,数据长度:%d\n", _sock, header->dataLength); } } } //发送数据 int SendData(DataHeader* header) { if (isRun() && header) { return send(_sock, (const char*)header, header->dataLength, 0); } return SOCKET_ERROR; } private: }; #endif
#include "EasyTcpClient.hpp" #include<thread> bool g_bRun = true; void cmdThread() { while (true) { char cmdBuf[256] = {}; scanf("%s", cmdBuf); if (0 == strcmp(cmdBuf, "exit")) { g_bRun = false; printf("退出cmdThread线程\n"); break; } else { printf("不支持的命令。\n"); } } } int main() { //const int cCount = FD_SETSIZE - 1; const int cCount = 1000; EasyTcpClient* client[cCount]; for (int n = 0; n < cCount; n++) { client[n] = new EasyTcpClient(); } for (int n = 0; n < cCount; n++) { client[n]->Connect("127.0.0.1", 4567); //client[n]->Connect("192.168.58.129", 4567); } //启动UI线程 std::thread t1(cmdThread); t1.detach(); Login login; strcpy(login.userName, "lyd"); strcpy(login.PassWord, "lydmm"); while (g_bRun) { for (int n = 0; n < cCount; n++) { client[n]->SendData(&login); client[n]->OnRun(); } //printf("空闲时间处理其它业务..\n"); //Sleep(1000); } for (int n = 0; n < cCount; n++) { client[n]->Close(); } printf("已退出。\n"); getchar(); return 0; }