至少每秒十几万读写才考虑使用,否则没有意义
这个队列结构简单,有大小上限,可以适用于多写多读的情况
接口
bool enqueue(const ELEM_T &a_data); // ⼊队列 bool dequeue(ELEM_T &a_data); // 出队列 bool try_dequeue(ELEM_T &a_data); // 尝试⼊队列
成员
ELEM_T m_thequeue[Q_SIZE]; volatile QUEUE_INT m_count; // 队列的元素格式 volatile QUEUE_INT m_writeIndex;//新元素⼊列时存放位置在数组中的下标 volatile QUEUE_INT m_readIndex;//第一个有效数据 volatile QUEUE_INT m_maximumReadIndex; //指向第一个不能读取的数据(可能无效,可能正在写入) //即[m_readIndex, m_maximumReadIndex)是可读的数据 inline QUEUE_INT countToIndex(QUEUE_INT a_count);
队列已满判断:(m_writeIndex + 1) %Q_SIZE == m_readIndex
队列为空判断:m_readIndex == m_maximumReadIndex
bool enqueue(&data) { do { curWriteIndex = m_writeIndex; curReadIndex = m_readIndex; if(ModQ(curWriteIndex + 1) == ModQ(curReadIndex)) { reutrn false; //队列已满 } } while (!CAS(&m_writeIndex, curWriteIndex, curWriteIndex+1); //尝试获取一个curWriteIndex写入数据 //写入数据 m_thequeue[ModQ(curWriteIndex)] = data; while(!CAS(&m_maximumReadIndex, curWriteIndex, (curWriteIndex + 1))) { //所有写入数据的线程更新m_maximumReadIndex的顺序要和他们获取curWriteIndex的顺序一致 sched_yield(); } AtomicAdd(&m_count, 1); return true; }
while(1) { if (queue empty) return false; data = m_thequeue[ModQ(curReadIndex)];//先读数据,再增加index if (CAS(&m_readIndex, curReadIndex, curReadIndex+1)) { AtomicSub(&m_count, 1); return true; } }
如果一个数据一个node,则需要频繁分配内存,分配内存时库里面要加锁,所以不能实现真正的无锁并发。所以我们需要把多个数据从到一个node中,减少内存分配与释放,
并且,node中所有数据被读取后,我们不要急着释放,可以先把node暂存一下,下一次需要新node的时候再循环利用,减少不必要的开销。(队列中数据的总数短时间内大致是不变的)
以下这个队列只支持一写一读的场景,效率较高。
这个队列的实现出自ZMQ,
数据结构分两层,一层只是一个不具备线程安全的队列(yqueue),具有如下结构
begin, back, end的意义和stl相同
(chunk, pos)可以看成一个二维坐标,
(begin_chunk, begin_pos) 是第一个有效元素的位置,(back_chunk, back_pos)是最后一个有效元素位置,(end_chunk, end_pos)是back的后一个位置,指向第一个无效元素。
//init queue.push(); //enqueue queue.back() = value_; queue.push(); //dequeue data = queue.front(); queue.pop();
数据结构的第二层是ypipe,为yqueue提供了线程安全的功能,使用CAS实现
ypipe内部有四个指针
// Points to the first un-flushed item. This variable is used // exclusively by writer thread. T *w; //指向第一个未刷新的元素,只被写线程使用 // Points to the first un-prefetched item. This variable is used // exclusively by reader thread. T *r; //指向第一个还没预提取的元素,只被读线程使用 // Points to the first item to be flushed in the future. T *f; //指向下一轮要被刷新的一批元素中的第一个 // The single point of contention between writer and reader thread. // Points past the last flushed item. If it is NULL, // reader is asleep. This pointer should be always accessed using // atomic operations. atomic_ptr_t<T> c; //读写线程共享的指针,指向每一轮刷新的起点(看代码的时候会详细说)。当c为空时,表示读线程睡眠(只会在读线程中被设置为空)
front() <= r <= c == w <= f <= back() (注意,<=只是逻辑上的,因为有mod Q的存在,实际会不同)
其中c是atomic变量,而且可能是NULL,当c是NULL时表示读端正在睡眠,需要写端唤醒
[front(), r)是可读取的元素(已经预读取的元素,只有读线程才能访问)
[r, c) 是读端可以读取,但是还未预读取的元素,读端预读取(check_read)会导致r = c
c通常情况下等于w,除非读端在睡眠时,会把c设成NULL
[w, f) (已被写入,已经commit,但还未被flush的元素)
当调用flush时,[w, f)之间的元素会被刷新,刷新之后对读端可见
flush意味着 c = w = f
[f, back()] (已被写入但还未commit的元素,也没有被刷新,这些元素只有写线程能访问,而且可以被撤销)当write(data, complete=true)时会f = back()
重点基本就这些了,具体的CAS操作可以看源码,这里略过了
零声教育 3.2.2无锁队列