MQTT-SpringBoot整合
MQTT-SpringBoot
创建简单 SpringBoot 项目
导入必须依赖
pom.xml
4.0.0 com.study MqttDemo 0.0.1-SNAPSHOT SpringBootMqttDemo SpringBootMqttDemo 1.8 UTF-8 UTF-8 2.6.13 org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-integration org.springframework.integration spring-integration-mqtt 5.4.3 org.projectlombok lombok com.alibaba fastjson 1.2.83 org.springframework.boot spring-boot-dependencies ${spring-boot.version} pom import org.apache.maven.plugins maven-compiler-plugin 3.8.1 1.8 1.8 UTF-8 org.springframework.boot spring-boot-maven-plugin ${spring-boot.version} com.study.mqtt.demo.MqttDemoApplication true repackage repackage
增加MQTT相关配置
application.yml
spring: mqtt: # mqtt 服务器地址 url: tcp://192.168.40.128:1883 # 订阅客户端ID subClientId: sub_client_id_1 # 订阅主题 subTopic: lq/iot/demo/ # 发布客户端ID pubClientId: pub_client_id_1 # 用户名 username: admin # 密码 password: admin123456
编写对应Java类
配置类
MqttConfig.java
package com.study.mqtt.demo.domain; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; @Data @ConfigurationProperties(prefix = "spring.mqtt") public class MqttConfig { private String username; private String password; private String url; private String subClientId ; private String subTopic ; private String pubClientId ; }
启动类增加开启配置
MqttDemoApplication.java
package com.study.mqtt.demo; import com.study.mqtt.demo.domain.MqttConfig; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; @SpringBootApplication @EnableConfigurationProperties(value = MqttConfig.class) public class MqttDemoApplication { public static void main(String[] args) { SpringApplication.run(MqttDemoApplication.class, args); } }
创建MQTT连接工厂类
MqttFactory.java
package com.study.mqtt.demo.factory; import com.study.mqtt.demo.domain.MqttConfig; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; @Configuration public class MqttFactory { @Autowired private MqttConfig mqttConfig; @Bean public MqttPahoClientFactory mqttClientFactory() { // 创建客户端工厂 DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(mqttConfig.getUsername()); options.setPassword(mqttConfig.getPassword().toCharArray()); options.setServerURIs(new String[]{mqttConfig.getUrl()}); options.setCleanSession(true); factory.setConnectionOptions(options); return factory; } }
接收消息处理类
ReceiveMsgHandler.java
package com.study.mqtt.demo.handler; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessagingException; import org.springframework.stereotype.Component; @Component public class ReceiveMsgHandler implements MessageHandler { @Override public void handleMessage(Message message) throws MessagingException { System.out.println("接收到消息对象:" + message); // 消息内容 Object payload = message.getPayload(); MessageHeaders headers = message.getHeaders(); Object mqttReceivedTopic = headers.get("mqtt_receivedTopic"); System.out.println("接收的消息主题:" + mqttReceivedTopic); System.out.println("接收的消息内容:" + payload); } }
接收消息配置类
MqttInboundConfig.java
package com.study.mqtt.demo.inbound; import com.study.mqtt.demo.domain.MqttConfig; import com.study.mqtt.demo.factory.MqttFactory; import com.study.mqtt.demo.handler.ReceiveMsgHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @Configuration public class MqttInboundConfig { @Autowired private MqttConfig mqttConfig ; @Autowired private ReceiveMsgHandler receiveMsgHandler; /** * 配置消息接收通道 * @return */ @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } /** * 配置接收适配器 */ @Bean public MessageProducer messageProducer(MqttPahoClientFactory mqttPahoClientFactory) { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl() , mqttConfig.getSubClientId() , mqttPahoClientFactory , mqttConfig.getSubTopic().split(",")) ; adapter.setConverter(new DefaultPahoMessageConverter()); // 质量服务等级 adapter.setQos(1); adapter.setOutputChannel(mqttInputChannel()); return adapter ; } /** * 配置接收消息处理器 * @return */ @Bean @ServiceActivator(inputChannel = "mqttInputChannel") // 指定处理消息使用得通道 public MessageHandler messageHandler() { return this.receiveMsgHandler ; } }
发送消息配置类
MqttOutboundConfig.java
package com.study.mqtt.demo.outbound; import com.study.mqtt.demo.domain.MqttConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @Configuration public class MqttOutboundConfig { @Autowired private MqttConfig mqttConfig; @Autowired private MqttPahoClientFactory pahoClientFactory ; @Bean public MessageChannel mqttOutputChannel() { return new DirectChannel(); } @Bean @ServiceActivator(inputChannel = "mqttOutputChannel") public MessageHandler mqttOutboundMassageHandler() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getUrl() , mqttConfig.getPubClientId() , pahoClientFactory ) ; messageHandler.setAsync(true); messageHandler.setDefaultQos(0); messageHandler.setDefaultTopic("default"); return messageHandler ; } }
发送消息网关接口类
MqttGateway.java
package com.study.mqtt.demo.gateway; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; @MessagingGateway(defaultRequestChannel = "mqttOutputChannel") public interface MqttGateway { /** * 发送mqtt消息 * @param topic 主题 * @param payload 内容 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); /** * 发送包含qos的消息 * @param topic 主题 * @param qos 对消息处理的几种机制。 * * 0 发送成功就算完成,会出现消息丢失 * * 1 增加消息重试机制,消息发送失败会重新发送,会出现重复消息 * * 2 多了一次去重的动作,确保只有一次消息推给订阅者。 * @param payload 消息体 */ void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); }
发送消息服务类
MqttMsgSenderService.java
package com.study.mqtt.demo.service; import com.study.mqtt.demo.gateway.MqttGateway; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class MqttMsgSenderService { @Autowired private MqttGateway mqttGateway; public void send(String topic, String payload) { mqttGateway.sendToMqtt(topic, payload); } public void send(String topic, int qos, String payload) { mqttGateway.sendToMqtt(topic, qos, payload); } }
测试验证
订阅消息验证
- 启动项目
- 发送消息
- 主题为配置文件中配置的订阅主题 lq/iot/demo/
- 发送时间: 2025-05-25 21:29:26:439
- 订阅收到消息
- 接收到消息的时间:Sun May 25 21:29:26 GMT+08:00 2025
- 接收到的主题:lq/iot/demo/
- 接收到的内容:{ "msg":"spring boot mqtt demo" }
发送消息验证
- 编写测试类
- 发送主题:sb/mqtt/test
- 发送内容:hello world !=> 当前时间
package com.study.mqtt.demo; import com.study.mqtt.demo.service.MqttMsgSenderService; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import java.util.Date; @SpringBootTest(classes = MqttDemoApplication.class) class MqttDemoApplicationTests { @Autowired private MqttMsgSenderService mqttMsgSenderService; @Test void contextLoads() { } @Test void sendMsg(){ mqttMsgSenderService.send("sb/mqtt/test", "hello world ! => " + new Date()); } }
- 编写测试类
- 订阅收到消息
- 发送消息
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。