JAVA(SpringBoot)集成Netty实现(TCP、Websocket)服务端与客户端。

06-01 1366阅读

SpringBoot集成 Netty 。

  • 一、Netty 简介
  • 二、Netty功能
    • 1. 网络通信支持
    • 2. 高性能与低资源消耗
    • 3. 易于使用和定制
    • 4. 内存管理
    • 5. 安全性
    • 三、POM依赖
    • 四、TCP
      • 1、服务端
        • 1.1 创建一个Netty服务端类,NettyTcpServer
        • 1.2 创建一个 NettyTcpServerHandler继承自 ChannelInboundHandlerAdapter,主要负责处理 Netty TCP 服务端各种事件和消息。
        • 2、客户端
          • 2.1 创建一个Netty客户端类,NettyTcpClient
          • 2.2 创建一个 NettyTcpClientHandler 客户端处理器,继承自 SimpleChannelInboundHandler 处理服务器发送的数据。
          • 五、WebSocket
            • 1、服务端
              • 1.1 创建一个Netty WebSocket 服务端类,WebSocketServer
              • 1.2 创建一个 WebSocketHandler 继承自 SimpleChannelInboundHandler,用于处理 WebSocket 文本消息。
              • 六、TCP 与 WebSocket 同时监听不同的端口
                • 1、 NettyConfig 用于配置和管理 Netty 服务器的启动和关闭
                • 2、 AbstractNettyServer 通过继承该抽象类,子类可以实现特定的服务器逻辑,TCP 服务器或 WebSocket 服务器。
                • 3、 TcpServer 类是一个基于 Netty 的 TCP 服务器实现类,继承自 AbstractNettyServer。
                • 4、 TcpServerHandler 类,主要负责处理 Netty TCP 服务端各种事件和消息。
                • 5、 WebSocketServer 类,继承自 AbstractNettyServer,基于 Netty 框架实现的 WebSocket 服务器类。
                • 6、 WebSocketHandler 类,继承自 SimpleChannelInboundHandler,用于处理 WebSocket 文本消息

                  一、Netty 简介

                  Netty 是一款基于 Java 语言开发的高性能、异步事件驱动型网络应用框架,它为开发人员提供了对 TCP、UDP、HTTP、WebSocket 等多种网络协议的支持,极大地简化了网络编程工作,无论是开发服务器端应用还是客户端应用都十分便捷。其主要特点包括:

                  高性能:通过使用异步 I/O 和事件驱动模型,减少线程切换和上下文开销,提升系统性能。

                  易用性:提供了高层次的抽象,使得开发者可以专注于业务逻辑,而不必过多关注底层网络细节。

                  灵活性:支持多种协议和编解码方式,易于定制和扩展。

                  二、Netty功能

                  Netty是一个基于Java的高性能、异步事件驱动的网络应用框架,用于快速开发可维护的高性能网络服务器和客户端程序。以下是其一些主要功能:

                  1. 网络通信支持

                  • 多种协议:
                    • Netty支持广泛的网络协议,如TCP、UDP、HTTP、HTTP/2、WebSocket等。这使得开发人员能够轻松构建支持不同应用层协议的网络应用。例如,开发基于HTTP协议的Web服务器,或者基于WebSocket协议的实时通信应用。
                    • 以HTTP协议为例,Netty提供了专门的编解码器和处理器,方便处理HTTP请求和响应,包括解析HTTP头、处理HTTP内容体等。
                    • 跨平台:它能够在不同的操作系统和Java版本上运行,确保应用的可移植性。无论是在Linux、Windows还是Mac OS系统上,Netty都能发挥其高性能的优势。

                      2. 高性能与低资源消耗

                      • 异步和事件驱动:Netty采用异步I/O和事件驱动模型,这使得它能够在处理大量并发连接时,避免线程阻塞,提高系统的吞吐量和响应能力。例如,当一个新的连接建立或者数据可读时,Netty会触发相应的事件,由预先注册的事件处理器进行处理,而不是让线程一直等待I/O操作完成。
                      • 零拷贝技术:Netty通过使用零拷贝技术,减少了数据在内存中的拷贝次数,提高了数据传输的效率。例如,在文件传输场景中,Netty可以直接将文件内容从磁盘传输到网络,而不需要将文件内容先拷贝到应用程序的内存中。

                        3. 易于使用和定制

                        • 简单的API:Netty提供了简洁直观的API,使得开发人员能够快速上手,构建复杂的网络应用。例如,通过ChannelPipeline机制,开发人员可以方便地添加和管理各种网络处理逻辑,如编解码、业务逻辑处理等。
                        • 高度可定制:它的架构设计非常灵活,允许开发人员根据具体需求对其进行定制。比如,可以自定义编解码器来处理特定格式的协议数据,或者定制线程模型以适应不同的应用场景。

                          4. 内存管理

                          • 池化内存分配:Netty提供了池化内存分配器,能够有效地管理内存,减少内存碎片,提高内存的使用效率。例如,在高并发场景下,频繁地创建和销毁对象会导致内存碎片问题,而Netty的池化内存分配器可以复用已分配的内存块,避免这种情况的发生。
                          • 自动内存回收:它具备自动内存回收机制,能够根据应用的运行情况,自动调整内存的使用,降低内存泄漏的风险。

                            5. 安全性

                            • SSL/TLS支持:Netty内置了对SSL/TLS协议的支持,使得开发人员能够轻松地为网络应用添加安全加密功能,保护数据在传输过程中的机密性和完整性。例如,在开发金融类网络应用时,可以使用Netty的SSL/TLS支持,确保用户的交易数据安全传输。

                              三、POM依赖

                                  
                                      io.netty
                                      netty-transport
                                      4.1.94.Final
                                  
                                  
                                      io.netty
                                      netty-codec
                                      4.1.94.Final
                                  
                                  
                                      io.netty
                                      netty-all
                                      4.1.92.Final
                                  
                              

                              四、TCP

                              1、服务端

                              1.1 创建一个Netty服务端类,NettyTcpServer

                              import io.netty.bootstrap.ServerBootstrap;
                              import io.netty.channel.ChannelFuture;
                              import io.netty.channel.ChannelInitializer;
                              import io.netty.channel.ChannelOption;
                              import io.netty.channel.EventLoopGroup;
                              import io.netty.channel.nio.NioEventLoopGroup;
                              import io.netty.channel.socket.SocketChannel;
                              import io.netty.channel.socket.nio.NioServerSocketChannel;
                              import lombok.extern.slf4j.Slf4j;
                              import org.springframework.boot.CommandLineRunner;
                              import org.springframework.stereotype.Component;
                              /**
                               * Netty TCP服务类
                               *
                               * @author chenlei
                               */
                              @Slf4j
                              @Component
                              public class NettyTcpServer implements CommandLineRunner {
                                  /**
                                   * 端口号
                                   */
                                  private int port = 8972;
                                  @Override
                                  public void run(String... args) {
                                      // 接收连接
                                      EventLoopGroup boss = new NioEventLoopGroup();
                                      // 处理信息
                                      EventLoopGroup worker = new NioEventLoopGroup();
                                      try {
                                          // 定义server
                                          ServerBootstrap serverBootstrap = new ServerBootstrap();
                                          // 添加分组
                                          serverBootstrap.group(boss, worker)
                                                  // 添加通道设置非阻塞
                                                  .channel(NioServerSocketChannel.class)
                                                  // 服务端可连接队列数量
                                                  .option(ChannelOption.SO_BACKLOG, 128)
                                                  // 开启长连接
                                                  .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                                                  // 流程处理
                                                  .childHandler(new ChannelInitializer() {
                                                      @Override
                                                      protected void initChannel(SocketChannel ch) {
                                                          ch.pipeline().addLast(new NettyTcpServerHandler());
                                                      }
                                                  });
                                          // 绑定端口
                                          ChannelFuture cf = serverBootstrap.bind(port).sync();
                                          // 优雅关闭连接
                                          cf.channel().closeFuture().sync();
                                      } catch (Exception e) {
                                          log.error("连接错误:{}", e.getMessage());
                                      } finally {
                                          boss.shutdownGracefully();
                                          worker.shutdownGracefully();
                                      }
                                  }
                              }
                              

                              1.2 创建一个 NettyTcpServerHandler继承自 ChannelInboundHandlerAdapter,主要负责处理 Netty TCP 服务端各种事件和消息。

                              import io.netty.channel.ChannelHandlerContext;
                              import io.netty.channel.ChannelId;
                              import io.netty.channel.ChannelInboundHandlerAdapter;
                              import io.netty.handler.timeout.IdleState;
                              import io.netty.handler.timeout.IdleStateEvent;
                              import lombok.extern.slf4j.Slf4j;
                              import java.net.InetSocketAddress;
                              import java.util.concurrent.ConcurrentHashMap;
                              /**
                               * @author chenlei
                               */
                              @Slf4j
                              public class NettyTcpServerHandler extends ChannelInboundHandlerAdapter {
                                  /**
                                   * 保存连接到服务端的通道信息,对连接的客户端进行管理,包括连接的添加、删除、查找等操作。
                                   */
                                  private static final ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap();
                                  /**
                                   * 当有客户端连接到服务器时,此方法会被触发。
                                   * 它会记录客户端的 IP 地址、端口号以及连接的 ChannelId,并将该连接添加到 CHANNEL_MAP 中。
                                   * 如果连接已经存在于 CHANNEL_MAP 中,会打印相应的日志信息;如果不存在,则添加到映射中并记录连接信息。
                                   *
                                   * @param ctx 通道处理器上下文,包含了通道的信息和操作通道的方法
                                   */
                                  @Override
                                  public void channelActive(ChannelHandlerContext ctx) {
                                      // 获取客户端的网络地址信息
                                      InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
                                      // 获取客户端的 IP 地址
                                      String clientIp = insocket.getAddress().getHostAddress();
                                      // 获取客户端的端口号
                                      int clientPort = insocket.getPort();
                                      // 获取连接通道的唯一标识
                                      ChannelId channelId = ctx.channel().id();
                                      // 如果该连接通道已经在映射中,打印连接状态信息
                                      if (CHANNEL_MAP.containsKey(channelId)) {
                                          log.info("客户端【" + channelId + "】是连接状态,连接通道数量: " + CHANNEL_MAP.size());
                                      } else {
                                          // 将新的连接添加到映射中
                                          CHANNEL_MAP.put(channelId, ctx);
                                          log.info("客户端【" + channelId + "】连接 netty 服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]");
                                          log.info("连接通道数量: " + CHANNEL_MAP.size());
                                      }
                                  }
                                  /**
                                   * 当有客户端终止连接服务器时,此方法会被触发。
                                   * 它会从 CHANNEL_MAP 中移除该客户端的连接信息,并打印相应的退出信息和更新后的连接通道数量。
                                   * 首先检查该连接是否存在于 CHANNEL_MAP 中,如果存在则进行移除操作。
                                   *
                                   * @param ctx 通道处理器上下文
                                   */
                                  @Override
                                  public void channelInactive(ChannelHandlerContext ctx) {
                                      InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
                                      String clientIp = insocket.getAddress().getHostAddress();
                                      ChannelId channelId = ctx.channel().id();
                                      // 检查映射中是否包含该客户端连接
                                      if (CHANNEL_MAP.containsKey(channelId)) {
                                          // 从映射中移除连接
                                          CHANNEL_MAP.remove(channelId);
                                          log.info("客户端【" + channelId + "】退出 netty 服务器[IP:" + clientIp + "--->PORT:" + insocket.getPort() + "]");
                                          log.info("连接通道数量: " + CHANNEL_MAP.size());
                                      }
                                  }
                                  /**
                                   * 当有客户端向服务器发送消息时,此方法会被触发。
                                   * 它会打印接收到的客户端消息,并调用 channelWrite 方法将消息返回给客户端。
                                   * 首先会打印接收到客户端报文的日志信息,然后调用 channelWrite 方法进行响应。
                                   *
                                   * @param ctx 通道处理器上下文
                                   * @param msg 从客户端接收到的消息对象
                                   */
                                  @Override
                                  public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                      log.info("加载客户端报文......");
                                      log.info("【" + ctx.channel().id() + "】" + " :" + msg);
                                      // 下面可以解析数据,保存数据,生成返回报文,将需要返回报文写入 write 函数
                                      // 调用 channelWrite 方法将消息返回给客户端
                                      this.channelWrite(ctx.channel().id(), msg);
                                  }
                                  /**
                                   * 服务端给客户端发送消息的方法。
                                   * 首先根据传入的 ChannelId 从 CHANNEL_MAP 中获取对应的 ChannelHandlerContext,
                                   * 然后检查消息是否为空以及 ChannelHandlerContext 是否存在,若存在则将消息写入通道并刷新缓冲区。
                                   *
                                   * @param channelId 连接通道的唯一标识
                                   * @param msg       需要发送的消息内容
                                   */
                                  public void channelWrite(ChannelId channelId, Object msg) {
                                      // 获取与 ChannelId 对应的 ChannelHandlerContext
                                      ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId);
                                      if (ctx == null) {
                                          log.info("通道【" + channelId + "】不存在");
                                          return;
                                      }
                                      if (msg == null || msg == "") {
                                          log.info("服务端响应空的消息");
                                          return;
                                      }
                                      // 将消息写入通道
                                      ctx.write(msg);
                                      // 刷新通道的输出缓冲区,确保消息被发送出去
                                      ctx.flush();
                                  }
                                  /**
                                   * 当触发用户事件时,此方法会被调用,主要用于处理空闲状态事件。
                                   * 根据不同的空闲状态(读、写、总超时)进行相应的处理,如断开连接。
                                   * 首先检查触发的事件是否是 IdleStateEvent,如果是则判断具体的空闲状态并进行相应处理。
                                   *
                                   * @param ctx 通道处理器上下文
                                   * @param evt 触发的事件对象
                                   */
                                  @Override
                                  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
                                      String socketString = ctx.channel().remoteAddress().toString();
                                      if (evt instanceof IdleStateEvent) {
                                          IdleStateEvent event = (IdleStateEvent) evt;
                                          if (event.state() == IdleState.READER_IDLE) {
                                              log.info("Client: " + socketString + " READER_IDLE 读超时");
                                              // 读超时,断开连接
                                              ctx.disconnect();
                                          } else if (event.state() == IdleState.WRITER_IDLE) {
                                              log.info("Client: " + socketString + " WRITER_IDLE 写超时");
                                              // 写超时,断开连接
                                              ctx.disconnect();
                                          } else if (event.state() == IdleState.ALL_IDLE) {
                                              log.info("Client: " + socketString + " ALL_IDLE 总超时");
                                              // 总超时,断开连接
                                              ctx.disconnect();
                                          }
                                      }
                                  }
                                  /**
                                   * 当发生异常时,此方法会被触发。
                                   * 它会关闭通道并打印相应的错误信息,同时打印当前的连接通道数量。
                                   * 异常发生时,会关闭通道以防止资源泄漏,并且打印异常发生的通道信息和当前的连接数量。
                                   *
                                   * @param ctx   通道处理器上下文
                                   * @param cause 引发异常的原因
                                   */
                                  @Override
                                  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                                      ctx.close();
                                      log.info(ctx.channel().id() + " 发生了错误,此连接被关闭" + "此时连通数量: " + CHANNEL_MAP.size());
                                  }
                              }
                              

                              2、客户端

                              2.1 创建一个Netty客户端类,NettyTcpClient

                              import io.netty.bootstrap.Bootstrap;
                              import io.netty.buffer.Unpooled;
                              import io.netty.channel.*;
                              import io.netty.channel.nio.NioEventLoopGroup;
                              import io.netty.channel.socket.SocketChannel;
                              import io.netty.channel.socket.nio.NioSocketChannel;
                              import io.netty.handler.codec.DelimiterBasedFrameDecoder;
                              import io.netty.handler.codec.string.StringDecoder;
                              import io.netty.handler.codec.string.StringEncoder;
                              import lombok.extern.slf4j.Slf4j;
                              import org.springframework.boot.CommandLineRunner;
                              import org.springframework.stereotype.Component;
                              import java.util.concurrent.TimeUnit;
                              import java.util.concurrent.atomic.AtomicInteger;
                              /**
                               * Netty TCP客户端
                               *
                               * @author chenlei
                               */
                              @Slf4j
                              @Component
                              public class NettyTcpClient implements CommandLineRunner {
                                  /**
                                   * 最大连接次数
                                   */
                                  private final int maxConnectTimes = 10;
                                  /**
                                   * 地址
                                   */
                                  private final String host = "192.168.2.154";
                                  /**
                                   * 端口
                                   */
                                  private final int port = 1250;
                                  @Override
                                  public void run(String... args) {
                                      // 创建一个处理 I/O 操作的 EventLoopGroup,用于处理客户端的 I/O 操作和事件
                                      EventLoopGroup group = new NioEventLoopGroup();
                                      try {
                                          // 创建 Bootstrap 对象,用于配置和启动 Netty 客户端
                                          Bootstrap bootstrap = new Bootstrap();
                                          bootstrap.group(group)
                                                  // 使用 NioSocketChannel 作为通道类型,基于 NIO 的 Socket 通道
                                                  .channel(NioSocketChannel.class)
                                                  // 设置 TCP 选项,如 TCP_NODELAY 开启无延迟模式,提高性能
                                                  .option(ChannelOption.TCP_NODELAY, true)
                                                  // 为 ChannelPipeline 添加处理器,用于处理通道的数据
                                                  .handler(new ChannelInitializer() {
                                                      @Override
                                                      protected void initChannel(SocketChannel ch) {
                                                          // 添加帧解码器,处理粘包和拆包问题,这里以 \n 作为分隔符
                                                          ch.pipeline().addLast(new DelimiterBasedFrameDecoder(8192, Unpooled.copiedBuffer("\n".getBytes())));
                                                          // 添加字符串解码器,将字节数据解码为字符串
                                                          ch.pipeline().addLast(new StringDecoder());
                                                          // 添加字符串编码器,将字符串编码为字节数据
                                                          ch.pipeline().addLast(new StringEncoder());
                                                          // 添加自定义的处理器,用于处理接收到的数据
                                                          ch.pipeline().addLast(new NettyTcpClientHandler(bootstrap, host, NettyTcpClient.this));
                                                      }
                                                  });
                                          // 连接到服务器的指定端口范围
                                          final AtomicInteger connectTimes = new AtomicInteger(0);
                                          // 尝试连接到服务器,并添加连接结果监听器
                                          bootstrap.connect(host, port).addListener((ChannelFutureListener) future -> {
                                              if (future.isSuccess()) {
                                                  // 连接成功时打印日志
                                                  log.info("成功连接到端口: {}", port);
                                              } else {
                                                  // 连接失败时打印日志
                                                  log.error("连接到服务器时出错,端口: {}", port);
                                                  // 连接失败时,使用 eventLoop 安排重连任务,60 秒后重连
                                                  if (connectTimes.get()  {
                                                          // 重连逻辑,再次尝试连接
                                                          bootstrap.connect(host, port);
                                                      }, 60, TimeUnit.SECONDS);
                                                  } else {
                                                      log.error("已达到最大连接次数,停止重连,端口: {}", port);
                                                  }
                                              }
                                          });
                                      } catch (Exception e) {
                                          // 发生异常时打印错误日志
                                          log.error("Netty 客户端出错", e);
                                      }
                                  }
                                  /**
                                   * 定义重新连接方法
                                   *
                                   * @param bootstrap 用于配置和启动 Netty 客户端的 Bootstrap 对象
                                   * @param host      服务器的主机地址
                                   * @param port      要连接的服务器端口号
                                   * @param future    表示连接操作的 ChannelFuture 对象
                                   */
                                  void reconnect(Bootstrap bootstrap, String host, int port, ChannelFuture future) {
                                      final AtomicInteger connectTimes = new AtomicInteger(0);
                                      try {
                                          bootstrap.connect(host, port).addListener((ChannelFutureListener) f -> {
                                              if (f.isSuccess()) {
                                                  log.info("重连成功,端口: {}", port);
                                                  connectTimes.set(0);
                                              } else {
                                                  if (connectTimes.get()  reconnect(bootstrap, host, port, f), 60, TimeUnit.SECONDS);
                                                  } else {
                                                      log.error("已达到最大连接次数,停止重连,端口: {}", port);
                                                  }
                                              }
                                          });
                                      } catch (Exception e) {
                                          log.error("重连时发生异常,端口: {}", port);
                                          reconnect(bootstrap, host, port, future);
                                      }
                                  }
                              }
                              

                              2.2 创建一个 NettyTcpClientHandler 客户端处理器,继承自 SimpleChannelInboundHandler 处理服务器发送的数据。

                              import io.netty.bootstrap.Bootstrap;
                              import io.netty.channel.Channel;
                              import io.netty.channel.ChannelHandlerContext;
                              import io.netty.channel.SimpleChannelInboundHandler;
                              import lombok.extern.slf4j.Slf4j;
                              import java.net.InetSocketAddress;
                              /**
                               * Netty 客户端处理器类,继承自 SimpleChannelInboundHandler 以处理服务器发送的字符串数据
                               *
                               * @author chenlei
                               */
                              @Slf4j
                              public class NettyTcpClientHandler extends SimpleChannelInboundHandler {
                                  /**
                                   * Bootstrap 对象,用于客户端连接的配置和启动
                                   */
                                  private final Bootstrap bootstrap;
                                  /**
                                   * 服务器的主机地址
                                   */
                                  private final String host;
                                  /**
                                   * Netty 客户端实例,用于调用重连等操作
                                   */
                                  private final NettyTcpClient nettyTcpClient;
                                  /**
                                   * 初始化相关参数
                                   *
                                   * @param bootstrap   用于客户端连接的 Bootstrap 对象
                                   * @param host        服务器的主机地址
                                   * @param nettyClient Netty 客户端实例
                                   */
                                  public NettyTcpClientHandler(Bootstrap bootstrap, String host, NettyTcpClient nettyTcpClient) {
                                      this.bootstrap = bootstrap;
                                      this.host = host;
                                      this.nettyTcpClient= nettyTcpClient;
                                  }
                                  /**
                                   * 当接收到服务器发送的数据时调用此方法
                                   *
                                   * @param ctx      通道处理器上下文,可用于执行通道操作,如发送数据、关闭通道等
                                   * @param response 从服务器接收到的响应字符串
                                   */
                                  @Override
                                  protected void channelRead0(ChannelHandlerContext ctx, String response) {
                                      log.info("接收处理服务器响应数据");
                                      //以下进行具体的业务操作
                                  }
                                  /**
                                   * 当发生异常时调用此方法
                                   *
                                   * @param ctx   通道处理器上下文
                                   * @param cause 异常对象
                                   */
                                  @Override
                                  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                                      // 打印异常信息
                                      log.error("处理服务器响应时出错,异常信息: {}", cause.getMessage(), cause);
                                      // 关闭当前通道
                                      ctx.close();
                                  }
                                  /**
                                   * 当通道关闭时调用此方法
                                   *
                                   * @param ctx 通道处理器上下文
                                   */
                                  @Override
                                  public void channelInactive(ChannelHandlerContext ctx) {
                                      final Channel channel = ctx.channel();
                                      // 获取远程服务器的端口号
                                      int port = ((InetSocketAddress) channel.remoteAddress()).getPort();
                                      log.info("通道处于非活动状态,正在尝试在端口上重新连接: {}", port);
                                      // 获取 NettyTcpClientHandler 中存储的 NettyClient 实例
                                      NettyTcpClient nettyTcpClient = ((NettyTcpClientHandler) ctx.handler()).nettyTcpClient;
                                      // 调用 nettyTcpClient 中的 reconnect 方法进行重连
                                      nettyTcpClient .reconnect(bootstrap, host, port, ctx.channel().newFailedFuture(new RuntimeException("频道处于非活动状态")));
                                  }
                              }
                              

                              五、WebSocket

                              1、服务端

                              1.1 创建一个Netty WebSocket 服务端类,WebSocketServer

                              import io.netty.bootstrap.ServerBootstrap;
                              import io.netty.channel.ChannelFuture;
                              import io.netty.channel.ChannelInitializer;
                              import io.netty.channel.ChannelOption;
                              import io.netty.channel.EventLoopGroup;
                              import io.netty.channel.nio.NioEventLoopGroup;
                              import io.netty.channel.socket.SocketChannel;
                              import io.netty.channel.socket.nio.NioServerSocketChannel;
                              import lombok.extern.slf4j.Slf4j;
                              import org.springframework.boot.CommandLineRunner;
                              import org.springframework.stereotype.Component;
                              /**
                               * WebSocket 服务类
                               *
                               * @author chenlei
                               */
                              @Slf4j
                              @Component
                              public class WebSocketServer implements CommandLineRunner {
                                  /**
                                   * 端口号
                                   */
                                  private int port = 897;
                                  @Override
                                  public void run(String... args) {
                                      // 接收 WebSocket 连接的主 EventLoopGroup,用于处理接收新连接的请求
                                      EventLoopGroup webSocketBoss = new NioEventLoopGroup();
                                      // 处理 WebSocket 信息的从 EventLoopGroup,用于处理连接建立后的读写操作
                                      EventLoopGroup webSocketWorker = new NioEventLoopGroup();
                                      try {
                                         // 定义 WebSocket 服务器启动器
                                      ServerBootstrap webSocketServerBootstrap = new ServerBootstrap();
                                      webSocketServerBootstrap.group(webSocketBoss, webSocketWorker)
                                              .channel(NioServerSocketChannel.class)
                                              .option(ChannelOption.SO_BACKLOG, 128)
                                              .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                                              .childHandler(new ChannelInitializer() {
                                                  @Override
                                                  protected void initChannel(SocketChannel ch) {
                                                      // 在通道的处理器链中添加 HttpServerCodec,用于处理 HTTP 协议的编解码
                                                      ch.pipeline().addLast(new HttpServerCodec());
                                                      // 添加 HttpObjectAggregator,用于将 HTTP 消息聚合成完整的请求或响应
                                                      ch.pipeline().addLast(new HttpObjectAggregator(65536));
                                                      // 添加 WebSocketServerProtocolHandler,用于处理 WebSocket 的握手、控制帧等
                                                      ch.pipeline().addLast(new WebSocketServerProtocolHandler("/websocket"));
                                                      // 添加自定义的处理器,用于处理 WebSocket 数据
                                                      ch.pipeline().addLast(new WebSocketHandler());
                                                  }
                                              });
                                          // 绑定端口
                                          ChannelFuture cf = serverBootstrap.bind(port).sync();
                                          // 优雅关闭连接
                                          cf.channel().closeFuture().sync();
                                      } catch (Exception e) {
                                          log.error("连接错误:{}", e.getMessage());
                                      } finally {
                                          boss.shutdownGracefully();
                                          worker.shutdownGracefully();
                                      }
                                  }
                              }
                              

                              1.2 创建一个 WebSocketHandler 继承自 SimpleChannelInboundHandler,用于处理 WebSocket 文本消息。

                              import com.casictime.system.domain.vo.DeviceInfo;
                              import io.netty.channel.ChannelHandlerContext;
                              import io.netty.channel.SimpleChannelInboundHandler;
                              import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
                              import lombok.extern.slf4j.Slf4j;
                              /**
                               * WebSocketHandler 类,继承自 SimpleChannelInboundHandler,用于处理 WebSocket 文本消息
                               *
                               * @author chenlei
                               */
                              @Slf4j
                              public class WebSocketHandler extends SimpleChannelInboundHandler {
                                  /**
                                   * 处理接收到的消息
                                   *
                                   * @param ctx 通道处理上下文,包含了与通道相关的信息,如通道本身、管道等
                                   * @param msg 接收到的 TextWebSocketFrame 消息,包含了客户端发送的文本消息内容
                                   */
                                  @Override
                                  protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
                                      String message = msg.text();
                                      // 接收到客户端消息时记录日志,使用 debug 级别,方便在生产环境中调整日志输出
                                      log.debug("收到客户端的消息: {}", message);
                                       // 将信息返回给客户端
                                       ctx.writeAndFlush(msg);
                                      // 回复消息示例(如果需要)
                                      // ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到您的消息: " + message));
                                  }
                                  /**
                                   * 记录客户端的连接信息
                                   *
                                   * @param ctx 通道处理上下文
                                   */
                                  @Override
                                  public void channelActive(ChannelHandlerContext ctx) {
                                      // 当有新的 WebSocket 客户端连接时记录日志
                                      log.info("一个新的 WebSocket 客户端已连接: {}", ctx.channel().remoteAddress());
                                  }
                                  /**
                                   * 当 WebSocket 客户端断开连接时调用此方法
                                   *
                                   * @param ctx 通道处理上下文
                                   */
                                  @Override
                                  public void channelInactive(ChannelHandlerContext ctx) {
                                      // 当 WebSocket 客户端断开连接时记录日志
                                      log.info("WebSocket 客户端已断开连接: {}", ctx.channel().remoteAddress());
                                  }
                                  /**
                                   * 当处理 WebSocket 连接过程中发生异常时调用此方法
                                   * 该方法主要负责记录异常信息,并关闭连接以防止异常扩散影响其他客户端
                                   *
                                   * @param ctx   通道处理上下文
                                   * @param cause 引发异常的原因,包含了异常的详细信息
                                   */
                                  @Override
                                  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                                      // 当处理过程中发生异常时记录日志
                                      log.error("WebSocket 连接中发生错误: {}", cause.getMessage());
                                      // 关闭连接以避免异常扩散影响其他客户端
                                      ctx.close();
                                  }
                              }
                              

                              六、TCP 与 WebSocket 同时监听不同的端口

                              1、 NettyConfig 用于配置和管理 Netty 服务器的启动和关闭

                              import com.casictime.framework.netty.server.tcp.TcpServer;
                              import com.casictime.framework.netty.server.websocket.WebSocketServer;
                              import lombok.extern.slf4j.Slf4j;
                              import org.springframework.beans.factory.annotation.Autowired;
                              import org.springframework.context.annotation.Configuration;
                              import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
                              import javax.annotation.PostConstruct;
                              import javax.annotation.PreDestroy;
                              /**
                               * NettyConfig 用于配置和管理 Netty 服务器的启动和关闭。
                               * 负责启动 Netty 的 TCP 服务器和 WebSocket 服务器,并使用线程池来执行启动任务。
                               *
                               * @author chenlei
                               */
                              @Slf4j
                              @Configuration
                              public class NettyConfig {
                                  /**
                                   * TcpServer 用于启动 TCP 服务器
                                   */
                                  @Autowired
                                  private TcpServer tcpServer;
                                  /**
                                   * WebSocketServer 用于启动 WebSocket 服务器
                                   */
                                  @Autowired
                                  private WebSocketServer webSocketServer;
                                  /**
                                   * ThreadPoolTaskExecutor 执行服务器启动任务的线程池 (可根据具体情况修改,这个是我自己配置的线程池)
                                   */
                                  @Autowired
                                  private ThreadPoolTaskExecutor taskExecutor;
                                  /**
                                   * init 方法在 Spring 容器创建并初始化该类的实例后被调用。
                                   * 该方法使用线程池来执行启动 Netty TCP 服务器和 WebSocket 服务器的任务。
                                   */
                                  @PostConstruct
                                  public void init() {
                                      // 启动 Netty TCP 服务器
                                      taskExecutor.execute(() -> {
                                          try {
                                              // 调用 TcpServer 的 start 方法启动 TCP 服务器
                                              tcpServer.start();
                                          } catch (Exception e) {
                                              log.error("启动 Netty TCP 服务器 失败,{}", e.getMessage());
                                              e.printStackTrace();
                                          }
                                      });
                                      // 启动 Netty Websocket 服务器
                                      taskExecutor.execute(() -> {
                                          try {
                                              // 调用 WebSocketServer 的 start 方法启动 WebSocket 服务器
                                              webSocketServer.start();
                                          } catch (Exception e) {
                                              log.error("启动 Netty WebSocketServer 服务器 失败,{}", e.getMessage());
                                              e.printStackTrace();
                                          }
                                      });
                                  }
                                  /**
                                   * destroy 方法在 Spring 容器销毁该类的实例前被调用。
                                   * 该方法负责关闭线程池,以释放资源。
                                   */
                                  @PreDestroy
                                  public void destroy() {
                                      // 关闭线程池
                                      if (taskExecutor != null) {
                                          taskExecutor.shutdown();
                                      }
                                  }
                              }
                              

                              2、 AbstractNettyServer 通过继承该抽象类,子类可以实现特定的服务器逻辑,TCP 服务器或 WebSocket 服务器。

                              import io.netty.bootstrap.ServerBootstrap;
                              import io.netty.channel.ChannelFuture;
                              import io.netty.channel.ChannelInitializer;
                              import io.netty.channel.ChannelOption;
                              import io.netty.channel.EventLoopGroup;
                              import io.netty.channel.nio.NioEventLoopGroup;
                              import io.netty.channel.socket.SocketChannel;
                              import io.netty.channel.socket.nio.NioServerSocketChannel;
                              import lombok.extern.slf4j.Slf4j;
                              import org.springframework.stereotype.Component;
                              import java.util.ArrayList;
                              import java.util.List;
                              /**
                               * 启动和管理功能。
                               * 通过继承该抽象类,子类可以实现特定的服务器逻辑,如 TCP 服务器或 WebSocket 服务器。
                               *
                               * @author chenlei
                               */
                              @Slf4j
                              @Component
                              public abstract class AbstractNettyServer {
                                  /**
                                   * 存储绑定服务器的 ChannelFuture 对象列表,以便后续对服务器启动过程中的操作和状态进行管理
                                   */
                                  protected final List channelFutures = new ArrayList();
                                  /**
                                   * 启动 Netty 服务器的方法
                                   */
                                  public void start() {
                                      // 创建一个 NioEventLoopGroup 作为服务器的 boss 线程组,用于接收客户端的连接请求
                                      EventLoopGroup boss = new NioEventLoopGroup();
                                      // 创建一个 NioEventLoopGroup 作为服务器的 worker 线程组,用于处理客户端的业务逻辑
                                      EventLoopGroup worker = new NioEventLoopGroup();
                                      // 创建一个 ServerBootstrap 实例,用于配置和启动 Netty 服务器
                                      ServerBootstrap serverBootstrap = new ServerBootstrap();
                                      serverBootstrap.group(boss, worker)
                                              // 设置服务器的通道类型为 NioServerSocketChannel,即使用 NIO 进行网络通信
                                              .channel(NioServerSocketChannel.class)
                                              // 设置服务器的 TCP 选项,SO_BACKLOG 表示服务器可接收的最大连接请求队列长度,用于处理大量并发连接时的请求堆积
                                              .option(ChannelOption.SO_BACKLOG, 128)
                                              // 设置子通道的 TCP 选项,SO_KEEPALIVE 表示启用 TCP 的 Keep-Alive 机制,保持长连接
                                              .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)
                                              // 为子通道添加处理器,使用 ChannelInitializer 对每个新的 SocketChannel 进行初始化
                                              .childHandler(new ChannelInitializer() {
                                                  @Override
                                                  // 初始化新的 SocketChannel 的处理器链
                                                  protected void initChannel(SocketChannel ch) {
                                                      // 调用抽象方法,由子类实现具体的通道处理器初始化逻辑
                                                      initChannelHandlers(ch);
                                                  }
                                              });
                                      //端口
                                      int port = getPort();
                                      //地址
                                      String host = getHost();
                                      try {
                                          // 尝试将服务器绑定到当前端口
                                          ChannelFuture cf = serverBootstrap.bind(port);
                                          // 为绑定操作添加监听器,监听绑定操作的成功和失败状态
                                          cf.addListener(future -> {
                                              if (future.isSuccess()) {
                                                  // 绑定成功时,记录日志,显示服务器绑定的端口和主机地址
                                                  log.info("服务器在端口 {} 上成功绑定,主机地址为 {}", host, getHost());
                                              } else {
                                                  // 绑定失败时,记录错误日志,显示绑定的端口和失败原因
                                                  log.error("绑定端口 {} 时发生连接错误: {}", host, future.cause().getMessage());
                                              }
                                          });
                                          // 将 ChannelFuture 添加到列表中,以便后续管理
                                          channelFutures.add(cf);
                                      } catch (Exception e) {
                                          // 处理绑定端口时发生的异常,记录错误日志
                                          log.error("绑定端口 {} 时发生连接错误: {}", port, e.getMessage());
                                      }
                                      // 遍历已绑定的 ChannelFuture 列表,等待通道关闭
                                      for (ChannelFuture cf : channelFutures) {
                                          try {
                                              cf.channel().closeFuture().sync();
                                          } catch (InterruptedException e) {
                                              // 处理等待通道关闭时发生的异常,记录错误日志
                                              log.error("等待通道关闭时发生错误: {}", e.getMessage());
                                              // 中断当前线程状态,避免潜在的异常
                                              Thread.currentThread().interrupt();
                                          }
                                      }
                                      // 关闭 boss 线程组,释放资源
                                      boss.shutdownGracefully();
                                      // 关闭 worker 线程组,释放资源
                                      worker.shutdownGracefully();
                                  }
                                  /**
                                   * 获取服务器的主机地址,由子类实现具体逻辑
                                   *
                                   * @return 服务器的主机地址
                                   */
                                  protected abstract String getHost();
                                  /**
                                   * 获取服务器的端口范围,由子类实现具体逻辑
                                   *
                                   * @return 服务器的端口范围,多个端口使用逗号分隔
                                   */
                                  protected abstract int getPort();
                                  /**
                                   * 初始化通道处理器,由子类实现具体逻辑,根据不同的服务器类型添加不同的处理器
                                   *
                                   * @param ch 要初始化的 SocketChannel
                                   */
                                  protected abstract void initChannelHandlers(SocketChannel ch);
                              }
                              

                              3、 TcpServer 类是一个基于 Netty 的 TCP 服务器实现类,继承自 AbstractNettyServer。

                              import io.netty.buffer.Unpooled;
                              import io.netty.channel.socket.SocketChannel;
                              import io.netty.handler.codec.DelimiterBasedFrameDecoder;
                              import io.netty.handler.codec.string.StringDecoder;
                              import io.netty.handler.codec.string.StringEncoder;
                              import lombok.extern.slf4j.Slf4j;
                              import org.springframework.beans.factory.annotation.Value;
                              import org.springframework.stereotype.Component;
                              /**
                               * TcpServer 类是一个基于 Netty 的 TCP 服务器实现类,继承自 AbstractNettyServer。
                               * 启动时配置相应的通道处理器,以处理 TCP 连接和数据传输。
                               * 当客户端与服务器建立 TCP 连接时,该类将对连接进行一系列的处理操作。
                               *
                               * @author chenlei
                               */
                              @Slf4j
                              @Component
                              public class TcpServer extends AbstractNettyServer {
                                  /**
                                   * TCP 服务器的主机地址。
                                   */
                                  @Value("${tcp.host}")
                                  private String host;
                                  /**
                                   * TCP 服务器的端口。
                                   */
                                  @Value("${tcp.port}")
                                  private int port;
                                  /**
                                   * 获取 TCP 服务器的主机地址。
                                   *
                                   * @return 从配置文件中获取的 TCP 服务器的主机地址
                                   */
                                  @Override
                                  protected String getHost() {
                                      return host;
                                  }
                                  /**
                                   * 获取 TCP 服务器的端口范围。
                                   *
                                   * @return 从配置文件中获取的 TCP 服务器的端口范围
                                   */
                                  @Override
                                  protected int getPort() {
                                      return port;
                                  }
                                  /**
                                   * 初始化 TCP 服务器的通道处理器。
                                   * 为每个新建立的 SocketChannel 配置处理器链。
                                   *
                                   * @param ch 新建立的 SocketChannel,用于与客户端通信
                                   */
                                  @Override
                                  protected void initChannelHandlers(SocketChannel ch) {
                                      // 优化:可以使用更灵活的分隔符,如可配置的分隔符,提高代码的可扩展性
                                      // 添加帧解码器,解决粘包和拆包问题,使用 "\n" 作为分隔符,最大帧长度为 8192 字节
                                      ch.pipeline().addLast(new DelimiterBasedFrameDecoder(8192, Unpooled.copiedBuffer("\n".getBytes())));
                                      // 添加字符串解码器,将字节数据解码为字符串
                                      ch.pipeline().addLast(new StringDecoder());
                                      // 添加字符串编码器,将字符串编码为字节数据
                                      ch.pipeline().addLast(new StringEncoder());
                                      // 添加自定义的处理器,用于处理 TCP 数据
                                      ch.pipeline().addLast(new TcpServerHandler());
                                  }
                              }
                              

                              4、 TcpServerHandler 类,主要负责处理 Netty TCP 服务端各种事件和消息。

                              import io.netty.channel.ChannelHandlerContext;
                              import io.netty.channel.ChannelId;
                              import io.netty.channel.ChannelInboundHandlerAdapter;
                              import io.netty.handler.timeout.IdleState;
                              import io.netty.handler.timeout.IdleStateEvent;
                              import lombok.extern.slf4j.Slf4j;
                              import java.net.InetSocketAddress;
                              import java.util.concurrent.ConcurrentHashMap;
                              /**
                               * @author chenlei
                               */
                              @Slf4j
                              public class TcpServerHandler extends ChannelInboundHandlerAdapter {
                                  /**
                                   * 保存连接到服务端的通道信息,对连接的客户端进行管理,包括连接的添加、删除、查找等操作。
                                   */
                                  private static final ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap();
                                  /**
                                   * 当有客户端连接到服务器时,此方法会被触发。
                                   * 它会记录客户端的 IP 地址、端口号以及连接的 ChannelId,并将该连接添加到 CHANNEL_MAP 中。
                                   * 如果连接已经存在于 CHANNEL_MAP 中,会打印相应的日志信息;如果不存在,则添加到映射中并记录连接信息。
                                   *
                                   * @param ctx 通道处理器上下文,包含了通道的信息和操作通道的方法
                                   */
                                  @Override
                                  public void channelActive(ChannelHandlerContext ctx) {
                                      // 获取客户端的网络地址信息
                                      InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
                                      // 获取客户端的 IP 地址
                                      String clientIp = insocket.getAddress().getHostAddress();
                                      // 获取客户端的端口号
                                      int clientPort = insocket.getPort();
                                      // 获取连接通道的唯一标识
                                      ChannelId channelId = ctx.channel().id();
                                      // 如果该连接通道已经在映射中,打印连接状态信息
                                      if (CHANNEL_MAP.containsKey(channelId)) {
                                          log.info("客户端【" + channelId + "】是连接状态,连接通道数量: " + CHANNEL_MAP.size());
                                      } else {
                                          // 将新的连接添加到映射中
                                          CHANNEL_MAP.put(channelId, ctx);
                                          log.info("客户端【" + channelId + "】连接 netty 服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]");
                                          log.info("连接通道数量: " + CHANNEL_MAP.size());
                                      }
                                  }
                                  /**
                                   * 当有客户端终止连接服务器时,此方法会被触发。
                                   * 它会从 CHANNEL_MAP 中移除该客户端的连接信息,并打印相应的退出信息和更新后的连接通道数量。
                                   * 首先检查该连接是否存在于 CHANNEL_MAP 中,如果存在则进行移除操作。
                                   *
                                   * @param ctx 通道处理器上下文
                                   */
                                  @Override
                                  public void channelInactive(ChannelHandlerContext ctx) {
                                      InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
                                      String clientIp = insocket.getAddress().getHostAddress();
                                      ChannelId channelId = ctx.channel().id();
                                      // 检查映射中是否包含该客户端连接
                                      if (CHANNEL_MAP.containsKey(channelId)) {
                                          // 从映射中移除连接
                                          CHANNEL_MAP.remove(channelId);
                                          log.info("客户端【" + channelId + "】退出 netty 服务器[IP:" + clientIp + "--->PORT:" + insocket.getPort() + "]");
                                          log.info("连接通道数量: " + CHANNEL_MAP.size());
                                      }
                                  }
                                  /**
                                   * 当有客户端向服务器发送消息时,此方法会被触发。
                                   * 它会打印接收到的客户端消息,并调用 channelWrite 方法将消息返回给客户端。
                                   * 首先会打印接收到客户端报文的日志信息,然后调用 channelWrite 方法进行响应。
                                   *
                                   * @param ctx 通道处理器上下文
                                   * @param msg 从客户端接收到的消息对象
                                   */
                                  @Override
                                  public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                      // 将信息返回给客户端
                                      ctx.writeAndFlush(msg);
                                  }
                                  /**
                                   * 当触发用户事件时,此方法会被调用,主要用于处理空闲状态事件。
                                   * 根据不同的空闲状态(读、写、总超时)进行相应的处理,如断开连接。
                                   * 首先检查触发的事件是否是 IdleStateEvent,如果是则判断具体的空闲状态并进行相应处理。
                                   *
                                   * @param ctx 通道处理器上下文
                                   * @param evt 触发的事件对象
                                   */
                                  @Override
                                  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
                                      String socketString = ctx.channel().remoteAddress().toString();
                                      if (evt instanceof IdleStateEvent) {
                                          IdleStateEvent event = (IdleStateEvent) evt;
                                          if (event.state() == IdleState.READER_IDLE) {
                                              log.info("Client: " + socketString + " READER_IDLE 读超时");
                                              // 读超时,断开连接
                                              ctx.disconnect();
                                          } else if (event.state() == IdleState.WRITER_IDLE) {
                                              log.info("Client: " + socketString + " WRITER_IDLE 写超时");
                                              // 写超时,断开连接
                                              ctx.disconnect();
                                          } else if (event.state() == IdleState.ALL_IDLE) {
                                              log.info("Client: " + socketString + " ALL_IDLE 总超时");
                                              // 总超时,断开连接
                                              ctx.disconnect();
                                          }
                                      }
                                  }
                                  /**
                                   * 当发生异常时,此方法会被触发。
                                   * 它会关闭通道并打印相应的错误信息,同时打印当前的连接通道数量。
                                   * 异常发生时,会关闭通道以防止资源泄漏,并且打印异常发生的通道信息和当前的连接数量。
                                   *
                                   * @param ctx   通道处理器上下文
                                   * @param cause 引发异常的原因
                                   */
                                  @Override
                                  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                                      ctx.close();
                                      log.info(ctx.channel().id() + " 发生了错误,此连接被关闭" + "此时连通数量: " + CHANNEL_MAP.size());
                                  }
                              }
                              

                              5、 WebSocketServer 类,继承自 AbstractNettyServer,基于 Netty 框架实现的 WebSocket 服务器类。

                              import io.netty.channel.socket.SocketChannel;
                              import io.netty.handler.codec.http.HttpObjectAggregator;
                              import io.netty.handler.codec.http.HttpServerCodec;
                              import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
                              import lombok.extern.slf4j.Slf4j;
                              import org.springframework.beans.factory.annotation.Value;
                              import org.springframework.stereotype.Component;
                              /**
                               * WebSocketServer 类,继承自 AbstractNettyServer,基于 Netty 框架实现的 WebSocket 服务器类。
                               * 它负责从配置文件中获取 WebSocket 服务器的配置信息(主机地址和端口范围)。
                               *
                               * @author chenlei
                               */
                              @Slf4j
                              @Component
                              public class WebSocketServer extends AbstractNettyServer {
                                  /**
                                   * WebSocket 服务器的主机地址
                                   */
                                  @Value("${websocket.host}")
                                  private String host;
                                  /**
                                   * WebSocket 服务器的端口范围,多个端口使用逗号分隔。
                                   */
                                  @Value("${websocket.port}")
                                  private int port;
                                  /**
                                   * 获取 WebSocket 服务器的主机地址。
                                   *
                                   * @return 从配置文件中获取的 WebSocket 服务器的主机地址。
                                   */
                                  @Override
                                  protected String getHost() {
                                      return host;
                                  }
                                  /**
                                   * 获取 WebSocket 服务器的端口范围。
                                   *
                                   * @return 从配置文件中获取的 WebSocket 服务器的端口范围。
                                   */
                                  @Override
                                  protected int getPort() {
                                      return port;
                                  }
                                  /**
                                   * 初始化 WebSocket 服务器的通道处理器。
                                   * 方法会在服务器启动时为每个新建立的 SocketChannel 配置处理器链,以处理 WebSocket 通信的各个阶段。
                                   *
                                   * @param ch 新建立的 SocketChannel,用于与客户端进行通信。
                                   */
                                  @Override
                                  protected void initChannelHandlers(SocketChannel ch) {
                                      // 在通道的处理器链中添加 HttpServerCodec,用于处理 HTTP 协议的编解码
                                      ch.pipeline().addLast(new HttpServerCodec());
                                      // 添加 HttpObjectAggregator,用于将 HTTP 消息聚合成完整的请求或响应
                                      ch.pipeline().addLast(new HttpObjectAggregator(65536));
                                      // 添加 WebSocketServerProtocolHandler,用于处理 WebSocket 的握手、控制帧等
                                      ch.pipeline().addLast(new WebSocketServerProtocolHandler("/websocket"));
                                      // 添加自定义的处理器,用于处理 WebSocket 数据
                                      ch.pipeline().addLast(new WebSocketHandler());
                                  }
                              }
                              

                              6、 WebSocketHandler 类,继承自 SimpleChannelInboundHandler,用于处理 WebSocket 文本消息

                              import io.netty.channel.ChannelHandlerContext;
                              import io.netty.channel.SimpleChannelInboundHandler;
                              import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
                              import lombok.extern.slf4j.Slf4j;
                              /**
                               * WebSocketHandler 类,继承自 SimpleChannelInboundHandler,用于处理 WebSocket 文本消息
                               *
                               * @author chenlei
                               */
                              @Slf4j
                              public class WebSocketHandler extends SimpleChannelInboundHandler {
                                  /**
                                   * 处理接收到的消息
                                   *
                                   * @param ctx 通道处理上下文,包含了与通道相关的信息,如通道本身、管道等
                                   * @param msg 接收到的 TextWebSocketFrame 消息,包含了客户端发送的文本消息内容
                                   */
                                  @Override
                                  protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
                                      String message = msg.text();
                                      // 创建一个新的 TextWebSocketFrame 来存储要发送回客户端的消息
                                      TextWebSocketFrame responseFrame = new TextWebSocketFrame(message);
                                      // 将消息发送回客户端
                                      ctx.channel().writeAndFlush(responseFrame);
                                  }
                                  /**
                                   * 记录客户端的连接信息
                                   *
                                   * @param ctx 通道处理上下文
                                   */
                                  @Override
                                  public void channelActive(ChannelHandlerContext ctx) {
                                      // 当有新的 WebSocket 客户端连接时记录日志
                                      log.info("一个新的 WebSocket 客户端已连接: {}", ctx.channel().remoteAddress());
                                  }
                                  /**
                                   * 当 WebSocket 客户端断开连接时调用此方法
                                   *
                                   * @param ctx 通道处理上下文
                                   */
                                  @Override
                                  public void channelInactive(ChannelHandlerContext ctx) {
                                      // 当 WebSocket 客户端断开连接时记录日志
                                      log.info("WebSocket 客户端已断开连接: {}", ctx.channel().remoteAddress());
                                  }
                                  /**
                                   * 当处理 WebSocket 连接过程中发生异常时调用此方法
                                   * 该方法主要负责记录异常信息,并关闭连接以防止异常扩散影响其他客户端
                                   *
                                   * @param ctx   通道处理上下文
                                   * @param cause 引发异常的原因,包含了异常的详细信息
                                   */
                                  @Override
                                  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                                      // 当处理过程中发生异常时记录日志
                                      log.error("WebSocket 连接中发生错误: {}", cause.getMessage());
                                      // 关闭连接以避免异常扩散影响其他客户端
                                      ctx.close();
                                  }
                              }
                              
                              JAVA(SpringBoot)集成Netty实现(TCP、Websocket)服务端与客户端。
                              (图片来源网络,侵删)
                              JAVA(SpringBoot)集成Netty实现(TCP、Websocket)服务端与客户端。
                              (图片来源网络,侵删)
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。

目录[+]

取消
微信二维码
微信二维码
支付宝二维码