SpringBoot + Netty + Vue + WebSocket实现在线聊天

06-01 1067阅读

最近想学学WebSocket做一个实时通讯的练手项目

主要用到的技术栈是WebSocket Netty Vue Pinia MySQL SpringBoot,实现一个持久化数据,单一群聊,支持多用户的聊天界面

下面是实现的过程

后端

SpringBoot启动的时候会占用一个端口,而Netty也会占用一个端口,这两个端口不能重复,并且因为Netty启动后会阻塞当前线程,因此需要另开一个线程防止阻塞住SpringBoot

1. 编写Netty服务器

个人认为,Netty最关键的就是channel,可以代表一个客户端

我在这使用的是@PostConstruct注解,在Bean初始化后调用里面的方法,新开一个线程运行Netty,因为希望Netty受Spring管理,所以加上了spring的注解,也可以直接在启动类里注入Netty然后手动启动

@Service
public class NettyService {
    private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    private EventLoopGroup workGroup = new NioEventLoopGroup();
    @Autowired
    private WebSocketHandler webSocketHandler;
    @Autowired
    private HeartBeatHandler heartBeatHandler;
    @PostConstruct
    public void initNetty() throws BaseException {
        new Thread(()->{
            try {
                start();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }).start();
    }
    @PreDestroy
    public void destroy() throws BaseException {
        bossGroup.shutdownGracefully();
        workGroup.shutdownGracefully();
    }
    @Async
    public void start() throws BaseException {
        try {
            ChannelFuture channelFuture = new ServerBootstrap()
                    .group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.DEBUG))
                    .childHandler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                            nioSocketChannel.pipeline()
// http解码编码器
                                    .addLast(new HttpServerCodec())
// 处理完整的 HTTP 消息
                    .addLast(new HttpObjectAggregator(64 * 1024))
// 心跳检测时长
                                    .addLast(new IdleStateHandler(300, 0, 0, TimeUnit.SECONDS))
// 心跳检测处理器
                                    .addLast(heartBeatHandler)
// 支持ws协议(自定义)
                                    .addLast(new WebSocketServerProtocolHandler("/ws",null,true,64*1024,true,true,10000))
// ws请求处理器(自定义)
                                    .addLast(webSocketHandler)
                            ;
                        }
                    }).bind(8081).sync();
            System.out.println("Netty启动成功");
            ChannelFuture future = channelFuture.channel().closeFuture().sync();
        }
        catch (InterruptedException e){
            throw new InterruptedException ();
        }
        finally {
//优雅关闭
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}

服务器类只是指明一些基本信息,包含处理器类,支持的协议等等,具体的处理逻辑需要再自定义类来实现

2. 心跳检测处理器

心跳检测是指 服务器无法主动确定客户端的状态(用户可能关闭了网页,但是服务端没办法知道),为了确定客户端是否在线,需要客户端定时发送一条消息,消息内容不重要,重要的是发送消息代表该客户端仍然在线,当客户端长时间没有发送数据时,代表客户端已经下线

package org.example.payroll_management.websocket.netty.handler;
@Component
@ChannelHandler.Sharable
public class HeartBeatHandler extends ChannelDuplexHandler {
    @Autowired
    private ChannelContext channelContext;
    private static final Logger logger =  LoggerFactory.getLogger(HeartBeatHandler.class);
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent){
            // 心跳检测超时
            IdleStateEvent e = (IdleStateEvent) evt;
            logger.info("心跳检测超时");
            if (e.state() == IdleState.READER_IDLE){
                Attribute attr = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));
                Integer userId = attr.get();
                // 读超时,当前已经下线,主动断开连接
                ChannelContext.removeChannel(userId);
                ctx.close();
            } else if (e.state() == IdleState.WRITER_IDLE){
                ctx.writeAndFlush("心跳检测");
            }
        }
        super.userEventTriggered(ctx, evt);
    }
}

3. webSocket处理器

当客户端发送消息,消息的内容会发送当webSocket处理器中,可以对对应的方法进行处理,我这里偷懒了,就做了一个群组,全部用户只能在同一群中聊天,不过创建多个群组,或单对单聊天也不复杂,只需要将群组的ID进行保存就可以

这里就产生第一个问题了,就是SpringMVC的拦截器不会拦截其他端口的请求,解决方法是将token放置到请求参数中,在userEventTriggered方法中重新进行一次token检验

第二个问题,我是在拦截器中通过ThreadLocal保存用户ID,不走拦截器在其他地方拿不到用户ID,解决方法是,在userEventTriggered方法中重新保存,或者channel中可以保存附件(自身携带的数据),直接将id保存到附件中

第三个问题,消息的持久化,当用户重新打开界面时,肯定希望消息仍然存在,鉴于webSocket的实时性,数据持久化肯定不能在同一个线程中完成,我在这使用BlockingQueue+线程池完成对消息的异步保存,或者也可以用mq实现

SpringBoot + Netty + Vue + WebSocket实现在线聊天
(图片来源网络,侵删)

不过用的Executors.newSingleThreadExecutor();可能会产生OOM的问题,后面可以自定义一个线程池,当任务满了之后,指定拒绝策略为抛出异常,再通过全局异常捕捉拿到对应的数据保存到数据库中,不过俺这种小项目应该不会产生这种问题

第四个问题,消息内容,这个需要前后端统一一下,确定一下传输格式就OK了,然后从JSON中取出数据处理

SpringBoot + Netty + Vue + WebSocket实现在线聊天
(图片来源网络,侵删)

最后就是在线用户统计,这个没什么好说的,里面有对应的方法,当退出时,直接把channel踢出去就可以了

package org.example.payroll_management.websocket.netty.handler;
@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler {
    @Autowired
    private ChannelContext channelContext;
    @Autowired
    private MessageMapper messageMapper;
    @Autowired
    private UserService userService;
    private static final Logger logger = LoggerFactory.getLogger(WebSocketHandler.class);
    private static final BlockingQueue blockingQueue = new ArrayBlockingQueue(1024 * 1024);
    private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadExecutor();
    // 提交线程
    @PostConstruct
    private void init(){
        EXECUTOR_SERVICE.submit(new MessageHandler());
    }
    private class MessageHandler implements Runnable{
        // 异步保存
        @Override
        public void run() {
            while(true){
                WebSocketMessageDto message = null;
                try {
                    message = blockingQueue.take();
                    logger.info("消息持久化");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                Integer success = messageMapper.saveMessage(message);
                if (success  

4. 工具类

SpringBoot + Netty + Vue + WebSocket实现在线聊天
(图片来源网络,侵删)

主要用来保存用户信息的

不要问我为什么又有static又有普通方法,问就是懒得改,这里我直接保存的同一个群组,如果需要多群组的话,就需要建立SQL数据了

package org.example.payroll_management.websocket;
@Component
public class ChannelContext {
    private static final Map USER_CHANNEL_MAP = new ConcurrentHashMap();
    private static final Map USER_CHANNELGROUP_MAP = new ConcurrentHashMap();
    private static final Integer GROUP_ID = 10086;
    private static final Logger logger = LoggerFactory.getLogger(ChannelContext.class);
    public void addContext(Integer userId,Channel channel){
        String channelId = channel.id().toString();
        AttributeKey attributeKey = null;
        if (AttributeKey.exists(channelId)){
            attributeKey = AttributeKey.valueOf(channelId);
        } else{
            attributeKey = AttributeKey.newInstance(channelId);
        }
        channel.attr(attributeKey).set(userId);
    }
    public static List getAllUserId(){
        return new ArrayList(USER_CHANNEL_MAP.keySet());
    }
    public static void setChannel(Integer userId,Channel channel){
        USER_CHANNEL_MAP.put(userId,channel);
    }
    public static Channel getChannel(Integer userId){
        return USER_CHANNEL_MAP.get(userId);
    }
    public static void removeChannel(Integer userId){
        USER_CHANNEL_MAP.remove(userId);
    }
    public static void setChannelGroup(Integer groupId,Channel channel){
        if(groupId == null){
            groupId = GROUP_ID;
        }
        ChannelGroup channelGroup = USER_CHANNELGROUP_MAP.get(groupId);
        if (channelGroup == null){
            channelGroup =new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
            USER_CHANNELGROUP_MAP.put(GROUP_ID, channelGroup);
        }
        if (channel == null){
            return ;
        }
        channelGroup.add(channel);
        logger.info("向group中添加channel,ChannelGroup已有Channel数量:{}",channelGroup.size());
    }
    public static ChannelGroup getChannelGroup(Integer groupId){
        if (groupId == null){
            groupId = GROUP_ID;
        }
        return USER_CHANNELGROUP_MAP.get(groupId);
    }
    public static void removeChannelGroup(Integer groupId){
        if (groupId == null){
            groupId = GROUP_ID;
        }
         USER_CHANNELGROUP_MAP.remove(groupId);
    }
}

写到这里,Netty服务就搭建完成了,后面就可以等着前端的请求建立了

前端

前端我使用的vue,因为我希望当用户登录后自动建立ws连接,所以我在登录成功后添加上了ws建立请求,然后我发现,如果用户关闭网页后重新打开,因为跳过了登录界面,ws请求不会自动建立,所以需要一套全局的ws请求

不过我前端不是很好(其实后端也一般),所以很多地方肯定有更优的写法

1. pinia

使用pinia保存ws请求,方便在其他组件中调用

定义WebSocket实例(ws)和一个请求建立判断(wsConnect)

后面就可以通过ws接收服务的消息

import { defineStore } from 'pinia'
export const useWebSocketStore = defineStore('webSocket', {
    state() {
        return {
            ws: null,
            wsConnect: false,
        }
    },
    actions: {
        wsInit() {
            if (this.ws === null) {
                const token = localStorage.getItem("token")
                if (token === null)  return;
                this.ws = new WebSocket(`ws://localhost:8081/ws?token=${token}`)
                  
                this.ws.onopen = () => {
                    this.wsConnect = true;
                    console.log("ws协议建立成功")
                    // 发送心跳
                    const intervalId = setInterval(() => {
                        if (!this.wsConnect) {
                            clearInterval(intervalId)
                        }
                        const webSocketMessageDto = {
                            type: "心跳检测"
                        }
                        this.sendMessage(JSON.stringify(webSocketMessageDto));
                    }, 1000 * 3 * 60);
                }
                this.ws.onclose = () => {
                    this.ws = null;
                    this.wsConnect = false;
                }
            }
        },
        sendMessage(message) {
            if (message == null || message == '') {
                return;
            }
            if (!this.wsConnect) {
                console.log("ws协议没有建立")
                this.wsInit();
            }
            this.ws.send(message);
        },
        wsClose() {
            if (this.wsConnect) {
                this.ws.close();
                this.wsConnect = false;
            }
        }
    }
})

然后再app.vue中循环建立连接(建立请求重试)

 const wsConnect = function () {
        const token = localStorage.getItem("token")
        if (token === null) {
            return;
        }
        try {
            if (!webSocket.wsConnect) {
                console.log("尝试建立ws请求")
                webSocket.wsInit();
            } else {
                return;
            }
        } catch {
            wsConnect();
        }
    }

2. 聊天组件

界面相信大伙都会画,主要说一下我遇到的问题

第一个 上拉刷新,也就是加载历史记录的功能,我用的element-plus UI,也不知道是不是我的问题,UI里面的无限滚动不是重复发送请求就是无限发送请求,而且好像没有上拉加载的功能。于是我用了IntersectionObserver来解决,在页面底部加上一个div,当观察到这个div时,触发请求

第二个 滚动条到达顶部时,请求数据并放置数据,滚动条会自动滚动到顶部,并且由于观察的元素始终在顶端导致无限请求,这个其实也不是什么大问题,因为聊天的消息是有限的,没有数据之后我设置了停止观察,主要是用户体验不是很好。这是我是添加了display: flex; flex-direction: column-reverse;解决这个问题的(flex很神奇吧)。大致原理好像是垂直翻转了(例如上面我将观察元素放到div第一个子元素位置,添加flex后观察元素会到最后一个子元素位置上),也就是说当滚动条在最底部时,添加数据后,滚动条会自动滚动到最底部,不过这样体验感非常的不错

不要问我为什么数据要加 || 问就是数据懒得统一了

    .chatBox {
        border-radius: 20px;
        box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px;
        width: 1200px;
        height: 600px;
        background-color: white;
        display: flex;
        .chat {
            width: 1000px;
            height: inherit;
            .chatBackground {
                height: 500px;
                overflow: auto;
                display: flex;
                flex-direction: column-reverse;
                .loading {
                    text-align: center;
                    font-size: 12px;
                    margin-top: 20px;
                    color: gray;
                }
                .chatItem {
                    width: 100%;
                    padding-bottom: 20px;
                    .avatar {
                        margin-left: 20px;
                        display: flex;
                        align-items: center;
                        .username {
                            margin-left: 10px;
                            color: rgb(153, 153, 153);
                            font-size: 13px;
                        }
                    }
                    .chatItemMessage {
                        margin-left: 60px;
                        padding: 10px;
                        font-size: 14px;
                        width: 200px;
                        word-break: break-all;
                        max-width: 400px;
                        line-height: 25px;
                        width: fit-content;
                        border-radius: 10px;
                        height: auto;
                        /* background-color: skyblue; */
                        box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px;
                    }
                    .sendDate {
                        font-size: 12px;
                        margin-top: 10px;
                        margin-left: 60px;
                        color: rgb(187, 187, 187);
                    }
                }
            }
            .chatBottom {
                height: 100px;
                background-color: #F3F3F3;
                border-radius: 20px;
                display: flex;
                box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px;
                .messageInput {
                    border-radius: 20px;
                    width: 400px;
                    height: 40px;
                }
            }
        }
        .userList {
            width: 200px;
            height: inherit;
            border-radius: 20px;
            box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px;
            .user {
                width: inherit;
                height: 50px;
                line-height: 50px;
                text-indent: 2em;
                border-radius: 20px;
                transition: all 0.5s ease;
            }
        }
    }
    .user:hover {
        box-shadow: rgba(0, 0, 0, 0.05) -2px 0px 8px 0px;
        transform: translateX(-5px) translateY(-5px);
    }


    {{hasMessage}}
    
        
            
                
                    
                        
                        {{i.username || i.userId}}
                    
                    
                        {{i.text || i.content}}
                    
                    
                        {{i.date || i.sendDate}}
                    
                
                
                    显示更多内容
                
            
            
                
                发送消息
            
        
        
        
            
                
                    {{user.userName}}
                
            
        
    


    import { ref, onMounted, nextTick } from 'vue'
    import request from '@/utils/request.js'
    import { useWebSocketStore } from '@/stores/useWebSocketStore'
    import imageUrl from '@/assets/默认头像.jpg'
    const webSocketStore = useWebSocketStore();
    const chatBackgroundRef = ref(null)
    const userList = ref([])
    const message = ref('')
    const messageList = ref([
    ])
    const loading = ref(null)
    const page = ref(1);
    const size = 10;
    const hasMessage = ref(true);
    const observer = new IntersectionObserver((entries, observer) => {
        entries.forEach(async entry => {
            if (entry.isIntersecting) {
                observer.unobserve(entry.target)
                await pageQueryMessage();
            }
        })
    })
    onMounted(() => {
        observer.observe(loading.value)
        getOnlineUserList();
        if (!webSocketStore.wsConnect) {
            webSocketStore.wsInit();
        }
        const ws = webSocketStore.ws;
        ws.onmessage = async (e) => {
            // console.log(e);
            const webSocketMessage = JSON.parse(e.data);
            const messageObj = {
                username: webSocketMessage.sender,
                text: webSocketMessage.text,
                date: webSocketMessage.sendDate,
                type: webSocketMessage.type
            }
            console.log("###")
            // console.log(JSON.parse(messageObj.text))
            if (messageObj.type === "群发") {
                messageList.value.unshift(messageObj)
            } else if (messageObj.type === "用户变更") {
                userList.value = JSON.parse(messageObj.text)
            }
            await nextTick();
            // 当发送新消息时,自动滚动到页面最底部,可以替换成消息提示的样式
            // chatBackgroundRef.value.scrollTop = chatBackgroundRef.value.scrollHeight;
            console.log(webSocketMessage)
        }
    })
    const pageQueryMessage = function () {
        request({
            url: '/api/message/pageQueryMessage',
            method: 'post',
            data: {
                page: page.value,
                size: size
            }
        }).then((res) => {
            console.log(res)
            if (res.data.data.length === 0) {
                hasMessage.value = false;
            }
            else {
                observer.observe(loading.value)
                page.value = page.value + 1;
                messageList.value.push(...res.data.data)
            }
        })
    }
    function getOnlineUserList() {
        request({
            url: '/api/user/getOnlineUser',
            method: 'get'
        }).then((res) => {
            console.log(res)
            userList.value = res.data.data;
        })
    }
    const sendMessage = function () {
        if (!webSocketStore.wsConnect) {
            webSocketStore.wsInit();
        }
        const webSocketMessageDto = {
            type: "群发",
            text: message.value
        }
        webSocketStore.sendMessage(JSON.stringify(webSocketMessageDto));
    }

这样就实现了一个简易的聊天数据持久化,支持在线聊天的界面,总的来说WebSocket用起来还是十分方便的

后面我看看能不能做下上传图片,上传文件之类的功能

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

目录[+]

取消
微信二维码
微信二维码
支付宝二维码