diff --git a/app/src/main/java/com/ouxuan/oxface/device/MqttManager.java b/app/src/main/java/com/ouxuan/oxface/device/MqttManager.java index 537633a..6f541df 100644 --- a/app/src/main/java/com/ouxuan/oxface/device/MqttManager.java +++ b/app/src/main/java/com/ouxuan/oxface/device/MqttManager.java @@ -10,6 +10,8 @@ import com.tencent.iot.hub.device.android.core.gateway.TXGatewayConnection; import com.tencent.iot.hub.device.java.core.common.Status; import com.tencent.iot.hub.device.java.core.log.TXMqttLogCallBack; import com.tencent.iot.hub.device.java.core.mqtt.TXMqttActionCallBack; +import com.tencent.iot.hub.device.java.core.dynreg.TXMqttDynreg; +import com.tencent.iot.hub.device.java.core.dynreg.TXMqttDynregCallback; import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions; import org.eclipse.paho.client.mqttv3.IMqttToken; @@ -50,8 +52,7 @@ public class MqttManager { // 腾讯云IoT参数配置 private static final String PRODUCT_ID = "WZX68L5I75"; - private static final String DEV_PSK = "7udrYcfTVThbzdMlLT9fHQ=="; - private static final String PRODUCT_KEY = "qr3rximCZnT6ZU0NsAAiTC7O"; + private static final String PRODUCT_KEY = "qr3rximCZnT6ZU0NsAAiTC7O"; // 动态注册用的ProductKey // MQTT连接配置 private static final String BROKER_URL = "ssl://" + PRODUCT_ID + ".iotcloud.tencentdevices.com:8883"; // 腾讯云IoT Hub SSL地址 @@ -70,6 +71,8 @@ public class MqttManager { private String deviceName; private String subscribeTopic; private String path2Store; + private String dynamicDevicePSK; // 动态注册获取的设备PSK + private boolean isDynamicRegisterCompleted = false; // 动态注册是否完成 // 连接状态管理 private boolean isConnected = false; @@ -193,7 +196,82 @@ public class MqttManager { } /** - * 异步连接MQTT + * 开始动态注册 + */ + private void startDynamicRegister() { + LogManager.logInfo(TAG, "开始动态注册 MQTT 设备..."); + LogManager.logInfo(TAG, "ProductID: " + PRODUCT_ID); + LogManager.logInfo(TAG, "ProductKey: " + PRODUCT_KEY); + LogManager.logInfo(TAG, "DeviceName: " + deviceName); + + try { + TXMqttDynreg dynreg = new TXMqttDynreg(PRODUCT_ID, PRODUCT_KEY, deviceName, new DynamicRegisterCallback()); + dynreg.doDynamicRegister(); // 调起动态注册 + LogManager.logInfo(TAG, "MQTT动态注册请求已发送"); + } catch (Exception e) { + LogManager.logError(TAG, "动态注册失败", e); + scheduleReconnect(); + } + } + + /** + * 动态注册回调处理器 + */ + private class DynamicRegisterCallback extends TXMqttDynregCallback { + + @Override + public void onGetDevicePSK(String devicePsk) { + LogManager.logInfo(TAG, "动态注册成功,获取到设备PSK: " + devicePsk.substring(0, Math.min(8, devicePsk.length())) + "..."); + + // 保存动态获取的PSK + dynamicDevicePSK = devicePsk; + isDynamicRegisterCompleted = true; + + // 使用获取到的PSK连接MQTT + try { + connectMqtt(); + } catch (Exception e) { + LogManager.logError(TAG, "动态注册成功后连接MQTT失败", e); + scheduleReconnect(); + } + } + + @Override + public void onGetDeviceCert(String deviceCert, String devicePriv) { + LogManager.logInfo(TAG, "动态注册成功,获取到设备证书认证信息"); + // 本项目使用PSK认证,不使用证书认证 + } + + @Override + public void onFailedDynreg(Throwable cause, String errMsg) { + LogManager.logError(TAG, "动态注册失败: " + errMsg, cause); + + // 通知连接失败 + if (connectionStatusListener != null) { + mainHandler.post(() -> connectionStatusListener.onConnectionFailed("动态注册失败: " + errMsg)); + } + + // 调度重连(包括重新动态注册) + scheduleReconnect(); + } + + @Override + public void onFailedDynreg(Throwable cause) { + String errorMsg = cause != null ? cause.getMessage() : "未知错误"; + LogManager.logError(TAG, "动态注册失败: " + errorMsg, cause); + + // 通知连接失败 + if (connectionStatusListener != null) { + mainHandler.post(() -> connectionStatusListener.onConnectionFailed("动态注册失败: " + errorMsg)); + } + + // 调度重连(包括重新动态注册) + scheduleReconnect(); + } + } + + /** + * 开始动态注册和连接流程 */ public void connectAsync() { if (isConnecting || isConnected) { @@ -203,7 +281,13 @@ public class MqttManager { executorService.execute(() -> { try { - connectMqtt(); + if (!isDynamicRegisterCompleted) { + // 先进行动态注册 + startDynamicRegister(); + } else { + // 已有PSK,直接连接MQTT + connectMqtt(); + } } catch (Exception e) { LogManager.logError(TAG, "异步连接MQTT失败", e); scheduleReconnect(); @@ -212,23 +296,30 @@ public class MqttManager { } /** - * 连接MQTT服务器 + * 连接MQTT服务器(使用动态注册获取的PSK) */ private void connectMqtt() { LogManager.logInfo(TAG, "开始连接MQTT服务器..."); LogManager.logInfo(TAG, "连接地址: " + BROKER_URL); LogManager.logInfo(TAG, "产品ID: " + PRODUCT_ID); LogManager.logInfo(TAG, "设备名称: " + deviceName); + + if (dynamicDevicePSK == null || dynamicDevicePSK.isEmpty()) { + LogManager.logError(TAG, "动态注册获取的PSK为空,无法连接MQTT"); + scheduleReconnect(); + return; + } + isConnecting = true; try { - // 创建TXGatewayConnection实例 + // 创建TXGatewayConnection实例,使用动态注册获取的PSK mqttConnection = new TXGatewayConnection( context, BROKER_URL, PRODUCT_ID, deviceName, - DEV_PSK, + dynamicDevicePSK, // 使用动态注册获取的PSK null, // devCert null, // devPriv true, // 启用MQTT日志以便调试 @@ -244,7 +335,7 @@ public class MqttManager { options.setCleanSession(true); // 改为true,避免会话问题 options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); // 明确指定MQTT版本 - LogManager.logInfo(TAG, "使用PSK认证连接腾讯云IoT平台"); + LogManager.logInfo(TAG, "使用动态注册PSK认证连接腾讯云IoT平台"); LogManager.logInfo(TAG, "连接超时: " + CONNECTION_TIMEOUT + "秒"); LogManager.logInfo(TAG, "心跳间隔: " + KEEP_ALIVE_INTERVAL + "秒"); @@ -294,7 +385,7 @@ public class MqttManager { } /** - * 调度重连 + * 调度重连(包括重新动态注册) */ private void scheduleReconnect() { if (reconnectAttempts >= MAX_RECONNECT_ATTEMPTS) { @@ -307,8 +398,8 @@ public class MqttManager { // 渐进式延迟:5s, 10s, 20s, 40s, 80s long delay = RECONNECT_DELAY * (1L << (reconnectAttempts - 1)); - LogManager.logInfo(TAG, "调度第" + reconnectAttempts + "次重连,延迟" + delay + "ms"); - LogManager.logInfo(TAG, "重连原因: 代理程序不可用,可能是网络问题或认证失败"); + LogManager.logInfo(TAG, "调度第" + reconnectAttempts + "次重连(包括动态注册),延迟" + delay + "ms"); + LogManager.logInfo(TAG, "重连原因: 代理程序不可用或动态注册失败,可能是网络问题或认证失败"); if (connectionStatusListener != null) { mainHandler.post(() -> connectionStatusListener.onReconnecting(reconnectAttempts)); @@ -316,8 +407,8 @@ public class MqttManager { scheduledExecutor.schedule(() -> { if (!isConnected) { - LogManager.logInfo(TAG, "执行第" + reconnectAttempts + "次重连"); - // 清理旧连接 + LogManager.logInfo(TAG, "执行第" + reconnectAttempts + "次重连(包括重新动态注册)"); + // 清理旧连接和动态注册状态 if (mqttConnection != null) { try { mqttConnection = null; @@ -325,6 +416,9 @@ public class MqttManager { LogManager.logError(TAG, "清理旧连接失败", e); } } + // 重置动态注册状态,重新执行动态注册流程 + isDynamicRegisterCompleted = false; + dynamicDevicePSK = null; connectAsync(); } }, delay, TimeUnit.MILLISECONDS);