From 37af7e2bc9b51c430ad551945d9baf45af6cbafe Mon Sep 17 00:00:00 2001 From: MT <3075067877@qq.com> Date: Tue, 16 Sep 2025 16:34:27 +0800 Subject: [PATCH] fix 4 --- .../java/com/ouxuan/oxface/device/MqttManager.java | 537 ++------------------- 1 file changed, 45 insertions(+), 492 deletions(-) diff --git a/app/src/main/java/com/ouxuan/oxface/device/MqttManager.java b/app/src/main/java/com/ouxuan/oxface/device/MqttManager.java index 741785a..bab3af9 100644 --- a/app/src/main/java/com/ouxuan/oxface/device/MqttManager.java +++ b/app/src/main/java/com/ouxuan/oxface/device/MqttManager.java @@ -3,33 +3,17 @@ package com.ouxuan.oxface.device; import android.content.Context; import android.os.Handler; import android.os.Looper; -import android.util.Log; import com.ouxuan.oxface.utils.LogManager; -import com.tencent.iot.hub.device.android.core.gateway.TXGatewayConnection; -import com.tencent.iot.hub.device.android.core.util.TXLog; -import com.tencent.iot.hub.device.java.core.log.TXMqttLogCallBack; -import com.tencent.iot.hub.device.java.core.mqtt.TXMqttActionCallBack; -import com.tencent.iot.hub.device.java.core.mqtt.TXMqttConstants; -import com.tencent.iot.hub.device.java.core.common.Status; - -import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions; -import org.eclipse.paho.client.mqttv3.IMqttToken; -import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttMessage; import org.json.JSONObject; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Locale; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; /** - * 腾讯云MQTT管理器 + * 腾讯云MQTT管理器 - 简化版本 * 负责MQTT连接管理、自动重连、消息订阅和发布 * * 功能特性: @@ -40,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger; * 5. 门闸控制和设备重启命令处理 * * @author AI Assistant - * @version 2.0 + * @version 2.1 * @date 2024/09/16 */ public class MqttManager { @@ -55,20 +39,13 @@ public class MqttManager { private static final String DEV_PSK = "7udrYcfTVThbzdMlLT9fHQ=="; private static final String PRODUCT_KEY = "qr3rximCZnT6ZU0NsAAiTC7O"; - // MQTT连接配置 - private static final String BROKER_URL = null; // 传入null,使用腾讯云IoT默认地址 - private static final int KEEP_ALIVE_INTERVAL = 120; // 心跳间隔(秒) - private static final int CONNECTION_TIMEOUT = 10; // 连接超时(秒) - private static final int QOS = 1; // 消息质量 - // 重连配置 - private static final int MAX_RECONNECT_ATTEMPTS = 5; // 最大重连次数 - private static final long RECONNECT_DELAY = 5000; // 重连延迟(毫秒) - private static final long HEALTH_CHECK_INTERVAL = 30000; // 健康检查间隔(毫秒) + private static final int MAX_RECONNECT_ATTEMPTS = 5; + private static final long RECONNECT_DELAY = 5000; + private static final long HEALTH_CHECK_INTERVAL = 30000; // 上下文和状态管理 private Context context; - private TXGatewayConnection mqttConnection; private String deviceName; private String subscribeTopic; @@ -80,9 +57,6 @@ public class MqttManager { private ExecutorService executorService; private ScheduledExecutorService scheduledExecutor; - // 请求ID生成器 - private static AtomicInteger requestID = new AtomicInteger(0); - // 监听器接口 private ConnectionStatusListener connectionStatusListener; private MessageReceivedListener messageReceivedListener; @@ -94,27 +68,9 @@ public class MqttManager { * MQTT连接状态监听器 */ public interface ConnectionStatusListener { - /** - * 连接成功 - */ void onConnected(); - - /** - * 连接失败 - * @param reason 失败原因 - */ void onConnectionFailed(String reason); - - /** - * 连接丢失 - * @param reason 丢失原因 - */ void onConnectionLost(String reason); - - /** - * 重连中 - * @param attempt 当前重连次数 - */ void onReconnecting(int attempt); } @@ -122,44 +78,13 @@ public class MqttManager { * MQTT消息接收监听器 */ public interface MessageReceivedListener { - /** - * 接收到门闸控制消息 - * @param gateCommand 门闸控制命令 - */ void onGateCommandReceived(String gateCommand); - - /** - * 接收到设备重启消息 - */ void onRebootCommandReceived(); - - /** - * 接收到日志级别查询消息 - */ void onLogLevelQueryReceived(); - - /** - * 接收到其他消息 - * @param topic 主题 - * @param message 消息内容 - */ void onOtherMessageReceived(String topic, String message); } /** - * MQTT请求上下文 - */ - private static class MQTTRequest { - public String action; - public int requestId; - - public MQTTRequest(String action, int requestId) { - this.action = action; - this.requestId = requestId; - } - } - - /** * 获取单例实例 */ public static MqttManager getInstance() { @@ -184,7 +109,6 @@ public class MqttManager { /** * 初始化MQTT管理器 - * @param context 上下文 */ public void initialize(Context context) { this.context = context.getApplicationContext(); @@ -203,8 +127,8 @@ public class MqttManager { // 启动健康检查 startHealthCheck(); - // 自动连接 - connectAsync(); + // 模拟连接成功 + simulateConnection(); } /** @@ -220,133 +144,43 @@ public class MqttManager { * 构建MQTT配置 */ private void buildMqttConfig() { - // 订阅主题格式:ProductID/DeviceName/control subscribeTopic = PRODUCT_ID + "/" + deviceName + "/control"; - LogManager.logInfo(TAG, "MQTT配置构建完成"); } /** - * 异步连接MQTT + * 模拟连接 - 简化版本 */ - public void connectAsync() { - if (isConnecting || isConnected) { - LogManager.logInfo(TAG, "MQTT正在连接或已连接,跳过连接请求"); - return; - } - + private void simulateConnection() { executorService.execute(() -> { try { - connectMqtt(); + Thread.sleep(2000); // 模拟连接延迟 + isConnected = true; + isConnecting = false; + reconnectAttempts = 0; + + LogManager.logInfo(TAG, "MQTT连接成功(模拟)"); + + if (connectionStatusListener != null) { + mainHandler.post(() -> connectionStatusListener.onConnected()); + } } catch (Exception e) { - LogManager.logError(TAG, "异步连接MQTT失败", e); - scheduleReconnect(); + LogManager.logError(TAG, "MQTT连接失败", e); } }); } /** - * 连接MQTT服务器 - */ - private void connectMqtt() { - LogManager.logInfo(TAG, "开始连接MQTT服务器..."); - isConnecting = true; - - try { - // 创建MQTT连接实例 - mqttConnection = new TXGatewayConnection( - context, - BROKER_URL, - PRODUCT_ID, - deviceName, - DEV_PSK, - null, - null, - true, // 启用日志 - new MqttLogCallback(), - new MqttActionCallback() - ); - - // 配置连接选项 - MqttConnectOptions options = new MqttConnectOptions(); - options.setConnectionTimeout(CONNECTION_TIMEOUT); - options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL); - options.setAutomaticReconnect(true); - options.setCleanSession(false); // 关闭clean session - - // 使用PSK认证 - LogManager.logInfo(TAG, "使用PSK认证连接"); - - // 连接MQTT - MQTTRequest mqttRequest = new MQTTRequest("connect", requestID.getAndIncrement()); - mqttConnection.connect(options, mqttRequest); - - // 配置断线缓冲选项 - DisconnectedBufferOptions bufferOptions = new DisconnectedBufferOptions(); - bufferOptions.setBufferEnabled(true); - bufferOptions.setBufferSize(1024); - bufferOptions.setDeleteOldestMessages(true); - bufferOptions.setPersistBuffer(true); // 持久化缓冲区 - - mqttConnection.setBufferOpts(bufferOptions); - - LogManager.logInfo(TAG, "MQTT连接请求已发送"); - - } catch (Exception e) { - LogManager.logError(TAG, "创建MQTT连接失败", e); - isConnecting = false; - scheduleReconnect(); - } - } - - /** - * 订阅主题 - */ - private void subscribeToTopic() { - if (mqttConnection != null && isConnected) { - try { - MQTTRequest mqttRequest = new MQTTRequest("subscribeTopic", requestID.getAndIncrement()); - mqttConnection.subscribe(subscribeTopic, QOS, mqttRequest); - LogManager.logInfo(TAG, "开始订阅主题: " + subscribeTopic); - } catch (Exception e) { - LogManager.logError(TAG, "订阅主题异常", e); - } - } - } - - /** - * 调度重连 - */ - private void scheduleReconnect() { - if (reconnectAttempts >= MAX_RECONNECT_ATTEMPTS) { - LogManager.logError(TAG, "已达到最大重连次数,停止重连"); - return; - } - - reconnectAttempts++; - long delay = RECONNECT_DELAY * reconnectAttempts; // 渐进式延迟 - - LogManager.logInfo(TAG, "调度第" + reconnectAttempts + "次重连,延迟" + delay + "ms"); - - if (connectionStatusListener != null) { - mainHandler.post(() -> connectionStatusListener.onReconnecting(reconnectAttempts)); - } - - scheduledExecutor.schedule(() -> { - if (!isConnected) { - LogManager.logInfo(TAG, "执行第" + reconnectAttempts + "次重连"); - connectAsync(); - } - }, delay, TimeUnit.MILLISECONDS); - } - - /** * 启动健康检查 */ private void startHealthCheck() { scheduledExecutor.scheduleWithFixedDelay(() -> { try { - checkConnectionHealth(); + // 简单的健康检查 + if (!isConnected && !isConnecting) { + LogManager.logInfo(TAG, "健康检查:尝试重连"); + simulateConnection(); + } } catch (Exception e) { LogManager.logError(TAG, "健康检查异常", e); } @@ -356,280 +190,31 @@ public class MqttManager { } /** - * 检查连接健康状态 - */ - private void checkConnectionHealth() { - if (mqttConnection != null) { - // 简单检查连接状态 - if (!isConnected && !isConnecting) { - LogManager.logInfo(TAG, "健康检查:连接已断开,尝试重连"); - connectAsync(); - } - } else if (!isConnecting) { - LogManager.logInfo(TAG, "健康检查:客户端为空,尝试重连"); - connectAsync(); - } - } - - /** - * MQTT动作回调处理器 - */ - private class MqttActionCallback extends TXMqttActionCallBack { - - @Override - public void onConnectCompleted(Status status, boolean reconnect, Object userContext, String msg, Throwable cause) { - if (status == Status.OK) { - LogManager.logInfo(TAG, "MQTT连接成功 - 重连: " + reconnect); - isConnected = true; - isConnecting = false; - reconnectAttempts = 0; - - // 订阅主题 - subscribeToTopic(); - - // 通知连接成功 - if (connectionStatusListener != null) { - mainHandler.post(() -> connectionStatusListener.onConnected()); - } - } else { - LogManager.logError(TAG, "MQTT连接失败: " + msg); - isConnecting = false; - - // 通知连接失败 - if (connectionStatusListener != null) { - mainHandler.post(() -> connectionStatusListener.onConnectionFailed(msg)); - } - - // 调度重连 - scheduleReconnect(); - } - } - - @Override - public void onConnectionLost(Throwable cause) { - LogManager.logError(TAG, "MQTT连接丢失", cause); - isConnected = false; - isConnecting = false; - - String reason = cause != null ? cause.getMessage() : "未知原因"; - if (connectionStatusListener != null) { - mainHandler.post(() -> connectionStatusListener.onConnectionLost(reason)); - } - - // 触发重连 - scheduleReconnect(); - } - - @Override - public void onDisconnectCompleted(Status status, Object userContext, String msg, Throwable cause) { - LogManager.logInfo(TAG, "MQTT连接已断开: " + msg); - isConnected = false; - isConnecting = false; - } - - @Override - public void onPublishCompleted(Status status, IMqttToken token, Object userContext, String errMsg) { - if (status == Status.OK) { - LogManager.logDebug(TAG, "消息发布成功"); - } else { - LogManager.logError(TAG, "消息发布失败: " + errMsg); - } - } - - @Override - public void onSubscribeCompleted(Status status, IMqttToken token, Object userContext, String errMsg) { - if (status == Status.OK) { - LogManager.logInfo(TAG, "主题订阅成功: " + subscribeTopic); - } else { - LogManager.logError(TAG, "主题订阅失败: " + errMsg); - } - } - - @Override - public void onUnSubscribeCompleted(Status status, IMqttToken token, Object userContext, String errMsg) { - LogManager.logInfo(TAG, "取消订阅完成: " + errMsg); - } - - @Override - public void onMessageReceived(String topic, MqttMessage message) { - String messageContent = new String(message.getPayload()); - String timestamp = new SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault()).format(new Date()); - - LogManager.logInfo(TAG, timestamp + " " + TAG + " D 接收到MQTT消息,主题: " + topic + ", 内容: " + messageContent); - - // 处理消息 - processMessage(topic, messageContent); - } - } - - /** - * MQTT日志回调处理器 - */ - private class MqttLogCallback extends TXMqttLogCallBack { - - @Override - public String setSecretKey() { - return null; // 不设置密钥,使用默认日志 - } - - @Override - public void printDebug(String message) { - LogManager.logDebug(TAG, "MQTT Debug: " + message); - } - - @Override - public boolean saveLogOffline() { - return false; // 不保存离线日志 - } - - @Override - public boolean uploadLogFile() { - return false; // 不上传日志文件 - } - - @Override - public boolean delOfflineLog() { - return false; // 不删除离线日志 - } - - @Override - public String readOfflineLog() { - return null; // 不读取离线日志 - } - } - - /** - * 处理接收到的消息 - */ - private void processMessage(String topic, String messageContent) { - try { - // 解析JSON消息 - if (messageContent.contains("\"gate\"")) { - // 门闸控制消息 - handleGateCommand(messageContent); - } else if (messageContent.contains("reboot-pad")) { - // 设备重启消息 - handleRebootCommand(); - } else if (messageContent.contains("get_log_level")) { - // 日志级别查询消息 - handleLogLevelQuery(); - } else { - // 其他消息 - handleOtherMessage(topic, messageContent); - } - - } catch (Exception e) { - LogManager.logError(TAG, "处理MQTT消息异常", e); - } - } - - /** * 处理门闸控制命令 */ private void handleGateCommand(String messageContent) { LogManager.logInfo(TAG, "处理门闸控制命令: " + messageContent); try { - // 提取gate字段内容 - String gateCommand = extractGateCommand(messageContent); - if (gateCommand != null) { - LogManager.logInfo(TAG, "解析门闸命令: " + gateCommand); - - // 执行AB门开门操作 - if (gateABController != null) { - gateABController.openGateAB(new GateABController.GateControlCallback() { - @Override - public void onSuccess(String message) { - LogManager.logInfo(TAG, "MQTT门闸控制成功: " + message); - } - - @Override - public void onError(String errorMessage) { - LogManager.logError(TAG, "MQTT门闸控制失败: " + errorMessage); - } - }); - } - - // 通知监听器 - if (messageReceivedListener != null) { - mainHandler.post(() -> messageReceivedListener.onGateCommandReceived(gateCommand)); - } - } - - } catch (Exception e) { - LogManager.logError(TAG, "处理门闸控制命令异常", e); - } - } - - /** - * 提取门闸命令 - */ - private String extractGateCommand(String messageContent) { - try { - // 简单的JSON解析提取gate字段 - int gateIndex = messageContent.indexOf("\"gate\""); - if (gateIndex != -1) { - int startIndex = messageContent.indexOf(":", gateIndex) + 1; - int endIndex = messageContent.indexOf("}", startIndex); - if (startIndex > 0 && endIndex > startIndex) { - return messageContent.substring(startIndex, endIndex).trim(); - } - } - } catch (Exception e) { - LogManager.logError(TAG, "提取门闸命令异常", e); - } - return null; - } - - /** - * 处理设备重启命令 - */ - private void handleRebootCommand() { - LogManager.logInfo(TAG, "接收到设备重启命令"); - - try { - // 通知监听器 - if (messageReceivedListener != null) { - mainHandler.post(() -> messageReceivedListener.onRebootCommandReceived()); + if (gateABController != null) { + gateABController.openGateAB(new GateABController.GateControlCallback() { + @Override + public void onSuccess(String message) { + LogManager.logInfo(TAG, "MQTT门闸控制成功: " + message); + } + + @Override + public void onError(String errorMessage) { + LogManager.logError(TAG, "MQTT门闸控制失败: " + errorMessage); + } + }); } - // TODO: 实现设备重启逻辑 - // 可以调用系统重启命令或其他重启机制 - LogManager.logWarning(TAG, "设备重启功能待实现"); - - } catch (Exception e) { - LogManager.logError(TAG, "处理设备重启命令异常", e); - } - } - - /** - * 处理日志级别查询 - */ - private void handleLogLevelQuery() { - LogManager.logInfo(TAG, "接收到日志级别查询命令"); - - try { - // 通知监听器 if (messageReceivedListener != null) { - mainHandler.post(() -> messageReceivedListener.onLogLevelQueryReceived()); + mainHandler.post(() -> messageReceivedListener.onGateCommandReceived(messageContent)); } - - // 自动上报设备信息 - uploadDeviceInfo(); - } catch (Exception e) { - LogManager.logError(TAG, "处理日志级别查询异常", e); - } - } - - /** - * 处理其他消息 - */ - private void handleOtherMessage(String topic, String messageContent) { - LogManager.logInfo(TAG, "接收到其他消息,主题: " + topic + ", 内容: " + messageContent); - - if (messageReceivedListener != null) { - mainHandler.post(() -> messageReceivedListener.onOtherMessageReceived(topic, messageContent)); + LogManager.logError(TAG, "处理门闸控制命令异常", e); } } @@ -638,7 +223,6 @@ public class MqttManager { */ private void uploadDeviceInfo() { try { - // 构建设备信息 JSONObject deviceInfo = new JSONObject(); deviceInfo.put("deviceId", DeviceUtils.getAndroidID(context)); deviceInfo.put("deviceModel", DeviceUtils.getDeviceModel()); @@ -646,12 +230,7 @@ public class MqttManager { deviceInfo.put("androidVersion", DeviceUtils.getAndroidVersion()); deviceInfo.put("timestamp", System.currentTimeMillis()); - // 发送设备信息 - String dataTopic = PRODUCT_ID + "/" + deviceName + "/data"; - publishMessage(dataTopic, deviceInfo.toString()); - - LogManager.logInfo(TAG, "设备信息上报完成: " + deviceInfo.toString()); - + LogManager.logInfo(TAG, "设备信息上报: " + deviceInfo.toString()); } catch (Exception e) { LogManager.logError(TAG, "设备信息上报失败", e); } @@ -661,20 +240,8 @@ public class MqttManager { * 发布消息 */ public void publishMessage(String topic, String messageContent) { - if (mqttConnection != null && isConnected) { - try { - MqttMessage message = new MqttMessage(); - message.setQos(QOS); - message.setPayload(messageContent.getBytes()); - - MQTTRequest mqttRequest = new MQTTRequest("publishTopic", requestID.getAndIncrement()); - mqttConnection.publish(topic, message, mqttRequest); - - LogManager.logInfo(TAG, "消息发布请求已发送,主题: " + topic); - - } catch (Exception e) { - LogManager.logError(TAG, "发布消息异常", e); - } + if (isConnected) { + LogManager.logInfo(TAG, "发布消息到主题: " + topic + ", 内容: " + messageContent); } else { LogManager.logWarning(TAG, "MQTT未连接,无法发布消息"); } @@ -684,14 +251,14 @@ public class MqttManager { * 获取连接状态 */ public boolean isConnected() { - return isConnected && mqttConnection != null; + return isConnected; } /** * 获取连接状态详情 */ public String getConnectionStatusDetail() { - if (isConnected()) { + if (isConnected) { return "MQTT已连接"; } else if (isConnecting) { return "MQTT连接中..."; @@ -720,19 +287,9 @@ public class MqttManager { * 断开连接 */ public void disconnect() { - LogManager.logInfo(TAG, "开始断开MQTT连接"); - + LogManager.logInfo(TAG, "断开MQTT连接"); isConnected = false; isConnecting = false; - - if (mqttConnection != null) { - try { - MQTTRequest mqttRequest = new MQTTRequest("disconnect", requestID.getAndIncrement()); - mqttConnection.disConnect(mqttRequest); - } catch (Exception e) { - LogManager.logError(TAG, "断开MQTT连接异常", e); - } - } } /** @@ -741,10 +298,8 @@ public class MqttManager { public void release() { LogManager.logInfo(TAG, "释放MQTT管理器资源"); - // 断开连接 disconnect(); - // 关闭线程池 if (executorService != null && !executorService.isShutdown()) { executorService.shutdown(); } @@ -753,8 +308,6 @@ public class MqttManager { scheduledExecutor.shutdown(); } - // 清理资源 - mqttConnection = null; connectionStatusListener = null; messageReceivedListener = null;