流老化入口函数FlowManger调用函数FlowTimeoutHash完成流老化处理,这个函数完成了大部分老化工作,把一个老化的流从flow_hash上移走,移入了回收队列,由回收线程取出来放入空闲全局flow内存池。
1. FlowTimeoutHash 函数
FlowManager-》FlowTimeoutHash
函数有两个参数表示flow bucket的范围值,const uint32_t hash_min, const uint32_t hash_max,先把老化的flow移入参数td->aside_queue队列,再从aside_queue队列移入回收线程的队列flow_recycle_q,如此玩法,flow必晕。
/** * \brief time out flows from the hash * * \param ts timestamp * \param hash_min min hash index to consider * \param hash_max max hash index to consider * \param counters ptr to FlowTimeoutCounters structure * * \retval cnt number of timed out flow */ //新版本的老化策略比旧版本复杂了很多,流分配那块修改应该是有效率提升, //但是这个函数应该没有效率提升, //这个函数总思路就是先设置一个位图,每1位表示一个bucket是否有需要老化的flow, //还是和旧版一样判断每个bucket的next_ts与最新时间,先循环32或64次检查每个 //bucket是否有超时的flow,有超时的放入一个32或64位的整数, //循环完成,再循环这个整数,检查每一位同时检查next_ts,都符合超时条件则进行老化,好复杂,没毛用 static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, struct timeval *ts, const uint32_t hash_min, const uint32_t hash_max, FlowTimeoutCounters *counters) { uint32_t cnt = 0; //获取紧急模式标志 const int emergency = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)); //计算检查的bucket数量 const uint32_t rows_checked = hash_max - hash_min; uint32_t rows_skipped = 0; uint32_t rows_empty = 0; //那个一次取32还是64个bucket就是这个系统位数决定的,以下都认为64位吧 #if __WORDSIZE==64 #define BITS 64 #define TYPE uint64_t #else #define BITS 32 #define TYPE uint32_t #endif for (uint32_t idx = hash_min; idx < hash_max; idx+=BITS) { TYPE check_bits = 0; //64位整数 //取最小的数,万一bucket数量不够64个呢 const uint32_t check = MIN(BITS, (hash_max - idx)); //循环64次,检查64个bucket是否有超时的flow,有则设置一位到check_bits for (uint32_t i = 0; i < check; i++) { FlowBucket *fb = &flow_hash[idx+i]; check_bits |= (TYPE)(SC_ATOMIC_LOAD_EXPLICIT(fb->next_ts, SC_ATOMIC_MEMORY_ORDER_RELAXED) <= (int32_t)ts->tv_sec) << (TYPE)i; } if (check_bits == 0) //没有一个超时的则继续下一轮64个bucket的超时判断 continue; //多循环一次 for (uint32_t i = 0; i < check; i++) { //有超时的,则循环检测这个64位数,然后老化之 FlowBucket *fb = &flow_hash[idx+i]; //多比较一次,何苦呢 if ((check_bits & ((TYPE)1 << (TYPE)i)) != 0 && SC_ATOMIC_GET(fb->next_ts) <= (int32_t)ts->tv_sec) { FBLOCK_LOCK(fb); Flow *evicted = NULL; if (fb->evicted != NULL || fb->head != NULL) { if (fb->evicted != NULL) { //这个evicted队列不为空,说明在建立流获取流时检查到超时流,把那个超时 //放到有条件的放到这个队列了,也需要老化回收 /* transfer out of bucket so we can do additional work outside * of the bucket lock */ //这里把这个evicted队列放到临时变量,后边代码处理这个evicted队列 evicted = fb->evicted; fb->evicted = NULL; } if (fb->head != NULL) { //这个flow bucket上有超时flow,所以调用FlowManagerHashRowTimeout //进行超时处理,超时的流放到了td->aside_queue这个队列,这bucket老化 //就是把flow移入些队列,然后再将这些队列的flow以如回收线程的队列,繁琐 int32_t next_ts = 0; FlowManagerHashRowTimeout(td, fb->head, ts, emergency, counters, &next_ts); //设置bucket的最小超时时间,就是下一次超时时间 if (SC_ATOMIC_GET(fb->next_ts) != next_ts) SC_ATOMIC_SET(fb->next_ts, next_ts); } //如果bucket的evicted队列没有flow, 且head为空不包含flow了,说明这个 //bucket的flow都被老化了,则设置下次老化时间为INT_MAX,就是不需要老化了 if (fb->evicted == NULL && fb->head == NULL) { SC_ATOMIC_SET(fb->next_ts, INT_MAX); } } else { //如果bucket的evicted队列没有flow,且head为空不包含flow了,说明这个 //bucket的flow都被老化了,则设置下次老化时间为INT_MAX,就是不需要老化了 SC_ATOMIC_SET(fb->next_ts, INT_MAX); rows_empty++; } FBLOCK_UNLOCK(fb); /* processed evicted list */ if (evicted) { //这个bucket的超时处理完成,现在处理移入evicted队列的超时flow //其实,这个函数又把这些flow移入了td->aside_queue,尴尬 FlowManagerHashRowClearEvictedList(td, evicted, ts, counters); } } else { //没有超时的bucket,跳过计数增1 rows_skipped++; } } if (td->aside_queue.len) { //这个是最后真正处理超时的flow,把td->aside_queue的flow移入回收线程的回收队列, //何苦呢 cnt += ProcessAsideQueue(td, counters); } } counters->rows_checked += rows_checked; counters->rows_skipped += rows_skipped; counters->rows_empty += rows_empty; if (td->aside_queue.len) { //如果有未处理的flow,则继续把td->aside_queue的flow移入 //回收线程的回收队列flow_recycle_q cnt += ProcessAsideQueue(td, counters); } counters->flows_removed += cnt; /* coverity[missing_unlock : FALSE] */ return cnt; }
2. FlowManagerHashRowTimeout 函数
FlowManager-》FlowTimeoutHash-》FlowManagerHashRowTimeout
这函数实际执行了老化操作,其实就调用FlowManagerFlowTimeout检查是否超时,是就把老化的flow从flow_hash移除,然后移入参数td->aside_queue,别的什么没干。
/** * \internal * * \brief check all flows in a hash row for timing out * * \param f last flow in the hash row * \param ts timestamp * \param emergency bool indicating emergency mode * \param counters ptr to FlowTimeoutCounters structure */ static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, struct timeval *ts, int emergency, FlowTimeoutCounters *counters, int32_t *next_ts) { uint32_t checked = 0; Flow *prev_f = NULL; //循环检查bucket上的每个flow do { checked++; /* check flow timeout based on lastts and state. Both can be * accessed w/o Flow lock as we do have the hash row lock (so flow * can't disappear) and flow_state is atomic. lastts can only * be modified when we have both the flow and hash row lock */ /* timeout logic goes here */ //检查flow是否超时 if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == 0) { //未超时继续检查下个flow counters->flows_notimeout++; prev_f = f; f = f->next; continue; } FMFlowLock(f); //FLOWLOCK_WRLOCK(f); Flow *next_flow = f->next; /* never prune a flow that is used by a packet we * are currently processing in one of the threads */ //到这里说明flow超时了,引用计数如果大于0则不能老化 //FlowBypassedTimeout这个函数还需要再看看 if (f->use_cnt > 0 || !FlowBypassedTimeout(f, ts, counters)) { FLOWLOCK_UNLOCK(f); prev_f = f; if (f->use_cnt > 0) { counters->flows_timeout_inuse++; } f = f->next; continue; } //设置老化标志为超时老化 f->flow_end_flags |= FLOW_END_FLAG_TIMEOUT; counters->flows_timeout++; //从bucket上移除flow RemoveFromHash(f, prev_f); //将超时的flow移入td->aside_queue队列 //函数返回后,会将这个队列的flow移入回收线程的回收队列 FlowQueuePrivateAppendFlow(&td->aside_queue, f); /* flow is still locked in the queue */ f = next_flow; } while (f != NULL); counters->flows_checked += checked; if (checked > counters->rows_maxlen) counters->rows_maxlen = checked; }