github.com/zq2599/blog_demos
内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;
disruptor是LMAX公司开发的一个高性能队列,其作用和阻塞队列(BlockingQueue)类似,都是在相同进程内、不同线程间传递数据(例如消息、事件),另外disruptor也有自己的一些特色:
作为《disruptor笔记》系列的开篇,本篇有两个任务:
创建名为disruptor-tutorials的gradle工程,作为整个系列的父工程,该系列所有代码都是这个父工程下的module;
在disruptor-tutorials下面新建名为basic-event的module,这是个springboot应用,作用是使用disruptor的基本功能:一个线程发布事件,另一个线程消费事件,也就是对环形队列最基本的操作,如下图:
《Disruptor笔记》系列涉及的环境信息如下:
名称 | 链接 | 备注 |
---|---|---|
项目主页 | github.com/zq2599/blog_demos | 该项目在GitHub上的主页 |
git仓库地址(https) | github.com/zq2599/blog_demos.git | 该项目源码的仓库地址,https协议 |
git仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该项目源码的仓库地址,ssh协议 |
import java.time.OffsetDateTime import java.time.format.DateTimeFormatter buildscript { repositories { maven { url 'https://plugins.gradle.org/m2/' } // 如果有私服就在此配置,如果没有请注释掉 maven { url 'http://192.168.50.43:8081/repository/aliyun-proxy/' } // 阿里云 maven { url 'http://maven.aliyun.com/nexus/content/groups/public/' } mavenCentral() } ext { // 项目版本 projectVersion = '1.0-SNAPSHOT' // sprignboot版本 https://github.com/spring-projects/spring-boot/releases springBootVersion = '2.3.8.RELEASE' } } plugins { id 'java' id 'java-library' id 'org.springframework.boot' version "${springBootVersion}" apply false id 'io.spring.dependency-management' version '1.0.11.RELEASE' id 'net.nemerosa.versioning' version '2.14.0' id 'io.franzbecker.gradle-lombok' version '4.0.0' apply false id 'com.github.ben-manes.versions' version '0.36.0' // gradle dependencyUpdates } // If you attempt to build without the `--scan` parameter in `gradle 6.0+` it will cause a build error that it can't find // a buildScan property to change. This avoids that problem. if (hasProperty('buildScan')) { buildScan { termsOfServiceUrl = 'https://gradle.com/terms-of-service' termsOfServiceAgree = 'yes' } } wrapper { gradleVersion = '6.7.1' } def buildTimeAndDate = OffsetDateTime.now() ext { // 构建时取得当前日期和时间 buildDate = DateTimeFormatter.ISO_LOCAL_DATE.format(buildTimeAndDate) buildTime = DateTimeFormatter.ofPattern('HH:mm:ss.SSSZ').format(buildTimeAndDate) buildRevision = versioning.info.commit } allprojects { apply plugin: 'java' apply plugin: 'idea' apply plugin: 'eclipse' apply plugin: 'io.spring.dependency-management' apply plugin: 'io.franzbecker.gradle-lombok' compileJava { sourceCompatibility = JavaVersion.VERSION_1_8 targetCompatibility = JavaVersion.VERSION_1_8 options.encoding = 'UTF-8' } compileJava.options*.compilerArgs = [ '-Xlint:all', '-Xlint:-processing' ] // Copy LICENSE tasks.withType(Jar) { from(project.rootDir) { include 'LICENSE' into 'META-INF' } } // 写入到MANIFEST.MF中的内容 jar { manifest { attributes( 'Created-By': "${System.properties['java.version']} (${System.properties['java.vendor']} ${System.properties['java.vm.version']})".toString(), 'Built-By': 'travis', 'Build-Date': buildDate, 'Build-Time': buildTime, 'Built-OS': "${System.properties['os.name']}", 'Build-Revision': buildRevision, 'Specification-Title': project.name, 'Specification-Version': projectVersion, 'Specification-Vendor': 'Will Zhao', 'Implementation-Title': project.name, 'Implementation-Version': projectVersion, 'Implementation-Vendor': 'Will Zhao' ) } } repositories { mavenCentral() // 如果有私服就在此配置,如果没有请注释掉 maven { url 'http://192.168.50.43:8081/repository/aliyun-proxy/' } // 阿里云 maven { url 'http://maven.aliyun.com/nexus/content/groups/public/' } jcenter() } buildscript { repositories { maven { url 'https://plugins.gradle.org/m2/' } } } } allprojects { project -> buildscript { dependencyManagement { imports { mavenBom "org.springframework.boot:spring-boot-starter-parent:${springBootVersion}" mavenBom "org.junit:junit-bom:5.7.0" } dependencies { dependency 'org.projectlombok:lombok:1.16.16' dependency 'org.apache.commons:commons-lang3:3.11' dependency 'commons-collections:commons-collections:3.2.2' dependency 'com.lmax:disruptor:3.4.4' } } ext { springFrameworkVersion = dependencyManagement.importedProperties['spring-framework.version'] } } } group = 'bolingcavalry' version = projectVersion
plugins { id 'org.springframework.boot' } dependencies { implementation 'org.projectlombok:lombok' implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'com.lmax:disruptor' testImplementation('org.springframework.boot:spring-boot-starter-test') }
package com.bolingcavalry; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class BasicEventApplication { public static void main(String[] args) { SpringApplication.run(BasicEventApplication.class, args); } }
package com.bolingcavalry.service; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; @Data @ToString @NoArgsConstructor public class StringEvent { private String value; }
package com.bolingcavalry.service; import com.lmax.disruptor.EventFactory; public class StringEventFactory implements EventFactory<StringEvent> { @Override public StringEvent newInstance() { return new StringEvent(); } }
package com.bolingcavalry.service; import com.lmax.disruptor.EventHandler; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import java.util.function.Consumer; @Slf4j public class StringEventHandler implements EventHandler<StringEvent> { public StringEventHandler(Consumer<?> consumer) { this.consumer = consumer; } // 外部可以传入Consumer实现类,每处理一条消息的时候,consumer的accept方法就会被执行一次 private Consumer<?> consumer; @Override public void onEvent(StringEvent event, long sequence, boolean endOfBatch) throws Exception { log.info("sequence [{}], endOfBatch [{}], event : {}", sequence, endOfBatch, event); // 这里延时100ms,模拟消费事件的逻辑的耗时 Thread.sleep(100); // 如果外部传入了consumer,就要执行一次accept方法 if (null!=consumer) { consumer.accept(null); } } }
package com.bolingcavalry.service; import com.lmax.disruptor.RingBuffer; public class StringEventProducer { // 存储数据的环形队列 private final RingBuffer<StringEvent> ringBuffer; public StringEventProducer(RingBuffer<StringEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(String content) { // ringBuffer是个队列,其next方法返回的是下最后一条记录之后的位置,这是个可用位置 long sequence = ringBuffer.next(); try { // sequence位置取出的事件是空事件 StringEvent stringEvent = ringBuffer.get(sequence); // 空事件添加业务信息 stringEvent.setValue(content); } finally { // 发布 ringBuffer.publish(sequence); } } }
package com.bolingcavalry.service.impl; import com.bolingcavalry.service.*; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.util.DaemonThreadFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.time.LocalDateTime; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @Service @Slf4j public class BasicEventServiceImpl implements BasicEventService { private static final int BUFFER_SIZE = 16; private Disruptor<StringEvent> disruptor; private StringEventProducer producer; /** * 统计消息总数 */ private final AtomicLong eventCount = new AtomicLong(); @PostConstruct private void init() { Executor executor = Executors.newCachedThreadPool(); // 实例化 disruptor = new Disruptor<>(new StringEventFactory(), BUFFER_SIZE, new CustomizableThreadFactory("event-handler-")); // 准备一个匿名类,传给disruptor的事件处理类, // 这样每次处理事件时,都会将已经处理事件的总数打印出来 Consumer<?> eventCountPrinter = new Consumer<Object>() { @Override public void accept(Object o) { long count = eventCount.incrementAndGet(); log.info("receive [{}] event", count); } }; // 指定处理类 disruptor.handleEventsWith(new StringEventHandler(eventCountPrinter)); // 启动 disruptor.start(); // 生产者 producer = new StringEventProducer(disruptor.getRingBuffer()); } @Override public void publish(String value) { producer.onData(value); } @Override public long eventCount() { return eventCount.get(); } }
再写一个web接口类,这样就可以通过浏览器验证前面的代码了:
package com.bolingcavalry.controller; import com.bolingcavalry.service.BasicEventService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.*; import java.time.LocalDateTime; @RestController public class BasicEventController { @Autowired BasicEventService basicEventService; @RequestMapping(value = "/{value}", method = RequestMethod.GET) public String publish(@PathVariable("value") String value) { basicEventService.publish(value); return "success, " + LocalDateTime.now().toString(); } }
现在生产事件的接口已准备好,消费事件的代码也完成了,接下来就是如何调用生产事件的接口来验证生产和消费是否正常,这里我选择使用单元测试来验证;
在disruptor-tutorials\basic-event\src\test\java目录下新增测试类BasicEventServiceImplTest.java,测试逻辑是发布了一百个事件,再验证消费事件的数量是否也等于一百:
package com.bolingcavalry.service.impl; import com.bolingcavalry.service.BasicEventService; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import static org.junit.Assert.assertEquals; @RunWith(SpringRunner.class) @SpringBootTest @Slf4j public class BasicEventServiceImplTest { @Autowired BasicEventService basicEventService; @Test public void publish() throws InterruptedException { log.info("start publich test"); int count = 100; for(int i=0;i<count;i++) { log.info("publich {}", i); basicEventService.publish(String.valueOf(i)); } // 异步消费,因此需要延时等待 Thread.sleep(1000); // 消费的事件总数应该等于发布的事件数 assertEquals(count, basicEventService.eventCount()); } }
我是欣宸,期待与您一同畅游Java世界…