最近在总结之前做的恋爱交由平台的项目。在优化服务器时,将一开始使用的同步阻塞+多线程,替换为现在的epoll+线程池模型。提高了并发的能力,可以实现C10k的目标。
因此,特写此文,用来记录epoll+线程池模型。为相同需求的同学提供优化思路。
int TcpNet::InitNetWork() { pool_t *pool = NULL; m_pool = new thread_pool; bzero(&serveraddr,sizeof(serveraddr)); serveraddr.sin_family = AF_INET; if(inet_pton(AF_INET,_DEF_SERVERIP,&serveraddr.sin_addr.s_addr) == -1) { perror("Init Ip Error:"); return FALSE; } serveraddr.sin_port = htons(_DEF_PORT); //创建Socket if((sockfd = socket(AF_INET,SOCK_STREAM,0)) == -1) { perror("Create Socket Error:"); return FALSE; } int mw_optval; setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,(char*)&mw_optval,sizeof(mw_optval)); //绑定端口号 if(bind(sockfd,(struct sockaddr*)&serveraddr,sizeof(serveraddr)) == -1) { perror("Bind Socket Error:"); return FALSE; } //监听socket if(listen(sockfd,_DEF_LISTEN) == -1) { perror("Listen Error:"); return FALSE; } epfd = epoll_create(_DEF_EPOLLSIZE);//创建epoll的句柄,可以监听的文件描述符为_DEF_EPOLLSIZE Addfd(sockfd,TRUE); //创建拥有10个线程的线程池 最大线程数200 环形队列最大值50 if((pool = (m_pool->Pool_create(200,10,50))) == NULL) err_str("Create Thread_Pool Failed:",-1); m_pool->Producer_add(pool, EPOLL_Jobs, pool); return TRUE; }
// 这个函数是将要监控的socket挂载到红黑树上 void TcpNet::Addfd(int fd,int enable_et/*是否为边缘触发*/) { struct epoll_event eptemp; eptemp.events = EPOLLIN; // 对应文件描述符可读 eptemp.data.fd = fd; // 文件描述符 if(enable_et) eptemp.events |= EPOLLET; epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&eptemp); }
// 这个是将 epoll_jobs函数加入到线程池任务队列中 m_pool->Producer_add(pool, EPOLL_Jobs, pool);
// epoll工作内容 epoll_wait void * TcpNet::EPOLL_Jobs(void * arg) { pool_t *pool = (pool_t*)arg; int ready; int i = 0; while(1) { //阻塞-1监听socket printf("%d\n",i++); //epoll_wait返回值是就绪事件的个数 if((ready = epoll_wait(m_pThis->epfd,m_pThis->epollarr,_DEF_EPOLLSIZE,-1)) == -1) err_str("Epoll Call Failed:",-1);//出错 //调用epoll_deal 处理就绪的fd m_pThis->Epoll_Deal(ready,pool); bzero(m_pThis->epollarr,sizeof(epollarr)); } }
// 处理就绪事件 根据继续个数,处理就绪事件 // 在这里判断是 客户端连接还是 // 客户端连接的话,就交个线程,执行Accept_Deal建立连接的任务 // 是读事件就绪,则分配一个线程执行读取内核缓冲区的内容,并且 void TcpNet::Epoll_Deal(int ready,pool_t *pool) { int i = 0; for(i=0; i<ready; i++) { int fd = epollarr[i].data.fd;//从epollarr中读出就绪文件描述符 if(sockfd == fd) //客户端建立链接 m_pool->Producer_add(pool,Accept_Deal,NULL); else if(epollarr[i].events & EPOLLIN)//如果是IO事件,则删除fd,将读线程加入队列 { Deletefd(fd); m_pool->Producer_add(pool,Info_Recv,(void*)fd); } } }
// 创建客户端连接 void *TcpNet::Accept_Deal(void *arg) { struct sockaddr_in clientaddr; int clientsize = sizeof(clientaddr); int clientfd; char ipstr[_DEF_IPSIZE]; pthread_mutex_lock(&m_pThis->alock); if((clientfd = accept(m_pThis->sockfd,(struct sockaddr*)&clientaddr,(socklen_t*)&clientsize)) == -1) { err_str("Custom Thread Accept Error",-1); } pthread_mutex_unlock(&m_pThis->alock); m_pThis->Addfd(clientfd,TRUE); printf("Custom Thread TID:0x%x\tClient IP:%s\tClient PORT:%d\t\n",(unsigned int)pthread_self() ,inet_ntop(AF_INET,&clientaddr.sin_addr.s_addr,ipstr,sizeof(ipstr)),ntohs(clientaddr.sin_port)); return 0; }
void *TcpNet::Info_Recv(void *arg) { int clientfd = (long)arg; int nRelReadNum = 0; int nPackSize = 0; char *pSzBuf = NULL; nRelReadNum = recv(clientfd,&nPackSize,sizeof(nPackSize),0); if(nRelReadNum <= 0) { // m_pThis->Deletefd(clientfd); close(clientfd); return NULL; } pSzBuf = (char*)malloc(sizeof(char)*nPackSize);//先收到的是包大小 int nOffSet = 0; nRelReadNum = 0; //接收包的数据 while(nPackSize) { nRelReadNum = recv(clientfd,pSzBuf+nOffSet,nPackSize,0); if(nRelReadNum > 0) { nOffSet += nRelReadNum; nPackSize -= nRelReadNum; } } m_pThis->m_kernel->DealData(clientfd,pSzBuf,nOffSet); m_pThis->Addfd(clientfd,TRUE ); printf("pszbuf = %p \n",pSzBuf); if(pSzBuf != NULL) { free(pSzBuf); pSzBuf = NULL; } return 0; }
分为三部分:管理线程(负责线程数的扩容与缩减)、任务队列(负责向任务队列填装任务)、工作线程(负责从任务队列取任务并处理)。
其中任务队列、工作线程采用生产者-消费者模型.
采用互斥量+条件变量实现线程同步。
/* * 函数任务: 创建线程 * max:最大线程数 * min:最少线程数 * que_max:队列最大长度 */ pool_t *thread_pool::Pool_create(int max,int min,int que_max) { pool_t *p; if((p = (pool_t*)malloc(sizeof(pool_t))) == NULL) { err_str("malloc pool error:",-1); } p->thread_max = max; p->thread_min = min; p->thread_alive = 0; p->thread_busy = 0; p->thread_shutdown = TRUE; p->thread_wait = 0; p->queue_max = que_max; p->queue_cur = 0; p->queue_front = 0; p->queue_rear = 0; if(pthread_cond_init(&p->not_full,NULL)!=0 || pthread_cond_init(&p->not_empty,NULL)!=0 || pthread_mutex_init(&p->lock,NULL)!=0) { err_str("init cond or mutex error:",-1); } if((p->tids = (pthread_t*)malloc(sizeof(pthread_t)*max)) == NULL) { err_str("malloc tids error:",-1); } bzero(p->tids,sizeof(pthread_t)*max); if((p->queue_task = (task_t*)malloc(sizeof(task_t)*que_max))==NULL) { err_str("malloc task queue error:",-1); } int err; for(int i=0; i<min; i++) { if((err = pthread_create(&p->tids[i],NULL, Custom,(void*)p))>0) { printf("create custom error:%s\n",strerror(err)); return NULL; } ++(p->thread_alive); } if((err = pthread_create(&(p->manager_tid),NULL, Manager,(void*)p))>0) { printf("create Manager error:%s\n",strerror(err)); return NULL; } return p; } /* * 根据传进来的任务函数,和任务参数 * 将任务投递到队列中 (生产者) */ int thread_pool::Producer_add(pool_t * p,void *(task)(void *arg),void *arg) { pthread_mutex_lock(&p->lock); while(p->queue_cur == p->queue_max && p->thread_shutdown ) { // 当线程个数等于最大线程个数时 ,挂起线程,解锁 条件变量 pthread_cond_wait(&p->not_full,&p->lock); } if(!p->thread_shutdown ) { pthread_mutex_unlock(&p->lock); return -1; } p->queue_task[p->queue_front].task = task; // 将任务投递到队列中 p->queue_task[p->queue_front].arg = arg; // 任务函数的参数 p->queue_front = (p->queue_front + 1) % p->queue_max; // 将front后移 ++(p->queue_cur); //更新当前节点 pthread_cond_signal(&p->not_empty); pthread_mutex_unlock(&p->lock); return 0; } // 工作线程--负责从任务队列中取出任务并执行 // 消费者 void * thread_pool::Custom(void * arg) { pool_t * p = (pool_t*)arg; task_t task; while(p->thread_shutdown) { pthread_mutex_lock(&p->lock); while(p->queue_cur == 0 && p->thread_shutdown ) { pthread_cond_wait(&p->not_empty,&p->lock); } if(!p->thread_shutdown ) { pthread_mutex_unlock(&p->lock); pthread_exit(NULL); } if(p->thread_wait > 0 && p->thread_alive > p->thread_min) { --(p->thread_wait); --(p->thread_alive); pthread_mutex_unlock(&p->lock); pthread_exit(NULL); } task.task = p->queue_task[p->queue_rear].task; task.arg = p->queue_task[p->queue_rear].arg; p->queue_rear = (p->queue_rear + 1) % p->queue_max; --(p->queue_cur); pthread_cond_signal(&p->not_full); ++(p->thread_busy); pthread_mutex_unlock(&p->lock); //执行核心工作 (*task.task)(task.arg); // 调用任务函数 pthread_mutex_lock(&p->lock); --(p->thread_busy); pthread_mutex_unlock(&p->lock); } return 0; } // 管理线程 用于扩容和缩减线程个数 void *thread_pool::Manager(void *arg) { pool_t * p = (pool_t *)arg; int alive; int cur; int busy; int add = 0; while(p->thread_shutdown ) { pthread_mutex_lock(&p->lock); alive = p->thread_alive; busy = p->thread_busy; cur = p->queue_cur; pthread_mutex_unlock(&p->lock); if((cur > alive - busy || (float)busy / alive*100 >= (float)80 ) || p->thread_max > alive) { for(int i=0; i<(p->thread_max)&&add<_DEF_COUNT; i++,add++) { pthread_mutex_lock(&p->lock); if(p->tids[i] == 0 || !if_thread_alive(p->tids[i])) { pthread_create(&p->tids[i],NULL,Custom,(void*)p); ++(p->thread_alive); } pthread_mutex_unlock(&p->lock); } } if(busy *2 < alive - busy && alive > p->thread_min) { pthread_mutex_lock(&p->lock); p->thread_wait = _DEF_COUNT; pthread_mutex_unlock(&p->lock); for(int i=0; i<_DEF_COUNT; i++) { pthread_cond_signal(&p->not_empty); } } sleep(_DEF_TIMEOUT); } return 0; }