diff --git a/app/build.gradle b/app/build.gradle index da17911..8e506d4 100644 --- a/app/build.gradle +++ b/app/build.gradle @@ -101,6 +101,10 @@ dependencies { // 腾讯云IoT Hub SDK implementation 'com.tencent.iot.hub:hub-device-android:3.3.23' + + // 备用MQTT客户端库 + implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5' + implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1' testImplementation 'junit:junit:4.13.2' androidTestImplementation 'androidx.test.ext:junit:1.1.5' diff --git a/app/src/main/java/com/ouxuan/oxface/device/MqttManager.java b/app/src/main/java/com/ouxuan/oxface/device/MqttManager.java index 248944e..8867421 100644 --- a/app/src/main/java/com/ouxuan/oxface/device/MqttManager.java +++ b/app/src/main/java/com/ouxuan/oxface/device/MqttManager.java @@ -5,26 +5,32 @@ import android.os.Handler; import android.os.Looper; import com.ouxuan.oxface.utils.LogManager; + +import org.eclipse.paho.android.service.MqttAndroidClient; +import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions; +import org.eclipse.paho.client.mqttv3.IMqttActionListener; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.IMqttToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; import org.json.JSONObject; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Locale; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** - * 腾讯云MQTT管理器 - 简化版本 + * 腾讯云MQTT管理器 - 真实实现版本 * 负责MQTT连接管理、自动重连、消息订阅和发布 * - * 功能特性: - * 1. 初始化后自动连接 - * 2. 网络断线自动重连机制 - * 3. 消息订阅和处理 - * 4. 连接状态监控和上报 - * 5. 门闸控制和设备重启命令处理 - * * @author AI Assistant - * @version 2.1 + * @version 3.0 * @date 2024/09/16 */ public class MqttManager { @@ -39,6 +45,12 @@ public class MqttManager { private static final String DEV_PSK = "7udrYcfTVThbzdMlLT9fHQ=="; private static final String PRODUCT_KEY = "qr3rximCZnT6ZU0NsAAiTC7O"; + // MQTT连接配置 + private static final String BROKER_URL = "ssl://iotcloud-mqtt.gz.tencentcloudapi.com:8883"; + private static final int KEEP_ALIVE_INTERVAL = 120; + private static final int CONNECTION_TIMEOUT = 10; + private static final int QOS = 1; + // 重连配置 private static final int MAX_RECONNECT_ATTEMPTS = 5; private static final long RECONNECT_DELAY = 5000; @@ -46,8 +58,10 @@ public class MqttManager { // 上下文和状态管理 private Context context; + private MqttAndroidClient mqttClient; private String deviceName; private String subscribeTopic; + private String clientId; // 连接状态管理 private boolean isConnected = false; @@ -114,30 +128,35 @@ public class MqttManager { this.context = context.getApplicationContext(); this.gateABController = GateABController.getInstance(); - // 生成设备名称 - generateDeviceName(); + // 生成设备名称和客户端ID + generateDeviceInfo(); // 构建MQTT配置 buildMqttConfig(); + // 创建MQTT客户端 + createMqttClient(); + LogManager.logInfo(TAG, "MQTT管理器初始化完成"); LogManager.logInfo(TAG, "设备名称: " + deviceName); + LogManager.logInfo(TAG, "客户端ID: " + clientId); LogManager.logInfo(TAG, "订阅主题: " + subscribeTopic); // 启动健康检查 startHealthCheck(); - // 模拟连接成功 - simulateConnection(); + // 自动连接 + connectAsync(); } /** - * 生成设备名称 + * 生成设备信息 */ - private void generateDeviceName() { + private void generateDeviceInfo() { String androidId = DeviceUtils.getAndroidID(context); deviceName = "PadV6" + androidId; - LogManager.logInfo(TAG, "生成设备名称: " + deviceName); + clientId = PRODUCT_ID + deviceName; + LogManager.logInfo(TAG, "生成设备信息 - 设备名: " + deviceName + ", 客户端ID: " + clientId); } /** @@ -149,38 +168,177 @@ public class MqttManager { } /** - * 模拟连接 - 简化版本 + * 创建MQTT客户端 + */ + private void createMqttClient() { + try { + mqttClient = new MqttAndroidClient(context, BROKER_URL, clientId); + mqttClient.setCallback(new MqttCallbackHandler()); + LogManager.logInfo(TAG, "MQTT客户端创建成功"); + } catch (Exception e) { + LogManager.logError(TAG, "创建MQTT客户端失败", e); + } + } + + /** + * 异步连接MQTT */ - private void simulateConnection() { + public void connectAsync() { + if (isConnecting || isConnected) { + LogManager.logInfo(TAG, "MQTT正在连接或已连接,跳过连接请求"); + return; + } + executorService.execute(() -> { try { - Thread.sleep(2000); // 模拟连接延迟 - isConnected = true; - isConnecting = false; - reconnectAttempts = 0; - - LogManager.logInfo(TAG, "MQTT连接成功(模拟)"); - - if (connectionStatusListener != null) { - mainHandler.post(() -> connectionStatusListener.onConnected()); - } + connectMqtt(); } catch (Exception e) { - LogManager.logError(TAG, "MQTT连接失败", e); + LogManager.logError(TAG, "异步连接MQTT失败", e); + scheduleReconnect(); } }); } /** + * 连接MQTT服务器 + */ + private void connectMqtt() { + LogManager.logInfo(TAG, "开始连接MQTT服务器..."); + isConnecting = true; + + try { + MqttConnectOptions options = new MqttConnectOptions(); + options.setConnectionTimeout(CONNECTION_TIMEOUT); + options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL); + options.setAutomaticReconnect(false); // 手动控制重连 + options.setCleanSession(false); + + // 设置用户名和密码(腾讯云IoT认证) + options.setUserName(deviceName); + options.setPassword(DEV_PSK.toCharArray()); + + // SSL配置 + options.setSocketFactory(mqttClient.getSSLSocketFactory(null, null)); + + LogManager.logInfo(TAG, "使用PSK认证连接腾讯云IoT平台"); + + // 连接MQTT + IMqttToken token = mqttClient.connect(options, null, new IMqttActionListener() { + @Override + public void onSuccess(IMqttToken asyncActionToken) { + LogManager.logInfo(TAG, "MQTT连接成功"); + isConnected = true; + isConnecting = false; + reconnectAttempts = 0; + + // 订阅主题 + subscribeToTopic(); + + // 配置断线缓冲 + configureDisconnectedBuffer(); + + // 通知连接成功 + if (connectionStatusListener != null) { + mainHandler.post(() -> connectionStatusListener.onConnected()); + } + } + + @Override + public void onFailure(IMqttToken asyncActionToken, Throwable exception) { + LogManager.logError(TAG, "MQTT连接失败", exception); + isConnecting = false; + + // 通知连接失败 + if (connectionStatusListener != null) { + String reason = exception != null ? exception.getMessage() : "未知错误"; + mainHandler.post(() -> connectionStatusListener.onConnectionFailed(reason)); + } + + // 调度重连 + scheduleReconnect(); + } + }); + + } catch (MqttException e) { + LogManager.logError(TAG, "创建MQTT连接失败", e); + isConnecting = false; + scheduleReconnect(); + } + } + + /** + * 订阅主题 + */ + private void subscribeToTopic() { + if (mqttClient != null && isConnected) { + try { + mqttClient.subscribe(subscribeTopic, QOS, null, new IMqttActionListener() { + @Override + public void onSuccess(IMqttToken asyncActionToken) { + LogManager.logInfo(TAG, "主题订阅成功: " + subscribeTopic); + } + + @Override + public void onFailure(IMqttToken asyncActionToken, Throwable exception) { + LogManager.logError(TAG, "主题订阅失败", exception); + } + }); + } catch (MqttException e) { + LogManager.logError(TAG, "订阅主题异常", e); + } + } + } + + /** + * 配置断线缓冲 + */ + private void configureDisconnectedBuffer() { + try { + DisconnectedBufferOptions bufferOptions = new DisconnectedBufferOptions(); + bufferOptions.setBufferEnabled(true); + bufferOptions.setBufferSize(1024); + bufferOptions.setDeleteOldestMessages(true); + bufferOptions.setPersistBuffer(true); + mqttClient.setBufferOpts(bufferOptions); + LogManager.logInfo(TAG, "断线缓冲配置完成"); + } catch (Exception e) { + LogManager.logError(TAG, "配置断线缓冲失败", e); + } + } + + /** + * 调度重连 + */ + private void scheduleReconnect() { + if (reconnectAttempts >= MAX_RECONNECT_ATTEMPTS) { + LogManager.logError(TAG, "已达到最大重连次数,停止重连"); + return; + } + + reconnectAttempts++; + long delay = RECONNECT_DELAY * reconnectAttempts; + + LogManager.logInfo(TAG, "调度第" + reconnectAttempts + "次重连,延迟" + delay + "ms"); + + if (connectionStatusListener != null) { + mainHandler.post(() -> connectionStatusListener.onReconnecting(reconnectAttempts)); + } + + scheduledExecutor.schedule(() -> { + if (!isConnected) { + LogManager.logInfo(TAG, "执行第" + reconnectAttempts + "次重连"); + connectAsync(); + } + }, delay, TimeUnit.MILLISECONDS); + } + + /** * 启动健康检查 */ private void startHealthCheck() { scheduledExecutor.scheduleWithFixedDelay(() -> { try { - // 简单的健康检查 - if (!isConnected && !isConnecting) { - LogManager.logInfo(TAG, "健康检查:尝试重连"); - simulateConnection(); - } + checkConnectionHealth(); } catch (Exception e) { LogManager.logError(TAG, "健康检查异常", e); } @@ -190,31 +348,176 @@ public class MqttManager { } /** + * 检查连接健康状态 + */ + private void checkConnectionHealth() { + if (mqttClient != null) { + boolean actuallyConnected = mqttClient.isConnected(); + if (isConnected != actuallyConnected) { + LogManager.logWarning(TAG, "连接状态不一致,更新状态"); + isConnected = actuallyConnected; + } + + if (!isConnected && !isConnecting) { + LogManager.logInfo(TAG, "健康检查:连接断开,尝试重连"); + connectAsync(); + } + } else if (!isConnecting) { + LogManager.logInfo(TAG, "健康检查:客户端为空,重新创建"); + createMqttClient(); + connectAsync(); + } + } + + /** + * MQTT回调处理器 + */ + private class MqttCallbackHandler implements MqttCallback { + + @Override + public void connectionLost(Throwable cause) { + LogManager.logError(TAG, "MQTT连接丢失", cause); + isConnected = false; + isConnecting = false; + + String reason = cause != null ? cause.getMessage() : "未知原因"; + if (connectionStatusListener != null) { + mainHandler.post(() -> connectionStatusListener.onConnectionLost(reason)); + } + + // 触发重连 + scheduleReconnect(); + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + String messageContent = new String(message.getPayload()); + String timestamp = new SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault()).format(new Date()); + + LogManager.logInfo(TAG, timestamp + " " + TAG + " D 接收到MQTT消息,主题: " + topic + ", 内容: " + messageContent); + + // 处理消息 + processMessage(topic, messageContent); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + LogManager.logDebug(TAG, "消息发送完成"); + } + } + + /** + * 处理接收到的消息 + */ + private void processMessage(String topic, String messageContent) { + try { + if (messageContent.contains("\"gate\"")) { + handleGateCommand(messageContent); + } else if (messageContent.contains("reboot-pad")) { + handleRebootCommand(); + } else if (messageContent.contains("get_log_level")) { + handleLogLevelQuery(); + } else { + handleOtherMessage(topic, messageContent); + } + } catch (Exception e) { + LogManager.logError(TAG, "处理MQTT消息异常", e); + } + } + + /** * 处理门闸控制命令 */ private void handleGateCommand(String messageContent) { LogManager.logInfo(TAG, "处理门闸控制命令: " + messageContent); try { - if (gateABController != null) { - gateABController.openGateAB(new GateABController.GateControlCallback() { - @Override - public void onSuccess(String message) { - LogManager.logInfo(TAG, "MQTT门闸控制成功: " + message); - } - - @Override - public void onError(String errorMessage) { - LogManager.logError(TAG, "MQTT门闸控制失败: " + errorMessage); - } - }); + String gateCommand = extractGateCommand(messageContent); + if (gateCommand != null) { + LogManager.logInfo(TAG, "解析门闸命令: " + gateCommand); + + if (gateABController != null) { + gateABController.openGateAB(new GateABController.GateControlCallback() { + @Override + public void onSuccess(String message) { + LogManager.logInfo(TAG, "MQTT门闸控制成功: " + message); + } + + @Override + public void onError(String errorMessage) { + LogManager.logError(TAG, "MQTT门闸控制失败: " + errorMessage); + } + }); + } + + if (messageReceivedListener != null) { + mainHandler.post(() -> messageReceivedListener.onGateCommandReceived(gateCommand)); + } } - + } catch (Exception e) { + LogManager.logError(TAG, "处理门闸控制命令异常", e); + } + } + + /** + * 提取门闸命令 + */ + private String extractGateCommand(String messageContent) { + try { + int gateIndex = messageContent.indexOf("\"gate\""); + if (gateIndex != -1) { + int startIndex = messageContent.indexOf(":", gateIndex) + 1; + int endIndex = messageContent.indexOf("}", startIndex); + if (startIndex > 0 && endIndex > startIndex) { + return messageContent.substring(startIndex, endIndex).trim(); + } + } + } catch (Exception e) { + LogManager.logError(TAG, "提取门闸命令异常", e); + } + return null; + } + + /** + * 处理设备重启命令 + */ + private void handleRebootCommand() { + LogManager.logInfo(TAG, "接收到设备重启命令"); + + try { if (messageReceivedListener != null) { - mainHandler.post(() -> messageReceivedListener.onGateCommandReceived(messageContent)); + mainHandler.post(() -> messageReceivedListener.onRebootCommandReceived()); } + LogManager.logWarning(TAG, "设备重启功能待实现"); } catch (Exception e) { - LogManager.logError(TAG, "处理门闸控制命令异常", e); + LogManager.logError(TAG, "处理设备重启命令异常", e); + } + } + + /** + * 处理日志级别查询 + */ + private void handleLogLevelQuery() { + LogManager.logInfo(TAG, "接收到日志级别查询命令"); + + try { + if (messageReceivedListener != null) { + mainHandler.post(() -> messageReceivedListener.onLogLevelQueryReceived()); + } + uploadDeviceInfo(); + } catch (Exception e) { + LogManager.logError(TAG, "处理日志级别查询异常", e); + } + } + + /** + * 处理其他消息 + */ + private void handleOtherMessage(String topic, String messageContent) { + LogManager.logInfo(TAG, "接收到其他消息,主题: " + topic + ", 内容: " + messageContent); + + if (messageReceivedListener != null) { + mainHandler.post(() -> messageReceivedListener.onOtherMessageReceived(topic, messageContent)); } } @@ -230,7 +533,10 @@ public class MqttManager { deviceInfo.put("androidVersion", DeviceUtils.getAndroidVersion()); deviceInfo.put("timestamp", System.currentTimeMillis()); - LogManager.logInfo(TAG, "设备信息上报: " + deviceInfo.toString()); + String dataTopic = PRODUCT_ID + "/" + deviceName + "/data"; + publishMessage(dataTopic, deviceInfo.toString()); + + LogManager.logInfo(TAG, "设备信息上报完成: " + deviceInfo.toString()); } catch (Exception e) { LogManager.logError(TAG, "设备信息上报失败", e); } @@ -240,37 +546,34 @@ public class MqttManager { * 发布消息 */ public void publishMessage(String topic, String messageContent) { - if (isConnected) { - LogManager.logInfo(TAG, "发布消息到主题: " + topic + ", 内容: " + messageContent); + if (mqttClient != null && isConnected) { + try { + MqttMessage message = new MqttMessage(); + message.setQos(QOS); + message.setPayload(messageContent.getBytes()); + + mqttClient.publish(topic, message); + LogManager.logInfo(TAG, "消息发布成功,主题: " + topic); + } catch (MqttException e) { + LogManager.logError(TAG, "发布消息异常", e); + } } else { LogManager.logWarning(TAG, "MQTT未连接,无法发布消息"); } } /** - * 异步连接MQTT - 对外公开接口 - */ - public void connectAsync() { - LogManager.logInfo(TAG, "手动触发MQTT连接"); - if (!isConnected && !isConnecting) { - simulateConnection(); - } else { - LogManager.logInfo(TAG, "MQTT已连接或正在连接中"); - } - } - - /** * 获取连接状态 */ public boolean isConnected() { - return isConnected; + return isConnected && mqttClient != null && mqttClient.isConnected(); } /** * 获取连接状态详情 */ public String getConnectionStatusDetail() { - if (isConnected) { + if (isConnected()) { return "MQTT已连接"; } else if (isConnecting) { return "MQTT连接中..."; @@ -299,9 +602,18 @@ public class MqttManager { * 断开连接 */ public void disconnect() { - LogManager.logInfo(TAG, "断开MQTT连接"); + LogManager.logInfo(TAG, "开始断开MQTT连接"); + isConnected = false; isConnecting = false; + + if (mqttClient != null) { + try { + mqttClient.disconnect(); + } catch (MqttException e) { + LogManager.logError(TAG, "断开MQTT连接异常", e); + } + } } /** @@ -320,6 +632,7 @@ public class MqttManager { scheduledExecutor.shutdown(); } + mqttClient = null; connectionStatusListener = null; messageReceivedListener = null;