github.com/zq2599/blog_demos
内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;
为了完成本篇的实战,前文[《disruptor笔记之二:Disruptor类分析》]已做了充分的研究分析,建议观看,这里简单回顾以下Disruptor类的几个核心功能,这也是咱们编码时要实现的:
名称 | 链接 | 备注 |
---|---|---|
项目主页 | github.com/zq2599/blog_demos | 该项目在GitHub上的主页 |
git仓库地址(https) | github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https协议 |
git仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh协议 |
plugins { id 'org.springframework.boot' } dependencies { implementation 'org.projectlombok:lombok' implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'com.lmax:disruptor' testImplementation('org.springframework.boot:spring-boot-starter-test') }
package com.bolingcavalry; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class LowLevelOperateApplication { public static void main(String[] args) { SpringApplication.run(LowLevelOperateApplication.class, args); } }
package com.bolingcavalry.service; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; @Data @ToString @NoArgsConstructor public class StringEvent { private String value; }
package com.bolingcavalry.service; import com.lmax.disruptor.EventFactory; public class StringEventFactory implements EventFactory<StringEvent> { @Override public StringEvent newInstance() { return new StringEvent(); } }
package com.bolingcavalry.service; import com.lmax.disruptor.RingBuffer; public class StringEventProducer { // 存储数据的环形队列 private final RingBuffer<StringEvent> ringBuffer; public StringEventProducer(RingBuffer<StringEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(String content) { // ringBuffer是个队列,其next方法返回的是下最后一条记录之后的位置,这是个可用位置 long sequence = ringBuffer.next(); try { // sequence位置取出的事件是空事件 StringEvent stringEvent = ringBuffer.get(sequence); // 空事件添加业务信息 stringEvent.setValue(content); } finally { // 发布 ringBuffer.publish(sequence); } } }
package com.bolingcavalry.service; import com.lmax.disruptor.EventHandler; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import java.util.function.Consumer; @Slf4j public class StringEventHandler implements EventHandler<StringEvent> { public StringEventHandler(Consumer<?> consumer) { this.consumer = consumer; } // 外部可以传入Consumer实现类,每处理一条消息的时候,consumer的accept方法就会被执行一次 private Consumer<?> consumer; @Override public void onEvent(StringEvent event, long sequence, boolean endOfBatch) throws Exception { log.info("sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event); // 这里延时100ms,模拟消费事件的逻辑的耗时 Thread.sleep(100); // 如果外部传入了consumer,就要执行一次accept方法 if (null!=consumer) { consumer.accept(null); } } }
package com.bolingcavalry.service; public interface LowLevelOperateService { /** * 消费者数量 */ int CONSUMER_NUM = 3; /** * 环形缓冲区大小 */ int BUFFER_SIZE = 16; /** * 发布一个事件 * @param value * @return */ void publish(String value); /** * 返回已经处理的任务总数 * @return */ long eventCount(); }
package com.bolingcavalry.service.impl; import com.bolingcavalry.service.*; import com.lmax.disruptor.BatchEventProcessor; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @Service("oneConsumer") @Slf4j public class OneConsumerServiceImpl implements LowLevelOperateService { private RingBuffer<StringEvent> ringBuffer; private StringEventProducer producer; /** * 统计消息总数 */ private final AtomicLong eventCount = new AtomicLong(); private ExecutorService executors; @PostConstruct private void init() { // 准备一个匿名类,传给disruptor的事件处理类, // 这样每次处理事件时,都会将已经处理事件的总数打印出来 Consumer<?> eventCountPrinter = new Consumer<Object>() { @Override public void accept(Object o) { long count = eventCount.incrementAndGet(); log.info("receive [{}] event", count); } }; // 创建环形队列实例 ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE); // 准备线程池 executors = Executors.newFixedThreadPool(1); //创建SequenceBarrier SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); // 创建事件处理的工作类,里面执行StringEventHandler处理事件 BatchEventProcessor<StringEvent> batchEventProcessor = new BatchEventProcessor<>( ringBuffer, sequenceBarrier, new StringEventHandler(eventCountPrinter)); // 将消费者的sequence传给环形队列 ringBuffer.addGatingSequences(batchEventProcessor.getSequence()); // 在一个独立线程中取事件并消费 executors.submit(batchEventProcessor); // 生产者 producer = new StringEventProducer(ringBuffer); } @Override public void publish(String value) { producer.onData(value); } @Override public long eventCount() { return eventCount.get(); } }
package com.bolingcavalry.service.impl; import com.bolingcavalry.service.LowLevelOperateService; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import static org.junit.Assert.assertEquals; @RunWith(SpringRunner.class) @SpringBootTest @Slf4j public class LowLeverOperateServiceImplTest { @Autowired @Qualifier("oneConsumer") LowLevelOperateService oneConsumer; private static final int EVENT_COUNT = 100; private void testLowLevelOperateService(LowLevelOperateService service, int eventCount, int expectEventCount) throws InterruptedException { for(int i=0;i<eventCount;i++) { log.info("publich {}", i); service.publish(String.valueOf(i)); } // 异步消费,因此需要延时等待 Thread.sleep(10000); // 消费的事件总数应该等于发布的事件数 assertEquals(expectEventCount, service.eventCount()); } @Test public void testOneConsumer() throws InterruptedException { log.info("start testOneConsumerService"); testLowLevelOperateService(oneConsumer, EVENT_COUNT, EVENT_COUNT); }
package com.bolingcavalry.service.impl; import com.bolingcavalry.service.*; import com.lmax.disruptor.BatchEventProcessor; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @Service("multiConsumer") @Slf4j public class MultiConsumerServiceImpl implements LowLevelOperateService { private RingBuffer<StringEvent> ringBuffer; private StringEventProducer producer; /** * 统计消息总数 */ private final AtomicLong eventCount = new AtomicLong(); /** * 生产一个BatchEventProcessor实例,并且启动独立线程开始获取和消费消息 * @param executorService */ private void addProcessor(ExecutorService executorService) { // 准备一个匿名类,传给disruptor的事件处理类, // 这样每次处理事件时,都会将已经处理事件的总数打印出来 Consumer<?> eventCountPrinter = new Consumer<Object>() { @Override public void accept(Object o) { long count = eventCount.incrementAndGet(); log.info("receive [{}] event", count); } }; BatchEventProcessor<StringEvent> batchEventProcessor = new BatchEventProcessor<>( ringBuffer, ringBuffer.newBarrier(), new StringEventHandler(eventCountPrinter)); // 将当前消费者的sequence实例传给ringBuffer ringBuffer.addGatingSequences(batchEventProcessor.getSequence()); // 启动独立线程获取和消费事件 executorService.submit(batchEventProcessor); } @PostConstruct private void init() { ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE); ExecutorService executorService = Executors.newFixedThreadPool(CONSUMER_NUM); // 创建多个消费者,并在独立线程中获取和消费事件 for (int i=0;i<CONSUMER_NUM;i++) { addProcessor(executorService); } // 生产者 producer = new StringEventProducer(ringBuffer); } @Override public void publish(String value) { producer.onData(value); } @Override public long eventCount() { return eventCount.get(); } }
上述代码和前面的OneConsumerServiceImpl相比差别不大,主要是创建了多个BatchEventProcessor实例,然后分别在线程池中提交;
验证方法依旧是单元测试,在刚才的LowLeverOperateServiceImplTest.java中增加代码即可,注意testLowLevelOperateService的第三个参数是EVENT_COUNT * LowLevelOperateService.CONSUMER_NUM,表示预期的被消费消息数为300:
@Autowired @Qualifier("multiConsumer") LowLevelOperateService multiConsumer; @Test public void testMultiConsumer() throws InterruptedException { log.info("start testMultiConsumer"); testLowLevelOperateService(multiConsumer, EVENT_COUNT, EVENT_COUNT * LowLevelOperateService.CONSUMER_NUM); }
本篇的最后一个实战是发布100个事件,然后让三个消费者共同消费100个(例如A消费33个,B消费33个,C消费34个);
前面用到的BatchEventProcessor是用来独立消费的,不适合多个消费者共同消费,这种多个消费共同消费的场景需要借助WorkerPool来完成,这个名字还是很形象的:一个池子里面有很多个工作者,把任务放入这个池子,工作者们每人处理一部分,大家合力将任务完成;
传入WorkerPool的消费者需要实现WorkHandler接口,于是新增一个实现类:
package com.bolingcavalry.service; import com.lmax.disruptor.WorkHandler; import lombok.extern.slf4j.Slf4j; import java.util.function.Consumer; @Slf4j public class StringWorkHandler implements WorkHandler<StringEvent> { public StringWorkHandler(Consumer<?> consumer) { this.consumer = consumer; } // 外部可以传入Consumer实现类,每处理一条消息的时候,consumer的accept方法就会被执行一次 private Consumer<?> consumer; @Override public void onEvent(StringEvent event) throws Exception { log.info("work handler event : {}", event); // 这里延时100ms,模拟消费事件的逻辑的耗时 Thread.sleep(100); // 如果外部传入了consumer,就要执行一次accept方法 if (null!=consumer) { consumer.accept(null); } } }
package com.bolingcavalry.service.impl; import com.bolingcavalry.service.*; import com.lmax.disruptor.*; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @Service("workerPoolConsumer") @Slf4j public class WorkerPoolConsumerServiceImpl implements LowLevelOperateService { private RingBuffer<StringEvent> ringBuffer; private StringEventProducer producer; /** * 统计消息总数 */ private final AtomicLong eventCount = new AtomicLong(); @PostConstruct private void init() { ringBuffer = RingBuffer.createSingleProducer(new StringEventFactory(), BUFFER_SIZE); ExecutorService executorService = Executors.newFixedThreadPool(CONSUMER_NUM); StringWorkHandler[] handlers = new StringWorkHandler[CONSUMER_NUM]; // 创建多个StringWorkHandler实例,放入一个数组中 for (int i=0;i < CONSUMER_NUM;i++) { handlers[i] = new StringWorkHandler(o -> { long count = eventCount.incrementAndGet(); log.info("receive [{}] event", count); }); } // 创建WorkerPool实例,将StringWorkHandler实例的数组传进去,代表共同消费者的数量 WorkerPool<StringEvent> workerPool = new WorkerPool<>(ringBuffer, ringBuffer.newBarrier(), new IgnoreExceptionHandler(), handlers); // 这一句很重要,去掉就会出现重复消费同一个事件的问题 ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); workerPool.start(executorService); // 生产者 producer = new StringEventProducer(ringBuffer); } @Override public void publish(String value) { producer.onData(value); } @Override public long eventCount() { return eventCount.get(); } }
StringWorkHandler数组传入给WorkerPool后,每个StringWorkHandler实例都放入一个新的WorkProcessor实例,WorkProcessor实现了Runnable接口,在执行workerPool.start时,会将WorkProcessor提交到线程池中;
和前面的独立消费相比,共同消费最大的特点在于只调用了一次ringBuffer.addGatingSequences方法,也就是说三个消费者共用一个sequence实例;
@Autowired @Qualifier("workerPoolConsumer") LowLevelOperateService workerPoolConsumer; @Test public void testWorkerPoolConsumer() throws InterruptedException { log.info("start testWorkerPoolConsumer"); testLowLevelOperateService(workerPoolConsumer, EVENT_COUNT, EVENT_COUNT); }
我是欣宸,期待与您一同畅游Java世界…