HP-Socket 是一套通用的高性能 TCP/UDP /HTTP 通信 框架 ,包含服务端组件、客户端组件和 Agent 组件,广泛适用于各种不同应用场景的 TCP/UDP /HTTP 通信系统,提供 C/C++ 、 C# 、 Delphi 、 E (易语言)、 Java 、 Python 等编程语言接口。
HP-Socket是一套国产的开源通讯库,使用C++语言实现,提供多种编程语言的接口,支持 Windows 和 Linux 平台:
HP-Socket包含30多个组件 ,可根据通信角色Client/Server)、通信协议TCP/UDP/HTTP)和接收模型PUSH/PULL/PACK)进行归类,这里只简单介绍一下:
HP-Socket的TCP组件支持PUSH、PULL和PACK三种接收模型:
注:PACK模型组件会对应用程序发送的每个数据包自动加上 4 字节(32位的包头),前10位为用于数据包校验的包头标识位,后22位为记录包体长度的长度位。
HP-Socket支持MBCS和Unicode字符集,支持32位和64位应用程序。可以通过源代码、 DLL或LIB方式使用HP-Socket。 HP-Socket发行包中已经提供了HPSocket DLL和HPSocket4C DLL。
HP-Socket提供了各种情况下的dll文件,不需要我们重新编译,dll文件按编程接口分为两大类:
使用HP-Socket的线程池组件可以在程序中实现一个简单的、公用的线程池,TCP通讯的断线重连、发送心跳都会用到线程池。线程池组件的主要函数如下:
先实现线程池的CHPThreadPoolListener接口,然后构造IHPThreadPool智能指针,后面线程池的操作都通过智能指针操作,代码如下:
class CHPThreadPoolListenerImpl : public CHPThreadPoolListener { private: void LogInfo(string logStr) { cout <<"ThreadPool " <<logStr << endl; } public: virtual void OnStartup(IHPThreadPool* pThreadPool) { LogInfo("线程池启动"); } virtual void OnShutdown(IHPThreadPool* pThreadPool) { LogInfo("线程池启动关闭"); } virtual void OnWorkerThreadStart(IHPThreadPool* pThreadPool, THR_ID dwThreadID) { LogInfo("[" + to_string(dwThreadID) + "] " + "工作线程启动"); } virtual void OnWorkerThreadEnd(IHPThreadPool* pThreadPool, THR_ID dwThreadID) { LogInfo("[" + to_string(dwThreadID) + "] " + "工作线程退出"); } }; CHPThreadPoolListenerImpl ThreadPoolListener; //全局共享变量使用extern关键字修饰 extern CHPThreadPoolPtr ThreadPool(&ThreadPoolListener);
先实现一个打印函数,显示客户端相关的信息,代码如下:
void PrintInfo(ITcpClient* pSender, CONNID dwConnID) { char buffer[20]; TCHAR* ipAddr = buffer; int ipLen; USHORT port; pSender->GetLocalAddress(ipAddr, ipLen, port); cout << string(ipAddr,0,ipLen) << ":" << port << " " << " [" << dwConnID << "] -> "; pSender->GetRemoteHost(ipAddr, ipLen, port); cout << string(ipAddr, 0, ipLen) << ":" << port << " "; }
实现CTcpClientListener监听接口,客户端断线后自动重连,以换行符分割接收到的字符串,代码如下:
bool SysExit = false; void ReConnect(ITcpClient* pSender) { while (pSender->GetState() != SS_STOPPED) { Sleep(10); } pSender->Start("127.0.0.1", 60000); } class CClientListenerImpl : public CTcpClientListener { public: virtual EnHandleResult OnConnect(ITcpClient* pSender, CONNID dwConnID) { PrintInfo(pSender, dwConnID); cout << "连接成功" << endl; return HR_OK; } string resStr = ""; string commStr=""; virtual EnHandleResult OnReceive(ITcpClient* pSender, CONNID dwConnID, const BYTE* pData, int iLength) { string str((char*)pData,0, iLength); resStr.append(str); int index; while (true) { index = resStr.find("\r\n"); if (index == -1)break; commStr = resStr.substr(0, index); resStr = resStr.substr(index +2, resStr.length() - (index +2)); if (commStr!="") { PrintInfo(pSender, dwConnID); cout << "收到分割字符串 " << commStr << endl; } } PrintInfo(pSender, dwConnID); cout << "数据接受 " << str << endl; return HR_OK; } virtual EnHandleResult OnClose(ITcpClient* pSender, CONNID dwConnID, EnSocketOperation enOperation, int iErrorCode) { resStr = ""; PrintInfo(pSender, dwConnID); cout << "连接断开,"<< enOperation <<"操作导致错误,错误码 " << iErrorCode<< endl; if (!SysExit) { ThreadPool->Submit((Fn_TaskProc)(&ReConnect), (PVOID)pSender); } return HR_OK; } };
循环输入字符串发送服务端,代码如下:
int main() { //启动线程池 ThreadPool->Start(); CClientListenerImpl listener; CTcpClientPtr client(&listener); if (!client->Start("127.0.0.1", 60000)) { cout << "连接错误:" << client->GetLastError() << "-" << client->GetLastErrorDesc(); } string sendMsg; while (!SysExit) { cin >> sendMsg; if (sendMsg == "esc") { SysExit = true; break; } if (client->GetState() == SS_STARTED) { const BYTE* data = (BYTE*)(sendMsg.c_str()); if (client->Send(data, sizeof(data))) { PrintInfo(client, client->GetConnectionID()); cout << "发送成功 "<<sendMsg<<endl; } else { PrintInfo(client, client->GetConnectionID()); cout << "发送失败,错误描述 " << client->GetLastError() << "-" << client->GetLastErrorDesc() << endl; } } else { PrintInfo(client, client->GetConnectionID()); cout << "无法发送,当前状态 " <<client->GetState()<< endl; } } client->Stop(); //关闭线程池 ThreadPool->Stop(); return 0; }
先实现一个打印函数,基本上和客户端的相同,只有获取本地IP的地方不同,代码如下:
void PrintInfo(ITcpServer* pSender, CONNID dwConnID) { char buffer[20]; TCHAR* ipAddr = buffer; int ipLen; USHORT port; pSender->GetListenAddress(ipAddr, ipLen, port); cout << string(ipAddr, 0, ipLen) << ":" << port << " " << "<- [" << dwConnID << "] "; pSender->GetRemoteAddress(dwConnID, ipAddr, ipLen, port); cout << string(ipAddr, 0, ipLen) << ":" << port << " "; }
为了演示客户端和应用数据的绑定,定义一个用户数据类型并创建一个队列,代码如下:
class UserData { public: UserData(string name="") { Name = name; } string Name; }; queue<UserData*> qName; //创建队列对象
实现CTcpServerListener监听接口,收到字符串后加上用户名再发送回去,代码如下:
class CTcpServerListenerImpl : public CTcpServerListener { public: virtual EnHandleResult OnAccept(ITcpServer* pSender, CONNID dwConnID, UINT_PTR soClient) { pSender->SetConnectionExtra(dwConnID,qName.front()); qName.pop(); PrintInfo(pSender, dwConnID); cout << "连接成功" << endl; return HR_OK; } virtual EnHandleResult OnReceive(ITcpServer* pSender, CONNID dwConnID, const BYTE* pData, int iLength) { string str((char*)pData, 0, iLength); PrintInfo(pSender, dwConnID); cout << "数据接受 " << str<<endl; PVOID pInfo = nullptr; pSender->GetConnectionExtra(dwConnID, &pInfo); str = "reply-" + ((UserData*)pInfo)->Name + str; const BYTE* data = (BYTE*)(str.c_str()); pSender->Send(dwConnID, data,str.size()); return HR_OK; } virtual EnHandleResult OnClose(ITcpServer* pSender, CONNID dwConnID, EnSocketOperation enOperation, int iErrorCode) { PVOID pInfo = nullptr; pSender->GetConnectionExtra(dwConnID, &pInfo); qName.push((UserData*)pInfo); PrintInfo(pSender, dwConnID); cout << "断开连接"<< endl; pSender->SetConnectionExtra(dwConnID, NULL); return HR_OK; } };
循环输入字符串发送到客户端,自动回复客户端发送的消息,代码如下:
bool SysExit = false; int main() { UserData user1("NO1-User"); UserData user2("NO2-User"); UserData user3("NO3-User"); UserData user4("NO4-User"); qName.push(&user1); qName.push(&user2); qName.push(&user3); qName.push(&user4); CTcpServerListenerImpl listener; CTcpServerPtr server(&listener); if (!server->Start("127.0.0.1", 60000)) { cout << "启动错误:" << server->GetLastError() << "-" << server->GetLastErrorDesc(); } string sendMsg; while (!SysExit) { cin >> sendMsg; if (sendMsg == "esc") { SysExit = true; break; } //如果数组长度小于当前连接数量,则获取失败 DWORD count= 1000; CONNID pIDs[1000]; ZeroMemory(pIDs, 1000);; if (server->GetAllConnectionIDs(pIDs, count)&& count >0) { for (size_t i = 0; i < count; i++) { const BYTE* data = (BYTE*)(sendMsg.c_str()); if (server->Send(*(pIDs+i),data, sendMsg.size())) { PrintInfo(server, pIDs[i]); cout << "发送成功 " << sendMsg << endl; } else { PrintInfo(server, pIDs[i]); cout << "发送失败,错误描述 " << server->GetLastError() << "-" << server->GetLastErrorDesc() << endl; } } } else { cout << "无法发送,当前连接数 " << count << endl; } } server->Stop(); }
注:获取连接时指针数组的长度一定要大于当前连接数量,否则会失败。
HP-Socket的Http客户端有同步、异步两种,同步客户端不需要绑定监听器,这里使用同步客户端演示。
Sync Client:同步HTTP客户端组件(CHttpSyncClient和CHttpsSyncClient)内部会处理所有事件,因此,它们不需要绑定监听器(构造方法的监听器参数传入null); 如果绑定了监听器则可以跟踪组件的通信过程。
测试客户端可以使用实时天气接口上面的测试示例,当前的测试示例为:
http://api.k780.com/?app=weather.today&weaId=1&appkey=10003&sign=b59bc3ef6191eb9f747dd4e83c99f2a4&format=json
直接开始测试,代码如下:
int main() { CHttpSyncClientPtr SyncClient; THeader type; type.name = "Content-Type"; type.value = "text/html;charset=UTF-8"; if (SyncClient->OpenUrl("GET", "http://api.k780.com/?app=weather.today&weaId=1&appkey=10003&sign=b59bc3ef6191eb9f747dd4e83c99f2a4&format=json",&type)) { LPCBYTE pData = nullptr; int iLength = 0; SyncClient->GetResponseBody(&pData, &iLength); string body((char*)pData, iLength); //返回的有中文,需要转化编码格式 cout << body << endl; cout << endl; cout << StringToUtf(body) << endl; cout << endl; cout << UtfToString(StringToUtf(body)) << endl; } else { cout << "打开失败:"<<SyncClient->GetLastError()<<"-"<< SyncClient->GetLastErrorDesc()<<endl; } }
上面的StringToUtf和UtfToString函数是转载至C++ 中文乱码的问题,该函数实现UTF-8和ANSI编码格式的转化,代码如下:
string UtfToString(string strValue) { int nwLen = ::MultiByteToWideChar(CP_ACP, 0, strValue.c_str(), -1, NULL, 0); wchar_t* pwBuf = new wchar_t[nwLen + 1];//加上末尾'\0' ZeroMemory(pwBuf, nwLen * 2 + 2); ::MultiByteToWideChar(CP_ACP, 0, strValue.c_str(), strValue.length(), pwBuf, nwLen); int nLen = ::WideCharToMultiByte(CP_UTF8, 0, pwBuf, -1, NULL, NULL, NULL, NULL); char* pBuf = new char[nLen + 1]; ZeroMemory(pBuf, nLen + 1); ::WideCharToMultiByte(CP_UTF8, 0, pwBuf, nwLen, pBuf, nLen, NULL, NULL); std::string retStr(pBuf); delete[]pwBuf; delete[]pBuf; pwBuf = NULL; pBuf = NULL; return retStr; } string StringToUtf(string strValue) { int nwLen = MultiByteToWideChar(CP_UTF8, 0, strValue.c_str(), -1, NULL, 0); wchar_t* pwBuf = new wchar_t[nwLen + 1];//加上末尾'\0' memset(pwBuf, 0, nwLen * 2 + 2); MultiByteToWideChar(CP_UTF8, 0, strValue.c_str(), strValue.length(), pwBuf, nwLen); int nLen = WideCharToMultiByte(CP_ACP, 0, pwBuf, -1, NULL, NULL, NULL, NULL); char* pBuf = new char[nLen + 1]; memset(pBuf, 0, nLen + 1); WideCharToMultiByte(CP_ACP, 0, pwBuf, nwLen, pBuf, nLen, NULL, NULL); std::string retStr = pBuf; delete[]pBuf; delete[]pwBuf; return retStr; }
注:函数实现需放在main函数之前。