diff --git a/app/src/main/java/com/ouxuan/oxface/device/MqttManager.java b/app/src/main/java/com/ouxuan/oxface/device/MqttManager.java index 8867421..264f887 100644 --- a/app/src/main/java/com/ouxuan/oxface/device/MqttManager.java +++ b/app/src/main/java/com/ouxuan/oxface/device/MqttManager.java @@ -3,17 +3,17 @@ package com.ouxuan.oxface.device; import android.content.Context; import android.os.Handler; import android.os.Looper; +import android.util.Log; import com.ouxuan.oxface.utils.LogManager; +import com.tencent.iot.hub.device.android.core.gateway.TXGatewayConnection; +import com.tencent.iot.hub.device.java.core.common.Status; +import com.tencent.iot.hub.device.java.core.log.TXMqttLogCallBack; +import com.tencent.iot.hub.device.java.core.mqtt.TXMqttActionCallBack; -import org.eclipse.paho.android.service.MqttAndroidClient; import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions; -import org.eclipse.paho.client.mqttv3.IMqttActionListener; -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; -import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.json.JSONObject; @@ -24,13 +24,21 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** - * 腾讯云MQTT管理器 - 真实实现版本 + * 腾讯云MQTT管理器 - 基于TXGatewayConnection实现 * 负责MQTT连接管理、自动重连、消息订阅和发布 * + * 功能特性: + * 1. 初始化后自动连接 + * 2. 网络断线自动重连机制 + * 3. 消息订阅和处理 + * 4. 连接状态监控和上报 + * 5. 门闸控制和设备重启命令处理 + * * @author AI Assistant - * @version 3.0 + * @version 4.0 * @date 2024/09/16 */ public class MqttManager { @@ -46,7 +54,7 @@ public class MqttManager { private static final String PRODUCT_KEY = "qr3rximCZnT6ZU0NsAAiTC7O"; // MQTT连接配置 - private static final String BROKER_URL = "ssl://iotcloud-mqtt.gz.tencentcloudapi.com:8883"; + private static final String BROKER_URL = null; // 传入null,使用腾讯云IoT默认地址 private static final int KEEP_ALIVE_INTERVAL = 120; private static final int CONNECTION_TIMEOUT = 10; private static final int QOS = 1; @@ -58,10 +66,10 @@ public class MqttManager { // 上下文和状态管理 private Context context; - private MqttAndroidClient mqttClient; + private TXGatewayConnection mqttConnection; private String deviceName; private String subscribeTopic; - private String clientId; + private String path2Store; // 连接状态管理 private boolean isConnected = false; @@ -71,6 +79,9 @@ public class MqttManager { private ExecutorService executorService; private ScheduledExecutorService scheduledExecutor; + // 请求ID生成器 + private static AtomicInteger requestID = new AtomicInteger(0); + // 监听器接口 private ConnectionStatusListener connectionStatusListener; private MessageReceivedListener messageReceivedListener; @@ -99,6 +110,24 @@ public class MqttManager { } /** + * MQTT请求上下文 + */ + private static class MQTTRequest { + public String requestType; + public int requestId; + + public MQTTRequest(String requestType, int requestId) { + this.requestType = requestType; + this.requestId = requestId; + } + + @Override + public String toString() { + return "MQTTRequest{requestType='" + requestType + "', requestId=" + requestId + "}"; + } + } + + /** * 获取单例实例 */ public static MqttManager getInstance() { @@ -127,19 +156,16 @@ public class MqttManager { public void initialize(Context context) { this.context = context.getApplicationContext(); this.gateABController = GateABController.getInstance(); + this.path2Store = context.getCacheDir().getAbsolutePath(); - // 生成设备名称和客户端ID - generateDeviceInfo(); + // 生成设备名称 + generateDeviceName(); // 构建MQTT配置 buildMqttConfig(); - // 创建MQTT客户端 - createMqttClient(); - LogManager.logInfo(TAG, "MQTT管理器初始化完成"); LogManager.logInfo(TAG, "设备名称: " + deviceName); - LogManager.logInfo(TAG, "客户端ID: " + clientId); LogManager.logInfo(TAG, "订阅主题: " + subscribeTopic); // 启动健康检查 @@ -150,13 +176,12 @@ public class MqttManager { } /** - * 生成设备信息 + * 生成设备名称 */ - private void generateDeviceInfo() { + private void generateDeviceName() { String androidId = DeviceUtils.getAndroidID(context); deviceName = "PadV6" + androidId; - clientId = PRODUCT_ID + deviceName; - LogManager.logInfo(TAG, "生成设备信息 - 设备名: " + deviceName + ", 客户端ID: " + clientId); + LogManager.logInfo(TAG, "生成设备名称: " + deviceName); } /** @@ -168,19 +193,6 @@ public class MqttManager { } /** - * 创建MQTT客户端 - */ - private void createMqttClient() { - try { - mqttClient = new MqttAndroidClient(context, BROKER_URL, clientId); - mqttClient.setCallback(new MqttCallbackHandler()); - LogManager.logInfo(TAG, "MQTT客户端创建成功"); - } catch (Exception e) { - LogManager.logError(TAG, "创建MQTT客户端失败", e); - } - } - - /** * 异步连接MQTT */ public void connectAsync() { @@ -207,59 +219,36 @@ public class MqttManager { isConnecting = true; try { + // 创建TXGatewayConnection实例 + mqttConnection = new TXGatewayConnection( + context, + BROKER_URL, + PRODUCT_ID, + deviceName, + DEV_PSK, + null, // devCert + null, // devPriv + false, // mqttLogFlag + null, // logCallBack + new MqttActionCallback() + ); + + // 配置连接选项 MqttConnectOptions options = new MqttConnectOptions(); options.setConnectionTimeout(CONNECTION_TIMEOUT); options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL); options.setAutomaticReconnect(false); // 手动控制重连 options.setCleanSession(false); - // 设置用户名和密码(腾讯云IoT认证) - options.setUserName(deviceName); - options.setPassword(DEV_PSK.toCharArray()); - - // SSL配置 - options.setSocketFactory(mqttClient.getSSLSocketFactory(null, null)); - LogManager.logInfo(TAG, "使用PSK认证连接腾讯云IoT平台"); // 连接MQTT - IMqttToken token = mqttClient.connect(options, null, new IMqttActionListener() { - @Override - public void onSuccess(IMqttToken asyncActionToken) { - LogManager.logInfo(TAG, "MQTT连接成功"); - isConnected = true; - isConnecting = false; - reconnectAttempts = 0; - - // 订阅主题 - subscribeToTopic(); - - // 配置断线缓冲 - configureDisconnectedBuffer(); - - // 通知连接成功 - if (connectionStatusListener != null) { - mainHandler.post(() -> connectionStatusListener.onConnected()); - } - } - - @Override - public void onFailure(IMqttToken asyncActionToken, Throwable exception) { - LogManager.logError(TAG, "MQTT连接失败", exception); - isConnecting = false; - - // 通知连接失败 - if (connectionStatusListener != null) { - String reason = exception != null ? exception.getMessage() : "未知错误"; - mainHandler.post(() -> connectionStatusListener.onConnectionFailed(reason)); - } - - // 调度重连 - scheduleReconnect(); - } - }); + MQTTRequest mqttRequest = new MQTTRequest("connect", requestID.getAndIncrement()); + mqttConnection.connect(options, mqttRequest); - } catch (MqttException e) { + LogManager.logInfo(TAG, "MQTT连接请求已发送"); + + } catch (Exception e) { LogManager.logError(TAG, "创建MQTT连接失败", e); isConnecting = false; scheduleReconnect(); @@ -270,20 +259,12 @@ public class MqttManager { * 订阅主题 */ private void subscribeToTopic() { - if (mqttClient != null && isConnected) { + if (mqttConnection != null && isConnected) { try { - mqttClient.subscribe(subscribeTopic, QOS, null, new IMqttActionListener() { - @Override - public void onSuccess(IMqttToken asyncActionToken) { - LogManager.logInfo(TAG, "主题订阅成功: " + subscribeTopic); - } - - @Override - public void onFailure(IMqttToken asyncActionToken, Throwable exception) { - LogManager.logError(TAG, "主题订阅失败", exception); - } - }); - } catch (MqttException e) { + MQTTRequest mqttRequest = new MQTTRequest("subscribe", requestID.getAndIncrement()); + mqttConnection.subscribe(subscribeTopic, QOS, mqttRequest); + LogManager.logInfo(TAG, "开始订阅主题: " + subscribeTopic); + } catch (Exception e) { LogManager.logError(TAG, "订阅主题异常", e); } } @@ -299,7 +280,7 @@ public class MqttManager { bufferOptions.setBufferSize(1024); bufferOptions.setDeleteOldestMessages(true); bufferOptions.setPersistBuffer(true); - mqttClient.setBufferOpts(bufferOptions); + mqttConnection.setBufferOpts(bufferOptions); LogManager.logInfo(TAG, "断线缓冲配置完成"); } catch (Exception e) { LogManager.logError(TAG, "配置断线缓冲失败", e); @@ -351,31 +332,65 @@ public class MqttManager { * 检查连接健康状态 */ private void checkConnectionHealth() { - if (mqttClient != null) { - boolean actuallyConnected = mqttClient.isConnected(); - if (isConnected != actuallyConnected) { - LogManager.logWarning(TAG, "连接状态不一致,更新状态"); - isConnected = actuallyConnected; - } - + if (mqttConnection != null) { + // 检查连接状态 if (!isConnected && !isConnecting) { - LogManager.logInfo(TAG, "健康检查:连接断开,尝试重连"); + LogManager.logInfo(TAG, "健康检查:连接已断开,尝试重连"); connectAsync(); } } else if (!isConnecting) { - LogManager.logInfo(TAG, "健康检查:客户端为空,重新创建"); - createMqttClient(); + LogManager.logInfo(TAG, "健康检查:客户端为空,尝试重连"); connectAsync(); } } /** - * MQTT回调处理器 + * MQTT动作回调处理器 */ - private class MqttCallbackHandler implements MqttCallback { + private class MqttActionCallback extends TXMqttActionCallBack { @Override - public void connectionLost(Throwable cause) { + public void onConnectCompleted(Status status, boolean reconnect, Object userContext, String msg, Throwable cause) { + String userContextInfo = ""; + if (userContext instanceof MQTTRequest) { + userContextInfo = userContext.toString(); + } + String logInfo = String.format("onConnectCompleted, status[%s], reconnect[%b], userContext[%s], msg[%s]", + status.name(), reconnect, userContextInfo, msg); + LogManager.logInfo(TAG, "连接回调:" + logInfo); + + if (status.equals(Status.OK)) { + LogManager.logInfo(TAG, "MQTT连接成功"); + isConnected = true; + isConnecting = false; + reconnectAttempts = 0; + + // 订阅主题 + subscribeToTopic(); + + // 配置断线缓冲 + configureDisconnectedBuffer(); + + // 通知连接成功 + if (connectionStatusListener != null) { + mainHandler.post(() -> connectionStatusListener.onConnected()); + } + } else { + LogManager.logError(TAG, "MQTT连接失败: " + msg); + isConnecting = false; + + // 通知连接失败 + if (connectionStatusListener != null) { + mainHandler.post(() -> connectionStatusListener.onConnectionFailed(msg)); + } + + // 调度重连 + scheduleReconnect(); + } + } + + @Override + public void onConnectionLost(Throwable cause) { LogManager.logError(TAG, "MQTT连接丢失", cause); isConnected = false; isConnecting = false; @@ -390,7 +405,37 @@ public class MqttManager { } @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { + public void onDisconnectCompleted(Status status, Object userContext, String msg, Throwable cause) { + LogManager.logInfo(TAG, "MQTT连接已断开: " + msg); + isConnected = false; + isConnecting = false; + } + + @Override + public void onPublishCompleted(Status status, IMqttToken token, Object userContext, String errMsg) { + if (status == Status.OK) { + LogManager.logDebug(TAG, "消息发布成功"); + } else { + LogManager.logError(TAG, "消息发布失败: " + errMsg); + } + } + + @Override + public void onSubscribeCompleted(Status status, IMqttToken token, Object userContext, String errMsg) { + if (status == Status.OK) { + LogManager.logInfo(TAG, "主题订阅成功: " + subscribeTopic); + } else { + LogManager.logError(TAG, "主题订阅失败: " + errMsg); + } + } + + @Override + public void onUnSubscribeCompleted(Status status, IMqttToken token, Object userContext, String errMsg) { + LogManager.logInfo(TAG, "取消订阅完成: " + errMsg); + } + + @Override + public void onMessageReceived(String topic, MqttMessage message) { String messageContent = new String(message.getPayload()); String timestamp = new SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault()).format(new Date()); @@ -399,11 +444,6 @@ public class MqttManager { // 处理消息 processMessage(topic, messageContent); } - - @Override - public void deliveryComplete(IMqttDeliveryToken token) { - LogManager.logDebug(TAG, "消息发送完成"); - } } /** @@ -546,15 +586,17 @@ public class MqttManager { * 发布消息 */ public void publishMessage(String topic, String messageContent) { - if (mqttClient != null && isConnected) { + if (mqttConnection != null && isConnected) { try { MqttMessage message = new MqttMessage(); message.setQos(QOS); message.setPayload(messageContent.getBytes()); - mqttClient.publish(topic, message); + MQTTRequest mqttRequest = new MQTTRequest("publish", requestID.getAndIncrement()); + mqttConnection.publish(topic, message, mqttRequest); + LogManager.logInfo(TAG, "消息发布成功,主题: " + topic); - } catch (MqttException e) { + } catch (Exception e) { LogManager.logError(TAG, "发布消息异常", e); } } else { @@ -566,7 +608,7 @@ public class MqttManager { * 获取连接状态 */ public boolean isConnected() { - return isConnected && mqttClient != null && mqttClient.isConnected(); + return isConnected && mqttConnection != null; } /** @@ -607,10 +649,11 @@ public class MqttManager { isConnected = false; isConnecting = false; - if (mqttClient != null) { + if (mqttConnection != null) { try { - mqttClient.disconnect(); - } catch (MqttException e) { + MQTTRequest mqttRequest = new MQTTRequest("disconnect", requestID.getAndIncrement()); + mqttConnection.disConnect(mqttRequest); + } catch (Exception e) { LogManager.logError(TAG, "断开MQTT连接异常", e); } } @@ -632,7 +675,7 @@ public class MqttManager { scheduledExecutor.shutdown(); } - mqttClient = null; + mqttConnection = null; connectionStatusListener = null; messageReceivedListener = null;