第一次读源码,理解不到位,请多批评
首先找到MQTT的模块,./common/transport/mqtt,我们可以看到该模块是一个使用Netty封装的mqttServer,通过读取配置文件来初始化这个mqttServer
@Service("MqttTransportService") @ConditionalOnExpression("'${service.type:null}'=='tb-transport' || ('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true' && '${transport.mqtt.enabled}'=='true')") @Slf4j public class MqttTransportService { @Value("${transport.mqtt.bind_address}") private String host; @Value("${transport.mqtt.bind_port}") private Integer port; @Value("${transport.mqtt.netty.leak_detector_level}") private String leakDetectorLevel; @Value("${transport.mqtt.netty.boss_group_thread_count}") private Integer bossGroupThreadCount; @Value("${transport.mqtt.netty.worker_group_thread_count}") private Integer workerGroupThreadCount; @Value("${transport.mqtt.netty.so_keep_alive}") private boolean keepAlive; @Autowired private MqttTransportContext context; private Channel serverChannel; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; @PostConstruct public void init() throws Exception { log.info("Setting resource leak detector level to {}", leakDetectorLevel); ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase())); log.info("Starting MQTT transport..."); bossGroup = new NioEventLoopGroup(bossGroupThreadCount); workerGroup = new NioEventLoopGroup(workerGroupThreadCount); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new MqttTransportServerInitializer(context)) .childOption(ChannelOption.SO_KEEPALIVE, keepAlive); serverChannel = b.bind(host, port).sync().channel(); log.info("Mqtt transport started!"); } @PreDestroy public void shutdown() throws InterruptedException { log.info("Stopping MQTT transport!"); try { serverChannel.close().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } log.info("MQTT transport stopped!"); } }
代码中这里初始化了一个handler,来具体处理接收到的消息。(好像是netty的使用方式)
Degbug启动Thingsboard,并让MqttClinet发送连接请求。发现消息是从channelRead函数读入。
进入processMqttMsgh函数
看到此时header中的消息类型是CONNECT
进入processConnect函数
我们这里使用的authToken的方式进行连接的,也就是在设备中定义的token。该token作为用户名传入。不需要密码。
进入processAuthTokenConnect函数
在本函数中,消息被封装成了ValidateBasicMqttCredRequestMsg类型的消息,封装完成之后,调用transportService的process函数,进行消息的传输,将消息传到core模块进行权限校验。
我们看到在process函数中传入了传输类型MQTT,封装好的登录信息,以及校验完成的回调函数。
至此,MQTT登录消息就处理完毕了,封装好的消息将被传输至core模块进行处理。
在process函数中,消息被进一步封装起来,封装成了一个TbProtoQueueMsg<TransportApiRequestMsg>
类型的信息,并增加了一个UUID。
消息被传输到了doProcess函数
我们先将细节屏蔽看看结构。
第一步是下面的代码
第一个参数是函数的原返回值,第二个参数是转换函数(也就是如何将原返回值进行转化的函数),第三个参数是Executor(详见Guava)
也就是第一步,将send函数的返回结果从ListenableFuture<Response>
转换成了ListenableFuture<ValidateDeviceCredentialsResponse>
类型的结果
第二步是
点进去发现实际上是为上一步转换来的response注册listener,也就是注册回调函数。
而这回调函数,就是之前process函数中传入的回调。
综上,doProcess函数将返回值转换成合适的格式,并设置回调函数。下面我们进入transportApiRequestTemplate.send(protoMsg)函数
(TbQueueRequestTemplate实现类)
消息传输准备阶段,我们从里向外看,DefaultTbQueueRequestTemplate的send函数生成了一个可手动设置值的future操作句柄,并这次请求的requestId为key,包含操作句柄future和本次请求的过期时间的对象为value存储在pendingRequests中(pendingRequests可以保证requestId唯一)。而后send函数将这个future句柄返回,在上层的调用函数中被转换格式并设置了对应的回调函数。
从DefaultTbQueueRequestTemplate的send函数中的responseTemplate.send()看起。
可以看到有三个入参
应该是topic等相关信息
可以看到消息将会被发到一个tb_transport.api.requests
的topic中。
reuqest
就是前面封装后的登录信息
队列回调
应该是看是否成功发送消息到指定topic的,成功做什么事,失败做什么事。可以看到失败的时候,pendingRequests删掉了对应requestId的信息,并直接给future设置了失败信息。
可以看到信息传输可以通过很多种方式,我们这里先选择内存队列来看。
可以看到send函数将前面传入的request放入了topicName对应的队列中。topicName和其对应的队列存在一个单例的ConcurrentHashMap中。
这样就相当于将消息放到了topicName对应的队列中了。
responseTemplate.send()有很多中实现,其结果就是将封装好的登录信息送到对应topic的队列中(可能是内存的也可能是消息中间件或其他形式)等待校验逻辑的消费。当传送成功时,会返回成功信息。失败时将会直接给future句柄设置为失败。
前文说明了使用内存队列消息的生产过程,接下来说明一下消息的消费过程。
消息消费的入口在DefaultTbQueueResponseTemplate的init函数中。
init函数中有一个while循环,当stopped不为true的时候,while循环将一直运行
requestTemplate是一个下面类型的接口。
调用其poll(轮询)函数。对于我们的内存队列,其有单独的实现类。
其中关键一步就是
根据topic的名称,从单例的storage中取出该topic下的所有之前消息生产中存放的请求。
这里的handler是transportApiService,在核心服务初始化的时候被设置的。
在DefaultTransportApiService是其实现类。其handle函数如下。
我们找到我们对应的类型
进入validateCredentials函数
终于我们看到了校验的逻辑,在这里使用userName也就是我们传入的auth_token,经过业务校验获取到相关设备信息。
校验成功之后,将返回一个包含设备信息的future句柄。
这一块是成功后的回调。回调主要是要将设备信息发送回去。
我们可以看到,response的header里设置了REQUEST_ID_HEADER
,这个request_id是之前消息发送过来的request中设置的。
responseTopic也是requestHeader中带的,是mqtt模块接收core信息的topic。
response就是handle里面返回的信息。
DefaultTbQueueResponseTemplate的init函数将不停地获取对应topic的队列中的request信息,并将所有的request进行处理校验,之后,将带有requestId和设备信息的response重新发送到内存队列中responseTopic对应的队列里。等待进一步处理。
responseTemplate.send()函数,同样我们使用内存队列。
与消息发送过来的方式相同,将对应的responseTopic的队列中,放置刚获得的设备信息。
TbQueueRequestTemplat接口中同样有一个init方法
其中有一个while循环,不停地获取responseTopic对应队列的信息,这些response信息就是设备相关信息
对于每个response信息,我们获取其header中的requestId,并通过requestId获取pendingRequests中的对应的ResponseMetaData<Response> 类型的
expectedResponse信息,并将其中的future句柄的内容设置为response中的设备信息。
再看看最开始的回调信息
当成功获取到response之后,调用onValidateDeviceResponse进行后续的工作
总的来说大致的流程是这样的,mqtt连接发送到mqtt-server,mqtt-server通过消息队列,将连接请求相关信息发送至core进行权限校验和认证,core进行校验之后,如果成功将设备信息发送到消息队列,消息消费后被预先注册的成功回调函数处理。否则被失败回调函数处理。
https://www.baeldung.com/guava-futures-listenablefuture
https://www.jianshu.com/p/33ac5d394f68