Java原生结合MQTTX---完成心跳对话(附带源码)
简言:✨当Java遇上MQTT:打造会"隔空传话"的魔法程序✨
导语:想不想让两个Java程序像哈利波特里的双面镜一样实时对话?今天我们将用MQTT协议+EMQX,在Ubuntu上搭建一个魔法邮局,再亲手编写会传信的Java程序!gitCode平台附赠【案例源码】🔥
源码地址:https://blog.csdn.net/huangzhe0701/article/details/145205822
参考文档:
- 在 Ubuntu 上安装 EMQX:https://docs.emqx.com/zh/emqx/latest/deploy/install-ubuntu.html
- MQTTX 下载:https://mqttx.app/zh/downloads
一、🛠️ 搭建魔法邮局(EMQX服务器)
扩展:在安装EMQX前记得先更新先软件包
apt update
1. 安装EMQX企业版
在Ubuntu终端输入以下咒语:
# 下载魔法卷轴(安装包) wget https://www.emqx.com/zh/downloads/enterprise/5.9.0/emqx-enterprise-5.9.0-ubuntu24.04-amd64.deb
# 解开卷轴封印 sudo dpkg -i emqx-enterprise-5.9.0-ubuntu20.04-amd64.deb
# 启动邮局服务 sudo systemctl start emqx
2. 打开魔法管理台
浏览器访问 http://localhost:18083,默认账号admin/public,你将看到:
二、📱 准备第一个信使(MQTTX客户端)
安装MQTTX桌面版
安装地址:https://mqttx.app/zh/downloads
打开后新建连接:
- 名称:魔法邮箱_varin.cn
- 服务器:varin:1883
三、⚡ 编写会魔法的Java程序
1. 添加咒语依赖(Maven)
org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.5
2. MQTT连接核心代码
package cn.varin; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.junit.jupiter.api.Test; public class MqttConnectionTest { /** * * * String serverURI,(mqtt服务端地址) * String clientId,(客户端id) * MqttClientPersistence persistence(内存持久类) * * */ public static String serviceURL= "tcp://varin.cn:1883"; public static String clientId="varya_test_01"; public static MqttClient client; String user="varya"; String password= "123456"; static { try { // 建立一个mqqt客户端类 client = new MqttClient(serviceURL,clientId,new MemoryPersistence()); } catch (MqttException e) { throw new RuntimeException(e); } } // 创建mqtt连接 @Test public void createConnectionTest() throws MqttException { MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setUserName(user); mqttConnectOptions.setPassword(password.toCharArray()); client.connect(mqttConnectOptions); // 注意:因为用test类方法执行的话,太快,可能看不出来,是否正真的建立的连接,所以添加一个死循环来保持程序的存在。 while (true); } }
2. MQTT发送消息
package cn.varin; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.junit.jupiter.api.Test; public class MqttSendMessageTest { /** * * * String serverURI,(mqtt服务端地址) * String clientId,(客户端id) * MqttClientPersistence persistence(内存持久类) * * */ public static String serviceURL= "tcp://varin.cn:1883"; public static String clientId="varya_test_01"; public static MqttClient client; public static String user="varya"; public static String password= "123456"; static { try { // 建立一个mqqt客户端类 client = new MqttClient(serviceURL,clientId,new MemoryPersistence()); MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setUserName(user); mqttConnectOptions.setPassword(password.toCharArray()); client.connect(mqttConnectOptions); } catch (MqttException e) { throw new RuntimeException(e); } } // 发送消息 @Test public void SendMessageTest() throws MqttException { // 设置消息 String message = "hello mqttx Client"; MqttMessage mqttMessage = new MqttMessage(message.getBytes()); mqttMessage.setQos(2); // 当一个主题的消息设置了 setRetained(true) 后,这条消息会存储在 Broker 中。如果后续有新的客户端订阅这个主题,则无论何时订阅,都会立即收到最近的一条带有 retained 属性的消息作为初始数据2。如果没有设置 retained 或者之前的消息未被保留,则新订阅者不会接收到任何历史消息。, mqttMessage.setRetained(true); // 发送消息 client.publish("java_and_mqttx_conn", mqttMessage); //发送完,关闭连接 client.disconnect(); client.close(); } }
3. MQTT接收消息
package cn.varin; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.junit.jupiter.api.Test; public class MqttReceiveMessageTest { /** * * * String serverURI,(mqtt服务端地址) * String clientId,(客户端id) * MqttClientPersistence persistence(内存持久类) * * */ public static String serviceURL= "tcp://varin.cn:1883"; public static String clientId="varya_test_01"; public static MqttClient client; public static String user="varya"; public static String password= "123456"; static { try { // 建立一个mqqt客户端类 client = new MqttClient(serviceURL,clientId,new MemoryPersistence()); MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); mqttConnectOptions.setUserName(user); mqttConnectOptions.setPassword(password.toCharArray()); client.connect(mqttConnectOptions); } catch (MqttException e) { throw new RuntimeException(e); } } // 接受消息 @Test public void SendMessageTest() throws MqttException { client.subscribe("java_and_mqttx_conn",2); // 建立接收消息回调 client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable throwable) { // 连接丢失时调用 System.out.println("cooection error"); } @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { // 接收到消息时调用 System.out.println("来自主题:"+s); System.out.println("接收到的消息为:"+new String(mqttMessage.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { // 解释消息结束时调用 System.out.println("deliveryComplete"); } }); // 为了保持test类的连接,建立一个死循环语句 while (true); } }
四、🐉 彩蛋:会喷火的测试恐龙
参考文章:https://blog.csdn.net/huangzhe0701/article/details/145205822
在终端运行:
mosquitto_pub -h localhost -t "java_and_mqttx_conn" -m "恐龙喷火啦~🔥"
观察Java程序是否输出火焰日志!
五、💡 常见魔法失效对策
- 检查1883端口是否被麻瓜防火墙阻挡
- 确认EMQX服务像打人柳一样活跃(sudo systemctl status emqx)
- Java依赖是否像魔药材料一样齐全
结语:现在你的Java程序已经获得了通信魔法!快来用MQTT实现更多神奇功能吧~ 如果(程序)不显形,欢迎在评论区召唤帮忙!🎩
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。