步骤:
pd-netty服务的作用是接收司机端上报的车辆定位信息并将信息发送到kafka队列。pd-netty共提供两种方式来接收司机端上报的定位信息:基于netty实现的TCP方式、HTTP接口方式。
配置:
spring: # jackson时间格式化 jackson: time-zone: ${spring.jackson.time-zone} date-format: ${spring.jackson.date-format} servlet: multipart: max-file-size: ${spring.servlet.multipart.max-file-size} max-request-size: ${spring.servlet.multipart.max-request-size} enabled: ${spring.servlet.multipart.enabled} # kafka kafka: bootstrap-servers: ${spring.kafka.bootstrap-servers} listener: # 指定listener 容器中的线程数,用于提高并发量 concurrency: ${spring.kafka.listener.concurrency} producer: retries: ${spring.kafka.producer.retries} batch-size: ${spring.kafka.producer.batch-size} buffer-memory: ${spring.kafka.producer.buffer-memory} key-serializer: ${spring.kafka.producer.key-serializer} value-serializer: ${spring.kafka.producer.value-serializer} consumer: group-id: ${spring.kafka.consumer.group-id}
//LocationEntity实体 package com.itheima.pinda.entity; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.Data; @Data @ApiModel("位置信息") public class LocationEntity { public String getId() { return businessId + "#" + type + "#" + currentTime; } /** * 车辆Id */ @ApiModelProperty("业务id, 快递员id 或者 车辆id") private String businessId; /** * 司机名称 */ @ApiModelProperty("司机名称") private String name; /** * 司机电话 */ @ApiModelProperty("司机电话") private String phone; /** * 车牌号 */ @ApiModelProperty("licensePlate") private String licensePlate; /** * 类型 */ @ApiModelProperty("类型,车辆:truck,快递员:courier") private String type; /** * 经度 */ @ApiModelProperty("经度") private String lng; /** * 维度 */ @ApiModelProperty("维度") private String lat; /** * 当前时间 */ @ApiModelProperty("当前时间 格式:yyyyMMddHHmmss") private String currentTime; @ApiModelProperty("所属车队") private String team; @ApiModelProperty("运输任务id") private String transportTaskId; }
package com.itheima.pinda.service; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Component @Slf4j public class KafkaSender { public final static String MSG_TOPIC = "tms_order_location"; @Autowired private KafkaTemplate<String, String> kafkaTemplate; private static KafkaTemplate<String, String> template; @PostConstruct public void init() { KafkaSender.template = this.kafkaTemplate; } //发送消息到kafka队列 public static boolean send(String topic, String message) { try { template.send(topic, message); log.info("消息发送成功:{} , {}", topic, message); } catch (Exception e) { log.error("消息发送失败:{} , {}", topic, message, e); return false; } return true; } }
Netty 是一款基于 NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,对比于 BIO(Blocking I/O,阻塞IO),他的并发性能得到了很大提高。难能可贵的是,在保证快速和易用性的同时,并没有丧失可维护性和性能等优势。
TCP方式
Netty是由JBOSS提供的一个Java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
. Netty的特点?
• 一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持
• 使用更高效的socket底层,对epoll空轮询引起的cpu占用飙升在内部进行了处理,避免了直接使用NIO的陷阱,简化了NIO的处理方式。
• 采用多种decoder/encoder 支持,对TCP粘包/分包进行自动化处理
• 可使用接受/处理线程池,提高连接效率,对重连、心跳检测的简单支持
• 可配置IO线程数、TCP参数, TCP接收和发送缓冲区使用直接内存代替堆内存,通过内存池的方式循环利用ByteBuf
• 通过引用计数器及时申请释放不再引用的对象,降低了GC频率
• 使用单线程串行化的方式,高效的Reactor线程模型
• 大量使用了volitale、使用了CAS和原子类、线程安全类的使用、读写锁的使用
Netty 中的重要组件?
Channel:Netty 网络操作抽象类,它除了包括基本的 I/O 操作,如 bind、connect、read、write 等。
EventLoop:主要是配合 Channel 处理 I/O 操作,用来处理连接的生命周期中所发生的事情。
ChannelFuture:Netty 框架中所有的 I/O 操作都为异步的,因此我们需要 ChannelFuture 的 addListener()注册一个 ChannelFutureListener 监听事件,当操作执行成功或者失败时,监听就会自动触发返回结果。
ChannelHandler:充当了所有处理入站和出站数据的逻辑容器。ChannelHandler 主要用来处理各种事件,这里的事件很广泛,比如可以是连接、数据接收、异常、数据转换等。
ChannelPipeline:为 ChannelHandler 链提供了容器,当 channel 创建时,就会被自动分配到它专属的 ChannelPipeline,这个关联是永久性的。
package com.itheima.pinda.config; import com.itheima.pinda.service.NettyServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * netty 服务启动类 */ @Component @Slf4j public class NettyServer implements CommandLineRunner { private static NettyServer nettyServer; @PostConstruct public void init() { nettyServer = this; } @Value("${netty.port}") private int port; private EventLoopGroup mainGroup; private EventLoopGroup subGroup; private ServerBootstrap server; private ChannelFuture future; public NettyServer() { // NIO线程组,用于处理网络事件 mainGroup = new NioEventLoopGroup(); subGroup = new NioEventLoopGroup(); // 服务初始化工具,封装初始化服务的复杂代码 server = new ServerBootstrap(); server.group(mainGroup, subGroup) .option(ChannelOption.SO_BACKLOG, 128)// 设置缓存 .childOption(ChannelOption.SO_KEEPALIVE, true) .channel(NioServerSocketChannel.class)// 指定使用NioServerSocketChannel产生一个Channel用来接收连接 .childHandler(new NettyServerHandler());//具体处理网络IO事件 } public void start() { // 启动服务端,绑定端口 this.future = server.bind(nettyServer.port); log.info("Netty Server 启动完毕!!!! 端口:" + nettyServer.port); } @Override public void run(String... args) { this.start(); } }
import com.alibaba.fastjson.JSON; import com.itheima.pinda.entity.LocationEntity; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import java.io.UnsupportedEncodingException; /** * netty 业务处理 */ @Slf4j @ChannelHandler.Sharable //@ChannelHandler.Sharable:表示可以将带这个注释的 ChannelHandler 的同一实例多次添加到一个或多个 ChannelPipelines 中,而不会出现竞争条件。 // 如果未指定此注解,则每次将其添加到管道时都必须创建一个新的处理程序实例,因为它具有成员变量等非共享状态 //处理 I/O 事件或拦截 I/O 操作,并将其转发到其ChannelPipeline下一个处理程序 public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { log.info("ServerHandler.channelRead()"); ByteBuf in = (ByteBuf) msg; try { //接收报文 String body = getRequestBody(in); log.info("报文内容:{}", body); //解析报文 String message = parseMessage(body); if (StringUtils.isBlank(message)) { log.info("报文解析失败"); return; } //发送至kafka队列 KafkaSender.send(KafkaSender.MSG_TOPIC, message); } catch (Exception e) { log.error(e.getMessage()); } finally { //使用完ByteBuf之后,需要主动去释放资源,否则,资源一直在内存中加载,容易造成内存泄漏 ReferenceCountUtil.release(msg); } if (null != in) { //把当前的写指针 writerIndex 恢复到之前保存的 markedWriterIndex值 in.resetWriterIndex(); } } /** * 解析请求内容 * * @param in * @return * @throws UnsupportedEncodingException */ private String getRequestBody(ByteBuf in) throws UnsupportedEncodingException { if (in.readableBytes() <= 0) { return null; } byte[] req = new byte[in.readableBytes()]; in.readBytes(req); return new String(req, "UTF-8"); } /** * 解析报文 * <p> * 设备不同报文也不同,本次设备为移动端,直接使用json格式传输 */ private String parseMessage(String body) { if (StringUtils.isBlank(body)) { log.warn("报文为空"); return null; } body = body.trim(); // 其它格式的报文需要解析后放入MessageEntity实体 LocationEntity message = JSON.parseObject(body, LocationEntity.class); if (message == null || StringUtils.isBlank(message.getType()) || StringUtils.isBlank(message.getBusinessId()) || StringUtils.isBlank(message.getLat()) || StringUtils.isBlank(message.getLng()) || StringUtils.isBlank(message.getId())) { log.warn("报文内容异常"); return null; } String result = JSON.toJSONString(message); return result; } @Override public void channelReadComplete(ChannelHandlerContext ctx) { // 写一个空的buf,并刷新写出区域。完成后关闭sock channel连接。 ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 关闭发生异常的连接 ctx.close(); } }
编写NettyController,通过HTTP接口方式接受司机端上报的车辆位置信息
package com.itheima.pinda.controller; import com.alibaba.fastjson.JSON; import com.itheima.pinda.common.utils.Result; import com.itheima.pinda.entity.LocationEntity; import com.itheima.pinda.service.KafkaSender; import io.swagger.annotations.Api; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @Api(tags = "车辆轨迹服务") @RequestMapping("netty") @Slf4j public class NettyController { @PostMapping(value = "/push") public Result push(@RequestBody LocationEntity locationEntity) { String message = JSON.toJSONString(locationEntity); log.info("HTTP 方式推送位置信息:{}", message); KafkaSender.send(KafkaSender.MSG_TOPIC, message); return Result.ok(); } }