协程 yield 和 resume 的第三种情况,也就是会发生协程切换的第三种情况,即调用 read(), write() 等 I/O 操作而陷入 “阻塞”和最后又恢复执行的过程,需要注意的是, 这里的“阻塞”依然是用户态实现的过程。我们知道,libco 的协程是在底层线程上串行 执行的。如果调用 read 或 write 等系统调用陷入真正的阻塞(让当前线程被内核挂起) 的话,那么不光当前协程被挂起了,其他协程也得不到执行的机会。因此,如果工作协 程陷入真正的内核态阻塞,那么 libco 程序就会完全停止运转,后果是很严重的。这需要我们在调用系统调用的时候加一些代码,伪装成“同步阻塞式”的调用,这也是使用hook的原因。
源码如下
1 typedef ssize_t (*read_pfn_t)(int fildes, void *buf, size_t nbyte); 2 static read_pfn_t g_sys_read_func = (read_pfn_t)dlsym(RTLD_NEXT,"read"); 3 4 ssize_t read( int fd, void *buf, size_t nbyte ) 5 { 6 HOOK_SYS_FUNC( read ); 7 8 // 如果目前线程没有一个协程, 则直接执行系统调用 9 if( !co_is_enable_sys_hook() ) 10 { // dlsym以后得到的原函数 11 return g_sys_read_func( fd,buf,nbyte ); 12 } 13 // 获取这个文件描述符的详细信息 14 rpchook_t *lp = get_by_fd( fd ); 15 16 // 套接字为非阻塞的,直接进行系统调用 17 if( !lp || ( O_NONBLOCK & lp->user_flag ) ) 18 { 19 ssize_t ret = g_sys_read_func( fd,buf,nbyte ); 20 return ret; 21 } 22 23 // 套接字阻塞 24 int timeout = ( lp->read_timeout.tv_sec * 1000 ) 25 + ( lp->read_timeout.tv_usec / 1000 ); 26 27 struct pollfd pf = { 0 }; 28 pf.fd = fd; 29 pf.events = ( POLLIN | POLLERR | POLLHUP ); 30 31 // 调用co_poll, co_poll中会切换协程, 32 // 协程被恢复时将会从co_poll中的挂起点继续运行 33 int pollret = poll( &pf,1,timeout ); 34 35 // 套接字准备就绪或者超时 执行hook前的系统调用 36 ssize_t readret = g_sys_read_func( fd,(char*)buf ,nbyte ); 37 38 if( readret < 0 ) // 超时 39 { 40 co_log_err("CO_ERR: read fd %d ret %ld errno %d poll ret %d timeout %d", 41 fd,readret,errno,pollret,timeout); 42 } 43 // 成功读取 44 return readret; 45 46 }
rpchook_t结构体
1 //还有一个很重要的作用就是在libco中套接字在hook后的fcntl中都设置为非阻塞,这里保存了套接字原有的阻塞属性 2 3 struct rpchook_t 4 { 5 int user_flag; // 记录套接字的状态 6 struct sockaddr_in dest; //maybe sockaddr_un; // 套机字目标地址 7 int domain; //AF_LOCAL->域套接字 , AF_INET->IP // 套接字类型 8 9 struct timeval read_timeout; // 读超时时间 10 struct timeval write_timeout; // 写超时时间 11 };
为了使用户无感的从同步切换成异步,我们需要把用户实际创建的阻塞套接字转化成非阻塞套接字,为了可以将用户传进来的套接字原封不动的返回给用户,就需要保存用户将套接字刚刚传进来的时候的所有属性,然后在用户需要的时候传递给用户。
可以看到当用户设置的套接字属性本来就是非阻塞的时候直接调用原read即可。否则就是把目标fd注册到epoll中,等待fd事件来临或者超时时切换回来即可,poll帮我们做了这一切。最后就是判断read的返回值啦。
源码如下:
1 ssize_t write( int fd, const void *buf, size_t nbyte ) 2 { 3 HOOK_SYS_FUNC( write ); 4 5 if( !co_is_enable_sys_hook() ) 6 { 7 return g_sys_write_func( fd,buf,nbyte ); 8 } 9 rpchook_t *lp = get_by_fd( fd ); 10 11 // 我觉得这里有必要再强调一遍,user_flag是用户设定的,但对于libco来说, 12 // 所有由hook函数创建的套接字对系统来说都是非阻塞的 13 if( !lp || ( O_NONBLOCK & lp->user_flag ) ) 14 { 15 ssize_t ret = g_sys_write_func( fd,buf,nbyte ); 16 return ret; 17 } 18 size_t wrotelen = 0; //已写的长度 19 int timeout = ( lp->write_timeout.tv_sec * 1000 ) // 计算超时时间 20 + ( lp->write_timeout.tv_usec / 1000 ); 21 22 // 因为TCP协议的原因,有时可能因为ask中接收方窗口小于write大小的原因无法发满 23 ssize_t writeret = g_sys_write_func( fd,(const char*)buf + wrotelen,nbyte - wrotelen ); 24 25 if (writeret == 0) 26 { 27 return writeret; 28 } 29 30 if( writeret > 0 ) 31 { 32 wrotelen += writeret; 33 } 34 // 一次可能无法写入完全,发生在TCP发送窗口小于要发送的数据大小的时候,通常是对端数据堆积 35 while( wrotelen < nbyte ) 36 { 37 38 struct pollfd pf = { 0 }; 39 pf.fd = fd; 40 pf.events = ( POLLOUT | POLLERR | POLLHUP ); 41 poll( &pf,1,timeout ); 42 43 writeret = g_sys_write_func( fd,(const char*)buf + wrotelen,nbyte - wrotelen ); 44 45 if( writeret <= 0 ) 46 { 47 break; 48 } 49 wrotelen += writeret ; 50 } 51 if (writeret <= 0 && wrotelen == 0) 52 { 53 return writeret; 54 } 55 return wrotelen; 56 }
write的过程也比较容易,需要注意的一点是这里判断了如果一次写入没有写满的情况,这种情况其实是非常必要的,但也通常是网络编程新手所容易忽视的,当TCP发送窗口小于要发送的数据大小的时候,就会出现一次发不完的情况。所以一般需要循环发送。
条件变量和其他的函数不太一样,它并不是简单的hook一下,而是根据libco的架构重新设计了一个协程版的条件变量,究其原因就是条件变量条件何时满足用epoll并不太方便,如果硬要那么写也可以,每一个条件变量分配一个fd就可以了,libco基于co_eventloop采用了更为高效的方法。
相关结构体:
1 struct stCoCondItem_t 2 { 3 stCoCondItem_t *pPrev; 4 stCoCondItem_t *pNext; 5 stCoCond_t *pLink; // 所属链表 6 7 stTimeoutItem_t timeout; 8 }; 9 struct stCoCond_t // 条件变量的实体 10 { 11 stCoCondItem_t *head; 12 stCoCondItem_t *tail; 13 };
除去链表相关,只剩下了一个stTimeoutItem_t
结构,这个结构代表一个单独的事件,在poll中时一个stTimeoutItem_t代表一个poll事件。
首先我们来看看和pthread_cond_wait
语义相同的co_cond_timedwait
到底干了什么:
1 // 条件变量的实体;超时时间 2 int co_cond_timedwait( stCoCond_t *link,int ms ) 3 { 4 stCoCondItem_t* psi = (stCoCondItem_t*)calloc(1, sizeof(stCoCondItem_t)); 5 psi->timeout.pArg = GetCurrThreadCo(); 6 // 实际还是执行resume,进行协程切换 7 psi->timeout.pfnProcess = OnSignalProcessEvent; 8 9 if( ms > 0 ) 10 { 11 unsigned long long now = GetTickMS(); 12 // 定义超时时间 13 psi->timeout.ullExpireTime = now + ms; 14 15 // 加入时间轮 16 int ret = AddTimeout( co_get_curr_thread_env()->pEpoll->pTimeout,&psi->timeout,now ); 17 if( ret != 0 ) 18 { 19 free(psi); 20 return ret; 21 } 22 } 23 // 相当于timeout为负的话超时时间无限,此时条件变量中有一个事件在等待,也就是一个协程待唤醒 24 AddTail( link, psi); 25 26 co_yield_ct(); // 切换CPU执行权,切换CPU执行权,在epoll中触发peocess回调以后回到这里 27 28 // 这个条件要么被触发,要么已经超时,从条件变量实体中删除 29 RemoveFromLink<stCoCondItem_t,stCoCond_t>( psi ); 30 free(psi); 31 32 return 0; 33 }
条件变量的实体就是一条链表,其中存着stCoCondItem_t结构,在wait时创建一个stCoCondItem_t结构把其插入到代表条件变量实体的链表中,然后就切换CPU执行权,当然这个如果注册了超时时间也会被放入到时间轮中。等到再次执行的时候要么超时要么被signal了,就从条件变量的链表中移除即可。
源码如下:
1 int co_cond_signal( stCoCond_t *si ) 2 { 3 stCoCondItem_t * sp = co_cond_pop( si ); 4 if( !sp ) 5 { 6 return 0; 7 } 8 // 从时间轮中移除 9 RemoveFromLink<stTimeoutItem_t,stTimeoutItemLink_t>( &sp->timeout ); 10 11 // 加到active队列中,回想co_eventloop中对于active链表是否应该是局部变量的讨论 12 AddTail( co_get_curr_thread_env()->pEpoll->pstActiveList,&sp->timeout ); 13 // 所以单线程运行生产者消费者我们在signal以后还需要调用阻塞类函数转移CPU控制权,例如poll 14 return 0; 15 }
查看链表是否有元素,有的话从链表中删除,然后加入到epoll的active链表,在下一次epoll_wait中遍历active时会触发回调,然后CPU执行权切换到执行co_cond_timedwait的地方。co_cond_broadcast和co_cond_signal的逻辑都是差不多的,就是多了一个循环而已啦。
转载:
https://blog.csdn.net/weixin_43705457/article/details/106891691