这篇文章接着上篇继续解释流的建立,其中最重要的一个函数FlowGetNew,主要目的是获取一个flow,获取过程中也是非常曲折的,求爷爷告奶奶才能求来一个flow。
1. FlowGetNew 这个函数也分几个方面理解:
FlowHandlePacket-》FlowGetFlowFromHash-》FlowGetNew
a。从线程自己的flow队列里获取flow,如果获取成功则返回flow,如果没有就从全局flow内存池获取一个flow队列,再从flow队列里获取flow。
b。如果线程自己的flow队列和全局flow内存池也没有可用的flow队列,且flow内存超过配置上限,则设置进入紧急模式。
c。接b,设置紧急模式之后,调用函数FlowGetUsedFlow获取flow,这个函数是从正在使用的全局flow_hash的bucket链表里获取一个,后续会注释这个函数。
d。如果线程自己的flow队列和全局flow内存池也没有可用的flow队列,且flow内存没有超过配置上限,则直接在内存上分配flow即调用函数FlowAlloc。
static Flow *FlowGetNew(ThreadVars *tv, FlowLookupStruct *fls, const Packet *p) { //获取紧急模式标志 const bool emerg = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0); //检查是否可以生成flow,icmp错误包不生成flow //如果是紧急模式,tcp非sync的包不生成flow,可以看出sync包优先生成flow if (FlowCreateCheck(p, emerg) == 0) { return NULL; } //从线程自己的flow队列里获取flow,如果获取成功则返回flow /* get a flow from the spare queue */ Flow *f = FlowQueuePrivateGetFromTop(&fls->spare_queue); if (f == NULL) { //如果获取flow失败,就从全局flow内存池获取一个flow队列,再从flow队列里获取flow。 //FlowSpareSync这个函数主要是从空闲的全局flow内存池获取一个flow队列,给自己用 f = FlowSpareSync(tv, fls, p, emerg); } //全局flow内存池也没有可用的flow队列 if (f == NULL) { /* If we reached the max memcap, we get a used flow */ //判断如果flow内存超过配置上限,则进入紧急模式。 if (!(FLOW_CHECK_MEMCAP(sizeof(Flow) + FlowStorageSize()))) { /* declare state of emergency */ if (!(SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)) { //没有设置紧急模式标志才会设置,设置过就没必要再设置了 SC_ATOMIC_OR(flow_flags, FLOW_EMERGENCY); //这个函数把当前的流老化的正常超时时间设置为紧急模式超时时间 FlowTimeoutsEmergency(); } //没有空闲flow,flow占用内存也超过配置上限,试着从正在使用的全局flow_hash //的bucket链表上获取一个,只有引用计数为0则拿出来,不管它是否满足超时条件 f = FlowGetUsedFlow(tv, fls->dtv, &p->ts); if (f == NULL) { return NULL; } #ifdef UNITTESTS if (tv != NULL && fls->dtv != NULL) { #endif StatsIncr(tv, fls->dtv->counter_flow_get_used); #ifdef UNITTESTS } #endif /* flow is still locked from FlowGetUsedFlow() */ FlowUpdateCounter(tv, fls->dtv, p->proto); return f; } //如果线程自己的flow队列和全局flow内存池也没有可用的flow队列, //且flow内存没有超过配置上限,则直接在内存上分配flow即调用函数FlowAlloc /* now see if we can alloc a new flow */ f = FlowAlloc(); if (f == NULL) { #ifdef UNITTESTS if (tv != NULL && fls->dtv != NULL) { #endif StatsIncr(tv, fls->dtv->counter_flow_memcap); #ifdef UNITTESTS } #endif return NULL; } /* flow is initialized but *unlocked* */ //FlowAlloc里会初始化flow,没人用它,也没人锁它,谁用谁上锁 } else { /* flow has been recycled before it went into the spare queue */ /* flow is initialized (recylced) but *unlocked* */ //这的意思就是,获取到flow了,这个flow回收时会被初始化FlowInit(f), //的确如此,回收时会这么做,也会释放锁 } FLOWLOCK_WRLOCK(f); FlowUpdateCounter(tv, fls->dtv, p->proto); return f; }
2. FlowTimeoutsEmergency 函数
FlowHandlePacket-》FlowGetFlowFromHash-》FlowGetNew-》FlowTimeoutsEmergency
这个函数功能单一,把老化超时时间设置为紧急模式的老化时间。
void FlowTimeoutsEmergency(void) { //两个参数都是全局变量, //flow_timeouts 是正在使用的超时时间 //flow_timeouts_emerg 是初始化时设置的紧急超时时间 //把老化超时时间设置为紧急模式的老化时间,设置后在流老化线程中, //检查超时时获取到的时间就是这个紧急模式的时间 SC_ATOMIC_SET(flow_timeouts, flow_timeouts_emerg); }
3. FlowGetUsedFlow 函数
FlowHandlePacket-》FlowGetFlowFromHash-》FlowGetNew-》FlowGetUsedFlow
这个函数是在FlowGetNew中获取flow时,既没有空闲flow可用,flow内存也到达了配置的内存上限的时候调用这个函数。
函数主要是从全局变量flow_hash已经使用的flow中获取一个flow,这个flow的引用计数必须为0,不检查是否超时,只要引用计数为0就拿出来。
获取flow时,选取bucket时,有个小算法,如果每次都从flow_hash前边的bucket中拿flow出来,那么前边的势必很快取完,取完后,每次就要从前往后遍历,每次遍历都是无用功,耗费时间,因为在前边的bucket中的符合条件的flow先被取走,就会依次遍历每个bucket。
所以,设置一个原子变量flow_prune_idx,用它控制每次从哪个bucket开始获取符合条件的flow,这里是每次加5,即每次取flow的bucket的索引间隔为5,如果这个bucket上没有符合条件的flow,则索引加1,检查下一个bucket,直到找到flow或者查找计数为5(全局变量固定值),如果遍历到达最后一个bucket,则再从第一个bucket开始搜索。
目的是为每个bucket保留符合条件的flow,每次取时可以较快的找到flow,不需要遍历所有bucket.
/** \internal * \brief Get a flow from the hash directly. * * Called in conditions where the spare queue is empty and memcap is reached. * * Walks the hash until a flow can be freed. Timeouts are disregarded, use_cnt * is adhered to. "flow_prune_idx" atomic int makes sure we don't start at the * top each time since that would clear the top of the hash leading to longer * and longer search times under high pressure (observed). * * \param tv thread vars * \param dtv decode thread vars (for flow log api thread data) * * \retval f flow or NULL */ static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv, const struct timeval *ts) { //这个函数对控制bucket索引的全局原子变量加上FLOW_GET_NEW_TRIES,值是5, //就是每次搜索的bucket索引是上次的值加5,不会每次都从开始搜索,这样可以最大概率的给每个bucket //保留符合条件的flow,以后再搜索这些bucket,成功概率较大,不需要遍历所有bucket. //这是个人理解,为什么是最大概率呢,因为不是所有bucket都有符合条件的flow,你不搜索它,它也没有, //搜索它,它也没有,也谈不上给那个bucket保留符合条件的flow, uint32_t idx = GetUsedAtomicUpdate(FLOW_GET_NEW_TRIES) % flow_config.hash_size; uint32_t tried = 0; while (1) { //查找次数其实也是查找的bucket个数,如果到达5次,则更新计数器,返回. if (tried++ > FLOW_GET_NEW_TRIES) { STATSADDUI64(counter_flow_get_used_eval, tried); break; } //bucket索引到最大时,从0开始 if (++idx >= flow_config.hash_size) idx = 0; FlowBucket *fb = &flow_hash[idx]; //next_ts超时变量设置INT_MAX时,说明bucket上没有flow在未来next_ts要超时 //就是bucket是空的,FlowManager老化线程设置的INT_MAX if (SC_ATOMIC_GET(fb->next_ts) == INT_MAX) continue; if (GetUsedTryLockBucket(fb) != 0) { STATSADDUI64(counter_flow_get_used_eval_busy, 1); continue; } Flow *f = fb->head; if (f == NULL) { FBLOCK_UNLOCK(fb); continue; } if (GetUsedTryLockFlow(f) != 0) { STATSADDUI64(counter_flow_get_used_eval_busy, 1); FBLOCK_UNLOCK(fb); continue; } /** never prune a flow that is used by a packet or stream msg * we are currently processing in one of the threads */ if (f->use_cnt > 0) { //计数器大于0,说明有packe引用这个flow,不能抢别人的flow STATSADDUI64(counter_flow_get_used_eval_busy, 1); FBLOCK_UNLOCK(fb); FLOWLOCK_UNLOCK(f); continue; } //这个函数判断这个flow,最近是否使用过, //0判断的依据就是最新时间和这个flow的时间差,小于一定秒数则最近使用过,不能拿走 if (StillAlive(f, ts)) { STATSADDUI64(counter_flow_get_used_eval_reject, 1); FBLOCK_UNLOCK(fb); FLOWLOCK_UNLOCK(f); continue; } //好了,可喜可贺,这个flow计数为0,最近没被更新,就它了 //把它从bucket上拿走 /* remove from the hash */ fb->head = f->next; f->next = NULL; f->fb = NULL; FBLOCK_UNLOCK(fb); //这flow的终止标志,被强行拿走标志FLOW_END_FLAG_FORCED, //flow终止时是在紧急模式,设置标志FLOW_END_FLAG_EMERGENCY /* rest of the flags is updated on-demand in output */ f->flow_end_flags |= FLOW_END_FLAG_FORCED; if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) f->flow_end_flags |= FLOW_END_FLAG_EMERGENCY; /* invoke flow log api */ #ifdef UNITTESTS if (dtv) { #endif //输出flow日志 if (dtv->output_flow_thread_data) { (void)OutputFlowLog(tv, dtv->output_flow_thread_data, f); } #ifdef UNITTESTS } #endif //清除flow中的一些资源,主要是是否flow关联的tcp会话的内存即f->protoctx //flow结构体后的storage空间,void *数组,重新初始化这个flow FlowClearMemory(f, f->protomap); /* leave locked */ STATSADDUI64(counter_flow_get_used_eval, tried); return f; } STATSADDUI64(counter_flow_get_used_failed, 1); return NULL; }