1.向web端发送二进制数据与文本数据时注意
//lws_write(wsi, data->buf+LWS_PRE, isz, LWS_WRITE_BINARY);
//lws_write(wsi, data->buf+LWS_PRE, isz, LWS_WRITE_TEXT);
2.使用信号量 一生产者一消费者同步
3.未尝试将senddata数据直接拷贝到data->buf+ LWS_PRE后面区域, 而不借助成员变量.
map<long,string>map_wsi_token;// 存wsi指针为key, token为字符串
static volatile int exit_sig = 0;
#define MAX_PAYLOAD_SIZE 13 * 1024
void sighdl(int sig) {
lwsl_notice("%d traped", sig);
exit_sig = 1;
}
// 传输图像
struct lws* g_wsi = NULL;
long g_wsi_port = 9000;
char* g_wsi_protocolname = "hello";
MyThread * g_pThread = NULL;
/**
* 会话上下文对象,结构根据需要自定义
*/
struct session_data {
int msg_count;
unsigned char buf[LWS_PRE + MAX_PAYLOAD_SIZE];
int len;
bool bin;
bool fin;
};
static int protocol_my_callback(struct lws* wsi, enum lws_callback_reasons reason, void* user, void* in, size_t len) {
struct session_data* data = (struct session_data*)user;
switch (reason)
{
case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION: //这里的wsi中才有url内容
// 客户端连上来看看是否最新token
{
char content_length_str[200] = {0};
lws_hdr_copy(wsi,content_length_str,200,(lws_token_indexes)0);
char chartoken[200] = {0};
g_pThread->GetNewToken(chartoken);
if (chartoken[0]=='\0') // 没有最新的token
{
return -1;
}
else
{
if(strstr(content_length_str,chartoken) != NULL)
{
map_wsi_token.insert(pair<long,string>(long(wsi),string(content_length_str)));
g_wsi = wsi;
OutputDebugStringA("PROTOCOL_CONNECTION \r\n");
}
else
return -1;
}
}
break;
case LWS_CALLBACK_ESTABLISHED: // 当服务器和客户端完成握手后
OutputDebugStringA("LWS_CALLBACK_ESTABLISHED \r\n");
break;
case LWS_CALLBACK_RECEIVE: // 当接收到客户端发来的帧,首先验证token是否有效
{
map<long,string>::iterator iter = map_wsi_token.find(long(wsi));
if(iter != map_wsi_token.end())
{
char chartoken[200] = {0};
g_pThread->GetNewToken(chartoken);
string strtoken = iter->second;
if(strtoken.find(chartoken) == -1)
{
return -1;
}
}
else
{
return -1;
}
}
break;
case LWS_CALLBACK_SERVER_WRITEABLE: // 当此连接可写时
OutputDebugStringA("write WaitForSingleObject g_hFullBufferSemaphore\r\n");
WaitForSingleObject(g_pThread->g_hFullBufferSemaphore, INFINITE); // 等待是否已经生产了产品
if(g_pThread->_outputbuf != NULL)
{
// 前面LWS_PRE个字节必须留给LWS
int isz = g_pThread->_outsize;//sizeof(g_pThread->_outputbuf);
memset(data->buf+ LWS_PRE, 0, isz);
memcpy(data->buf+ LWS_PRE, g_pThread->_outputbuf, isz);
data->len = isz;
lws_write(wsi, data->buf+LWS_PRE, isz, (lws_write_protocol)g_pThread->_datatype);
// 下面的调用允许在此连接上接收数据;
lws_rx_flow_control(wsi, 1);
g_pThread->_outsize = 0;
}
ReleaseSemaphore(g_pThread->g_hEmptyBufferSemaphore, 1, NULL); // 我消费完了,通知生产者
break;
case LWS_CALLBACK_LOCK_POLL:
case LWS_CALLBACK_UNLOCK_POLL:
break;
case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: // 客户端断了链接
{
map<long,string>::iterator iter = map_wsi_token.find(long(wsi));
if(iter != map_wsi_token.end())
{
map_wsi_token.erase(iter);
g_wsi = NULL;
}
}
break;
}
// 回调函数最终要返回0,否则无法创建服务器
return 0;
}
struct lws_protocols protocols[] = {
{
//协议名称,协议回调,接收缓冲区大小 ,有效负载限制
g_wsi_protocolname, protocol_my_callback, sizeof(struct session_data), MAX_PAYLOAD_SIZE,
},
{
NULL, NULL, 0 // 最后一个元素固定为此格式
}
};
MyThread::MyThread(void)
{
irun = true;
memset(newtoken,0,128);
_outputbuf= NULL;
_datatype = 0;
}
MyThread::~MyThread(void)
{
StartThread(false);
CloseHandle(g_hEmptyBufferSemaphore);
CloseHandle(g_hFullBufferSemaphore);
if(_outputbuf != NULL)
{
free (_outputbuf);
_outputbuf = NULL;
}
}
void MyThread::Run()
{
// 信号处理函数
signal(SIGTERM, sighdl);
struct lws_context_creation_info ctx_info = { 0 };
ctx_info.port = g_wsi_port;
ctx_info.iface = NULL; // 在所有网络接口上监听
ctx_info.protocols = protocols;
ctx_info.gid = -1;
ctx_info.uid = -1;
ctx_info.options = LWS_SERVER_OPTION_VALIDATE_UTF8;
struct lws_context* context = lws_create_context(&ctx_info);
while (!exit_sig) {
lws_service(context, 50);
if(g_wsi != NULL)
lws_callback_on_writable(g_wsi);
OutputDebugStringA("lws_service 100\r\n");
}
OutputDebugStringA("lws_service exit_sig\r\n");
lws_context_destroy(context);
}
int MyThread::SendData(unsigned char *outbuffer,int size,int datatype)
{
OutputDebugStringA("WaitForSingleObject g_hEmptyBufferSemaphore \r\n");
// 等待消费线程,之后等来时计数-1, 初始该信号为1,
WaitForSingleObject(g_hEmptyBufferSemaphore, INFINITE);
if(isznote <size)
{
isznote = size;
_outputbuf = (unsigned char*)realloc(_outputbuf,isznote*sizeof(unsigned char));
memset(_outputbuf,0,isznote*sizeof(unsigned char));
char info[100] = {0};
sprintf(info,"**sz:%d \r\n",isznote*sizeof(unsigned char));
OutputDebugStringA(info);
}
_datatype = datatype;
memcpy(_outputbuf,outbuffer,size*sizeof(unsigned char));
_outsize = size;
if(g_wsi != NULL)
lws_callback_on_writable(g_wsi); //需要给客户端应答时,触发一次写回调
ReleaseSemaphore(g_hFullBufferSemaphore, 1, NULL); //通知消费线程,信号+1,
OutputDebugStringA("ReleaseSemaphore g_hFullBufferSemaphore \r\n");
return 0;
}
void MyThread::GetNewToken(char* info)
{
strcpy(info,newtoken);
}
void MyThread::SetNewToken(const char* info)
{
memset(newtoken,0,128);
strcpy(newtoken,info);
}
int MyThread::ThreadInit(long port,const char* protocolname)
{
g_event = CreateEvent(NULL, false, false, NULL); //创建一个匿名事件,当参数bManualReset设置为false时
g_pThread = this;
g_wsi_protocolname = (char*)protocolname;
protocols[0].name = g_wsi_protocolname;
g_wsi_port = port;
if(_outputbuf == NULL)
{
isznote = 4096;
_outputbuf = (unsigned char*)malloc(isznote*sizeof(unsigned char*));
}
g_hEmptyBufferSemaphore = CreateSemaphore( NULL, 1, 1, NULL); //创建信号量1
g_hFullBufferSemaphore = CreateSemaphore( NULL, 0, 1, NULL); //
return 0;
}
void MyThread::StartThread(bool brun)
{
if(brun)
{
thread=new CThreadHandler(this);//线程类
thread->Start();//启动线程
}
else
{
exit_sig = 0;// 设置退出信号
int code = thread->Join(5000);
if(code == STILL_ACTIVE)
{
bool bstop = thread->Terminate(code);
if(bstop== false)
{
printf("thread is not terminate \r\n");
}
}
else
{
printf("thread is not STILL_ACTIVE \r\n");
}
}
}