在v2版本的MyDisruptor实现多消费者、消费者组间依赖功能后。按照计划,v3版本的MyDisruptor需要支持多线程消费者的功能。
由于该文属于系列博客的一部分,需要先对之前的博客内容有所了解才能更好地理解本篇博客
/** * 多线程消费者(仿Disruptor.WorkerPool) * */ public class MyWorkerPool<T> { private final MySequence workSequence = new MySequence(-1); private final MyRingBuffer<T> myRingBuffer; private final List<MyWorkProcessor<T>> workEventProcessorList; public MyWorkerPool( MyRingBuffer<T> myRingBuffer, MySequenceBarrier mySequenceBarrier, MyWorkHandler<T>... myWorkHandlerList) { this.myRingBuffer = myRingBuffer; final int numWorkers = myWorkHandlerList.length; this.workEventProcessorList = new ArrayList<>(numWorkers); // 为每个自定义事件消费逻辑MyEventHandler,创建一个对应的MyWorkProcessor去处理 for (MyWorkHandler<T> myEventConsumer : myWorkHandlerList) { workEventProcessorList.add(new MyWorkProcessor<>( myRingBuffer, myEventConsumer, mySequenceBarrier, this.workSequence)); } } /** * 返回包括每个workerEventProcessor + workerPool自身的序列号集合 * */ public MySequence[] getCurrentWorkerSequences() { final MySequence[] sequences = new MySequence[this.workEventProcessorList.size() + 1]; for (int i = 0, size = workEventProcessorList.size(); i < size; i++) { sequences[i] = workEventProcessorList.get(i).getCurrentConsumeSequence(); } sequences[sequences.length - 1] = workSequence; return sequences; } public MyRingBuffer<T> start(final Executor executor) { final long cursor = myRingBuffer.getCurrentProducerSequence().get(); workSequence.set(cursor); for (MyWorkProcessor<?> processor : workEventProcessorList) { processor.getCurrentConsumeSequence().set(cursor); executor.execute(processor); } return this.myRingBuffer; } } /** * 多线程消费者-事件处理器接口 * */ public interface MyWorkHandler<T> { /** * 消费者消费事件 * @param event 事件对象本身 * */ void consume(T event); }
接下来是本篇博客的重点部分,MyWorkProcessor的实现。
/** * 多线程消费者工作线程 (仿Disruptor.WorkProcessor) * */ public class MyWorkProcessor<T> implements Runnable{ private final MySequence currentConsumeSequence = new MySequence(-1); private final MyRingBuffer<T> myRingBuffer; private final MyWorkHandler<T> myWorkHandler; private final MySequenceBarrier sequenceBarrier; private final MySequence workGroupSequence; public MyWorkProcessor(MyRingBuffer<T> myRingBuffer, MyWorkHandler<T> myWorkHandler, MySequenceBarrier sequenceBarrier, MySequence workGroupSequence) { this.myRingBuffer = myRingBuffer; this.myWorkHandler = myWorkHandler; this.sequenceBarrier = sequenceBarrier; this.workGroupSequence = workGroupSequence; } public MySequence getCurrentConsumeSequence() { return currentConsumeSequence; } @Override public void run() { long nextConsumerIndex = this.currentConsumeSequence.get() + 1; // 设置哨兵值,保证第一次循环时nextConsumerIndex <= cachedAvailableSequence一定为false,走else分支通过序列屏障获得最大的可用序列号 long cachedAvailableSequence = Long.MIN_VALUE; // 最近是否处理过了序列 boolean processedSequence = true; while (true) { try { if(processedSequence) { // 争抢到了一个新的待消费序列,但还未实际进行消费(标记为false) processedSequence = false; // 如果已经处理过序列,则重新cas的争抢一个新的待消费序列 do { nextConsumerIndex = this.workGroupSequence.get() + 1L; // 由于currentConsumeSequence会被注册到生产者侧,因此需要始终和workGroupSequence worker组的实际sequence保持协调 // 即当前worker的消费序列currentConsumeSequence = 当前消费者组的序列workGroupSequence this.currentConsumeSequence.lazySet(nextConsumerIndex - 1L); // 问题:只使用workGroupSequence,每个worker不维护currentConsumeSequence行不行? // 回答:这是不行的。因为和单线程消费者的行为一样,都是具体的消费者eventHandler/workHandler执行过之后才更新消费者的序列号,令其对外部可见(生产者、下游消费者) // 因为消费依赖关系中约定,对于序列i事件只有在上游的消费者消费过后(eventHandler/workHandler执行过),下游才能消费序列i的事件 // workGroupSequence主要是用于通过cas协调同一workerPool内消费者线程序列争抢的,对外的约束依然需要workProcessor本地的消费者序列currentConsumeSequence来控制 // cas更新,保证每个worker线程都会获取到唯一的一个sequence } while (!workGroupSequence.compareAndSet(nextConsumerIndex - 1L, nextConsumerIndex)); }else{ // processedSequence == false(手头上存在一个还未消费的序列) // 走到这里说明之前拿到了一个新的消费序列,但是由于nextConsumerIndex > cachedAvailableSequence,没有实际执行消费逻辑 // 而是被阻塞后返回获得了最新的cachedAvailableSequence,重新执行一次循环走到了这里 // 需要先把手头上的这个序列给消费掉,才能继续拿下一个消费序列 } // cachedAvailableSequence只会存在两种情况 // 1 第一次循环,初始化为Long.MIN_VALUE,则必定会走到下面的else分支中 // 2 非第一次循环,则cachedAvailableSequence为序列屏障所允许的最大可消费序列 if (nextConsumerIndex <= cachedAvailableSequence) { // 争抢到的消费序列是满足要求的(小于序列屏障值,被序列屏障允许的),则调用消费者进行实际的消费 // 取出可以消费的下标对应的事件,交给eventConsumer消费 T event = myRingBuffer.get(nextConsumerIndex); this.myWorkHandler.consume(event); // 实际调用消费者进行消费了,标记为true.这样一来就可以在下次循环中cas争抢下一个新的消费序列了 processedSequence = true; } else { // 1 第一次循环会获取当前序列屏障的最大可消费序列 // 2 非第一次循环,说明争抢到的序列超过了屏障序列的最大值,等待生产者推进到争抢到的sequence cachedAvailableSequence = sequenceBarrier.getAvailableConsumeSequence(nextConsumerIndex); } } catch (final Throwable ex) { // 消费者消费时发生了异常,也认为是成功消费了,避免阻塞消费序列 // 下次循环会cas争抢一个新的消费序列 processedSequence = true; } } } }
v3版本支持了多线程消费者功能,下面通过一个demo来展示如何使用该功能。
public class MyRingBufferV3Demo { /** * -> 多线程消费者B(依赖A) * 单线程消费者A -> 单线程消费者D(依赖B、C) * -> 单线程消费者C(依赖A) * */ public static void main(String[] args) throws InterruptedException { // 环形队列容量为16(2的4次方) int ringBufferSize = 16; // 创建环形队列 MyRingBuffer<OrderEventModel> myRingBuffer = MyRingBuffer.createSingleProducer( new OrderEventProducer(), ringBufferSize, new MyBlockingWaitStrategy()); // 获得ringBuffer的序列屏障(最上游的序列屏障内只维护生产者的序列) MySequenceBarrier mySequenceBarrier = myRingBuffer.newBarrier(); // ================================== 基于生产者序列屏障,创建消费者A MyBatchEventProcessor<OrderEventModel> eventProcessorA = new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerA"), mySequenceBarrier); MySequence consumeSequenceA = eventProcessorA.getCurrentConsumeSequence(); // RingBuffer监听消费者A的序列 myRingBuffer.addGatingConsumerSequenceList(consumeSequenceA); // ================================== 消费者组依赖上游的消费者A,通过消费者A的序列号创建序列屏障(构成消费的顺序依赖) MySequenceBarrier workerSequenceBarrier = myRingBuffer.newBarrier(consumeSequenceA); // 基于序列屏障,创建多线程消费者B MyWorkerPool<OrderEventModel> workerPoolProcessorB = new MyWorkerPool<>(myRingBuffer, workerSequenceBarrier, new OrderWorkHandlerDemo("workerHandler1"), new OrderWorkHandlerDemo("workerHandler2"), new OrderWorkHandlerDemo("workerHandler3")); MySequence[] workerSequences = workerPoolProcessorB.getCurrentWorkerSequences(); // RingBuffer监听消费者C的序列 myRingBuffer.addGatingConsumerSequenceList(workerSequences); // ================================== 通过消费者A的序列号创建序列屏障(构成消费的顺序依赖),创建消费者C MySequenceBarrier mySequenceBarrierC = myRingBuffer.newBarrier(consumeSequenceA); MyBatchEventProcessor<OrderEventModel> eventProcessorC = new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerC"), mySequenceBarrierC); MySequence consumeSequenceC = eventProcessorC.getCurrentConsumeSequence(); // RingBuffer监听消费者C的序列 myRingBuffer.addGatingConsumerSequenceList(consumeSequenceC); // ================================== 基于多线程消费者B,单线程消费者C的序列屏障,创建消费者D MySequence[] bAndCSequenceArr = new MySequence[workerSequences.length+1]; // 把多线程消费者B的序列复制到合并的序列数组中 System.arraycopy(workerSequences, 0, bAndCSequenceArr, 0, workerSequences.length); // 数组的最后一位是消费者C的序列 bAndCSequenceArr[bAndCSequenceArr.length-1] = consumeSequenceC; MySequenceBarrier mySequenceBarrierD = myRingBuffer.newBarrier(bAndCSequenceArr); MyBatchEventProcessor<OrderEventModel> eventProcessorD = new MyBatchEventProcessor<>(myRingBuffer, new OrderEventHandlerDemo("consumerD"), mySequenceBarrierD); MySequence consumeSequenceD = eventProcessorD.getCurrentConsumeSequence(); // RingBuffer监听消费者D的序列 myRingBuffer.addGatingConsumerSequenceList(consumeSequenceD); // 启动消费者线程A new Thread(eventProcessorA).start(); // 启动workerPool多线程消费者B workerPoolProcessorB.start(Executors.newFixedThreadPool(100, new ThreadFactory() { private final AtomicInteger mCount = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { return new Thread(r,"worker" + mCount.getAndIncrement()); } })); // 启动消费者线程C new Thread(eventProcessorC).start(); // 启动消费者线程D new Thread(eventProcessorD).start(); // 生产者发布100个事件 for(int i=0; i<100; i++) { long nextIndex = myRingBuffer.next(); OrderEventModel orderEvent = myRingBuffer.get(nextIndex); orderEvent.setMessage("message-"+i); orderEvent.setPrice(i * 10); System.out.println("生产者发布事件:" + orderEvent); myRingBuffer.publish(nextIndex); } } }
disruptor无论在整体设计还是最终代码实现上都有很多值得反复琢磨和学习的细节,希望能帮助到对disruptor感兴趣的小伙伴。
本篇博客的完整代码在我的github上:https://github.com/1399852153/MyDisruptor 分支:feature/lab3