本文分析的state-threads的版本是1.9
。
srs源码分析1-搭建环境
srs源码分析2-浅析state_threads
srs源码分析3-srs的启动
srs源码分析4-客户端的连接
srs源码分析5-handshake
srs源码分析6-connect
以下正在写作中。。。
srs源码分析7-create stream
srs源码分析8-推流-publish
srs源码分析9-推流-unpublish
srs源码分析10-拉流-play
srs源码分析11-拉流-pause
srs源码分析12-转发-forward
srs是基于协程开发的,底层使用了state_threads协程库。为了更好的理解srs,所以需要先熟悉state_threads。这里并不会介绍协程的相关概念,只是简单的介绍一下state_threads的核心逻辑。
以下state_thread会被简称为st。
使用st实现了一个简单的echo服务器,以下代码写的很简单,重点是理解st的使用。
#include <arpa/inet.h> #include <errno.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/socket.h> #include <sys/types.h> #include <st.h> #define LISTEN_PORT 9000 #define ERR_EXIT(m) \ do { \ perror(m); \ exit(-1); \ } while (0) void *client_thread(void *arg) { st_netfd_t client_st_fd = (st_netfd_t)arg; int client_fd = st_netfd_fileno(client_st_fd); sockaddr_in client_addr; socklen_t client_addr_len = sizeof(client_addr); int ret = getpeername(client_fd, (sockaddr *)&client_addr, &client_addr_len); if (ret == -1) { printf("[WARN] Failed to get client ip: %s\n", strerror(ret)); } char ip_buf[INET_ADDRSTRLEN]; bzero(ip_buf, sizeof(ip_buf)); inet_ntop(client_addr.sin_family, &client_addr.sin_addr, ip_buf, sizeof(ip_buf)); while (1) { char buf[1024] = {0}; ssize_t ret = st_read(client_st_fd, buf, sizeof(buf), ST_UTIME_NO_TIMEOUT); if (ret == -1) { printf("client st_read error\n"); break; } else if (ret == 0) { printf("client quit, ip = %s\n", ip_buf); break; } printf("recv from %s, data = %s", ip_buf, buf); ret = st_write(client_st_fd, buf, ret, ST_UTIME_NO_TIMEOUT); if (ret == -1) { printf("client st_write error\n"); } } } void *listen_thread(void *arg) { while (1) { st_netfd_t client_st_fd = st_accept((st_netfd_t)arg, NULL, NULL, ST_UTIME_NO_TIMEOUT); if (client_st_fd == NULL) { continue; } printf("get a new client, fd = %d\n", st_netfd_fileno(client_st_fd)); st_thread_t client_tid = st_thread_create(client_thread, (void *)client_st_fd, 0, 0); if (client_tid == NULL) { printf("Failed to st create client thread\n"); } } } int main() { int ret = st_set_eventsys(ST_EVENTSYS_ALT); if (ret == -1) { printf("st_set_eventsys use linux epoll failed\n"); } ret = st_init(); if (ret != 0) { printf("st_init failed. ret = %d\n", ret); return -1; } int listen_fd = socket(AF_INET, SOCK_STREAM, 0); if (listen_fd == -1) { ERR_EXIT("socket"); } int reuse_socket = 1; ret = setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &reuse_socket, sizeof(int)); if (ret == -1) { ERR_EXIT("setsockopt"); } struct sockaddr_in server_addr; server_addr.sin_family = AF_INET; server_addr.sin_port = htons(LISTEN_PORT); server_addr.sin_addr.s_addr = INADDR_ANY; ret = bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)); if (ret == -1) { ERR_EXIT("bind"); } ret = listen(listen_fd, 128); if (ret == -1) { ERR_EXIT("listen"); } st_netfd_t st_listen_fd = st_netfd_open_socket(listen_fd); if (!st_listen_fd) { printf("st_netfd_open_socket open socket failed.\n"); return -1; } st_thread_t listen_tid = st_thread_create(listen_thread, (void *)st_listen_fd, 1, 0); if (listen_tid == NULL) { printf("Failed to st create listen thread\n"); } while (1) { st_sleep(1); /*用于让出CPU执行权,重新调度就绪的协程。*/ } return 0; }
root@learner:~/tmp/st# gcc main.cpp -lst root@learner-Lenovo:~/tmp/st# ./a.out get a new client, fd = 4 recv from 192.168.30.17, data = hello world client quit, ip = 192.168.30.17 ^C
root@learner:~# nc 192.168.30.17 9000 hello world hello world ^C
创建一个listen协程,用于监听客户端的连接,当客户端连接服务后,会为此客户端创建一个client协程,用于处理此客户端的所有请求。
st中协程的切换提供了两种方式:一种是使用系统提供的setjmp
和longjmp
接口,另一种是使用汇编实现的_st_md_cxt_save
和_st_md_cxt_restore
接口,这两个函数从用法上同setjmp和longjmp。
这两种方式的切换本质上都是栈帧的切换。
C语言中的goto语句只能在当前函数内跳转,而不能在函数间跳转。setjmp()和longjmp()可以执行非局部跳转,即跳转的目标为当前执行函数之外的某个位置。
setjmp()函数为后续由longjmp()调用执行的跳转确立了跳转目标,该目标正是程序发起setjmp()调用的位置。从编程角度看来,调用longjmp()函数后,看起来就和从第二次调用setjmp()返回时完全一样。通过setjmp()的返回值,可以区分setjmp()调用是初始返回还是第二次返回。初始调用返回值为0,后续“伪返回”的返回值为longjmp()调用中val参数所指定的任意值。通过对val参数使用不同值,能够区分程序中跳转至同一目标的不同起跳位置。更多相关setjmp()、longjmp()的介绍,可以参考《Linux/UNIX系统编程手册》上册第106页。
以下是从《Linux/UNIX系统编程手册》摘抄的示例:
#include <stdio.h> #include <stdlib.h> #include <setjmp.h> jmp_buf env; void f2(int num) { longjmp(env, num); } void f1(int num) { if(num == 1){ longjmp(env, num); } f2(num); } int main(int argc, char** argv) { if(argc != 2){ printf("Usage: %s [1|2]\n", argv[0]); return -1; } switch(setjmp(env)){ case 0: printf("Calling f1() after initial setjmp()\n"); f1(atoi(argv[1])); break; case 1: printf("We jumped back from f1()\n"); break; case 2: printf("We jumped back from f2()\n"); break; } return 0; }
这个示例我稍微做了一些修改,运行结果及分析如下:
root@learner:~/tmp# ./a.out 1 Calling f1() after initial setjmp() We jumped back from f1()
root@learner:~/tmp# ./a.out 2 Calling f1() after initial setjmp() We jumped back from f2()
这两个函数是通过汇编实现的,代码如下:
#define JB_BX 0 #define JB_SI 1 #define JB_DI 2 #define JB_BP 3 #define JB_SP 4 #define JB_PC 5 .file "md.S" .text /* _st_md_cxt_save(__jmp_buf env) 存储函数栈帧 */ .globl _st_md_cxt_save .type _st_md_cxt_save, @function .align 16 _st_md_cxt_save: movl 4(%esp), %eax /*取得参数env的地址,保存到eax中。*/ movl %ebx, (JB_BX*4)(%eax) /*保存ebx*/ movl %esi, (JB_SI*4)(%eax) /*保存esi*/ movl %edi, (JB_DI*4)(%eax) /*保存edi*/ /*保存esp,即栈顶,保存的栈顶是没有调用_st_md_cxt_save()函数之前的栈顶*/ leal 4(%esp), %ecx / movl %ecx, (JB_SP*4)(%eax) /*保存ecx*/ movl 0(%esp), %ecx movl %ecx, (JB_PC*4)(%eax) /*保存引用计数器pc*/ movl %ebp, (JB_BP*4)(%eax) /*保存ebp 即调用_st_md_cxt_save()的函数的ebp*/ xorl %eax, %eax /*清空eax 作为_st_md_cxt_save()的返回值*/ ret .size _st_md_cxt_save, .-_st_md_cxt_save /* _st_md_cxt_restore(__jmp_buf env, int val) 恢复函数栈帧 */ .globl _st_md_cxt_restore .type _st_md_cxt_restore, @function .align 16 _st_md_cxt_restore: movl 4(%esp), %ecx /*获取第一个参数的地址,即env的地址。*/ movl 8(%esp), %eax /*获取第二个参数的地址,即val的地址。*/ movl (JB_PC*4)(%ecx), %edx /*将原pc寄存器的值保存到edx中*/ movl (JB_BX*4)(%ecx), %ebx /*恢复ebx*/ movl (JB_SI*4)(%ecx), %esi /*恢复esi*/ movl (JB_DI*4)(%ecx), %edi /*恢复edi*/ movl (JB_BP*4)(%ecx), %ebp /*恢复ebp*/ movl (JB_SP*4)(%ecx), %esp /*恢复esp*/ testl %eax, %eax /*测试eax的值是否为0,也就是第二个参数是否为0。*/ jnz 1f /*如果第二个参数不为0,则直接跳转到1:执行。*/ incl %eax /*将返回值置为1*/ 1: jmp *%edx /*跳转到之前pc处*/ .size _st_md_cxt_restore, .-_st_md_cxt_restore
_st_md_cxt_save(__jmp_buf env)用于保存栈帧,_st_md_cxt_restore(__jmp_buf env, int val)用于恢复栈帧。
#if defined(MD_USE_BUILTIN_SETJMP) && !defined(USE_LIBC_SETJMP) #define MD_SETJMP(env) _st_md_cxt_save(env) #define MD_LONGJMP(env, val) _st_md_cxt_restore(env, val) extern int _st_md_cxt_save(jmp_buf env); extern void _st_md_cxt_restore(jmp_buf env, int val); #else #define MD_SETJMP(env) setjmp(env) #define MD_LONGJMP(env, val) longjmp(env, val) #endif
如果定义了MD_USE_BUILTIN_SETJMP
宏,且没有定义USE_LIBC_SETJMP
宏,则使用自定义的栈帧存取函数。否则使用系统提供的setjmp和longjmp切换栈帧。
#define _ST_SWITCH_CONTEXT(_thread) \ ST_BEGIN_MACRO \ ST_SWITCH_OUT_CB(_thread); \ if (!MD_SETJMP((_thread)->context)) { \ /*调出协程返回0,调入协程返回1。*/ _st_vp_schedule(); \ /*选择下一个需要调度的协程*/ } \ ST_DEBUG_ITERATE_THREADS(); \ ST_SWITCH_IN_CB(_thread); \ ST_END_MACRO
_ST_SWITCH_CONTEXT
用于将协程的CPU执行权让出去,重新调度一个新的协程。
当协程调用_ST_SWITCH_CONTEXT
时,此时MD_SETJMP会返回0,则进入协程调度函数_st_vp_schedule(),CPU的执行权转移到其他协程。此时相当于在本协程中打上了一个切换点。当本协程将再次获得CPU执行权时,在_st_vp_schedule()中调用_ST_RESTORE_CONTEXT宏函数,会通过MD_SETJMP再次返回,此时返回值为1,跳过if语句返回到本协程调用_ST_SWITCH_CONTEXT的位置,继续往下执行。
#define _ST_RESTORE_CONTEXT(_thread) \ ST_BEGIN_MACRO \ _ST_SET_CURRENT_THREAD(_thread); \ /*标记此协程为当前运行的协程*/ MD_LONGJMP((_thread)->context, 1); \ /*执行协程切换 恢复之前挂起的协程*/ ST_END_MACRO
_ST_RESTORE_CONTEXT
用于恢复指定的协程,通过MD_LONGJMP宏,返回到MD_SETJMP打的断点处,从MD_SETJMP再次返回,从而再次获取到CPU的执行权。
void _st_vp_schedule(void) { _st_thread_t *thread; /*从就绪的协程队列中取出一个协程*/ if (_ST_RUNQ.next != &_ST_RUNQ) { thread = _ST_THREAD_PTR(_ST_RUNQ.next); _ST_DEL_RUNQ(thread); /*从就绪协程队列删除*/ } else { /*如果就绪的协程队列为空,说明所有的就绪协程都处理完毕了。*/ thread = _st_this_vp.idle_thread; /*现在切换至idle协程*/ } ST_ASSERT(thread->state == _ST_ST_RUNNABLE); /*该协程必须处于可运行状态*/ thread->state = _ST_ST_RUNNING; /*将即将运行协程的状态标记为正在运行*/ _ST_RESTORE_CONTEXT(thread); /*切换至新的协程*/ }
在切换协程时,会从就绪的协程队列中取出一个协程,然后切换至该协程。如果就绪队列中没有可切换的协程,则说明没有协程需要处理,此时会切换至idle协程。返回idle协程后,会重新进入epoll_wait,重新开始监听待发生的事件和处理定时事件。
所有的协程都是在一个单线程中执行的,所以需要有一个调度器来调度所有的协程,以便需要执行权限的协程能够获取到CPU。通常协程在发生读事件
、写事件
、定时器事件
时才需要执行权限,也就是发生这些事件后,需要将协程调度到CPU上,让其获得CPU的执行权,处理对应的事情。
st中对读写事件的监控是通过epoll实现的,而定时器事件通过最小堆配合epoll的超时实现的。
typedef struct _st_eventsys_ops { const char *name; /* Name of this event system */ int val; /* Type of this event system */ int (*init)(void); /* Initialization */ void (*dispatch)(void); /* Dispatch function */ int (*pollset_add)(struct pollfd *, int); /* Add descriptor set */ void (*pollset_del)(struct pollfd *, int); /* Delete descriptor set */ int (*fd_new)(int); /* New descriptor allocated */ int (*fd_close)(int); /* Descriptor closed */ int (*fd_getlimit)(void); /* Descriptor hard limit */ } _st_eventsys_t;
这是调度器的接口,可以使用epoll实现,也可以使用select和poll实现。
static _st_eventsys_t _st_epoll_eventsys = { "epoll", ST_EVENTSYS_ALT, _st_epoll_init, _st_epoll_dispatch, _st_epoll_pollset_add, _st_epoll_pollset_del, _st_epoll_fd_new, _st_epoll_fd_close, _st_epoll_fd_getlimit };
st中通过epoll实现了调度器,实现的这些函数作为回调函数封装到了结构体中。
ST_HIDDEN void _st_epoll_dispatch(void) { ... if (_ST_SLEEPQ == NULL) { /* 定时队列为空,说明没有定时器事件,则epoll_wait的超时时间为-1, 即没有事件触发时,epoll_wait一直阻塞。*/ timeout = -1; } else { /*从定时队列获取最小定时器*/ min_timeout = (_ST_SLEEPQ->due <= _ST_LAST_CLOCK) ? 0 : (_ST_SLEEPQ->due - _ST_LAST_CLOCK); /*换算epoll_wait的超时时间 单位:us */ timeout = (int) (min_timeout / 1000); ... } /*进入epoll等待事件的触发,也可能因为超时而退出。*/ nfd = epoll_wait(..., ..., ..., timeout); ... pq->thread->state = _ST_ST_RUNNABLE; /*把协程的状态设置为可运行状态*/ _ST_ADD_RUNQ(pq->thread); /*将协程添加到运行队列,等待新一轮的调度。*/ ... }
在进入epoll_wait之前,先从最小堆中获取最近一个定时器触发的时间,将此时间作为epoll_wait的超时时间,如果在这个超时时间之内发生了读写事件,则epoll_wait返回处理读写事件;如果段超时时间之内没有发生读写事件,epoll_wait会因为超时而退出,此时返回正好处理定时事件。
若不是因为超时而从epoll_wait返回,说明有的协程读写事件触发了,此时需要将触发事件的协程保存到可运行队列中,等待新一轮的调度。
_st_thread_t *st_thread_create(void *(*start)(void *arg), void *arg, int joinable, int stk_size) { _st_thread_t *thread; _st_stack_t *stack; void **ptds; char *sp; /* Adjust stack size 调整栈大小*/ if (stk_size == 0) stk_size = ST_DEFAULT_STACK_SIZE; /*默认栈大小是128KB*/ /*页大小对齐*/ stk_size = ((stk_size + _ST_PAGE_SIZE - 1) / _ST_PAGE_SIZE) * _ST_PAGE_SIZE; /*申请栈空间*/ stack = _st_stack_new(stk_size); if (!stack) return NULL; sp = stack->stk_top; /*栈顶*/ sp = sp - (ST_KEYS_MAX * sizeof(void *));/*栈顶空出一块区域,用于存放私有的数据。*/ ptds = (void **) sp; sp = sp - sizeof(_st_thread_t); /*再空出一个_st_thread_t大小*/ thread = (_st_thread_t *) sp; if ((unsigned long)sp & 0x3f) sp = sp - ((unsigned long)sp & 0x3f); stack->sp = sp - _ST_STACK_PAD_SIZE; /*栈顶再空出128字节的填充区域*/ memset(thread, 0, sizeof(_st_thread_t)); memset(ptds, 0, ST_KEYS_MAX * sizeof(void *)); thread->private_data = ptds; /*指向协程私有数据*/ thread->stack = stack; /*指向协程栈*/ thread->start = start; /*协程入口函数*/ thread->arg = arg; /*入口函数参数*/ /*保存切换上下文,打上还原点,当本协程下次获取到执行权限时,从这个还原点接着执行。*/ _ST_INIT_CONTEXT(thread, stack->sp, _st_thread_main); /*如果需要主动回收协程,则需要协程创建一个条件变量,用于阻塞等待回收协程。*/ if (joinable) { thread->term = st_cond_new(); if (thread->term == NULL) { _st_stack_free(thread->stack); return NULL; } } thread->state = _ST_ST_RUNNABLE; /*标记协程为可运行状态*/ _st_active_count++; /*增加活跃协程的个数*/ _ST_ADD_RUNQ(thread); /*将协程插入到运行队列*/ return thread; }
创建一个新的协程,在创建的过程中,会将这个协程放到可运行队列,等待着调度。在调度到这个新的协程时,就可以获得CPU的执行权。
除了主协程外,其他协程的栈都是在堆上申请的空间,默认大小时128KB。
#define _ST_INIT_CONTEXT MD_INIT_CONTEXT #define MD_INIT_CONTEXT(_thread, _sp, _main) \ ST_BEGIN_MACRO \ if (MD_SETJMP((_thread)->context)) \ /*设置还原点,或从还原点返回。*/ _main(); \ MD_GET_SP(_thread) = (long) (_sp); \ /*设置ctx中sp寄存器的值,设置新的栈帧*/ ST_END_MACRO
在创建新协程时,会通过上面的宏函数设置还原点,当执行到MD_SETJMP时,会返回0,此时_main()函数不会被执行。当协程再次获取执行权时,会再次从MD_SETJMP返回,此时返回值为1,则进入_main()函数,也就是_st_thread_main()
函数。
void _st_thread_main(void) { _st_thread_t *thread = _ST_CURRENT_THREAD(); /*获取当前协程的句柄*/ MD_CAP_STACK(&thread); thread->retval = (*thread->start)(thread->arg); /*执行协程入口函数*/ st_thread_exit(thread->retval); /*协程退出*/ }
新的协程创建后,并不会立即被执行,需要先打上还原点,然后放入可执行队列中。当调度器调度到这个新线程后才会真正获取到CPU的执行权,在MD_SETJMP返回后,进入这个函数,在此函数中才会进入协程的入口函数。协程入口函数处理完毕后,会进入协程退出函数,这个稍后分析。
int st_init(void) { _st_thread_t *thread; if (_st_active_count) { /*如果已经初始化,则直接返回。*/ return 0; } st_set_eventsys(ST_EVENTSYS_DEFAULT); /*设置epoll封装的接口 */ if (_st_io_init() < 0) return -1; memset(&_st_this_vp, 0, sizeof(_st_vp_t)); /*三个队列的初始化*/ ST_INIT_CLIST(&_ST_RUNQ); /*可运行队列*/ ST_INIT_CLIST(&_ST_IOQ); /*io队列*/ ST_INIT_CLIST(&_ST_ZOMBIEQ); /*僵尸态队列*/ if ((*_st_eventsys->init)() < 0) /*epoll的初始化*/ return -1; _st_this_vp.pagesize = getpagesize(); /*页大小*/ _st_this_vp.last_clock = st_utime(); /*时钟时间*/ /* 创建一个idle协程 */ _st_this_vp.idle_thread = st_thread_create(_st_idle_thread_start, NULL, 0, 0); if (!_st_this_vp.idle_thread) return -1; _st_this_vp.idle_thread->flags = _ST_FL_IDLE_THREAD; /*标识为idle协程*/ _st_active_count--; _ST_DEL_RUNQ(_st_this_vp.idle_thread); /*从可运行队列中删除idle协程*/ /*为主协程封装一个_st_thread_t */ thread = (_st_thread_t *) calloc(1, sizeof(_st_thread_t) + (ST_KEYS_MAX * sizeof(void *))); if (!thread) return -1; thread->private_data = (void **) (thread + 1); /*指向协程私有数据*/ thread->state = _ST_ST_RUNNING; /*设置协程为可运行状态*/ thread->flags = _ST_FL_PRIMORDIAL; /*标识为主协程*/ _ST_SET_CURRENT_THREAD(thread); /*设置当前工作的协程*/ _st_active_count++; /*增加活跃协程个数*/ return 0; }
在使用st时,首先需要调用st_init()函数对st进行初始化。这个函数有三个作用:1、做一些初始化工作 2、创建idle协程 3、将主线程封装为主协程
主线程也是一条可执行流,需要将主线程封装成主协程,以便能够在调度器中进行调度。
idle协程是非常核心的,当就绪队列中没有可运行的协程时,会将CPU的执行权限调度到idle协程。在idle协程中重新开始监听读、写、定时器事件。
void *_st_idle_thread_start(void *arg) { _st_thread_t *me = _ST_CURRENT_THREAD(); while (_st_active_count > 0) { _ST_VP_IDLE(); /*进入epoll_wait,监听读写事件*/ _st_vp_check_clock(); /*处理定时器事件*/ me->state = _ST_ST_RUNNABLE; /*将idle线程标记为可运行状态*/ _ST_SWITCH_CONTEXT(me); /*让出CPU执行权,重新开始调度。*/ } exit(0); return NULL; }
当就绪队列为空时,调度会进入idle线程,在idle线程中,会进入epoll_wait监听读写事件,有读写事件触发时,会将协程保存到就绪队列中;从epoll_wait返回后,查看是否有定时器触发,若有定时器触发,则将协程保存到就绪队列中。处理完读写事件和定时器事件后,idle协程让出CPU执行权,开始依次调度所有的就绪协程,所有的就绪协程处理完毕后,会再次进入idle协程,之后都是这样循环往复。
#define _ST_VP_IDLE() (*_st_eventsys->dispatch)()
_st_eventsys->dispatch是回调函数,这个函数指针实际指向_st_epoll_dispatch。
void _st_vp_check_clock(void) { _st_thread_t *thread; st_utime_t now; now = st_utime(); /*获取当前时间*/ _ST_LAST_CLOCK = now; if (_st_curr_time && now - _st_last_tset > 999000) { _st_curr_time = time(NULL); _st_last_tset = now; } while (_ST_SLEEPQ != NULL) { /*睡眠队列不为空*/ thread = _ST_SLEEPQ; /*获取最小堆上的最小的定时器*/ ST_ASSERT(thread->flags & _ST_FL_ON_SLEEPQ); if (thread->due > now) break; /*协程的定时器还没有到,立即返回。*/ /*协程的定时器触发了*/ _ST_DEL_SLEEPQ(thread); /*从睡眠队列中删除*/ /*协程是因为条件变量而睡眠的,现在条件变量超时了。*/ if (thread->state == _ST_ST_COND_WAIT) thread->flags |= _ST_FL_TIMEDOUT; ST_ASSERT(!(thread->flags & _ST_FL_IDLE_THREAD)); thread->state = _ST_ST_RUNNABLE; /*标记协程为可运行状态*/ _ST_INSERT_RUNQ(thread); /*将协程送至就绪队列,等待调度。*/ } }
从epoll_wait返回后,检查睡眠队列中的协程,当其定时器到了,则将协程送至就绪队列,等待新一轮的调度。
所有的定时器都放在最小堆中,从最小堆中获取到的是所有定时器的最小值。如果当前时间超过了最小堆中的定时器,说明定时器触发了。通过while循环将最小堆中的所有该触发的定时器全部都保存到就绪队列中。
_st_thread_t *st_thread_create(void *(*start)(void *arg), void *arg, int joinable, int stk_size) { ... if (joinable) { /*如果协程需要主动回收,则为协程创建一个条件变量。*/ thread->term = st_cond_new(); /*创建条件变量*/ if (thread->term == NULL) { _st_stack_free(thread->stack); return NULL; } } ... }
在创建协程的时候,需要指明是否会主动回收协程。如果需要主动回收协程,则需要为协程创建一个条件变量,以便其他协程阻塞的回收该协程。
void _st_thread_main(void) { _st_thread_t *thread = _ST_CURRENT_THREAD(); /*获取当前协程的句柄*/ MD_CAP_STACK(&thread); thread->retval = (*thread->start)(thread->arg); /*执行协程入口函数*/ st_thread_exit(thread->retval); /*退出协程*/ }
当协程的主体函数执行完毕后,会进入st_thread_exit函数,用于退出协程。
void st_thread_exit(void *retval) { _st_thread_t *thread = _ST_CURRENT_THREAD(); /*获取当前协程句柄*/ thread->retval = retval; /*保存返回值*/ _st_thread_cleanup(thread); /*释放协程的私有数据*/ _st_active_count--; /*活跃协程数减一*/ if (thread->term) { /*如果需要主动回收此协程*/ thread->state = _ST_ST_ZOMBIE; /*设置协程为僵尸态*/ _ST_ADD_ZOMBIEQ(thread); /*添加到僵尸态队列*/ st_cond_signal(thread->term); /*通知阻塞等待回收的协程*/ _ST_SWITCH_CONTEXT(thread); /*让出执行权*/ st_cond_destroy(thread->term); /*销毁条件变量*/ thread->term = NULL; } /*如果是主协程,则无需释放其对应的栈,否则释放在堆上申请的栈空间。*/ if (!(thread->flags & _ST_FL_PRIMORDIAL)) _st_stack_free(thread->stack); _ST_SWITCH_CONTEXT(thread); /*销毁完毕让出CPU执行权*/ }
若协程是主协程,则无需释放堆空间,否则需要释放在堆上申请的用于栈的空间。thread->term不为NULL,说明这个协程需要主动的回收,此时需要将协程设置为僵尸态,并加入僵尸态队列。同时通知阻塞等待回收的协程。
int st_thread_join(_st_thread_t *thread, void **retvalp) { _st_cond_t *term = thread->term; /*获取协程的条件变量*/ if (term == NULL) { errno = EINVAL; return -1; } if (_ST_CURRENT_THREAD() == thread) { /*不能是当前协程*/ errno = EDEADLK; return -1; } /*不能多个线程回收同时回收同一个协程*/ if (term->wait_q.next != &term->wait_q) { errno = EINVAL; return -1; } /*如果协程的状态不是僵尸态,则用于回收的线程将进入条件变量等待。*/ while (thread->state != _ST_ST_ZOMBIE) { if (st_cond_timedwait(term, ST_UTIME_NO_TIMEOUT) != 0) return -1; } if (retvalp) *retvalp = thread->retval; /*获取待回收协程的返回值*/ thread->state = _ST_ST_RUNNABLE; /*将待回收的协程设置为可运行状态*/ _ST_DEL_ZOMBIEQ(thread); /*从僵尸态队列删除*/ _ST_ADD_RUNQ(thread); /*加入就绪运行队列*/ return 0; }
协程在回收其他协程,此时待回收的协程还没有退出,主动回收的协程将进入条件变量等待。当待回收的协程退出时,会激活条件变量上的协程。
主动回收的协程从条件变量返回后,此时待回收的协程处于僵尸态,获取返回值后,此时需要再次将待回收的协程置为可运行状态,并加入就绪运行队列。待回收协程会再次进入st_thread_exit()
函数,从_ST_SWITCH_CONTEXT返回,主动销毁条件变量和栈空间,最后通过_ST_SWITCH_CONTEXT让出执行权,这时协程才算退出。
void st_thread_yield() { _st_thread_t *me = _ST_CURRENT_THREAD(); /*获取当前协程句柄*/ /*检查是否有定时器事件触发*/ _st_vp_check_clock(); /*就绪队列为空,则直接返回。*/ if (_ST_RUNQ.next == &_ST_RUNQ) { return; } me->state = _ST_ST_RUNNABLE; /*将本协程标记为可运行状态*/ _ST_ADD_RUNQ(me); /*把本协程添加到就绪队列中*/ /*将执行权切换给就绪队列中的其他协程*/ _ST_SWITCH_CONTEXT(me); }
协程在运行的过程中,可以主动的让出执行权。在让出执行权的时候,需要将自己主动加入到就绪队列中,等待再次被调度。
int st_poll(struct pollfd *pds, int npds, st_utime_t timeout) { struct pollfd *pd; struct pollfd *epd = pds + npds; /*指向数组末尾*/ _st_pollq_t pq; _st_thread_t *me = _ST_CURRENT_THREAD(); int n; if (me->flags & _ST_FL_INTERRUPT) { me->flags &= ~_ST_FL_INTERRUPT; errno = EINTR; return -1; } if ((*_st_eventsys->pollset_add)(pds, npds) < 0) return -1; pq.pds = pds; pq.npds = npds; pq.thread = me; pq.on_ioq = 1; _ST_ADD_IOQ(pq); if (timeout != ST_UTIME_NO_TIMEOUT) _ST_ADD_SLEEPQ(me, timeout); me->state = _ST_ST_IO_WAIT; /*主动切出协程,交出执行权。*/ _ST_SWITCH_CONTEXT(me); n = 0; if (pq.on_ioq) { _ST_DEL_IOQ(pq); (*_st_eventsys->pollset_del)(pds, npds); } else { for (pd = pds; pd < epd; pd++) { if (pd->revents) n++; } } if (me->flags & _ST_FL_INTERRUPT) { me->flags &= ~_ST_FL_INTERRUPT; errno = EINTR; return -1; } return n; }
注册需要监听的事件,然后让出CPU执行权,当事件触发后再次从_ST_SWITCH_CONTEXT返回继续处理。
int st_netfd_poll(_st_netfd_t *fd, int how, st_utime_t timeout) { struct pollfd pd; int n; pd.fd = fd->osfd; pd.events = (short) how; pd.revents = 0; if ((n = st_poll(&pd, 1, timeout)) < 0) /*单一fd*/ return -1; if (n == 0) { errno = ETIME; return -1; } if (pd.revents & POLLNVAL) { errno = EBADF; return -1; } return 0; }
对监听一个文件描述符的封装
_st_netfd_t *st_accept(_st_netfd_t *fd, struct sockaddr *addr, int *addrlen, st_utime_t timeout) { int osfd, err; _st_netfd_t *newfd; /*执行accept函数,如果没有client连接,则accept立即返回。*/ while ((osfd = accept(fd->osfd, addr, (socklen_t *)addrlen)) < 0) { if (errno == EINTR) continue; if (!_IO_NOT_READY_ERROR) return NULL; /*进入poll函数,注册读事件,同时让出CPU的执行权,等待读事件触发。*/ if (st_netfd_poll(fd, POLLIN, timeout) < 0) return NULL; } /*accept返回的client socket fd,将其进行封装。*/ newfd = _st_netfd_new(osfd, 1, 1); if (!newfd) { err = errno; close(osfd); errno = err; } return newfd; }
fd
被设置为了非阻塞,调用accept()函数后,若没有客户端请求连接,则立即从accept返回,若errno为EAGAIN或EWOULDBLOCK,说明没有客户端连接,然后执行st_netfd_poll()函数,在此函数内会为fd
注册读事件,同时会让出CPU的执行权。当fd
的读事件触发后,本协程会再次被调度从而获得CPU执行权,接着往下执行。
ssize_t st_read(_st_netfd_t *fd, void *buf, size_t nbyte, st_utime_t timeout) { ssize_t n; while ((n = read(fd->osfd, buf, nbyte)) < 0) { /*非阻塞的读取*/ if (errno == EINTR) /*被信号中断了*/ continue; if (!_IO_NOT_READY_ERROR) /*不是EAGAIN或EWOULDBLOCK错误*/ return -1; /*执行到这里说明发生了EAGAIN或EWOULDBLOCK错误,此时没有数据可读,让出执行权。*/ if (st_netfd_poll(fd, POLLIN, timeout) < 0) return -1; } return n; }
read的原理同accept,不再赘述。