本文将介绍在spring boot项目中如何集成mqtt协议。

pom文件中添加依赖

1
2
3
4
5
6
7
8
9
10
11
12
 <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>

在application-dev.yml配置MQTT服务器信息

1
2
3
4
5
6
7
8
9
10
mqtt:
username: clientTest
password: 123456
url: tcp://192.168.199.249:1883
producer:
clientId: serve_producer
defaultTopic: topic1
consumer:
clientId: serve_consumer
defaultTopic: topic1

编写spring boot的mqtt配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
@Slf4j
@Configuration
public class MqttConfig {

@Autowired
private MqttEntity mqttEntity;

@Autowired
private MqttCallbackHandle mqttCallbackHandle;

/**
* 订阅的bean名称
*/
public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
/**
* 发布的bean名称
*/
public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";


private static final byte[] WILL_DATA;

static {
WILL_DATA = "offline".getBytes();
}

/**
* mqtt 连接配置
*
* @return
*/
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
// 这里设置为true表示每次连接到服务器都以新的身份连接
mqttConnectOptions.setCleanSession(true);
// 设置连接的用户名
mqttConnectOptions.setUserName(mqttEntity.getUsername());
// 设置连接的密码
mqttConnectOptions.setPassword(mqttEntity.getPassword().toCharArray());
// 设置连接mqtt服务器地址
mqttConnectOptions.setServerURIs(mqttEntity.getUrl().split(","));
// 设置超时时间 单位为秒
mqttConnectOptions.setConnectionTimeout(30);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
mqttConnectOptions.setKeepAliveInterval(60);
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false);
return mqttConnectOptions;
}

/**
* mqtt 客户端
* @return
*/
@Bean
public MqttPahoClientFactory mqttClientFactory () {
DefaultMqttPahoClientFactory defaultMqttPahoClientFactory = new DefaultMqttPahoClientFactory();
defaultMqttPahoClientFactory.setConnectionOptions(getMqttConnectOptions());
return defaultMqttPahoClientFactory;
}

/**
* MQTT信息通道(生产者)
* @return
*/
@Bean(name = CHANNEL_NAME_OUT)
public MessageChannel mqttOutBoundChannel(){
return new DirectChannel();
}

/**
* MQTT消息处理器(生产者)
* @return
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
public MessageHandler mqttOutbound(){
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
mqttEntity.getProducerClientId(),
mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttEntity.getProducerDefaultTopic());
return messageHandler;
}

/**
* 生产者 用于消息发送
*/
@MessagingGateway(defaultRequestChannel = CHANNEL_NAME_OUT)
public interface MqttProducer {
//用于消息发送
void sendToMqtt(String payload);

// 指定topic进行消息发送
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

// 指定topic qos 进行消息发送
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}


/**
* MQTT消息订阅绑定
* @return
*/
@Bean
public MessageProducer inbound() {
// 可以同时消费(订阅)多个Topic
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
mqttEntity.getConsumerClientId(), mqttClientFactory(),
mqttEntity.getConsumerDefaultTopic().split(","));

adapter.addTopic(DefContants.CONSUMER_TEST_TOPIC);
adapter.addTopic(DefContants.CONSUMER_SYS_CLIENTS_TOPIC);
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
// 设置订阅通道
adapter.setOutputChannel(mqttInBoundChannel());
return adapter;
}

/**
* MQTT信息通道(消费者)
* @return
*/
@Bean(name = CHANNEL_NAME_IN)
public MessageChannel mqttInBoundChannel(){
return new DirectChannel();
}


/**
* MQTT消息处理器(消费者)
* ServiceActivator注解表明当前方法用于处理MQTT消息,inputChannel参数指定了用于接收消息信息的channel
* @return
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_IN)
public MessageHandler handler() {
return message -> {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String payload = message.getPayload().toString();
mqttCallbackHandle.handle(topic, payload);
};
}

}

MQTT消息推送API

1
2
3
4
5
6
7
8
9
10
11
12
@ApiOperation(value = "发送指定数据", notes = "发送指定数据")
@GetMapping(value = "/device/syncdata")
public Result<?> addOwner(@RequestParam String params) {
if (StringUtils.isEmpty(params)) {
return Result.error("发送参数不能为空");
}

Map<String, Object> map = new HashMap<>();
map.put("params", params);
mqttProducer.sendToMqtt(DefContants.PRODUCER_TEST_TOPIC , JacksonUtils.writeValue(map));
return Result.ok();
}

MQTT消息处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void handle(String topic, String payload){
log.info("MqttCallbackHandle:" + topic + "---"+ payload);

// 根据topic分别进行消息处理。
if (topic.endsWith(DefContants.CONSUMER_SYS_CLIENTS_CONNECTED_TOPIC)){
// 设备上线
log.info("设备上线");

} else if (topic.endsWith(DefContants.CONSUMER_SYS_CLIENTS_DISCONNECTED_TOPIC)){
// 设备离线
log.info("设备离线");

} else if (topic.equals(DefContants.CONSUMER_TEST_TOPIC)){
// 接收到自定义消息
log.info("自定义定义消息: " + payload);
}
}

mqttx 客户端消息发送

程序日志输出

1
2
3
4
5
6
2020-01-19 23:23:15.506  INFO 11232 --- [ serve_consumer] cn.dreamchan.mqtt.MqttCallbackHandle     : MqttCallbackHandle:comsumer_topic---{
"msg": "hello"
}
2020-01-19 23:23:15.507 INFO 11232 --- [ serve_consumer] cn.dreamchan.mqtt.MqttCallbackHandle : 自定义定义消息: {
"msg": "hello"
}

打开http://localhost:9080/swagger-ui.html 地址,调用API向mqttx 客户端发送消息。

文中代码详见 springboot-mqtt-web