实现Reactor&Netty进行反应式tcp网络通信。
<!-- netty --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.21.Final</version> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.4.5</version> </dependency> <dependency> <groupId>io.projectreactor.addons</groupId> <artifactId>reactor-extra</artifactId> <version>3.4.3</version> </dependency> <dependency> <groupId>org.reactivestreams</groupId> <artifactId>reactive-streams</artifactId> <version>1.0.3</version> </dependency> <dependency> <groupId>io.projectreactor.netty</groupId> <artifactId>reactor-netty</artifactId> <version>1.0.6</version> </dependency> <dependency> <groupId>io.projectreactor.netty</groupId> <artifactId>reactor-netty-core</artifactId> <version>1.0.6</version> </dependency>
package reactornettyexamples.tcp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; import reactor.netty.Connection; import java.time.Duration; public class MyServerConnection { private Logger log = LoggerFactory.getLogger(this.getClass()); final Connection conn; public MyServerConnection(Connection conn) { this.conn = conn; } public void handle() { conn.inbound().receiveObject() // 从StringToIntegerDecoder接收到作为结果接收到的对象 .log("MyServerConnection") .delayElements(Duration.ofSeconds(1)) // 将下一个元素的处理延迟1秒 .doOnNext(s -> log.info("Current received and decoded element: " + s)) .take(5) // 在收到五个元素后取消(实际上断开了客户端的连接) .flatMap(s -> conn.outbound().sendString( Mono.just(String.format("byte count: %d", (Integer) s)) ).then() ) .subscribe(conn.disposeSubscriber()); // 我们必须按顺序使用该核心订阅者,以使连接在take(5)之后断开连接 } }
package reactornettyexamples.tcp; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.SelfSignedCertificate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.netty.Connection; import reactor.netty.tcp.TcpServer; import java.security.cert.CertificateException; import java.time.Duration; import java.util.List; public class MyTcpServer { private static Logger log = LoggerFactory.getLogger(MyTcpServer.class); public static void main(String[] args) throws CertificateException, InterruptedException { SelfSignedCertificate cert = new SelfSignedCertificate(); SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(cert.certificate(), cert.privateKey()); TcpServer.create() // 准备要配置的TCP服务器。 .port(1551) // 配置端口号 .secure(spec -> spec.sslContext(sslContextBuilder)) // 使用自签名证书。 //.wiretap() .doOnConnection(MyTcpServer::onConnect) .bindUntilJavaShutdown(Duration.ofSeconds(30), null); // 以阻塞方式启动服务器,并等待其完成初始化。 } private static void onConnect(Connection conn) { conn.addHandler(new StringToIntegerDecoder()); // 将处理程序添加到netty管道 MyServerConnection myConn = new MyServerConnection(conn); log.info("New client connected: {}", conn); myConn.handle(); } /** * 返回接收字节数的解码器。 */ public static class StringToIntegerDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { int n = in.readableBytes(); if (in.readableBytes() > 0) { in.readBytes(n); out.add(n); // 将解码器的结果存储在此处 } } } }
package reactornettyexamples.tcp; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; import reactor.netty.tcp.TcpClient; import java.time.Duration; import java.util.concurrent.CountDownLatch; public class MyTcpClient { private static Logger log = LoggerFactory.getLogger(MyTcpClient.class); public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); StringBuilder toSend = new StringBuilder("a"); TcpClient.create() // 准备要配置的TCP客户端 .port(1551) // 服务端口 // 配置SSL,以提供已配置的SslContext。 .secure(spec -> spec .sslContext(SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE))) //.wiretap() .doOnConnected(con -> { log.info("Client connected successfully!"); // 下一个导致Publisher <Void>的运算符的下一个序列永远不会自己完成,因此客户端保持永久连接 con.outbound().sendString(Mono.just(toSend.toString())) .then(con.inbound() .receive() .asString() .log("tcp-connection") .doOnNext(s -> log.info("Server returned: " + s)) .flatMap(s -> con.outbound() .sendString(Mono.just(toSend.append("a").toString())) .then() ) ) .then() .subscribe(); }) .doOnDisconnected(con -> { log.info("Server disconnected!"); latch.countDown(); }) .connect() // 方式1:简单重试,retry()动作是当操作序列发生错误后重新订阅序列。 // 重试3次 //.retry(3) // 方式2:backoff方法返回就其实是Retry的子类RetryBackoffSpec。它需要两个参数:最大重试次数和最小间隔时间。 // 最多重试3次,每次的最短时间间隔为5秒 //.retryWhen(Retry.backoff(3, Duration.ofSeconds(5))) // 方式3:fixedDelay方法返回的也是RetryBackoffSpec。它需要两个参数:最大重试次数和固定的间隔时间。 // 最大重试3次,固定延迟5秒 //.retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(5))) // 方式4:from方法。它需要一个Function函数。 /*.retryWhen(Retry.from((retrySignals) -> { return retrySignals.map(rs -> getNumberOfTries(rs)); }))*/ // 方式5:withThrowable方法。它和from有一点类似也是接收一个Function,但是它的参数是异常。 /*.retryWhen(Retry.withThrowable((retrySignals) -> { return retrySignals.map(rs -> { if(rs instanceof Exception) { throw new RuntimeException("重试错误"); } else { return rs; } }); }))*/ .log("tcp-client") .doOnError(e -> log.error("Error connecting to server ... " + e.getMessage())) //.retryBackoff(Long.MAX_VALUE, Duration.ofSeconds(3), Duration.ofSeconds(10)) // 重试服务 .block(); latch.await(); // 客户端正在运行,直到服务器断开与客户端的连接 } private Long getNumberOfTries(Retry.RetrySignal rs) { log.info("重试:" + rs.totalRetries()); if (rs.totalRetries() < 3) { return rs.totalRetries(); } else { log.error("retries exhausted"); throw Exceptions.propagate(rs.failure()); } } }