JAVA(SpringBoot)集成Netty实现(TCP、Websocket)服务端与客户端。
SpringBoot集成 Netty 。
- 一、Netty 简介
- 二、Netty功能
- 1. 网络通信支持
- 2. 高性能与低资源消耗
- 3. 易于使用和定制
- 4. 内存管理
- 5. 安全性
- 三、POM依赖
- 四、TCP
- 1、服务端
- 1.1 创建一个Netty服务端类,NettyTcpServer
- 1.2 创建一个 NettyTcpServerHandler继承自 ChannelInboundHandlerAdapter,主要负责处理 Netty TCP 服务端各种事件和消息。
- 2、客户端
- 2.1 创建一个Netty客户端类,NettyTcpClient
- 2.2 创建一个 NettyTcpClientHandler 客户端处理器,继承自 SimpleChannelInboundHandler 处理服务器发送的数据。
- 五、WebSocket
- 1、服务端
- 1.1 创建一个Netty WebSocket 服务端类,WebSocketServer
- 1.2 创建一个 WebSocketHandler 继承自 SimpleChannelInboundHandler,用于处理 WebSocket 文本消息。
- 六、TCP 与 WebSocket 同时监听不同的端口
- 1、 NettyConfig 用于配置和管理 Netty 服务器的启动和关闭
- 2、 AbstractNettyServer 通过继承该抽象类,子类可以实现特定的服务器逻辑,TCP 服务器或 WebSocket 服务器。
- 3、 TcpServer 类是一个基于 Netty 的 TCP 服务器实现类,继承自 AbstractNettyServer。
- 4、 TcpServerHandler 类,主要负责处理 Netty TCP 服务端各种事件和消息。
- 5、 WebSocketServer 类,继承自 AbstractNettyServer,基于 Netty 框架实现的 WebSocket 服务器类。
- 6、 WebSocketHandler 类,继承自 SimpleChannelInboundHandler,用于处理 WebSocket 文本消息
一、Netty 简介
Netty 是一款基于 Java 语言开发的高性能、异步事件驱动型网络应用框架,它为开发人员提供了对 TCP、UDP、HTTP、WebSocket 等多种网络协议的支持,极大地简化了网络编程工作,无论是开发服务器端应用还是客户端应用都十分便捷。其主要特点包括:
高性能:通过使用异步 I/O 和事件驱动模型,减少线程切换和上下文开销,提升系统性能。
易用性:提供了高层次的抽象,使得开发者可以专注于业务逻辑,而不必过多关注底层网络细节。
灵活性:支持多种协议和编解码方式,易于定制和扩展。
二、Netty功能
Netty是一个基于Java的高性能、异步事件驱动的网络应用框架,用于快速开发可维护的高性能网络服务器和客户端程序。以下是其一些主要功能:
1. 网络通信支持
- 多种协议:
- Netty支持广泛的网络协议,如TCP、UDP、HTTP、HTTP/2、WebSocket等。这使得开发人员能够轻松构建支持不同应用层协议的网络应用。例如,开发基于HTTP协议的Web服务器,或者基于WebSocket协议的实时通信应用。
- 以HTTP协议为例,Netty提供了专门的编解码器和处理器,方便处理HTTP请求和响应,包括解析HTTP头、处理HTTP内容体等。
- 跨平台:它能够在不同的操作系统和Java版本上运行,确保应用的可移植性。无论是在Linux、Windows还是Mac OS系统上,Netty都能发挥其高性能的优势。
2. 高性能与低资源消耗
- 异步和事件驱动:Netty采用异步I/O和事件驱动模型,这使得它能够在处理大量并发连接时,避免线程阻塞,提高系统的吞吐量和响应能力。例如,当一个新的连接建立或者数据可读时,Netty会触发相应的事件,由预先注册的事件处理器进行处理,而不是让线程一直等待I/O操作完成。
- 零拷贝技术:Netty通过使用零拷贝技术,减少了数据在内存中的拷贝次数,提高了数据传输的效率。例如,在文件传输场景中,Netty可以直接将文件内容从磁盘传输到网络,而不需要将文件内容先拷贝到应用程序的内存中。
3. 易于使用和定制
- 简单的API:Netty提供了简洁直观的API,使得开发人员能够快速上手,构建复杂的网络应用。例如,通过ChannelPipeline机制,开发人员可以方便地添加和管理各种网络处理逻辑,如编解码、业务逻辑处理等。
- 高度可定制:它的架构设计非常灵活,允许开发人员根据具体需求对其进行定制。比如,可以自定义编解码器来处理特定格式的协议数据,或者定制线程模型以适应不同的应用场景。
4. 内存管理
- 池化内存分配:Netty提供了池化内存分配器,能够有效地管理内存,减少内存碎片,提高内存的使用效率。例如,在高并发场景下,频繁地创建和销毁对象会导致内存碎片问题,而Netty的池化内存分配器可以复用已分配的内存块,避免这种情况的发生。
- 自动内存回收:它具备自动内存回收机制,能够根据应用的运行情况,自动调整内存的使用,降低内存泄漏的风险。
5. 安全性
- SSL/TLS支持:Netty内置了对SSL/TLS协议的支持,使得开发人员能够轻松地为网络应用添加安全加密功能,保护数据在传输过程中的机密性和完整性。例如,在开发金融类网络应用时,可以使用Netty的SSL/TLS支持,确保用户的交易数据安全传输。
三、POM依赖
io.netty netty-transport 4.1.94.Final io.netty netty-codec 4.1.94.Final io.netty netty-all 4.1.92.Final
四、TCP
1、服务端
1.1 创建一个Netty服务端类,NettyTcpServer
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; /** * Netty TCP服务类 * * @author chenlei */ @Slf4j @Component public class NettyTcpServer implements CommandLineRunner { /** * 端口号 */ private int port = 8972; @Override public void run(String... args) { // 接收连接 EventLoopGroup boss = new NioEventLoopGroup(); // 处理信息 EventLoopGroup worker = new NioEventLoopGroup(); try { // 定义server ServerBootstrap serverBootstrap = new ServerBootstrap(); // 添加分组 serverBootstrap.group(boss, worker) // 添加通道设置非阻塞 .channel(NioServerSocketChannel.class) // 服务端可连接队列数量 .option(ChannelOption.SO_BACKLOG, 128) // 开启长连接 .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) // 流程处理 .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline().addLast(new NettyTcpServerHandler()); } }); // 绑定端口 ChannelFuture cf = serverBootstrap.bind(port).sync(); // 优雅关闭连接 cf.channel().closeFuture().sync(); } catch (Exception e) { log.error("连接错误:{}", e.getMessage()); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
1.2 创建一个 NettyTcpServerHandler继承自 ChannelInboundHandlerAdapter,主要负责处理 Netty TCP 服务端各种事件和消息。
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelId; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; import java.util.concurrent.ConcurrentHashMap; /** * @author chenlei */ @Slf4j public class NettyTcpServerHandler extends ChannelInboundHandlerAdapter { /** * 保存连接到服务端的通道信息,对连接的客户端进行管理,包括连接的添加、删除、查找等操作。 */ private static final ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap(); /** * 当有客户端连接到服务器时,此方法会被触发。 * 它会记录客户端的 IP 地址、端口号以及连接的 ChannelId,并将该连接添加到 CHANNEL_MAP 中。 * 如果连接已经存在于 CHANNEL_MAP 中,会打印相应的日志信息;如果不存在,则添加到映射中并记录连接信息。 * * @param ctx 通道处理器上下文,包含了通道的信息和操作通道的方法 */ @Override public void channelActive(ChannelHandlerContext ctx) { // 获取客户端的网络地址信息 InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); // 获取客户端的 IP 地址 String clientIp = insocket.getAddress().getHostAddress(); // 获取客户端的端口号 int clientPort = insocket.getPort(); // 获取连接通道的唯一标识 ChannelId channelId = ctx.channel().id(); // 如果该连接通道已经在映射中,打印连接状态信息 if (CHANNEL_MAP.containsKey(channelId)) { log.info("客户端【" + channelId + "】是连接状态,连接通道数量: " + CHANNEL_MAP.size()); } else { // 将新的连接添加到映射中 CHANNEL_MAP.put(channelId, ctx); log.info("客户端【" + channelId + "】连接 netty 服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]"); log.info("连接通道数量: " + CHANNEL_MAP.size()); } } /** * 当有客户端终止连接服务器时,此方法会被触发。 * 它会从 CHANNEL_MAP 中移除该客户端的连接信息,并打印相应的退出信息和更新后的连接通道数量。 * 首先检查该连接是否存在于 CHANNEL_MAP 中,如果存在则进行移除操作。 * * @param ctx 通道处理器上下文 */ @Override public void channelInactive(ChannelHandlerContext ctx) { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress(); ChannelId channelId = ctx.channel().id(); // 检查映射中是否包含该客户端连接 if (CHANNEL_MAP.containsKey(channelId)) { // 从映射中移除连接 CHANNEL_MAP.remove(channelId); log.info("客户端【" + channelId + "】退出 netty 服务器[IP:" + clientIp + "--->PORT:" + insocket.getPort() + "]"); log.info("连接通道数量: " + CHANNEL_MAP.size()); } } /** * 当有客户端向服务器发送消息时,此方法会被触发。 * 它会打印接收到的客户端消息,并调用 channelWrite 方法将消息返回给客户端。 * 首先会打印接收到客户端报文的日志信息,然后调用 channelWrite 方法进行响应。 * * @param ctx 通道处理器上下文 * @param msg 从客户端接收到的消息对象 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { log.info("加载客户端报文......"); log.info("【" + ctx.channel().id() + "】" + " :" + msg); // 下面可以解析数据,保存数据,生成返回报文,将需要返回报文写入 write 函数 // 调用 channelWrite 方法将消息返回给客户端 this.channelWrite(ctx.channel().id(), msg); } /** * 服务端给客户端发送消息的方法。 * 首先根据传入的 ChannelId 从 CHANNEL_MAP 中获取对应的 ChannelHandlerContext, * 然后检查消息是否为空以及 ChannelHandlerContext 是否存在,若存在则将消息写入通道并刷新缓冲区。 * * @param channelId 连接通道的唯一标识 * @param msg 需要发送的消息内容 */ public void channelWrite(ChannelId channelId, Object msg) { // 获取与 ChannelId 对应的 ChannelHandlerContext ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId); if (ctx == null) { log.info("通道【" + channelId + "】不存在"); return; } if (msg == null || msg == "") { log.info("服务端响应空的消息"); return; } // 将消息写入通道 ctx.write(msg); // 刷新通道的输出缓冲区,确保消息被发送出去 ctx.flush(); } /** * 当触发用户事件时,此方法会被调用,主要用于处理空闲状态事件。 * 根据不同的空闲状态(读、写、总超时)进行相应的处理,如断开连接。 * 首先检查触发的事件是否是 IdleStateEvent,如果是则判断具体的空闲状态并进行相应处理。 * * @param ctx 通道处理器上下文 * @param evt 触发的事件对象 */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { String socketString = ctx.channel().remoteAddress().toString(); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.info("Client: " + socketString + " READER_IDLE 读超时"); // 读超时,断开连接 ctx.disconnect(); } else if (event.state() == IdleState.WRITER_IDLE) { log.info("Client: " + socketString + " WRITER_IDLE 写超时"); // 写超时,断开连接 ctx.disconnect(); } else if (event.state() == IdleState.ALL_IDLE) { log.info("Client: " + socketString + " ALL_IDLE 总超时"); // 总超时,断开连接 ctx.disconnect(); } } } /** * 当发生异常时,此方法会被触发。 * 它会关闭通道并打印相应的错误信息,同时打印当前的连接通道数量。 * 异常发生时,会关闭通道以防止资源泄漏,并且打印异常发生的通道信息和当前的连接数量。 * * @param ctx 通道处理器上下文 * @param cause 引发异常的原因 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); log.info(ctx.channel().id() + " 发生了错误,此连接被关闭" + "此时连通数量: " + CHANNEL_MAP.size()); } }
2、客户端
2.1 创建一个Netty客户端类,NettyTcpClient
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * Netty TCP客户端 * * @author chenlei */ @Slf4j @Component public class NettyTcpClient implements CommandLineRunner { /** * 最大连接次数 */ private final int maxConnectTimes = 10; /** * 地址 */ private final String host = "192.168.2.154"; /** * 端口 */ private final int port = 1250; @Override public void run(String... args) { // 创建一个处理 I/O 操作的 EventLoopGroup,用于处理客户端的 I/O 操作和事件 EventLoopGroup group = new NioEventLoopGroup(); try { // 创建 Bootstrap 对象,用于配置和启动 Netty 客户端 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) // 使用 NioSocketChannel 作为通道类型,基于 NIO 的 Socket 通道 .channel(NioSocketChannel.class) // 设置 TCP 选项,如 TCP_NODELAY 开启无延迟模式,提高性能 .option(ChannelOption.TCP_NODELAY, true) // 为 ChannelPipeline 添加处理器,用于处理通道的数据 .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { // 添加帧解码器,处理粘包和拆包问题,这里以 \n 作为分隔符 ch.pipeline().addLast(new DelimiterBasedFrameDecoder(8192, Unpooled.copiedBuffer("\n".getBytes()))); // 添加字符串解码器,将字节数据解码为字符串 ch.pipeline().addLast(new StringDecoder()); // 添加字符串编码器,将字符串编码为字节数据 ch.pipeline().addLast(new StringEncoder()); // 添加自定义的处理器,用于处理接收到的数据 ch.pipeline().addLast(new NettyTcpClientHandler(bootstrap, host, NettyTcpClient.this)); } }); // 连接到服务器的指定端口范围 final AtomicInteger connectTimes = new AtomicInteger(0); // 尝试连接到服务器,并添加连接结果监听器 bootstrap.connect(host, port).addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { // 连接成功时打印日志 log.info("成功连接到端口: {}", port); } else { // 连接失败时打印日志 log.error("连接到服务器时出错,端口: {}", port); // 连接失败时,使用 eventLoop 安排重连任务,60 秒后重连 if (connectTimes.get() { // 重连逻辑,再次尝试连接 bootstrap.connect(host, port); }, 60, TimeUnit.SECONDS); } else { log.error("已达到最大连接次数,停止重连,端口: {}", port); } } }); } catch (Exception e) { // 发生异常时打印错误日志 log.error("Netty 客户端出错", e); } } /** * 定义重新连接方法 * * @param bootstrap 用于配置和启动 Netty 客户端的 Bootstrap 对象 * @param host 服务器的主机地址 * @param port 要连接的服务器端口号 * @param future 表示连接操作的 ChannelFuture 对象 */ void reconnect(Bootstrap bootstrap, String host, int port, ChannelFuture future) { final AtomicInteger connectTimes = new AtomicInteger(0); try { bootstrap.connect(host, port).addListener((ChannelFutureListener) f -> { if (f.isSuccess()) { log.info("重连成功,端口: {}", port); connectTimes.set(0); } else { if (connectTimes.get() reconnect(bootstrap, host, port, f), 60, TimeUnit.SECONDS); } else { log.error("已达到最大连接次数,停止重连,端口: {}", port); } } }); } catch (Exception e) { log.error("重连时发生异常,端口: {}", port); reconnect(bootstrap, host, port, future); } } }
2.2 创建一个 NettyTcpClientHandler 客户端处理器,继承自 SimpleChannelInboundHandler 处理服务器发送的数据。
import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; /** * Netty 客户端处理器类,继承自 SimpleChannelInboundHandler 以处理服务器发送的字符串数据 * * @author chenlei */ @Slf4j public class NettyTcpClientHandler extends SimpleChannelInboundHandler { /** * Bootstrap 对象,用于客户端连接的配置和启动 */ private final Bootstrap bootstrap; /** * 服务器的主机地址 */ private final String host; /** * Netty 客户端实例,用于调用重连等操作 */ private final NettyTcpClient nettyTcpClient; /** * 初始化相关参数 * * @param bootstrap 用于客户端连接的 Bootstrap 对象 * @param host 服务器的主机地址 * @param nettyClient Netty 客户端实例 */ public NettyTcpClientHandler(Bootstrap bootstrap, String host, NettyTcpClient nettyTcpClient) { this.bootstrap = bootstrap; this.host = host; this.nettyTcpClient= nettyTcpClient; } /** * 当接收到服务器发送的数据时调用此方法 * * @param ctx 通道处理器上下文,可用于执行通道操作,如发送数据、关闭通道等 * @param response 从服务器接收到的响应字符串 */ @Override protected void channelRead0(ChannelHandlerContext ctx, String response) { log.info("接收处理服务器响应数据"); //以下进行具体的业务操作 } /** * 当发生异常时调用此方法 * * @param ctx 通道处理器上下文 * @param cause 异常对象 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 打印异常信息 log.error("处理服务器响应时出错,异常信息: {}", cause.getMessage(), cause); // 关闭当前通道 ctx.close(); } /** * 当通道关闭时调用此方法 * * @param ctx 通道处理器上下文 */ @Override public void channelInactive(ChannelHandlerContext ctx) { final Channel channel = ctx.channel(); // 获取远程服务器的端口号 int port = ((InetSocketAddress) channel.remoteAddress()).getPort(); log.info("通道处于非活动状态,正在尝试在端口上重新连接: {}", port); // 获取 NettyTcpClientHandler 中存储的 NettyClient 实例 NettyTcpClient nettyTcpClient = ((NettyTcpClientHandler) ctx.handler()).nettyTcpClient; // 调用 nettyTcpClient 中的 reconnect 方法进行重连 nettyTcpClient .reconnect(bootstrap, host, port, ctx.channel().newFailedFuture(new RuntimeException("频道处于非活动状态"))); } }
五、WebSocket
1、服务端
1.1 创建一个Netty WebSocket 服务端类,WebSocketServer
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; /** * WebSocket 服务类 * * @author chenlei */ @Slf4j @Component public class WebSocketServer implements CommandLineRunner { /** * 端口号 */ private int port = 897; @Override public void run(String... args) { // 接收 WebSocket 连接的主 EventLoopGroup,用于处理接收新连接的请求 EventLoopGroup webSocketBoss = new NioEventLoopGroup(); // 处理 WebSocket 信息的从 EventLoopGroup,用于处理连接建立后的读写操作 EventLoopGroup webSocketWorker = new NioEventLoopGroup(); try { // 定义 WebSocket 服务器启动器 ServerBootstrap webSocketServerBootstrap = new ServerBootstrap(); webSocketServerBootstrap.group(webSocketBoss, webSocketWorker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { // 在通道的处理器链中添加 HttpServerCodec,用于处理 HTTP 协议的编解码 ch.pipeline().addLast(new HttpServerCodec()); // 添加 HttpObjectAggregator,用于将 HTTP 消息聚合成完整的请求或响应 ch.pipeline().addLast(new HttpObjectAggregator(65536)); // 添加 WebSocketServerProtocolHandler,用于处理 WebSocket 的握手、控制帧等 ch.pipeline().addLast(new WebSocketServerProtocolHandler("/websocket")); // 添加自定义的处理器,用于处理 WebSocket 数据 ch.pipeline().addLast(new WebSocketHandler()); } }); // 绑定端口 ChannelFuture cf = serverBootstrap.bind(port).sync(); // 优雅关闭连接 cf.channel().closeFuture().sync(); } catch (Exception e) { log.error("连接错误:{}", e.getMessage()); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
1.2 创建一个 WebSocketHandler 继承自 SimpleChannelInboundHandler,用于处理 WebSocket 文本消息。
import com.casictime.system.domain.vo.DeviceInfo; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import lombok.extern.slf4j.Slf4j; /** * WebSocketHandler 类,继承自 SimpleChannelInboundHandler,用于处理 WebSocket 文本消息 * * @author chenlei */ @Slf4j public class WebSocketHandler extends SimpleChannelInboundHandler { /** * 处理接收到的消息 * * @param ctx 通道处理上下文,包含了与通道相关的信息,如通道本身、管道等 * @param msg 接收到的 TextWebSocketFrame 消息,包含了客户端发送的文本消息内容 */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) { String message = msg.text(); // 接收到客户端消息时记录日志,使用 debug 级别,方便在生产环境中调整日志输出 log.debug("收到客户端的消息: {}", message); // 将信息返回给客户端 ctx.writeAndFlush(msg); // 回复消息示例(如果需要) // ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到您的消息: " + message)); } /** * 记录客户端的连接信息 * * @param ctx 通道处理上下文 */ @Override public void channelActive(ChannelHandlerContext ctx) { // 当有新的 WebSocket 客户端连接时记录日志 log.info("一个新的 WebSocket 客户端已连接: {}", ctx.channel().remoteAddress()); } /** * 当 WebSocket 客户端断开连接时调用此方法 * * @param ctx 通道处理上下文 */ @Override public void channelInactive(ChannelHandlerContext ctx) { // 当 WebSocket 客户端断开连接时记录日志 log.info("WebSocket 客户端已断开连接: {}", ctx.channel().remoteAddress()); } /** * 当处理 WebSocket 连接过程中发生异常时调用此方法 * 该方法主要负责记录异常信息,并关闭连接以防止异常扩散影响其他客户端 * * @param ctx 通道处理上下文 * @param cause 引发异常的原因,包含了异常的详细信息 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 当处理过程中发生异常时记录日志 log.error("WebSocket 连接中发生错误: {}", cause.getMessage()); // 关闭连接以避免异常扩散影响其他客户端 ctx.close(); } }
六、TCP 与 WebSocket 同时监听不同的端口
1、 NettyConfig 用于配置和管理 Netty 服务器的启动和关闭
import com.casictime.framework.netty.server.tcp.TcpServer; import com.casictime.framework.netty.server.websocket.WebSocketServer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; /** * NettyConfig 用于配置和管理 Netty 服务器的启动和关闭。 * 负责启动 Netty 的 TCP 服务器和 WebSocket 服务器,并使用线程池来执行启动任务。 * * @author chenlei */ @Slf4j @Configuration public class NettyConfig { /** * TcpServer 用于启动 TCP 服务器 */ @Autowired private TcpServer tcpServer; /** * WebSocketServer 用于启动 WebSocket 服务器 */ @Autowired private WebSocketServer webSocketServer; /** * ThreadPoolTaskExecutor 执行服务器启动任务的线程池 (可根据具体情况修改,这个是我自己配置的线程池) */ @Autowired private ThreadPoolTaskExecutor taskExecutor; /** * init 方法在 Spring 容器创建并初始化该类的实例后被调用。 * 该方法使用线程池来执行启动 Netty TCP 服务器和 WebSocket 服务器的任务。 */ @PostConstruct public void init() { // 启动 Netty TCP 服务器 taskExecutor.execute(() -> { try { // 调用 TcpServer 的 start 方法启动 TCP 服务器 tcpServer.start(); } catch (Exception e) { log.error("启动 Netty TCP 服务器 失败,{}", e.getMessage()); e.printStackTrace(); } }); // 启动 Netty Websocket 服务器 taskExecutor.execute(() -> { try { // 调用 WebSocketServer 的 start 方法启动 WebSocket 服务器 webSocketServer.start(); } catch (Exception e) { log.error("启动 Netty WebSocketServer 服务器 失败,{}", e.getMessage()); e.printStackTrace(); } }); } /** * destroy 方法在 Spring 容器销毁该类的实例前被调用。 * 该方法负责关闭线程池,以释放资源。 */ @PreDestroy public void destroy() { // 关闭线程池 if (taskExecutor != null) { taskExecutor.shutdown(); } } }
2、 AbstractNettyServer 通过继承该抽象类,子类可以实现特定的服务器逻辑,TCP 服务器或 WebSocket 服务器。
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; /** * 启动和管理功能。 * 通过继承该抽象类,子类可以实现特定的服务器逻辑,如 TCP 服务器或 WebSocket 服务器。 * * @author chenlei */ @Slf4j @Component public abstract class AbstractNettyServer { /** * 存储绑定服务器的 ChannelFuture 对象列表,以便后续对服务器启动过程中的操作和状态进行管理 */ protected final List channelFutures = new ArrayList(); /** * 启动 Netty 服务器的方法 */ public void start() { // 创建一个 NioEventLoopGroup 作为服务器的 boss 线程组,用于接收客户端的连接请求 EventLoopGroup boss = new NioEventLoopGroup(); // 创建一个 NioEventLoopGroup 作为服务器的 worker 线程组,用于处理客户端的业务逻辑 EventLoopGroup worker = new NioEventLoopGroup(); // 创建一个 ServerBootstrap 实例,用于配置和启动 Netty 服务器 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker) // 设置服务器的通道类型为 NioServerSocketChannel,即使用 NIO 进行网络通信 .channel(NioServerSocketChannel.class) // 设置服务器的 TCP 选项,SO_BACKLOG 表示服务器可接收的最大连接请求队列长度,用于处理大量并发连接时的请求堆积 .option(ChannelOption.SO_BACKLOG, 128) // 设置子通道的 TCP 选项,SO_KEEPALIVE 表示启用 TCP 的 Keep-Alive 机制,保持长连接 .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) // 为子通道添加处理器,使用 ChannelInitializer 对每个新的 SocketChannel 进行初始化 .childHandler(new ChannelInitializer() { @Override // 初始化新的 SocketChannel 的处理器链 protected void initChannel(SocketChannel ch) { // 调用抽象方法,由子类实现具体的通道处理器初始化逻辑 initChannelHandlers(ch); } }); //端口 int port = getPort(); //地址 String host = getHost(); try { // 尝试将服务器绑定到当前端口 ChannelFuture cf = serverBootstrap.bind(port); // 为绑定操作添加监听器,监听绑定操作的成功和失败状态 cf.addListener(future -> { if (future.isSuccess()) { // 绑定成功时,记录日志,显示服务器绑定的端口和主机地址 log.info("服务器在端口 {} 上成功绑定,主机地址为 {}", host, getHost()); } else { // 绑定失败时,记录错误日志,显示绑定的端口和失败原因 log.error("绑定端口 {} 时发生连接错误: {}", host, future.cause().getMessage()); } }); // 将 ChannelFuture 添加到列表中,以便后续管理 channelFutures.add(cf); } catch (Exception e) { // 处理绑定端口时发生的异常,记录错误日志 log.error("绑定端口 {} 时发生连接错误: {}", port, e.getMessage()); } // 遍历已绑定的 ChannelFuture 列表,等待通道关闭 for (ChannelFuture cf : channelFutures) { try { cf.channel().closeFuture().sync(); } catch (InterruptedException e) { // 处理等待通道关闭时发生的异常,记录错误日志 log.error("等待通道关闭时发生错误: {}", e.getMessage()); // 中断当前线程状态,避免潜在的异常 Thread.currentThread().interrupt(); } } // 关闭 boss 线程组,释放资源 boss.shutdownGracefully(); // 关闭 worker 线程组,释放资源 worker.shutdownGracefully(); } /** * 获取服务器的主机地址,由子类实现具体逻辑 * * @return 服务器的主机地址 */ protected abstract String getHost(); /** * 获取服务器的端口范围,由子类实现具体逻辑 * * @return 服务器的端口范围,多个端口使用逗号分隔 */ protected abstract int getPort(); /** * 初始化通道处理器,由子类实现具体逻辑,根据不同的服务器类型添加不同的处理器 * * @param ch 要初始化的 SocketChannel */ protected abstract void initChannelHandlers(SocketChannel ch); }
3、 TcpServer 类是一个基于 Netty 的 TCP 服务器实现类,继承自 AbstractNettyServer。
import io.netty.buffer.Unpooled; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * TcpServer 类是一个基于 Netty 的 TCP 服务器实现类,继承自 AbstractNettyServer。 * 启动时配置相应的通道处理器,以处理 TCP 连接和数据传输。 * 当客户端与服务器建立 TCP 连接时,该类将对连接进行一系列的处理操作。 * * @author chenlei */ @Slf4j @Component public class TcpServer extends AbstractNettyServer { /** * TCP 服务器的主机地址。 */ @Value("${tcp.host}") private String host; /** * TCP 服务器的端口。 */ @Value("${tcp.port}") private int port; /** * 获取 TCP 服务器的主机地址。 * * @return 从配置文件中获取的 TCP 服务器的主机地址 */ @Override protected String getHost() { return host; } /** * 获取 TCP 服务器的端口范围。 * * @return 从配置文件中获取的 TCP 服务器的端口范围 */ @Override protected int getPort() { return port; } /** * 初始化 TCP 服务器的通道处理器。 * 为每个新建立的 SocketChannel 配置处理器链。 * * @param ch 新建立的 SocketChannel,用于与客户端通信 */ @Override protected void initChannelHandlers(SocketChannel ch) { // 优化:可以使用更灵活的分隔符,如可配置的分隔符,提高代码的可扩展性 // 添加帧解码器,解决粘包和拆包问题,使用 "\n" 作为分隔符,最大帧长度为 8192 字节 ch.pipeline().addLast(new DelimiterBasedFrameDecoder(8192, Unpooled.copiedBuffer("\n".getBytes()))); // 添加字符串解码器,将字节数据解码为字符串 ch.pipeline().addLast(new StringDecoder()); // 添加字符串编码器,将字符串编码为字节数据 ch.pipeline().addLast(new StringEncoder()); // 添加自定义的处理器,用于处理 TCP 数据 ch.pipeline().addLast(new TcpServerHandler()); } }
4、 TcpServerHandler 类,主要负责处理 Netty TCP 服务端各种事件和消息。
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelId; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; import java.util.concurrent.ConcurrentHashMap; /** * @author chenlei */ @Slf4j public class TcpServerHandler extends ChannelInboundHandlerAdapter { /** * 保存连接到服务端的通道信息,对连接的客户端进行管理,包括连接的添加、删除、查找等操作。 */ private static final ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap(); /** * 当有客户端连接到服务器时,此方法会被触发。 * 它会记录客户端的 IP 地址、端口号以及连接的 ChannelId,并将该连接添加到 CHANNEL_MAP 中。 * 如果连接已经存在于 CHANNEL_MAP 中,会打印相应的日志信息;如果不存在,则添加到映射中并记录连接信息。 * * @param ctx 通道处理器上下文,包含了通道的信息和操作通道的方法 */ @Override public void channelActive(ChannelHandlerContext ctx) { // 获取客户端的网络地址信息 InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); // 获取客户端的 IP 地址 String clientIp = insocket.getAddress().getHostAddress(); // 获取客户端的端口号 int clientPort = insocket.getPort(); // 获取连接通道的唯一标识 ChannelId channelId = ctx.channel().id(); // 如果该连接通道已经在映射中,打印连接状态信息 if (CHANNEL_MAP.containsKey(channelId)) { log.info("客户端【" + channelId + "】是连接状态,连接通道数量: " + CHANNEL_MAP.size()); } else { // 将新的连接添加到映射中 CHANNEL_MAP.put(channelId, ctx); log.info("客户端【" + channelId + "】连接 netty 服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]"); log.info("连接通道数量: " + CHANNEL_MAP.size()); } } /** * 当有客户端终止连接服务器时,此方法会被触发。 * 它会从 CHANNEL_MAP 中移除该客户端的连接信息,并打印相应的退出信息和更新后的连接通道数量。 * 首先检查该连接是否存在于 CHANNEL_MAP 中,如果存在则进行移除操作。 * * @param ctx 通道处理器上下文 */ @Override public void channelInactive(ChannelHandlerContext ctx) { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress(); ChannelId channelId = ctx.channel().id(); // 检查映射中是否包含该客户端连接 if (CHANNEL_MAP.containsKey(channelId)) { // 从映射中移除连接 CHANNEL_MAP.remove(channelId); log.info("客户端【" + channelId + "】退出 netty 服务器[IP:" + clientIp + "--->PORT:" + insocket.getPort() + "]"); log.info("连接通道数量: " + CHANNEL_MAP.size()); } } /** * 当有客户端向服务器发送消息时,此方法会被触发。 * 它会打印接收到的客户端消息,并调用 channelWrite 方法将消息返回给客户端。 * 首先会打印接收到客户端报文的日志信息,然后调用 channelWrite 方法进行响应。 * * @param ctx 通道处理器上下文 * @param msg 从客户端接收到的消息对象 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // 将信息返回给客户端 ctx.writeAndFlush(msg); } /** * 当触发用户事件时,此方法会被调用,主要用于处理空闲状态事件。 * 根据不同的空闲状态(读、写、总超时)进行相应的处理,如断开连接。 * 首先检查触发的事件是否是 IdleStateEvent,如果是则判断具体的空闲状态并进行相应处理。 * * @param ctx 通道处理器上下文 * @param evt 触发的事件对象 */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { String socketString = ctx.channel().remoteAddress().toString(); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.info("Client: " + socketString + " READER_IDLE 读超时"); // 读超时,断开连接 ctx.disconnect(); } else if (event.state() == IdleState.WRITER_IDLE) { log.info("Client: " + socketString + " WRITER_IDLE 写超时"); // 写超时,断开连接 ctx.disconnect(); } else if (event.state() == IdleState.ALL_IDLE) { log.info("Client: " + socketString + " ALL_IDLE 总超时"); // 总超时,断开连接 ctx.disconnect(); } } } /** * 当发生异常时,此方法会被触发。 * 它会关闭通道并打印相应的错误信息,同时打印当前的连接通道数量。 * 异常发生时,会关闭通道以防止资源泄漏,并且打印异常发生的通道信息和当前的连接数量。 * * @param ctx 通道处理器上下文 * @param cause 引发异常的原因 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); log.info(ctx.channel().id() + " 发生了错误,此连接被关闭" + "此时连通数量: " + CHANNEL_MAP.size()); } }
5、 WebSocketServer 类,继承自 AbstractNettyServer,基于 Netty 框架实现的 WebSocket 服务器类。
import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * WebSocketServer 类,继承自 AbstractNettyServer,基于 Netty 框架实现的 WebSocket 服务器类。 * 它负责从配置文件中获取 WebSocket 服务器的配置信息(主机地址和端口范围)。 * * @author chenlei */ @Slf4j @Component public class WebSocketServer extends AbstractNettyServer { /** * WebSocket 服务器的主机地址 */ @Value("${websocket.host}") private String host; /** * WebSocket 服务器的端口范围,多个端口使用逗号分隔。 */ @Value("${websocket.port}") private int port; /** * 获取 WebSocket 服务器的主机地址。 * * @return 从配置文件中获取的 WebSocket 服务器的主机地址。 */ @Override protected String getHost() { return host; } /** * 获取 WebSocket 服务器的端口范围。 * * @return 从配置文件中获取的 WebSocket 服务器的端口范围。 */ @Override protected int getPort() { return port; } /** * 初始化 WebSocket 服务器的通道处理器。 * 方法会在服务器启动时为每个新建立的 SocketChannel 配置处理器链,以处理 WebSocket 通信的各个阶段。 * * @param ch 新建立的 SocketChannel,用于与客户端进行通信。 */ @Override protected void initChannelHandlers(SocketChannel ch) { // 在通道的处理器链中添加 HttpServerCodec,用于处理 HTTP 协议的编解码 ch.pipeline().addLast(new HttpServerCodec()); // 添加 HttpObjectAggregator,用于将 HTTP 消息聚合成完整的请求或响应 ch.pipeline().addLast(new HttpObjectAggregator(65536)); // 添加 WebSocketServerProtocolHandler,用于处理 WebSocket 的握手、控制帧等 ch.pipeline().addLast(new WebSocketServerProtocolHandler("/websocket")); // 添加自定义的处理器,用于处理 WebSocket 数据 ch.pipeline().addLast(new WebSocketHandler()); } }
6、 WebSocketHandler 类,继承自 SimpleChannelInboundHandler,用于处理 WebSocket 文本消息
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import lombok.extern.slf4j.Slf4j; /** * WebSocketHandler 类,继承自 SimpleChannelInboundHandler,用于处理 WebSocket 文本消息 * * @author chenlei */ @Slf4j public class WebSocketHandler extends SimpleChannelInboundHandler { /** * 处理接收到的消息 * * @param ctx 通道处理上下文,包含了与通道相关的信息,如通道本身、管道等 * @param msg 接收到的 TextWebSocketFrame 消息,包含了客户端发送的文本消息内容 */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) { String message = msg.text(); // 创建一个新的 TextWebSocketFrame 来存储要发送回客户端的消息 TextWebSocketFrame responseFrame = new TextWebSocketFrame(message); // 将消息发送回客户端 ctx.channel().writeAndFlush(responseFrame); } /** * 记录客户端的连接信息 * * @param ctx 通道处理上下文 */ @Override public void channelActive(ChannelHandlerContext ctx) { // 当有新的 WebSocket 客户端连接时记录日志 log.info("一个新的 WebSocket 客户端已连接: {}", ctx.channel().remoteAddress()); } /** * 当 WebSocket 客户端断开连接时调用此方法 * * @param ctx 通道处理上下文 */ @Override public void channelInactive(ChannelHandlerContext ctx) { // 当 WebSocket 客户端断开连接时记录日志 log.info("WebSocket 客户端已断开连接: {}", ctx.channel().remoteAddress()); } /** * 当处理 WebSocket 连接过程中发生异常时调用此方法 * 该方法主要负责记录异常信息,并关闭连接以防止异常扩散影响其他客户端 * * @param ctx 通道处理上下文 * @param cause 引发异常的原因,包含了异常的详细信息 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 当处理过程中发生异常时记录日志 log.error("WebSocket 连接中发生错误: {}", cause.getMessage()); // 关闭连接以避免异常扩散影响其他客户端 ctx.close(); } }
(图片来源网络,侵删)(图片来源网络,侵删)
- SSL/TLS支持:Netty内置了对SSL/TLS协议的支持,使得开发人员能够轻松地为网络应用添加安全加密功能,保护数据在传输过程中的机密性和完整性。例如,在开发金融类网络应用时,可以使用Netty的SSL/TLS支持,确保用户的交易数据安全传输。
- 多种协议:
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们。