|
|
@ -10,6 +10,8 @@ import com.tencent.iot.hub.device.android.core.gateway.TXGatewayConnection; |
|
|
|
import com.tencent.iot.hub.device.java.core.common.Status; |
|
|
|
import com.tencent.iot.hub.device.java.core.log.TXMqttLogCallBack; |
|
|
|
import com.tencent.iot.hub.device.java.core.mqtt.TXMqttActionCallBack; |
|
|
|
import com.tencent.iot.hub.device.java.core.dynreg.TXMqttDynreg; |
|
|
|
import com.tencent.iot.hub.device.java.core.dynreg.TXMqttDynregCallback; |
|
|
|
|
|
|
|
import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions; |
|
|
|
import org.eclipse.paho.client.mqttv3.IMqttToken; |
|
|
@ -50,8 +52,7 @@ public class MqttManager { |
|
|
|
|
|
|
|
// 腾讯云IoT参数配置 |
|
|
|
private static final String PRODUCT_ID = "WZX68L5I75"; |
|
|
|
private static final String DEV_PSK = "7udrYcfTVThbzdMlLT9fHQ=="; |
|
|
|
private static final String PRODUCT_KEY = "qr3rximCZnT6ZU0NsAAiTC7O"; |
|
|
|
private static final String PRODUCT_KEY = "qr3rximCZnT6ZU0NsAAiTC7O"; // 动态注册用的ProductKey |
|
|
|
|
|
|
|
// MQTT连接配置 |
|
|
|
private static final String BROKER_URL = "ssl://" + PRODUCT_ID + ".iotcloud.tencentdevices.com:8883"; // 腾讯云IoT Hub SSL地址 |
|
|
@ -70,6 +71,8 @@ public class MqttManager { |
|
|
|
private String deviceName; |
|
|
|
private String subscribeTopic; |
|
|
|
private String path2Store; |
|
|
|
private String dynamicDevicePSK; // 动态注册获取的设备PSK |
|
|
|
private boolean isDynamicRegisterCompleted = false; // 动态注册是否完成 |
|
|
|
|
|
|
|
// 连接状态管理 |
|
|
|
private boolean isConnected = false; |
|
|
@ -193,7 +196,82 @@ public class MqttManager { |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 异步连接MQTT |
|
|
|
* 开始动态注册 |
|
|
|
*/ |
|
|
|
private void startDynamicRegister() { |
|
|
|
LogManager.logInfo(TAG, "开始动态注册 MQTT 设备..."); |
|
|
|
LogManager.logInfo(TAG, "ProductID: " + PRODUCT_ID); |
|
|
|
LogManager.logInfo(TAG, "ProductKey: " + PRODUCT_KEY); |
|
|
|
LogManager.logInfo(TAG, "DeviceName: " + deviceName); |
|
|
|
|
|
|
|
try { |
|
|
|
TXMqttDynreg dynreg = new TXMqttDynreg(PRODUCT_ID, PRODUCT_KEY, deviceName, new DynamicRegisterCallback()); |
|
|
|
dynreg.doDynamicRegister(); // 调起动态注册 |
|
|
|
LogManager.logInfo(TAG, "MQTT动态注册请求已发送"); |
|
|
|
} catch (Exception e) { |
|
|
|
LogManager.logError(TAG, "动态注册失败", e); |
|
|
|
scheduleReconnect(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 动态注册回调处理器 |
|
|
|
*/ |
|
|
|
private class DynamicRegisterCallback extends TXMqttDynregCallback { |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onGetDevicePSK(String devicePsk) { |
|
|
|
LogManager.logInfo(TAG, "动态注册成功,获取到设备PSK: " + devicePsk.substring(0, Math.min(8, devicePsk.length())) + "..."); |
|
|
|
|
|
|
|
// 保存动态获取的PSK |
|
|
|
dynamicDevicePSK = devicePsk; |
|
|
|
isDynamicRegisterCompleted = true; |
|
|
|
|
|
|
|
// 使用获取到的PSK连接MQTT |
|
|
|
try { |
|
|
|
connectMqtt(); |
|
|
|
} catch (Exception e) { |
|
|
|
LogManager.logError(TAG, "动态注册成功后连接MQTT失败", e); |
|
|
|
scheduleReconnect(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onGetDeviceCert(String deviceCert, String devicePriv) { |
|
|
|
LogManager.logInfo(TAG, "动态注册成功,获取到设备证书认证信息"); |
|
|
|
// 本项目使用PSK认证,不使用证书认证 |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onFailedDynreg(Throwable cause, String errMsg) { |
|
|
|
LogManager.logError(TAG, "动态注册失败: " + errMsg, cause); |
|
|
|
|
|
|
|
// 通知连接失败 |
|
|
|
if (connectionStatusListener != null) { |
|
|
|
mainHandler.post(() -> connectionStatusListener.onConnectionFailed("动态注册失败: " + errMsg)); |
|
|
|
} |
|
|
|
|
|
|
|
// 调度重连(包括重新动态注册) |
|
|
|
scheduleReconnect(); |
|
|
|
} |
|
|
|
|
|
|
|
@Override |
|
|
|
public void onFailedDynreg(Throwable cause) { |
|
|
|
String errorMsg = cause != null ? cause.getMessage() : "未知错误"; |
|
|
|
LogManager.logError(TAG, "动态注册失败: " + errorMsg, cause); |
|
|
|
|
|
|
|
// 通知连接失败 |
|
|
|
if (connectionStatusListener != null) { |
|
|
|
mainHandler.post(() -> connectionStatusListener.onConnectionFailed("动态注册失败: " + errorMsg)); |
|
|
|
} |
|
|
|
|
|
|
|
// 调度重连(包括重新动态注册) |
|
|
|
scheduleReconnect(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 开始动态注册和连接流程 |
|
|
|
*/ |
|
|
|
public void connectAsync() { |
|
|
|
if (isConnecting || isConnected) { |
|
|
@ -203,7 +281,13 @@ public class MqttManager { |
|
|
|
|
|
|
|
executorService.execute(() -> { |
|
|
|
try { |
|
|
|
connectMqtt(); |
|
|
|
if (!isDynamicRegisterCompleted) { |
|
|
|
// 先进行动态注册 |
|
|
|
startDynamicRegister(); |
|
|
|
} else { |
|
|
|
// 已有PSK,直接连接MQTT |
|
|
|
connectMqtt(); |
|
|
|
} |
|
|
|
} catch (Exception e) { |
|
|
|
LogManager.logError(TAG, "异步连接MQTT失败", e); |
|
|
|
scheduleReconnect(); |
|
|
@ -212,23 +296,30 @@ public class MqttManager { |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 连接MQTT服务器 |
|
|
|
* 连接MQTT服务器(使用动态注册获取的PSK) |
|
|
|
*/ |
|
|
|
private void connectMqtt() { |
|
|
|
LogManager.logInfo(TAG, "开始连接MQTT服务器..."); |
|
|
|
LogManager.logInfo(TAG, "连接地址: " + BROKER_URL); |
|
|
|
LogManager.logInfo(TAG, "产品ID: " + PRODUCT_ID); |
|
|
|
LogManager.logInfo(TAG, "设备名称: " + deviceName); |
|
|
|
|
|
|
|
if (dynamicDevicePSK == null || dynamicDevicePSK.isEmpty()) { |
|
|
|
LogManager.logError(TAG, "动态注册获取的PSK为空,无法连接MQTT"); |
|
|
|
scheduleReconnect(); |
|
|
|
return; |
|
|
|
} |
|
|
|
|
|
|
|
isConnecting = true; |
|
|
|
|
|
|
|
try { |
|
|
|
// 创建TXGatewayConnection实例 |
|
|
|
// 创建TXGatewayConnection实例,使用动态注册获取的PSK |
|
|
|
mqttConnection = new TXGatewayConnection( |
|
|
|
context, |
|
|
|
BROKER_URL, |
|
|
|
PRODUCT_ID, |
|
|
|
deviceName, |
|
|
|
DEV_PSK, |
|
|
|
dynamicDevicePSK, // 使用动态注册获取的PSK |
|
|
|
null, // devCert |
|
|
|
null, // devPriv |
|
|
|
true, // 启用MQTT日志以便调试 |
|
|
@ -244,7 +335,7 @@ public class MqttManager { |
|
|
|
options.setCleanSession(true); // 改为true,避免会话问题 |
|
|
|
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); // 明确指定MQTT版本 |
|
|
|
|
|
|
|
LogManager.logInfo(TAG, "使用PSK认证连接腾讯云IoT平台"); |
|
|
|
LogManager.logInfo(TAG, "使用动态注册PSK认证连接腾讯云IoT平台"); |
|
|
|
LogManager.logInfo(TAG, "连接超时: " + CONNECTION_TIMEOUT + "秒"); |
|
|
|
LogManager.logInfo(TAG, "心跳间隔: " + KEEP_ALIVE_INTERVAL + "秒"); |
|
|
|
|
|
|
@ -294,7 +385,7 @@ public class MqttManager { |
|
|
|
} |
|
|
|
|
|
|
|
/** |
|
|
|
* 调度重连 |
|
|
|
* 调度重连(包括重新动态注册) |
|
|
|
*/ |
|
|
|
private void scheduleReconnect() { |
|
|
|
if (reconnectAttempts >= MAX_RECONNECT_ATTEMPTS) { |
|
|
@ -307,8 +398,8 @@ public class MqttManager { |
|
|
|
// 渐进式延迟:5s, 10s, 20s, 40s, 80s |
|
|
|
long delay = RECONNECT_DELAY * (1L << (reconnectAttempts - 1)); |
|
|
|
|
|
|
|
LogManager.logInfo(TAG, "调度第" + reconnectAttempts + "次重连,延迟" + delay + "ms"); |
|
|
|
LogManager.logInfo(TAG, "重连原因: 代理程序不可用,可能是网络问题或认证失败"); |
|
|
|
LogManager.logInfo(TAG, "调度第" + reconnectAttempts + "次重连(包括动态注册),延迟" + delay + "ms"); |
|
|
|
LogManager.logInfo(TAG, "重连原因: 代理程序不可用或动态注册失败,可能是网络问题或认证失败"); |
|
|
|
|
|
|
|
if (connectionStatusListener != null) { |
|
|
|
mainHandler.post(() -> connectionStatusListener.onReconnecting(reconnectAttempts)); |
|
|
@ -316,8 +407,8 @@ public class MqttManager { |
|
|
|
|
|
|
|
scheduledExecutor.schedule(() -> { |
|
|
|
if (!isConnected) { |
|
|
|
LogManager.logInfo(TAG, "执行第" + reconnectAttempts + "次重连"); |
|
|
|
// 清理旧连接 |
|
|
|
LogManager.logInfo(TAG, "执行第" + reconnectAttempts + "次重连(包括重新动态注册)"); |
|
|
|
// 清理旧连接和动态注册状态 |
|
|
|
if (mqttConnection != null) { |
|
|
|
try { |
|
|
|
mqttConnection = null; |
|
|
@ -325,6 +416,9 @@ public class MqttManager { |
|
|
|
LogManager.logError(TAG, "清理旧连接失败", e); |
|
|
|
} |
|
|
|
} |
|
|
|
// 重置动态注册状态,重新执行动态注册流程 |
|
|
|
isDynamicRegisterCompleted = false; |
|
|
|
dynamicDevicePSK = null; |
|
|
|
connectAsync(); |
|
|
|
} |
|
|
|
}, delay, TimeUnit.MILLISECONDS); |
|
|
|