基于Java提供的对象输入/输出流ObjectInputStream
和ObjectOutputStream
,可以直接把Java对象作为可存储的字节数组,写入文件,也可以传输到网络上。对程序员来说,基于JDK默认的序列化机制,可以避免操作底层的字节数组,从而提高开发效率。
Java序列化的目的主要有两个:
Netty的NIO网络开发,主要关注的是网络传输方面。
当进行跨进程服务调用时,需要把被传输的Java对象编码为字节数组或者ByteBuffer对象。当远程服务读取到ByteBuffer或者字节数组时,需要将其解码为发送时的Java对象,这被称为Java对象编解码技术。
Java序列化仅仅是Java编解码技术的一种,由于它的种种缺陷,衍生出了很多种编解码技术和框架,后续章节我们会结合Netty介绍几种业界主流的编解码技术和框架。
Java序列化从JDK1.1版本就已经提供,它不需要添加额外的类库,只需要实现 java.io.Serializable 并生成序列ID即可,因此,它从诞生之初就得到了广泛的应用。
但是在RPC(远程服务调用)时,很少直接使用Java序列化进行消息的编解码和传输。
无法跨语言是Java序列化最致命的问题,对于跨进程的服务调用,服务提供者可能会使用C++或者其他语言开发,当我们需要和异构语言进行交互时,Java序列化就难以胜任。
由于Java序列化技术是Java语言内部的私有协议,其他语言并不支持,对于用户来说他完全是黑盒。对于Java序列化后的字节数组,别的语言无法进行反序列化,这就严重阻碍了它的应用。事实上,目前几乎所有流行的Java RPC 通信框架,都没有使用Java序列化作为编解码框架,原因就在于它无法跨语言,而这些RPC框架往往需要支持跨语言调用。
我们通过一个实例来看下Java序列化后的字节数组大小:
先创建一个Java对象:
package com.lsh.serializable; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; import java.nio.ByteBuffer; /** * @author :LiuShihao * @date :Created in 2021/4/9 12:28 下午 * @desc : * UserInfo是一个普通的Java对象,实现了Serializable接口,并生成了序列化ID,可以通过JDK的Java序列化机制进行序列化和反序列化 * */ //getset方法 @Data //全参构造 @AllArgsConstructor //无参构造 @NoArgsConstructor public class UserInfo implements Serializable { private static final long serialVersionUID = -8018991226271912056L; private String userName; private int userID; /** * * 使用基于ByteBuffer的通用二进制编解码技术对UserInfo对象进行编码,编码结果仍然是byte数组, * 可以与传统的JDK序列化后的码流大小进行对比 * * Buffer的属性: * 容量(capacity):缓冲区能够容纳的数据元素的最大数量。这一容量在缓冲区创建时被设定,并且永远不能被改变 * 上界(limit):缓冲区的第一个不能被读或写的元素。或者说,缓冲区中现存元素的计数 * 位置(position):下一个要被读或写的元素的索引。位置会自动由相应的 get( )和 put( )函数更新 * 标记(mark):下一个要被读或写的元素的索引。位置会自动由相应的 get( )和 put( )函数更新一个备忘位置。调用 mark( )来设定 mark = postion。调用 reset( )设定 position =mark。标记在设定前是未定义的(undefined)。这四个属性之间总是遵循以下关系:0 <= mark <= position <= limit <= capacity * * @return */ public byte[] codeC(){ ByteBuffer buffer = ByteBuffer.allocate(1024); byte[] value = this.getUserName().getBytes(); buffer.putInt(value.length); //put() :相对写,向position的位置写入一个byte,并将postion+1,为下次读写作准备 buffer.put(value); buffer.putInt(this.userID); //Buffer有两种模式,写模式和读模式。在写模式下调用flip()之后,Buffer从写模式变成读模式。 buffer.flip(); //remaining() 返回limit和position之间相对位置差 byte[] result = new byte[buffer.remaining()]; buffer.get(result); return result; } }
然后在对比两种序列化编码后的码流的大小:
package com.lsh.serializable; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; /** * @author :LiuShihao * @date :Created in 2021/4/9 12:40 下午 * @desc : */ public class TestUserInfo { /** * 先调用两种编码接口对UserInfo对象编码,然后分别打印两者编码后的码流的大小 * @param args * @throws IOException */ public static void main(String[] args) throws IOException { UserInfo userInfo = new UserInfo("Welcomde to Netty",100); ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutputStream os = new ObjectOutputStream(bos); os.writeObject(userInfo); os.flush(); os.close(); byte[] b = bos.toByteArray(); System.out.println("The jdk serializable length is :"+b.length); bos.close(); System.out.println("------------------------------------------"); System.out.println("The byte array serializable length is :"+userInfo.codeC().length); } }
运行结果:
测试结果发现,采用JDK序列化机制编码后的二进制数组大小竟然是二进制编码的5倍。
我们评判一个编解码框架的优劣时,往往会考虑一下几个因素:
在同等情况下,编码后的字节数组越大,存储的时候越占用空间,存储的硬件成本越高,并且在网络传输时更占宽带,导致系统的吞吐量降低,Java序列化后的码流偏大,也一直被行业所诟病,导致它的应用范围受到了很大限制。
对UserInfo进行改造,新增一个方法,在创建一个性能测试版本的UserInfo测试程序:
在UserInfo类中新增下面这个方法:
public byte[] codeC(ByteBuffer buffer){ buffer.clear(); byte[] value = this.userName.getBytes(); buffer.putInt(value.length); buffer.put(value); buffer.putInt(this.userID); buffer.flip(); value = null; byte[] result = new byte[buffer.remaining()]; buffer.get(result); return result; }
编码性能测试类:
package com.lsh.serializable; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.nio.ByteBuffer; /** * @author :LiuShihao * @date :Created in 2021/4/12 10:51 上午 * @desc :编码性能测试类 */ public class PerformTestUserInfo { public static void main(String[] args) throws IOException { UserInfo userInfo = new UserInfo("Welcome Netty",100); int loop = 1000000; ByteArrayOutputStream bos = null; ObjectOutputStream os = null; long startTime = System.currentTimeMillis(); for (int i = 0; i < loop; i++) { bos = new ByteArrayOutputStream(); os = new ObjectOutputStream(bos); os.writeObject(userInfo); os.flush(); os.close(); byte[] b = bos.toByteArray(); bos.close(); } long endTime = System.currentTimeMillis(); System.out.println("The JDK serializable cost time is :"+(endTime-startTime) + "ms"); System.out.println("------------------------------------------------------------"); ByteBuffer buffer = ByteBuffer.allocate(1024); startTime = System.currentTimeMillis(); for (int i = 0; i < loop ; i++) { byte[] b = userInfo.codeC(buffer); } endTime = System.currentTimeMillis(); System.out.println("The byte array serializable cost time is :"+(endTime-startTime) +"ms"); } }
运行结果:
结果也非常明显,Java序列化的性能只有二进制编码的3.7%左右。可见Java序列化的性能是在太差。
总结:
所以:无论是序列化后的码流大小,还是序列化的性能,JDK默认的序列化机制表现的都很差,因此我们通常不会使用Java序列化作为远程跨节点调用的编解码框架。
Protobuf全称Google Protocol Buffers ,它是有谷歌开源而来,在谷歌内部久经考验。它的数据结构以 .proto
文件 进行描述。
它的特点如下:
XML不适合做高性能的通信协议,因为尽管XML的可读性和可扩展性非常好,也非常适合描述数据结构,但是XML解析的时间开销和XML为了可读性而牺牲的空间开销都非常大,因此不适合做高性能的通信协议,Protobuf使用二进制编码,在空间和性能上具有更大的优势。
Thrift源于Facebook,在2007年Facebook将Thrift 作为一个开源项目交给了Apache基金会。对于当时的Facebook来说,创造Thrift是为了解决Facebook各系统间大数据量的传输通信以及系统之间语言环境不同需要跨平台的特性,因此Thrift可以支持多种程序语言,如C++、C#、Cocoa、Erlang、Haskell、Java、Ocami、Perl、PHP、Python、Ruby和Smalltalk。
在多种不同语言之间通信,Thrift可以作为高性能的通信中间件使用,它支持数据(对象)序列化和多种类型的RPC服务。Thrift适用于搭建大型数据交换及存储的通用工具,对于大型系统中的内部数据传输,相对于JSON和XML在性能和传输大小上都有明显的优势。
JBoss Marshalling 是一个Java对象的序列化API包,修正了JDK自带的序列化包的很多问题,但又保持跟java.io.Serializable 接口的兼容,同时又增加了一些可调的参数和附加的特性,并且这些参数和特性可通过工厂类进行配置。
相比于传统的Java序列化机制,它的优点如下:
相比于前面介绍的两种编解码框架,JBoss Marshalling 更多的是在JBoss 内部使用,应用范围有限。
MessagePack
是一个高效的二进制序列化框架,它想JSON一样支持不同语言间的数据交换,但是它的性能更快,序列化之后的码流也更小。MessagePack 在业界得到了非常广泛的应用。
MessagePack的特点如下:
MessagePack 提供了对多语言的支持,官方支持的语言如下:Java、Python、Ruby、Haskell、C#、OCaml、Lua、GO、C、C++等。
MessagePack 的Maven坐标:
<!-- https://mvnrepository.com/artifact/org.msgpack/msgpack --> <dependency> <groupId>org.msgpack</groupId> <artifactId>msgpack</artifactId> <version>0.6.12</version> </dependency>
package com.lsh.msgpack; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import org.msgpack.MessagePack; /** * @author :LiuShihao * @date :Created in 2021/4/12 2:36 下午 * @desc :MessagePack 编码器 * * MsgpackEncoder 继承 MessageToByteEncoder ,它阿静负责将Object类型的Java对象 编码为byte数组,然后写入到ByteBuf中 */ public class MsgpackEncode extends MessageToByteEncoder<Object> { @Override protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception { MessagePack msgpack = new MessagePack(); byte[] raw = msgpack.write(o); byteBuf.writeBytes(raw); } }
package com.lsh.msgpack; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import org.msgpack.MessagePack; import java.util.List; /** * @author :LiuShihao * @date :Created in 2021/4/12 2:39 下午 * @desc :MessagePack 解码器 * * 首先从数据报byteBuf中获取需要解码的byte数组,然后调用MessagePack 的read()方法将其反序列化为 Object对象 * 将解码后的 对象加入到List列表中,这样就完成了MessagePack 的解码操作 */ public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> { @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception { final byte[] array; final int length = byteBuf.readableBytes(); array = new byte[length]; byteBuf.getBytes(byteBuf.readerIndex(),array,0,length); MessagePack msgpack = new MessagePack(); list.add(msgpack.read(array)); } }
我们以Netty原生的Echo程序为例,进行测试,对Echo进行简单改造,传输的对象由字符串改为Java对象,利用MessagePack 对Java对象进行序列化。
直接加入了Netty 提供的LengthFieldPrepender
和 LengthFieldBasedFreamDecoder
解决了TCP粘包和半包问题:在消息头中新增报文长度字段,利用该字段进行半包的编解码。
package com.lsh.netty.echo.mgspackecho; import com.lsh.msgpack.MsgpackDecoder; import com.lsh.msgpack.MsgpackEncoder; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; /** * @author :LiuShihao * @date :Created in 2021年04月12日17:18:40 * @desc : * //注意:巨坑1:使用MessagePack编解码框架 传输的Java对象 一定要加上org.msgpack.annotation.Message 注解@Message 才能 传输 * */ public class EchoClient { private final String host; private final int port; private final int sendNumber; public EchoClient (String host,int port,int sendNumber ){ this.host = host; this.port = port; this.sendNumber = sendNumber; } public void run() throws Exception{ //配置Nio线程组 NioEventLoopGroup group = new NioEventLoopGroup(); try{ Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY,true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,3000) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { /** * 在消息头中新增报文长度字段,利用该字段进行半包的编解码 * 下面利用了Netty 提供的LengthFieldPrepender 和 LengthFieldBasedFrameDecoder * 结合新开发的MessagePack 编解码框架,实现对TCP粘包/半包的支持。 * 在MessagePack编码器之前,增加LengthFieldPrepender 它将在ByteBuf之前增加2个字节的消息长度字段 * 如: * +-----------------+ +---------+--------------+ * | "HELLO,WORLD" | ---> | 0X000C |"HELLO,WORLD" | * +-----------------+ +---------+--------------+ * 在MessagePack解码器之前增加LengthFieldBasedFrameDecoder,用于处理半包消息,这样后面的MessagePack接收到的永远是整包消息, * +---------+--------------+ +-----------------+ * | 0X000C |"HELLO,WORLD" | ---> | "HELLO,WORLD" | * +---------+--------------+ +-----------------+ */ ch.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(65535,0,2,0,2)); ch.pipeline().addLast("msgpack decoder",new MsgpackDecoder()); ch.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2)); ch.pipeline().addLast("msgpack encoder",new MsgpackEncoder()); ch.pipeline().addLast(new EchoClientHandler(sendNumber)); } }); //发起异步连接操作 ChannelFuture f = b.connect(host, port).sync(); //等待客户端链路关闭 f.channel().closeFuture().sync(); }finally { //优雅退出,释放线程组资源 group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; String host = "127.0.0.1"; int sendNumber = 10; new EchoClient(host,port,sendNumber).run(); } }
package com.lsh.netty.echo.mgspackecho; import com.lsh.serializable.UserInfo; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; /** * @author :LiuShihao * @date :Created in 2021年04月12日17:18:40 * @desc : */ public class EchoClientHandler extends ChannelHandlerAdapter { private final int sendNuber ; public EchoClientHandler(int sendNuber){ this.sendNuber = sendNuber; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { UserInfo[] infos = UserInfo(); for (UserInfo info : infos) { ctx.write(info); } ctx.flush(); } private UserInfo[] UserInfo(){ UserInfo[] userInfos = new UserInfo[sendNuber]; UserInfo userInfo = null; for (int i = 0; i < sendNuber; i++) { userInfo = new UserInfo(); userInfo.setUserID(i); userInfo.setUserName("ABCDEFG --->"+i); userInfos[i] = userInfo; } System.out.println("userInfos.length : "+userInfos.length); return userInfos; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{ System.out.println("Client receive the magpack message :"+msg); // ctx.write(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
package com.lsh.netty.echo.mgspackecho; import com.lsh.msgpack.MsgpackDecoder; import com.lsh.msgpack.MsgpackEncoder; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; /** * @author :LiuShihao * @date :Created in 2021年04月12日17:18:40 * @desc : */ public class EchoServer { private final int port; public EchoServer(int port){ this.port = port; } public void run() throws Exception{ //配置服务端的NIO线程组 NioEventLoopGroup acceptorGroup = new NioEventLoopGroup(); NioEventLoopGroup IOGroup = new NioEventLoopGroup(); try{ ServerBootstrap b = new ServerBootstrap(); b.group(acceptorGroup,IOGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { /** * 利用Netty 的半包编码和解码器 LengthFieldPrepender 和LengthFieldBasedFrameDecoder 可以轻松解决TCP粘包和半包问题。 */ ch.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(65535,0,2,0,2)); ch.pipeline().addLast("msgpack decoder",new MsgpackDecoder()); ch.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2)); ch.pipeline().addLast("msgpack encoder",new MsgpackEncoder()); ch.pipeline().addLast(new EchoServerHandler()); } }); //绑定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); //等待服务端监听端口关闭 f.channel().closeFuture().sync(); }finally { //优雅退出 释放线程池资源 acceptorGroup.shutdownGracefully(); IOGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; new EchoServer(port).run(); } }
package com.lsh.netty.echo.mgspackecho; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; /** * @author :LiuShihao * @date :Created in 2021年04月12日17:18:40 * @desc : */ public class EchoServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //注意 坑2:此处不能将 msg 转换为 UserInfo类型 否则无法传输 // UserInfo userInfo = (UserInfo)msg; System.out.println("Server receive the magpack message :"+msg); ctx.writeAndFlush(msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); //发生异常,关闭链路; ctx.close(); } }
package com.lsh.serializable; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.msgpack.annotation.Message; import java.io.Serializable; import java.nio.ByteBuffer; /** * @author :LiuShihao * @date :Created in 2021/4/9 12:28 下午 * @desc : * UserInfo是一个普通的Java对象,实现了Serializable接口,并生成了序列化ID,可以通过JDK的Java序列化机制进行序列化和反序列化 * */ //注意:巨坑:使用MessagePack编解码框架 传输的Java对象 一定要加上org.msgpack.annotation.Message 注解 才能 传输 @Message //getset方法 @Data //全参构造 @AllArgsConstructor //无参构造 @NoArgsConstructor public class UserInfo implements Serializable { private static final long serialVersionUID = -8018991226271912056L; private String userName; private int userID; }
@Message
注解(import org.msgpack.annotation.Message;);
服务端控制台打印情况:
客户端控制台打印情况: