package com.ouxuan.oxface.device; import android.content.Context; import android.os.Handler; import android.os.Looper; import android.util.Log; import com.ouxuan.oxface.utils.LogManager; import com.tencent.iot.hub.device.android.core.gateway.TXGatewayConnection; import com.tencent.iot.hub.device.java.core.common.Status; import com.tencent.iot.hub.device.java.core.log.TXMqttLogCallBack; import com.tencent.iot.hub.device.java.core.mqtt.TXMqttActionCallBack; import com.tencent.iot.hub.device.java.core.dynreg.TXMqttDynreg; import com.tencent.iot.hub.device.java.core.dynreg.TXMqttDynregCallback; import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions; import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.json.JSONObject; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Locale; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * 腾讯云MQTT管理器 - 基于TXGatewayConnection实现 * 负责MQTT连接管理、自动重连、消息订阅和发布 * * 功能特性: * 1. 初始化后自动连接 * 2. 网络断线自动重连机制 * 3. 消息订阅和处理 * 4. 连接状态监控和上报 * 5. 门闸控制和设备重启命令处理 * * @author AI Assistant * @version 4.0 * @date 2024/09/16 */ public class MqttManager { private static final String TAG = "MqttManager"; // 单例实例 private static volatile MqttManager instance; // 腾讯云IoT参数配置 private static final String PRODUCT_ID = "WZX68L5I75"; private static final String PRODUCT_KEY = "qr3rximCZnT6ZU0NsAAiTC7O"; // 动态注册用的ProductKey // MQTT连接配置 - 根据腾讯云IoT官方文档配置 // MQTTS(8883): ${productid}.iotcloud.tencentdevices.com private static final String BROKER_URL = "ssl://" + PRODUCT_ID + ".iotcloud.tencentdevices.com:8883"; // 腾讯云IoT Hub SSL地址 private static final int KEEP_ALIVE_INTERVAL = 240; // 增加心跳间隔 private static final int CONNECTION_TIMEOUT = 30; // 增加连接超时时间 private static final int QOS = 1; // 重连配置 private static final int MAX_RECONNECT_ATTEMPTS = 5; private static final long RECONNECT_DELAY = 5000; private static final long HEALTH_CHECK_INTERVAL = 30000; // 上下文和状态管理 private Context context; private TXGatewayConnection mqttConnection; private String deviceName; private String subscribeTopic; private String path2Store; private String dynamicDevicePSK; // 动态注册获取的设备PSK private boolean isDynamicRegisterCompleted = false; // 动态注册是否完成 // 连接状态管理 private boolean isConnected = false; private boolean isConnecting = false; private int reconnectAttempts = 0; private Handler mainHandler; private ExecutorService executorService; private ScheduledExecutorService scheduledExecutor; // 请求ID生成器 private static AtomicInteger requestID = new AtomicInteger(0); // 监听器接口 private ConnectionStatusListener connectionStatusListener; private MessageReceivedListener messageReceivedListener; // 外部依赖 private GateABController gateABController; /** * MQTT连接状态监听器 */ public interface ConnectionStatusListener { void onConnected(); void onConnectionFailed(String reason); void onConnectionLost(String reason); void onReconnecting(int attempt); } /** * MQTT消息接收监听器 */ public interface MessageReceivedListener { void onGateCommandReceived(String gateCommand); void onRebootCommandReceived(); void onLogLevelQueryReceived(); void onOtherMessageReceived(String topic, String message); } /** * MQTT请求上下文 */ private static class MQTTRequest { public String requestType; public int requestId; public MQTTRequest(String requestType, int requestId) { this.requestType = requestType; this.requestId = requestId; } @Override public String toString() { return "MQTTRequest{requestType='" + requestType + "', requestId=" + requestId + "}"; } } /** * 获取单例实例 */ public static MqttManager getInstance() { if (instance == null) { synchronized (MqttManager.class) { if (instance == null) { instance = new MqttManager(); } } } return instance; } /** * 私有构造函数 */ private MqttManager() { mainHandler = new Handler(Looper.getMainLooper()); executorService = Executors.newCachedThreadPool(); scheduledExecutor = Executors.newScheduledThreadPool(2); } /** * 初始化MQTT管理器 */ public void initialize(Context context) { this.context = context.getApplicationContext(); this.gateABController = GateABController.getInstance(); this.path2Store = context.getCacheDir().getAbsolutePath(); // 生成设备名称 generateDeviceName(); // 构建MQTT配置 buildMqttConfig(); LogManager.logInfo(TAG, "MQTT管理器初始化完成"); LogManager.logInfo(TAG, "设备名称: " + deviceName); LogManager.logInfo(TAG, "订阅主题: " + subscribeTopic); // 启动健康检查 startHealthCheck(); // 自动连接 connectAsync(); } /** * 生成设备名称 */ private void generateDeviceName() { String androidId = DeviceUtils.getAndroidID(context); deviceName = "PadV6" + androidId; LogManager.logInfo(TAG, "生成设备名称: " + deviceName); } /** * 构建MQTT配置 */ private void buildMqttConfig() { subscribeTopic = PRODUCT_ID + "/" + deviceName + "/control"; LogManager.logInfo(TAG, "MQTT配置构建完成"); } /** * 开始动态注册 */ private void startDynamicRegister() { LogManager.logInfo(TAG, "开始动态注册 MQTT 设备..."); LogManager.logInfo(TAG, "ProductID: " + PRODUCT_ID); LogManager.logInfo(TAG, "ProductKey: " + PRODUCT_KEY); LogManager.logInfo(TAG, "DeviceName: " + deviceName); try { TXMqttDynreg dynreg = new TXMqttDynreg(PRODUCT_ID, PRODUCT_KEY, deviceName, new DynamicRegisterCallback()); dynreg.doDynamicRegister(); // 调起动态注册 LogManager.logInfo(TAG, "MQTT动态注册请求已发送"); } catch (Exception e) { LogManager.logError(TAG, "动态注册失败", e); scheduleReconnect(); } } /** * 动态注册回调处理器 */ private class DynamicRegisterCallback extends TXMqttDynregCallback { @Override public void onGetDevicePSK(String devicePsk) { LogManager.logInfo(TAG, "动态注册成功,获取到设备PSK: " + devicePsk.substring(0, Math.min(8, devicePsk.length())) + "..."); // 保存动态获取的PSK dynamicDevicePSK = devicePsk; isDynamicRegisterCompleted = true; // 使用获取到的PSK连接MQTT try { connectMqtt(); } catch (Exception e) { LogManager.logError(TAG, "动态注册成功后连接MQTT失败", e); scheduleReconnect(); } } @Override public void onGetDeviceCert(String deviceCert, String devicePriv) { LogManager.logInfo(TAG, "动态注册成功,获取到设备证书认证信息"); // 本项目使用PSK认证,不使用证书认证 } @Override public void onFailedDynreg(Throwable cause, String errMsg) { LogManager.logError(TAG, "动态注册失败: " + errMsg, cause); // 通知连接失败 if (connectionStatusListener != null) { mainHandler.post(() -> connectionStatusListener.onConnectionFailed("动态注册失败: " + errMsg)); } // 调度重连(包括重新动态注册) scheduleReconnect(); } @Override public void onFailedDynreg(Throwable cause) { String errorMsg = cause != null ? cause.getMessage() : "未知错误"; LogManager.logError(TAG, "动态注册失败: " + errorMsg, cause); // 通知连接失败 if (connectionStatusListener != null) { mainHandler.post(() -> connectionStatusListener.onConnectionFailed("动态注册失败: " + errorMsg)); } // 调度重连(包括重新动态注册) scheduleReconnect(); } } /** * 开始动态注册和连接流程 */ public void connectAsync() { if (isConnecting || isConnected) { LogManager.logInfo(TAG, "MQTT正在连接或已连接,跳过连接请求"); return; } executorService.execute(() -> { try { if (!isDynamicRegisterCompleted) { // 先进行动态注册 startDynamicRegister(); } else { // 已有PSK,直接连接MQTT connectMqtt(); } } catch (Exception e) { LogManager.logError(TAG, "异步连接MQTT失败", e); scheduleReconnect(); } }); } /** * 连接MQTT服务器(使用动态注册获取的PSK) */ private void connectMqtt() { LogManager.logInfo(TAG, "开始连接MQTT服务器..."); LogManager.logInfo(TAG, "连接地址: " + BROKER_URL); LogManager.logInfo(TAG, "产品ID: " + PRODUCT_ID); LogManager.logInfo(TAG, "设备名称: " + deviceName); if (dynamicDevicePSK == null || dynamicDevicePSK.isEmpty()) { LogManager.logError(TAG, "动态注册获取的PSK为空,无法连接MQTT"); scheduleReconnect(); return; } isConnecting = true; try { // 创建TXGatewayConnection实例,使用动态注册获取的PSK mqttConnection = new TXGatewayConnection( context, BROKER_URL, PRODUCT_ID, deviceName, dynamicDevicePSK, // 使用动态注册获取的PSK null, // devCert null, // devPriv true, // 启用MQTT日志以便调试 new MqttLogCallback(), // 添加日志回调 new MqttActionCallback() ); // 配置连接选项 MqttConnectOptions options = new MqttConnectOptions(); options.setConnectionTimeout(CONNECTION_TIMEOUT); options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL); options.setAutomaticReconnect(false); // 手动控制重连 options.setCleanSession(true); // 改为true,避免会话问题 options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); // 明确指定MQTT版本 LogManager.logInfo(TAG, "使用动态注册PSK认证连接腾讯云IoT平台"); LogManager.logInfo(TAG, "连接超时: " + CONNECTION_TIMEOUT + "秒"); LogManager.logInfo(TAG, "心跳间隔: " + KEEP_ALIVE_INTERVAL + "秒"); // 连接MQTT MQTTRequest mqttRequest = new MQTTRequest("connect", requestID.getAndIncrement()); mqttConnection.connect(options, mqttRequest); LogManager.logInfo(TAG, "MQTT连接请求已发送"); } catch (Exception e) { LogManager.logError(TAG, "创建MQTT连接失败", e); isConnecting = false; scheduleReconnect(); } } /** * 订阅主题 */ private void subscribeToTopic() { if (mqttConnection != null && isConnected) { try { MQTTRequest mqttRequest = new MQTTRequest("subscribe", requestID.getAndIncrement()); mqttConnection.subscribe(subscribeTopic, QOS, mqttRequest); LogManager.logInfo(TAG, "开始订阅主题: " + subscribeTopic); } catch (Exception e) { LogManager.logError(TAG, "订阅主题异常", e); } } } /** * 配置断线缓冲 */ private void configureDisconnectedBuffer() { try { DisconnectedBufferOptions bufferOptions = new DisconnectedBufferOptions(); bufferOptions.setBufferEnabled(true); bufferOptions.setBufferSize(1024); bufferOptions.setDeleteOldestMessages(true); bufferOptions.setPersistBuffer(true); mqttConnection.setBufferOpts(bufferOptions); LogManager.logInfo(TAG, "断线缓冲配置完成"); } catch (Exception e) { LogManager.logError(TAG, "配置断线缓冲失败", e); } } /** * 调度重连(包括重新动态注册) */ private void scheduleReconnect() { if (reconnectAttempts >= MAX_RECONNECT_ATTEMPTS) { LogManager.logError(TAG, "已达到最大重连次数,停止重连"); LogManager.logError(TAG, "请检查网络连接和腾讯云IoT设备配置"); return; } reconnectAttempts++; // 渐进式延迟:5s, 10s, 20s, 40s, 80s long delay = RECONNECT_DELAY * (1L << (reconnectAttempts - 1)); LogManager.logInfo(TAG, "调度第" + reconnectAttempts + "次重连(包括动态注册),延迟" + delay + "ms"); LogManager.logInfo(TAG, "重连原因: 代理程序不可用或动态注册失败,可能是网络问题或认证失败"); if (connectionStatusListener != null) { mainHandler.post(() -> connectionStatusListener.onReconnecting(reconnectAttempts)); } scheduledExecutor.schedule(() -> { if (!isConnected) { LogManager.logInfo(TAG, "执行第" + reconnectAttempts + "次重连(包括重新动态注册)"); // 清理旧连接和动态注册状态 if (mqttConnection != null) { try { mqttConnection = null; } catch (Exception e) { LogManager.logError(TAG, "清理旧连接失败", e); } } // 重置动态注册状态,重新执行动态注册流程 isDynamicRegisterCompleted = false; dynamicDevicePSK = null; connectAsync(); } }, delay, TimeUnit.MILLISECONDS); } /** * 启动健康检查 */ private void startHealthCheck() { scheduledExecutor.scheduleWithFixedDelay(() -> { try { checkConnectionHealth(); } catch (Exception e) { LogManager.logError(TAG, "健康检查异常", e); } }, HEALTH_CHECK_INTERVAL, HEALTH_CHECK_INTERVAL, TimeUnit.MILLISECONDS); LogManager.logInfo(TAG, "MQTT健康检查已启动"); } /** * 检查连接健康状态 */ private void checkConnectionHealth() { if (mqttConnection != null) { // 检查连接状态 if (!isConnected && !isConnecting) { LogManager.logInfo(TAG, "健康检查:连接已断开,尝试重连"); connectAsync(); } } else if (!isConnecting) { LogManager.logInfo(TAG, "健康检查:客户端为空,尝试重连"); connectAsync(); } } /** * MQTT日志回调处理器 */ private class MqttLogCallback extends TXMqttLogCallBack { @Override public String setSecretKey() { return null; // 不需要加密 } @Override public void printDebug(String message) { LogManager.logDebug("TXMQTT", message); } @Override public boolean saveLogOffline(String log) { // 简化实现,返回false表示不支持离线日志保存 return false; } @Override public String readOfflineLog() { // 简化实现,返回null表示没有离线日志 return null; } @Override public boolean delOfflineLog() { // 简化实现,返回true表示删除成功 return true; } } /** * MQTT动作回调处理器 */ private class MqttActionCallback extends TXMqttActionCallBack { @Override public void onConnectCompleted(Status status, boolean reconnect, Object userContext, String msg, Throwable cause) { String userContextInfo = ""; if (userContext instanceof MQTTRequest) { userContextInfo = userContext.toString(); } String logInfo = String.format("onConnectCompleted, status[%s], reconnect[%b], userContext[%s], msg[%s]", status.name(), reconnect, userContextInfo, msg); LogManager.logInfo(TAG, "连接回调:" + logInfo); if (cause != null) { LogManager.logError(TAG, "连接异常信息", cause); } if (status.equals(Status.OK)) { LogManager.logInfo(TAG, "MQTT连接成功"); isConnected = true; isConnecting = false; reconnectAttempts = 0; // 订阅主题 subscribeToTopic(); // 配置断线缓冲 configureDisconnectedBuffer(); // 通知连接成功 if (connectionStatusListener != null) { mainHandler.post(() -> connectionStatusListener.onConnected()); } } else { String errorMsg = "连接失败: " + msg; if (msg != null && msg.contains("代理程序不可用")) { errorMsg += "\n请检查:\n1. 网络连接是否正常\n2. 腾讯云IoT设备信息是否正确\n3. PSK密钥是否有效"; } LogManager.logError(TAG, "MQTT" + errorMsg); isConnecting = false; // 通知连接失败 if (connectionStatusListener != null) { final String finalErrorMsg = errorMsg; mainHandler.post(() -> connectionStatusListener.onConnectionFailed(finalErrorMsg)); } // 调度重连 scheduleReconnect(); } } @Override public void onConnectionLost(Throwable cause) { LogManager.logError(TAG, "MQTT连接丢失", cause); isConnected = false; isConnecting = false; String reason = cause != null ? cause.getMessage() : "未知原因"; // 检查是否是由于腾讯云SDK内部的NullPointerException导致的断连 if (reason.contains("java.lang.NullPointerException") && reason.contains("String.contains")) { LogManager.logError(TAG, "检测到由于腾讯云IoT Hub SDK内部问题导致的断连"); LogManager.logError(TAG, "这可能是SDK版本兼容性问题,将适当延迟重连"); // 对于这种情况,我们增加一些延迟重连的逻辑 scheduledExecutor.schedule(() -> { if (!isConnected && !isConnecting) { LogManager.logInfo(TAG, "尝试从腾讯云SDK内部错误中恢复连接"); scheduleReconnect(); } }, 2000, TimeUnit.MILLISECONDS); // 2秒延迟 } else { // 正常的连接丢失,立即重连 scheduleReconnect(); } if (connectionStatusListener != null) { mainHandler.post(() -> connectionStatusListener.onConnectionLost(reason)); } } @Override public void onDisconnectCompleted(Status status, Object userContext, String msg, Throwable cause) { LogManager.logInfo(TAG, "MQTT连接已断开: " + msg); isConnected = false; isConnecting = false; } @Override public void onPublishCompleted(Status status, IMqttToken token, Object userContext, String errMsg, Throwable cause) { if (status == Status.OK) { LogManager.logDebug(TAG, "消息发布成功"); } else { String errorMsg = errMsg != null ? errMsg : "未知错误"; LogManager.logError(TAG, "消息发布失败: " + errorMsg); // 记录异常信息以便调试 if (cause != null) { LogManager.logError(TAG, "发布异常详情", cause); } } } @Override public void onSubscribeCompleted(Status status, IMqttToken asyncActionToken, Object userContext, String errMsg, Throwable cause) { // 记录详细的调试信息 String tokenInfo = "null"; String userContextInfo = "null"; try { if (asyncActionToken != null && asyncActionToken.getTopics() != null) { tokenInfo = java.util.Arrays.toString(asyncActionToken.getTopics()); } } catch (Exception e) { LogManager.logError(TAG, "获取token信息失败", e); } try { if (userContext instanceof MQTTRequest) { userContextInfo = userContext.toString(); } else if (userContext != null) { userContextInfo = userContext.toString(); } } catch (Exception e) { LogManager.logError(TAG, "获取userContext信息失败", e); } String debugInfo = String.format("onSubscribeCompleted: status[%s], topics[%s], userContext[%s], errMsg[%s]", status != null ? status.name() : "null", tokenInfo, userContextInfo, errMsg != null ? errMsg : "null"); LogManager.logInfo(TAG, "订阅回调详情: " + debugInfo); if (status == Status.OK) { LogManager.logInfo(TAG, "主题订阅成功: " + subscribeTopic); } else { String errorMsg = errMsg != null ? errMsg : "未知错误"; LogManager.logError(TAG, "主题订阅失败: " + errorMsg); // 检查是否是腾讯云SDK内部的NullPointerException if (errorMsg != null && errorMsg.contains("java.lang.NullPointerException")) { LogManager.logError(TAG, "检测到腾讯云IoT Hub SDK内部的NullPointerException,这可能是SDK的问题"); LogManager.logError(TAG, "尝试忽略这个错误并继续保持连接"); // 对于腾讯云SDK内部的gateway result主题订阅失败,我们可以忽略 if (tokenInfo.contains("$gateway/operation/result")) { LogManager.logWarning(TAG, "忽略gateway result主题订阅失败,这不影响主要功能"); return; // 不触发重连 } } // 检查是否是由于代理程序不可用导致的错误 if (errorMsg != null && errorMsg.contains("代理程序不可用")) { LogManager.logError(TAG, "MQTT代理程序不可用,请检查网络连接和腾讯云IoT设备配置"); } // 记录异常信息以便调试 if (cause != null) { LogManager.logError(TAG, "订阅异常详情", cause); } } } @Override public void onUnSubscribeCompleted(Status status, IMqttToken asyncActionToken, Object userContext, String errMsg, Throwable cause) { String message = errMsg != null ? errMsg : "取消订阅完成"; LogManager.logInfo(TAG, "取消订阅完成: " + message); // 记录异常信息以便调试 if (cause != null) { LogManager.logError(TAG, "取消订阅异常详情", cause); } } @Override public void onMessageReceived(String topic, MqttMessage message) { String messageContent = new String(message.getPayload()); String timestamp = new SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault()).format(new Date()); LogManager.logInfo(TAG, timestamp + " " + TAG + " D 接收到MQTT消息,主题: " + topic + ", 内容: " + messageContent); // 处理消息 processMessage(topic, messageContent); } } /** * 处理接收到的消息 */ private void processMessage(String topic, String messageContent) { try { if (messageContent.contains("\"gate\"")) { handleGateCommand(messageContent); } else if (messageContent.contains("reboot-pad")) { handleRebootCommand(); } else if (messageContent.contains("get_log_level")) { handleLogLevelQuery(); } else { handleOtherMessage(topic, messageContent); } } catch (Exception e) { LogManager.logError(TAG, "处理MQTT消息异常", e); } } /** * 处理门闸控制命令 */ private void handleGateCommand(String messageContent) { LogManager.logInfo(TAG, "处理门闸控制命令: " + messageContent); try { String gateCommand = extractGateCommand(messageContent); if (gateCommand != null) { LogManager.logInfo(TAG, "解析门闸命令: " + gateCommand); if (gateABController != null) { gateABController.openGateAB(new GateABController.GateControlCallback() { @Override public void onSuccess(String message) { LogManager.logInfo(TAG, "MQTT门闸控制成功: " + message); } @Override public void onError(String errorMessage) { LogManager.logError(TAG, "MQTT门闸控制失败: " + errorMessage); } }); } if (messageReceivedListener != null) { mainHandler.post(() -> messageReceivedListener.onGateCommandReceived(gateCommand)); } } } catch (Exception e) { LogManager.logError(TAG, "处理门闸控制命令异常", e); } } /** * 提取门闸命令 */ private String extractGateCommand(String messageContent) { try { int gateIndex = messageContent.indexOf("\"gate\""); if (gateIndex != -1) { int startIndex = messageContent.indexOf(":", gateIndex) + 1; int endIndex = messageContent.indexOf("}", startIndex); if (startIndex > 0 && endIndex > startIndex) { return messageContent.substring(startIndex, endIndex).trim(); } } } catch (Exception e) { LogManager.logError(TAG, "提取门闸命令异常", e); } return null; } /** * 处理设备重启命令 */ private void handleRebootCommand() { LogManager.logInfo(TAG, "接收到设备重启命令"); try { if (messageReceivedListener != null) { mainHandler.post(() -> messageReceivedListener.onRebootCommandReceived()); } LogManager.logWarning(TAG, "设备重启功能待实现"); } catch (Exception e) { LogManager.logError(TAG, "处理设备重启命令异常", e); } } /** * 处理日志级别查询 */ private void handleLogLevelQuery() { LogManager.logInfo(TAG, "接收到日志级别查询命令"); try { if (messageReceivedListener != null) { mainHandler.post(() -> messageReceivedListener.onLogLevelQueryReceived()); } uploadDeviceInfo(); } catch (Exception e) { LogManager.logError(TAG, "处理日志级别查询异常", e); } } /** * 处理其他消息 */ private void handleOtherMessage(String topic, String messageContent) { LogManager.logInfo(TAG, "接收到其他消息,主题: " + topic + ", 内容: " + messageContent); if (messageReceivedListener != null) { mainHandler.post(() -> messageReceivedListener.onOtherMessageReceived(topic, messageContent)); } } /** * 上报设备信息 */ private void uploadDeviceInfo() { try { JSONObject deviceInfo = new JSONObject(); deviceInfo.put("deviceId", DeviceUtils.getAndroidID(context)); deviceInfo.put("deviceModel", DeviceUtils.getDeviceModel()); deviceInfo.put("deviceBrand", DeviceUtils.getDeviceBrand()); deviceInfo.put("androidVersion", DeviceUtils.getAndroidVersion()); deviceInfo.put("timestamp", System.currentTimeMillis()); String dataTopic = PRODUCT_ID + "/" + deviceName + "/data"; publishMessage(dataTopic, deviceInfo.toString()); LogManager.logInfo(TAG, "设备信息上报完成: " + deviceInfo.toString()); } catch (Exception e) { LogManager.logError(TAG, "设备信息上报失败", e); } } /** * 发布消息 */ public void publishMessage(String topic, String messageContent) { if (mqttConnection != null && isConnected) { try { MqttMessage message = new MqttMessage(); message.setQos(QOS); message.setPayload(messageContent.getBytes()); MQTTRequest mqttRequest = new MQTTRequest("publish", requestID.getAndIncrement()); mqttConnection.publish(topic, message, mqttRequest); LogManager.logInfo(TAG, "消息发布成功,主题: " + topic); } catch (Exception e) { LogManager.logError(TAG, "发布消息异常", e); } } else { LogManager.logWarning(TAG, "MQTT未连接,无法发布消息"); } } /** * 获取连接状态 */ public boolean isConnected() { return isConnected && mqttConnection != null; } /** * 获取连接状态详情 */ public String getConnectionStatusDetail() { if (isConnected()) { return "MQTT已连接"; } else if (isConnecting) { return "MQTT连接中..."; } else if (reconnectAttempts > 0) { return "MQTT重连中(" + reconnectAttempts + "/" + MAX_RECONNECT_ATTEMPTS + ")"; } else { return "MQTT未连接"; } } /** * 设置连接状态监听器 */ public void setConnectionStatusListener(ConnectionStatusListener listener) { this.connectionStatusListener = listener; } /** * 设置消息接收监听器 */ public void setMessageReceivedListener(MessageReceivedListener listener) { this.messageReceivedListener = listener; } /** * 断开连接 */ public void disconnect() { LogManager.logInfo(TAG, "开始断开MQTT连接"); isConnected = false; isConnecting = false; if (mqttConnection != null) { try { MQTTRequest mqttRequest = new MQTTRequest("disconnect", requestID.getAndIncrement()); mqttConnection.disConnect(mqttRequest); } catch (Exception e) { LogManager.logError(TAG, "断开MQTT连接异常", e); } } } /** * 释放资源 */ public void release() { LogManager.logInfo(TAG, "释放MQTT管理器资源"); disconnect(); if (executorService != null && !executorService.isShutdown()) { executorService.shutdown(); } if (scheduledExecutor != null && !scheduledExecutor.isShutdown()) { scheduledExecutor.shutdown(); } mqttConnection = null; connectionStatusListener = null; messageReceivedListener = null; LogManager.logInfo(TAG, "MQTT管理器资源释放完成"); } }