Android WebSocket工具类:重连、心跳、消息队列一站式解决方案
- 依赖库
使用 OkHttp 的WebSocket支持。
在 build.gradle 中添加依赖:
implementation 'com.squareup.okhttp3:okhttp:4.9.3'
- WebSocket工具类实现
import okhttp3.*; import android.os.Handler; import android.os.Looper; import android.util.Log; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class WebSocketManager { private static final String TAG = "WebSocketManager"; private OkHttpClient client; private WebSocket webSocket; private String wsUrl; private AtomicInteger reconnectCount = new AtomicInteger(0); private boolean isConnecting = false; private boolean isConnected = false; private Handler mainHandler = new Handler(Looper.getMainLooper()); private Runnable heartbeatRunnable; private ConcurrentLinkedQueue messageQueue = new ConcurrentLinkedQueue(); // 线程安全的消息队列 private ExecutorService executorService = Executors.newSingleThreadExecutor(); // 线程池 private int maxReconnectCount = 5; // 最大重连次数 private long reconnectInterval = 5000; // 重连间隔时间(毫秒) private long heartbeatInterval = 30000; // 心跳间隔时间(毫秒) private long heartbeatTimeout = 60000; // 心跳超时时间(毫秒) private WebSocketCallback callback; private WebSocketListener webSocketListener = new WebSocketListener() { @Override public void onOpen(WebSocket webSocket, Response response) { Log.d(TAG, "WebSocket connected"); isConnecting = false; isConnected = true; reconnectCount.set(0); // 重置重连次数 startHeartbeat(); // 开始心跳 sendQueuedMessages(); // 发送缓存的消息 if (callback != null) { mainHandler.post(() -> callback.onConnected()); } } @Override public void onMessage(WebSocket webSocket, String text) { Log.d(TAG, "Received message: " + text); if (callback != null) { mainHandler.post(() -> callback.onMessage(text)); } } @Override public void onClosed(WebSocket webSocket, int code, String reason) { Log.d(TAG, "WebSocket closed: " + reason); isConnecting = false; isConnected = false; stopHeartbeat(); // 停止心跳 if (callback != null) { mainHandler.post(() -> callback.onDisconnected()); } } @Override public void onFailure(WebSocket webSocket, Throwable t, Response response) { Log.e(TAG, "WebSocket failed: " + t.getMessage()); isConnecting = false; isConnected = false; stopHeartbeat(); // 停止心跳 if (callback != null) { mainHandler.post(() -> callback.onError(t)); } reconnect(); // 尝试重连 } }; // 私有构造函数,使用Builder模式创建实例 private WebSocketManager(Builder builder) { this.wsUrl = builder.wsUrl; this.maxReconnectCount = builder.maxReconnectCount; this.reconnectInterval = builder.reconnectInterval; this.heartbeatInterval = builder.heartbeatInterval; this.heartbeatTimeout = builder.heartbeatTimeout; client = new OkHttpClient.Builder() .connectTimeout(10, TimeUnit.SECONDS) .writeTimeout(10, TimeUnit.SECONDS) .readTimeout(10, TimeUnit.SECONDS) .build(); } // 连接WebSocket public void connect() { if (!isConnecting && !isConnected) { isConnecting = true; executorService.submit(() -> { Request request = new Request.Builder().url(wsUrl).build(); webSocket = client.newWebSocket(request, webSocketListener); }); } } // 断开连接 public void disconnect() { if (webSocket != null) { executorService.submit(() -> webSocket.close(1000, "Normal closure")); } releaseResources(); } // 发送消息 public void sendMessage(String message) { if (webSocket != null && isConnected) { executorService.submit(() -> { boolean sent = webSocket.send(message); if (!sent) { // 发送失败,将消息加入队列 messageQueue.offer(message); } }); } else { // 网络未连接时,将消息加入队列 messageQueue.offer(message); } } // 重连机制 private void reconnect() { if (reconnectCount.get() { Log.d(TAG, "Reconnecting... Attempt: " + reconnectCount.get()); connect(); }, reconnectInterval); } else { Log.e(TAG, "Max reconnection attempts reached"); } } // 发送缓存的消息 private void sendQueuedMessages() { executorService.submit(() -> { while (!messageQueue.isEmpty()) { String message = messageQueue.poll(); if (message != null) { boolean sent = webSocket.send(message); if (!sent) { // 发送失败,将消息重新加入队列 messageQueue.offer(message); break; } } } }); } // 开始心跳 private void startHeartbeat() { heartbeatRunnable = new Runnable() { @Override public void run() { if (isConnected) { webSocket.send("Heartbeat"); // 发送心跳消息 mainHandler.postDelayed(this, heartbeatInterval); } } }; mainHandler.post(heartbeatRunnable); } // 停止心跳 private void stopHeartbeat() { mainHandler.removeCallbacks(heartbeatRunnable); } // 释放资源 private void releaseResources() { executorService.shutdown(); mainHandler.removeCallbacksAndMessages(null); } // 设置回调 public void setWebSocketCallback(WebSocketCallback callback) { this.callback = callback; } // Builder模式 public static class Builder { private String wsUrl; private int maxReconnectCount = 5; private long reconnectInterval = 5000; private long heartbeatInterval = 30000; private long heartbeatTimeout = 60000; public Builder(String wsUrl) { this.wsUrl = wsUrl; } public Builder setMaxReconnectCount(int maxReconnectCount) { this.maxReconnectCount = maxReconnectCount; return this; } public Builder setReconnectInterval(long reconnectInterval) { this.reconnectInterval = reconnectInterval; return this; } public Builder setHeartbeatInterval(long heartbeatInterval) { this.heartbeatInterval = heartbeatInterval; return this; } public Builder setHeartbeatTimeout(long heartbeatTimeout) { this.heartbeatTimeout = heartbeatTimeout; return this; } public WebSocketManager build() { return new WebSocketManager(this); } } // 回调接口 public interface WebSocketCallback { void onMessage(String message); void onConnected(); void onDisconnected(); void onError(Throwable t); } }
- 功能说明
3.1 连接状态管理
增加 isConnecting 状态,区分连接中和已连接状态。
3.2 消息重发机制
在发送失败时,将消息重新加入队列,等待重连后发送。
3.3 动态心跳机制
支持动态调整心跳间隔和超时时间。
3.4 资源释放
在断开连接时,释放线程池和Handler资源,避免内存泄漏。
3.5 日志分级
通过 Log.d 和 Log.e 区分调试日志和错误日志。
- 使用示例
4.1 初始化并连接WebSocket
WebSocketManager webSocketManager = new WebSocketManager.Builder("ws://your-websocket-url") .setMaxReconnectCount(10) .setReconnectInterval(3000) .setHeartbeatInterval(60000) .setHeartbeatTimeout(120000) .build(); webSocketManager.setWebSocketCallback(new WebSocketManager.WebSocketCallback() { @Override public void onMessage(String message) { Log.d(TAG, "Received: " + message); } @Override public void onConnected() { Log.d(TAG, "Connected"); } @Override public void onDisconnected() { Log.d(TAG, "Disconnected"); } @Override public void onError(Throwable t) { Log.e(TAG, "Error: " + t.getMessage()); } }); webSocketManager.connect();
4.2 发送消息
webSocketManager.sendMessage("Hello, WebSocket!");
4.3 断开连接
webSocketManager.disconnect();
4.4 处理生命周期
@Override protected void onDestroy() { super.onDestroy(); webSocketManager.disconnect(); }
在Activity中使用WebSocket工具类非常简单。我们需要确保WebSocket的生命周期与Activity的生命周期绑定,避免内存泄漏和资源浪费。以下是完整的示例代码,展示如何在Activity中初始化、使用和释放WebSocket工具类。
在Activity中使用WebSocket工具类
- Activity代码示例
import android.os.Bundle; import android.util.Log; import androidx.annotation.Nullable; import androidx.appcompat.app.AppCompatActivity; public class MainActivity extends AppCompatActivity { private static final String TAG = "MainActivity"; private WebSocketManager webSocketManager; @Override protected void onCreate(@Nullable Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); // 初始化WebSocketManager webSocketManager = new WebSocketManager.Builder("ws://your-websocket-url") .setMaxReconnectCount(10) .setReconnectInterval(3000) .setHeartbeatInterval(60000) .setHeartbeatTimeout(120000) .build(); // 设置WebSocket回调 webSocketManager.setWebSocketCallback(new WebSocketManager.WebSocketCallback() { @Override public void onMessage(String message) { Log.d(TAG, "Received: " + message); // 在这里处理接收到的消息 } @Override public void onConnected() { Log.d(TAG, "Connected"); // 在这里处理连接成功事件 } @Override public void onDisconnected() { Log.d(TAG, "Disconnected"); // 在这里处理连接断开事件 } @Override public void onError(Throwable t) { Log.e(TAG, "Error: " + t.getMessage()); // 在这里处理错误事件 } }); // 连接WebSocket webSocketManager.connect(); // 发送消息 webSocketManager.sendMessage("Hello, WebSocket!"); } @Override protected void onDestroy() { super.onDestroy(); // 断开WebSocket连接并释放资源 if (webSocketManager != null) { webSocketManager.disconnect(); } } }
- 代码说明
2.1 初始化WebSocketManager
在 onCreate 方法中,使用 Builder 模式初始化 WebSocketManager,并设置最大重连次数、重连间隔、心跳间隔和超时时间。
webSocketManager = new WebSocketManager.Builder("ws://your-websocket-url") .setMaxReconnectCount(10) .setReconnectInterval(3000) .setHeartbeatInterval(60000) .setHeartbeatTimeout(120000) .build();
2.2 设置回调
通过 setWebSocketCallback 方法设置回调接口,处理WebSocket的连接、消息、断开和错误事件。
webSocketManager.setWebSocketCallback(new WebSocketManager.WebSocketCallback() { @Override public void onMessage(String message) { Log.d(TAG, "Received: " + message); // 在这里处理接收到的消息 } @Override public void onConnected() { Log.d(TAG, "Connected"); // 在这里处理连接成功事件 } @Override public void onDisconnected() { Log.d(TAG, "Disconnected"); // 在这里处理连接断开事件 } @Override public void onError(Throwable t) { Log.e(TAG, "Error: " + t.getMessage()); // 在这里处理错误事件 } });
2.3 连接WebSocket
在 onCreate 方法中调用 connect() 方法,启动WebSocket连接。
webSocketManager.connect();
2.4 发送消息
通过 sendMessage 方法发送消息。
webSocketManager.sendMessage("Hello, WebSocket!");
2.5 释放资源
在 onDestroy 方法中调用 disconnect() 方法,断开WebSocket连接并释放资源。
@Override protected void onDestroy() { super.onDestroy(); if (webSocketManager != null) { webSocketManager.disconnect(); } }
- 注意事项
3.1 生命周期绑定
确保WebSocket的生命周期与Activity的生命周期绑定,避免内存泄漏。
在 onDestroy 中释放资源。
3.2 线程安全
WebSocket的回调方法(如 onMessage)运行在后台线程,如果需要更新UI,请使用 Handler 或 runOnUiThread。
3.3 网络权限
在 AndroidManifest.xml 中添加网络权限:
3.4 SSL/TLS支持
如果WebSocket服务器使用 wss 协议(即基于SSL/TLS的安全连接),OkHttp 会自动处理SSL/TLS握手,无需额外配置。
- 扩展功能
4.1 动态调整心跳间隔
可以根据服务器响应动态调整心跳间隔。例如,在收到服务器的心跳响应后,延长心跳间隔。
@Override public void onMessage(String message) { if ("HeartbeatResponse".equals(message)) { // 动态调整心跳间隔 webSocketManager.setHeartbeatInterval(120000); } }
4.2 消息重发机制
在发送失败时,将消息加入队列,等待重连后重新发送。
webSocketManager.sendMessage("Hello, WebSocket!");
4.3 日志分级
通过 Log.d 和 Log.e 区分调试日志和错误日志,方便调试和生产环境使用。
- 总结
在Activity中使用优化后的WebSocket工具类非常简单。通过生命周期绑定、回调接口和资源释放机制,可以确保WebSocket的高效运行和资源管理。以下是完整的流程:
初始化:在 onCreate 中初始化 WebSocketManager。
设置回调:处理连接、消息、断开和错误事件。
连接WebSocket:调用 connect() 方法。
发送消息:通过 sendMessage 方法发送消息。
释放资源:在 onDestroy 中调用 disconnect() 方法。