本文将介绍在spring boot项目中如何集成mqtt协议。
pom文件中添加依赖
1 2 3 4 5 6 7 8 9 10 11 12
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
|
在application-dev.yml配置MQTT服务器信息
1 2 3 4 5 6 7 8 9 10
| mqtt: username: clientTest password: 123456 url: tcp: producer: clientId: serve_producer defaultTopic: topic1 consumer: clientId: serve_consumer defaultTopic: topic1
|
编写spring boot的mqtt配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
| @Slf4j @Configuration public class MqttConfig {
@Autowired private MqttEntity mqttEntity;
@Autowired private MqttCallbackHandle mqttCallbackHandle;
public static final String CHANNEL_NAME_IN = "mqttInboundChannel";
public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
private static final byte[] WILL_DATA;
static { WILL_DATA = "offline".getBytes(); }
public MqttConnectOptions getMqttConnectOptions() { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setCleanSession(true); mqttConnectOptions.setUserName(mqttEntity.getUsername()); mqttConnectOptions.setPassword(mqttEntity.getPassword().toCharArray()); mqttConnectOptions.setServerURIs(mqttEntity.getUrl().split(",")); mqttConnectOptions.setConnectionTimeout(30); mqttConnectOptions.setKeepAliveInterval(60); mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false); return mqttConnectOptions; }
@Bean public MqttPahoClientFactory mqttClientFactory () { DefaultMqttPahoClientFactory defaultMqttPahoClientFactory = new DefaultMqttPahoClientFactory(); defaultMqttPahoClientFactory.setConnectionOptions(getMqttConnectOptions()); return defaultMqttPahoClientFactory; }
@Bean(name = CHANNEL_NAME_OUT) public MessageChannel mqttOutBoundChannel(){ return new DirectChannel(); }
@Bean @ServiceActivator(inputChannel = CHANNEL_NAME_OUT) public MessageHandler mqttOutbound(){ MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( mqttEntity.getProducerClientId(), mqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultTopic(mqttEntity.getProducerDefaultTopic()); return messageHandler; }
@MessagingGateway(defaultRequestChannel = CHANNEL_NAME_OUT) public interface MqttProducer { void sendToMqtt(String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); }
@Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( mqttEntity.getConsumerClientId(), mqttClientFactory(), mqttEntity.getConsumerDefaultTopic().split(","));
adapter.addTopic(DefContants.CONSUMER_TEST_TOPIC); adapter.addTopic(DefContants.CONSUMER_SYS_CLIENTS_TOPIC); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(1); adapter.setOutputChannel(mqttInBoundChannel()); return adapter; }
@Bean(name = CHANNEL_NAME_IN) public MessageChannel mqttInBoundChannel(){ return new DirectChannel(); }
@Bean @ServiceActivator(inputChannel = CHANNEL_NAME_IN) public MessageHandler handler() { return message -> { String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); String payload = message.getPayload().toString(); mqttCallbackHandle.handle(topic, payload); }; }
}
|
MQTT消息推送API
1 2 3 4 5 6 7 8 9 10 11 12
| @ApiOperation(value = "发送指定数据", notes = "发送指定数据") @GetMapping(value = "/device/syncdata") public Result<?> addOwner(@RequestParam String params) { if (StringUtils.isEmpty(params)) { return Result.error("发送参数不能为空"); }
Map<String, Object> map = new HashMap<>(); map.put("params", params); mqttProducer.sendToMqtt(DefContants.PRODUCER_TEST_TOPIC , JacksonUtils.writeValue(map)); return Result.ok(); }
|
MQTT消息处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public void handle(String topic, String payload){ log.info("MqttCallbackHandle:" + topic + "---"+ payload);
if (topic.endsWith(DefContants.CONSUMER_SYS_CLIENTS_CONNECTED_TOPIC)){ log.info("设备上线");
} else if (topic.endsWith(DefContants.CONSUMER_SYS_CLIENTS_DISCONNECTED_TOPIC)){ log.info("设备离线");
} else if (topic.equals(DefContants.CONSUMER_TEST_TOPIC)){ log.info("自定义定义消息: " + payload); } }
|
mqttx 客户端消息发送
程序日志输出
1 2 3 4 5 6
| 2020-01-19 23:23:15.506 INFO 11232 --- [ serve_consumer] cn.dreamchan.mqtt.MqttCallbackHandle : MqttCallbackHandle:comsumer_topic---{ "msg": "hello" } 2020-01-19 23:23:15.507 INFO 11232 --- [ serve_consumer] cn.dreamchan.mqtt.MqttCallbackHandle : 自定义定义消息: { "msg": "hello" }
|
打开http://localhost:9080/swagger-ui.html 地址,调用API向mqttx 客户端发送消息。
文中代码详见 springboot-mqtt-web