Java原生结合MQTTX---完成心跳对话(附带源码)

06-01 665阅读

简言:✨当Java遇上MQTT:打造会"隔空传话"的魔法程序✨

导语:想不想让两个Java程序像哈利波特里的双面镜一样实时对话?今天我们将用MQTT协议+EMQX,在Ubuntu上搭建一个魔法邮局,再亲手编写会传信的Java程序!gitCode平台附赠【案例源码】🔥


源码地址:https://blog.csdn.net/huangzhe0701/article/details/145205822

参考文档:

  1. 在 Ubuntu 上安装 EMQX:https://docs.emqx.com/zh/emqx/latest/deploy/install-ubuntu.html
  2. MQTTX 下载:https://mqttx.app/zh/downloads

Java原生结合MQTTX---完成心跳对话(附带源码)

一、🛠️ 搭建魔法邮局(EMQX服务器)

扩展:在安装EMQX前记得先更新先软件包

apt update

1. 安装EMQX企业版

在Ubuntu终端输入以下咒语:

# 下载魔法卷轴(安装包)
wget https://www.emqx.com/zh/downloads/enterprise/5.9.0/emqx-enterprise-5.9.0-ubuntu24.04-amd64.deb

Java原生结合MQTTX---完成心跳对话(附带源码)

# 解开卷轴封印
sudo dpkg -i emqx-enterprise-5.9.0-ubuntu20.04-amd64.deb	

Java原生结合MQTTX---完成心跳对话(附带源码)

# 启动邮局服务
sudo systemctl start emqx

2. 打开魔法管理台

浏览器访问 http://localhost:18083,默认账号admin/public,你将看到:

Java原生结合MQTTX---完成心跳对话(附带源码)

Java原生结合MQTTX---完成心跳对话(附带源码)


二、📱 准备第一个信使(MQTTX客户端)

安装MQTTX桌面版

安装地址:https://mqttx.app/zh/downloads

打开后新建连接:

  • 名称:魔法邮箱_varin.cn
  • 服务器:varin:1883

    Java原生结合MQTTX---完成心跳对话(附带源码)

    Java原生结合MQTTX---完成心跳对话(附带源码)

    Java原生结合MQTTX---完成心跳对话(附带源码)


    三、⚡ 编写会魔法的Java程序

    1. 添加咒语依赖(Maven)

        org.eclipse.paho
        org.eclipse.paho.client.mqttv3
        1.2.5
    
    

    2. MQTT连接核心代码

    package cn.varin;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    import org.junit.jupiter.api.Test;
    public class MqttConnectionTest {
        /**
         *
         *
         * String serverURI,(mqtt服务端地址)
         * String clientId,(客户端id)
         * MqttClientPersistence persistence(内存持久类)
         *
         *
         */
        public static String serviceURL= "tcp://varin.cn:1883";
        public static String clientId="varya_test_01";
        public static MqttClient client;
        String user="varya";
        String password= "123456";
        static {
            try {
                // 建立一个mqqt客户端类
                client = new MqttClient(serviceURL,clientId,new MemoryPersistence());
            } catch (MqttException e) {
                throw new RuntimeException(e);
            }
        }
        // 创建mqtt连接
        @Test
        public void createConnectionTest() throws MqttException {
            MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
            mqttConnectOptions.setUserName(user);
            mqttConnectOptions.setPassword(password.toCharArray());
            client.connect(mqttConnectOptions);
    //        注意:因为用test类方法执行的话,太快,可能看不出来,是否正真的建立的连接,所以添加一个死循环来保持程序的存在。
            while (true);
        }
    }
    

    Java原生结合MQTTX---完成心跳对话(附带源码)

    2. MQTT发送消息

    package cn.varin;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    import org.junit.jupiter.api.Test;
    public class MqttSendMessageTest {
        /**
         *
         *
         * String serverURI,(mqtt服务端地址)
         * String clientId,(客户端id)
         * MqttClientPersistence persistence(内存持久类)
         *
         *
         */
        public static String serviceURL= "tcp://varin.cn:1883";
        public static String clientId="varya_test_01";
        public static MqttClient client;
        public static String user="varya";
       public static String password= "123456";
        static {
            try {
                // 建立一个mqqt客户端类
                client = new MqttClient(serviceURL,clientId,new MemoryPersistence());
                MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
                mqttConnectOptions.setUserName(user);
                mqttConnectOptions.setPassword(password.toCharArray());
                client.connect(mqttConnectOptions);
            } catch (MqttException e) {
                throw new RuntimeException(e);
            }
        }
        // 发送消息
        @Test
        public void SendMessageTest() throws MqttException {
            // 设置消息
            String message = "hello mqttx Client";
            MqttMessage mqttMessage = new MqttMessage(message.getBytes());
            mqttMessage.setQos(2);
            // 当一个主题的消息设置了 setRetained(true) 后,这条消息会存储在 Broker 中。如果后续有新的客户端订阅这个主题,则无论何时订阅,都会立即收到最近的一条带有 retained 属性的消息作为初始数据2。如果没有设置 retained 或者之前的消息未被保留,则新订阅者不会接收到任何历史消息。,
            mqttMessage.setRetained(true);
            // 发送消息
            client.publish("java_and_mqttx_conn", mqttMessage);
            //发送完,关闭连接
            client.disconnect();
            client.close();
        }
    }
        
    

    Java原生结合MQTTX---完成心跳对话(附带源码)

    3. MQTT接收消息

    package cn.varin;
    import org.eclipse.paho.client.mqttv3.*;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    import org.junit.jupiter.api.Test;
    public class MqttReceiveMessageTest {
        /**
         *
         *
         * String serverURI,(mqtt服务端地址)
         * String clientId,(客户端id)
         * MqttClientPersistence persistence(内存持久类)
         *
         *
         */
        public static String serviceURL= "tcp://varin.cn:1883";
        public static String clientId="varya_test_01";
        public static MqttClient client;
        public static String user="varya";
       public static String password= "123456";
        static {
            try {
                // 建立一个mqqt客户端类
                client = new MqttClient(serviceURL,clientId,new MemoryPersistence());
                MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
                mqttConnectOptions.setUserName(user);
                mqttConnectOptions.setPassword(password.toCharArray());
                client.connect(mqttConnectOptions);
            } catch (MqttException e) {
                throw new RuntimeException(e);
            }
        }
        // 接受消息
        @Test
        public void SendMessageTest() throws MqttException {
           client.subscribe("java_and_mqttx_conn",2);
           // 建立接收消息回调
            client.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable throwable) {
                    // 连接丢失时调用
                    System.out.println("cooection error");
                }
                @Override
                public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                    // 接收到消息时调用
                    System.out.println("来自主题:"+s);
                    System.out.println("接收到的消息为:"+new String(mqttMessage.getPayload()));
                }
                @Override
                public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                    // 解释消息结束时调用
                    System.out.println("deliveryComplete");
                }
            });
            // 为了保持test类的连接,建立一个死循环语句
            while (true);
        }
    }
    

    Java原生结合MQTTX---完成心跳对话(附带源码)



    四、🐉 彩蛋:会喷火的测试恐龙

    参考文章:https://blog.csdn.net/huangzhe0701/article/details/145205822

    在终端运行:

    mosquitto_pub -h localhost -t "java_and_mqttx_conn" -m "恐龙喷火啦~🔥"
    

    观察Java程序是否输出火焰日志!


    五、💡 常见魔法失效对策

    1. 检查1883端口是否被麻瓜防火墙阻挡
    2. 确认EMQX服务像打人柳一样活跃(sudo systemctl status emqx)
    3. Java依赖是否像魔药材料一样齐全

    结语:现在你的Java程序已经获得了通信魔法!快来用MQTT实现更多神奇功能吧~ 如果(程序)不显形,欢迎在评论区召唤帮忙!🎩

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

相关阅读

目录[+]

取消
微信二维码
微信二维码
支付宝二维码