- release-3.2分支源码
以MQTT协议接入为例,分析设备如何连接Thingsboard。一般情况下,使用Mqtt客户端连接Mqtt服务端只需发送CONNECT消息,等待服务端返回CONNACK消息即可,示意图如下:
入口:MqttTransportService
,核心处理类为MqttTransportHandler
通过 init方法进行初始化,主要是使用Netty构造服务端并完成监听。
log.info("Setting resource leak detector level to {}", leakDetectorLevel);
//设置内存泄漏检测级别
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));
log.info("Starting MQTT transport...");
//创建主循环组
bossGroup = new NioEventLoopGroup(bossGroupThreadCount);
//创建工作循环组
workerGroup = new NioEventLoopGroup(workerGroupThreadCount);
//创建服务端引导实例并设置
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new MqttTransportServerInitializer(context))
.childOption(ChannelOption.SO_KEEPALIVE, keepAlive);
//监听地址和端口
serverChannel = b.bind(host, port).sync().channel();
log.info("Mqtt transport started!");
当有客户端连接时,MqttTransportServerInitializer初始化管道处理链
//获取channel管道
ChannelPipeline pipeline = ch.pipeline();
String remoteAddress = ch.remoteAddress().toString();
log.info("init channel for address: [{}]",remoteAddress);
SslHandler sslHandler = null;
//如果有SslHandlerProvider,增加ssl handler
if (context.getSslHandlerProvider() != null) {
sslHandler = context.getSslHandlerProvider().getSslHandler();
pipeline.addLast(sslHandler);
}
//增加Mqtt解码器到管道
pipeline.addLast("decoder", new MqttDecoder(context.getMaxPayloadSize()));
//增加Mqtt编码器到管道
pipeline.addLast("encoder", MqttEncoder.INSTANCE);
//新建Mqtt处理Handler,用于处理Mqtt消息
MqttTransportHandler handler = new MqttTransportHandler(context, sslHandler);
//增加MqttHandler到管道
pipeline.addLast(handler);
//channel关闭监听增加MqttHandler
ch.closeFuture().addListener(handler);
在MqttTransportHandler中进行Mqtt消息处理,以一个认证为AccessToken的设备进行连接举例,核心处理流程如下:
//MqttTransportHandler 132
processMqttMsg(ctx, (MqttMessage) msg);
//MqttTransportHandler 154
processConnect(ctx, (MqttConnectMessage) msg);
//MqttTransportHandler 474
processAuthTokenConnect(ctx, msg);
//MqttTransportHandler 492
//构造请求消息,调用transportService处理请求消息
transportService.process(DeviceTransportType.MQTT, request.build(),
//DefaultTransportService 271
//调用doProcess方法处理protoMsg消息,protoMsg中包含请求消息。
doProcess(transportType, protoMsg, callback);
//DefaultTransportService 283
//调用transportApiRequestTemplate的send方法处理消息
ListenableFuture<ValidateDeviceCredentialsResponse> response = Futures.transform(transportApiRequestTemplate.send(protoMsg), tmp -> {
//DefaultTbQueueRequestTemplate 180
//构造TopicPartitionInfo,使用requestTemplate(类型为TbQueueProducer)发送请求到指定消息队列的指定TOPIC中
//TbQueueProducer有多种实现,常见的是InMemoryTbQueueProducer和TbKafkaProducerTemplate
requestTemplate.send(TopicPartitionInfo.builder().topic(requestTemplate.getDefaultTopic()).build(), request, new TbQueueCallback() {
//InMemoryTbQueueProducer 42 (作为requestTemplate的实现,在单体架构下调用)
//调用storage(ConcurrentHashMap+LinkedBlockingQueue)存放消息,根据结果设置callback
boolean result = storage.put(tpi.getFullTopicName(), msg);
if (result) {
if (callback != null) {
callback.onSuccess(null);
}
} else {
if (callback != null) {
callback.onFailure(new RuntimeException("Failure add msg to InMemoryQueue"));
}
}
//TbKafkaProducerTemplate 82 (作为requestTemplate的实现,在微服务架构下设置消息队列为kafka时使用,其他消息队列类似)
//使用Kafka生产者向Kafka Borker中发送消息,等待处理结果,然后根据异常是否为空callback对应属性。
producer.send(record, (metadata, exception) -> {
if (exception == null) {
if (callback != null) {
callback.onSuccess(new KafkaTbQueueMsgMetadata(metadata));
}
} else {
if (callback != null) {
callback.onFailure(exception);
} else {
log.warn("Producer template failure: {}", exception.getMessage(), exception);
}
}
});
//DefaultTbQueueResponseTemplate 116
//调用handler(类型为TbQueueHandler,具体实现为DefaultTransportApiService)处理请求(request),异步获取响应(response)结果
AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(request),
//DefaultTbQueueResponseTemplate 120
//调用responseTemplate(类型为TbQueueProducer)发送响应结果(response)到指定消息中间件的指定TOPIC中,TbQueueProducer有多种实现,常见的是InMemoryTbQueueProducer和TbKafkaProducerTemplate
responseTemplate.send(TopicPartitionInfo.builder().topic(responseTopic).build(), response, null);
//InMemoryTbQueueProducer 42 (作为responseTemplate的实现,在单体架构下调用)
//同上
//TbKafkaProducerTemplate 82 (作为responseTemplate的实现,在微服务架构下设置消息队列为kafka时使用,其他消息队列类似)
//同上
//DefaultTbQueueRequestTemplate 94
//调用responseTemplate(类型为TbQueueConsumer)获取消息
List<Response> responses = responseTemplate.poll(pollInterval);
//DefaultTbQueueRequestTemplate 106
//设置future为response
expectedResponse.future.set(response);
//DefaultTransportService 303
//异步调用TransportServiceCallback onSuccess方法
AsyncCallbackTemplate.withCallback(response, callback::onSuccess, callback::onError, transportCallbackExecutor);
//MqttTransportHandler 494
//调用onValidateDeviceResponse验证返回信息
onValidateDeviceResponse(msg, ctx, connectMessage);
//MqttTransportHandler 646
//调用transportService处理结果消息
transportService.process(deviceSessionCtx.getSessionInfo(),DefaultTransportService.getSessionEventMsg(SessionEvent.OPEN), new TransportServiceCallback<Void>() {
//DefaultTransportService 360
//调用sendToDeviceActor处理会话信息
sendToDeviceActor(sessionInfo, TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
.setSessionEvent(msg).build(), callback);
//DefaultTransportService 760
//调用tbCoreMsgProducer(类型为TbQueueProducer)往消息中间件发送请求
tbCoreMsgProducer.send(tpi,
new TbProtoQueueMsg<>(getRoutingKey(sessionInfo),
ToCoreMsg.newBuilder().setToDeviceActorMsg(toDeviceActorMsg).build()),
wrappedCallback);
//InMemoryTbQueueProducer 42 (作为tbCoreMsgProducer的具体实现,在单体架构下调用)
//同上
//TbKafkaProducerTemplate 82 (作为tbCoreMsgProducer的具体实现,在微服务架构下设置消息队列为kafka时使用,其他消息队列类似)
//同上
//MqttTransportHandler 651
//channel上下文中写入并刷新CONNACK消息。
ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED, connectMessage));
设备认证示意图:
由于很多太多的异步操作,导致时序图太复杂,暂时没有太好的办法绘制,有一张半成品在TIPS中可下载,感兴趣的同学可以了解下.
- 设备连接时序图半成品
- TOPIC信息:请求TOPIC:
tb_transport.api.requests
,可配置TB_QUEUE_TRANSPORT_API_REQUEST_TOPIC
进行修改,返回结果TOPIC:tb_transport.api.response
+service.id
,可配置TB_QUEUE_TRANSPORT_API_RESPONSE_TOPIC
进行修改。 - 消息队列key计算:使用DeviceId作为key,可参考
DefaultTransportService
733行,以Kafka为例,消费者可根据hash(key)进行消费,确保单个设备消息被同一消费者消费,