Browse Source

fix 4

devab
赵明涛 4 weeks ago
parent
commit
37af7e2bc9
  1. 505
      app/src/main/java/com/ouxuan/oxface/device/MqttManager.java

505
app/src/main/java/com/ouxuan/oxface/device/MqttManager.java

@ -3,33 +3,17 @@ package com.ouxuan.oxface.device;
import android.content.Context; import android.content.Context;
import android.os.Handler; import android.os.Handler;
import android.os.Looper; import android.os.Looper;
import android.util.Log;
import com.ouxuan.oxface.utils.LogManager; import com.ouxuan.oxface.utils.LogManager;
import com.tencent.iot.hub.device.android.core.gateway.TXGatewayConnection;
import com.tencent.iot.hub.device.android.core.util.TXLog;
import com.tencent.iot.hub.device.java.core.log.TXMqttLogCallBack;
import com.tencent.iot.hub.device.java.core.mqtt.TXMqttActionCallBack;
import com.tencent.iot.hub.device.java.core.mqtt.TXMqttConstants;
import com.tencent.iot.hub.device.java.core.common.Status;
import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.json.JSONObject; import org.json.JSONObject;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* 腾讯云MQTT管理器
* 腾讯云MQTT管理器 - 简化版本
* 负责MQTT连接管理自动重连消息订阅和发布 * 负责MQTT连接管理自动重连消息订阅和发布
* *
* 功能特性 * 功能特性
@ -40,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* 5. 门闸控制和设备重启命令处理 * 5. 门闸控制和设备重启命令处理
* *
* @author AI Assistant * @author AI Assistant
* @version 2.0
* @version 2.1
* @date 2024/09/16 * @date 2024/09/16
*/ */
public class MqttManager { public class MqttManager {
@ -55,20 +39,13 @@ public class MqttManager {
private static final String DEV_PSK = "7udrYcfTVThbzdMlLT9fHQ=="; private static final String DEV_PSK = "7udrYcfTVThbzdMlLT9fHQ==";
private static final String PRODUCT_KEY = "qr3rximCZnT6ZU0NsAAiTC7O"; private static final String PRODUCT_KEY = "qr3rximCZnT6ZU0NsAAiTC7O";
// MQTT连接配置
private static final String BROKER_URL = null; // 传入null使用腾讯云IoT默认地址
private static final int KEEP_ALIVE_INTERVAL = 120; // 心跳间隔()
private static final int CONNECTION_TIMEOUT = 10; // 连接超时()
private static final int QOS = 1; // 消息质量
// 重连配置 // 重连配置
private static final int MAX_RECONNECT_ATTEMPTS = 5; // 最大重连次数
private static final long RECONNECT_DELAY = 5000; // 重连延迟(毫秒)
private static final long HEALTH_CHECK_INTERVAL = 30000; // 健康检查间隔(毫秒)
private static final int MAX_RECONNECT_ATTEMPTS = 5;
private static final long RECONNECT_DELAY = 5000;
private static final long HEALTH_CHECK_INTERVAL = 30000;
// 上下文和状态管理 // 上下文和状态管理
private Context context; private Context context;
private TXGatewayConnection mqttConnection;
private String deviceName; private String deviceName;
private String subscribeTopic; private String subscribeTopic;
@ -80,9 +57,6 @@ public class MqttManager {
private ExecutorService executorService; private ExecutorService executorService;
private ScheduledExecutorService scheduledExecutor; private ScheduledExecutorService scheduledExecutor;
// 请求ID生成器
private static AtomicInteger requestID = new AtomicInteger(0);
// 监听器接口 // 监听器接口
private ConnectionStatusListener connectionStatusListener; private ConnectionStatusListener connectionStatusListener;
private MessageReceivedListener messageReceivedListener; private MessageReceivedListener messageReceivedListener;
@ -94,27 +68,9 @@ public class MqttManager {
* MQTT连接状态监听器 * MQTT连接状态监听器
*/ */
public interface ConnectionStatusListener { public interface ConnectionStatusListener {
/**
* 连接成功
*/
void onConnected(); void onConnected();
/**
* 连接失败
* @param reason 失败原因
*/
void onConnectionFailed(String reason); void onConnectionFailed(String reason);
/**
* 连接丢失
* @param reason 丢失原因
*/
void onConnectionLost(String reason); void onConnectionLost(String reason);
/**
* 重连中
* @param attempt 当前重连次数
*/
void onReconnecting(int attempt); void onReconnecting(int attempt);
} }
@ -122,44 +78,13 @@ public class MqttManager {
* MQTT消息接收监听器 * MQTT消息接收监听器
*/ */
public interface MessageReceivedListener { public interface MessageReceivedListener {
/**
* 接收到门闸控制消息
* @param gateCommand 门闸控制命令
*/
void onGateCommandReceived(String gateCommand); void onGateCommandReceived(String gateCommand);
/**
* 接收到设备重启消息
*/
void onRebootCommandReceived(); void onRebootCommandReceived();
/**
* 接收到日志级别查询消息
*/
void onLogLevelQueryReceived(); void onLogLevelQueryReceived();
/**
* 接收到其他消息
* @param topic 主题
* @param message 消息内容
*/
void onOtherMessageReceived(String topic, String message); void onOtherMessageReceived(String topic, String message);
} }
/** /**
* MQTT请求上下文
*/
private static class MQTTRequest {
public String action;
public int requestId;
public MQTTRequest(String action, int requestId) {
this.action = action;
this.requestId = requestId;
}
}
/**
* 获取单例实例 * 获取单例实例
*/ */
public static MqttManager getInstance() { public static MqttManager getInstance() {
@ -184,7 +109,6 @@ public class MqttManager {
/** /**
* 初始化MQTT管理器 * 初始化MQTT管理器
* @param context 上下文
*/ */
public void initialize(Context context) { public void initialize(Context context) {
this.context = context.getApplicationContext(); this.context = context.getApplicationContext();
@ -203,8 +127,8 @@ public class MqttManager {
// 启动健康检查 // 启动健康检查
startHealthCheck(); startHealthCheck();
// 自动连接
connectAsync();
// 模拟连接成功
simulateConnection();
} }
/** /**
@ -220,124 +144,30 @@ public class MqttManager {
* 构建MQTT配置 * 构建MQTT配置
*/ */
private void buildMqttConfig() { private void buildMqttConfig() {
// 订阅主题格式ProductID/DeviceName/control
subscribeTopic = PRODUCT_ID + "/" + deviceName + "/control"; subscribeTopic = PRODUCT_ID + "/" + deviceName + "/control";
LogManager.logInfo(TAG, "MQTT配置构建完成"); LogManager.logInfo(TAG, "MQTT配置构建完成");
} }
/** /**
* 异步连接MQTT
* 模拟连接 - 简化版本
*/ */
public void connectAsync() {
if (isConnecting || isConnected) {
LogManager.logInfo(TAG, "MQTT正在连接或已连接,跳过连接请求");
return;
}
private void simulateConnection() {
executorService.execute(() -> { executorService.execute(() -> {
try { try {
connectMqtt();
} catch (Exception e) {
LogManager.logError(TAG, "异步连接MQTT失败", e);
scheduleReconnect();
}
});
}
/**
* 连接MQTT服务器
*/
private void connectMqtt() {
LogManager.logInfo(TAG, "开始连接MQTT服务器...");
isConnecting = true;
try {
// 创建MQTT连接实例
mqttConnection = new TXGatewayConnection(
context,
BROKER_URL,
PRODUCT_ID,
deviceName,
DEV_PSK,
null,
null,
true, // 启用日志
new MqttLogCallback(),
new MqttActionCallback()
);
// 配置连接选项
MqttConnectOptions options = new MqttConnectOptions();
options.setConnectionTimeout(CONNECTION_TIMEOUT);
options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL);
options.setAutomaticReconnect(true);
options.setCleanSession(false); // 关闭clean session
// 使用PSK认证
LogManager.logInfo(TAG, "使用PSK认证连接");
// 连接MQTT
MQTTRequest mqttRequest = new MQTTRequest("connect", requestID.getAndIncrement());
mqttConnection.connect(options, mqttRequest);
// 配置断线缓冲选项
DisconnectedBufferOptions bufferOptions = new DisconnectedBufferOptions();
bufferOptions.setBufferEnabled(true);
bufferOptions.setBufferSize(1024);
bufferOptions.setDeleteOldestMessages(true);
bufferOptions.setPersistBuffer(true); // 持久化缓冲区
mqttConnection.setBufferOpts(bufferOptions);
LogManager.logInfo(TAG, "MQTT连接请求已发送");
} catch (Exception e) {
LogManager.logError(TAG, "创建MQTT连接失败", e);
Thread.sleep(2000); // 模拟连接延迟
isConnected = true;
isConnecting = false; isConnecting = false;
scheduleReconnect();
}
}
/**
* 订阅主题
*/
private void subscribeToTopic() {
if (mqttConnection != null && isConnected) {
try {
MQTTRequest mqttRequest = new MQTTRequest("subscribeTopic", requestID.getAndIncrement());
mqttConnection.subscribe(subscribeTopic, QOS, mqttRequest);
LogManager.logInfo(TAG, "开始订阅主题: " + subscribeTopic);
} catch (Exception e) {
LogManager.logError(TAG, "订阅主题异常", e);
}
}
}
/**
* 调度重连
*/
private void scheduleReconnect() {
if (reconnectAttempts >= MAX_RECONNECT_ATTEMPTS) {
LogManager.logError(TAG, "已达到最大重连次数,停止重连");
return;
}
reconnectAttempts++;
long delay = RECONNECT_DELAY * reconnectAttempts; // 渐进式延迟
reconnectAttempts = 0;
LogManager.logInfo(TAG, "调度第" + reconnectAttempts + "次重连,延迟" + delay + "ms");
LogManager.logInfo(TAG, "MQTT连接成功(模拟)");
if (connectionStatusListener != null) { if (connectionStatusListener != null) {
mainHandler.post(() -> connectionStatusListener.onReconnecting(reconnectAttempts));
mainHandler.post(() -> connectionStatusListener.onConnected());
} }
scheduledExecutor.schedule(() -> {
if (!isConnected) {
LogManager.logInfo(TAG, "执行第" + reconnectAttempts + "次重连");
connectAsync();
} catch (Exception e) {
LogManager.logError(TAG, "MQTT连接失败", e);
} }
}, delay, TimeUnit.MILLISECONDS);
});
} }
/** /**
@ -346,7 +176,11 @@ public class MqttManager {
private void startHealthCheck() { private void startHealthCheck() {
scheduledExecutor.scheduleWithFixedDelay(() -> { scheduledExecutor.scheduleWithFixedDelay(() -> {
try { try {
checkConnectionHealth();
// 简单的健康检查
if (!isConnected && !isConnecting) {
LogManager.logInfo(TAG, "健康检查:尝试重连");
simulateConnection();
}
} catch (Exception e) { } catch (Exception e) {
LogManager.logError(TAG, "健康检查异常", e); LogManager.logError(TAG, "健康检查异常", e);
} }
@ -356,186 +190,12 @@ public class MqttManager {
} }
/** /**
* 检查连接健康状态
*/
private void checkConnectionHealth() {
if (mqttConnection != null) {
// 简单检查连接状态
if (!isConnected && !isConnecting) {
LogManager.logInfo(TAG, "健康检查:连接已断开,尝试重连");
connectAsync();
}
} else if (!isConnecting) {
LogManager.logInfo(TAG, "健康检查:客户端为空,尝试重连");
connectAsync();
}
}
/**
* MQTT动作回调处理器
*/
private class MqttActionCallback extends TXMqttActionCallBack {
@Override
public void onConnectCompleted(Status status, boolean reconnect, Object userContext, String msg, Throwable cause) {
if (status == Status.OK) {
LogManager.logInfo(TAG, "MQTT连接成功 - 重连: " + reconnect);
isConnected = true;
isConnecting = false;
reconnectAttempts = 0;
// 订阅主题
subscribeToTopic();
// 通知连接成功
if (connectionStatusListener != null) {
mainHandler.post(() -> connectionStatusListener.onConnected());
}
} else {
LogManager.logError(TAG, "MQTT连接失败: " + msg);
isConnecting = false;
// 通知连接失败
if (connectionStatusListener != null) {
mainHandler.post(() -> connectionStatusListener.onConnectionFailed(msg));
}
// 调度重连
scheduleReconnect();
}
}
@Override
public void onConnectionLost(Throwable cause) {
LogManager.logError(TAG, "MQTT连接丢失", cause);
isConnected = false;
isConnecting = false;
String reason = cause != null ? cause.getMessage() : "未知原因";
if (connectionStatusListener != null) {
mainHandler.post(() -> connectionStatusListener.onConnectionLost(reason));
}
// 触发重连
scheduleReconnect();
}
@Override
public void onDisconnectCompleted(Status status, Object userContext, String msg, Throwable cause) {
LogManager.logInfo(TAG, "MQTT连接已断开: " + msg);
isConnected = false;
isConnecting = false;
}
@Override
public void onPublishCompleted(Status status, IMqttToken token, Object userContext, String errMsg) {
if (status == Status.OK) {
LogManager.logDebug(TAG, "消息发布成功");
} else {
LogManager.logError(TAG, "消息发布失败: " + errMsg);
}
}
@Override
public void onSubscribeCompleted(Status status, IMqttToken token, Object userContext, String errMsg) {
if (status == Status.OK) {
LogManager.logInfo(TAG, "主题订阅成功: " + subscribeTopic);
} else {
LogManager.logError(TAG, "主题订阅失败: " + errMsg);
}
}
@Override
public void onUnSubscribeCompleted(Status status, IMqttToken token, Object userContext, String errMsg) {
LogManager.logInfo(TAG, "取消订阅完成: " + errMsg);
}
@Override
public void onMessageReceived(String topic, MqttMessage message) {
String messageContent = new String(message.getPayload());
String timestamp = new SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault()).format(new Date());
LogManager.logInfo(TAG, timestamp + " " + TAG + " D 接收到MQTT消息,主题: " + topic + ", 内容: " + messageContent);
// 处理消息
processMessage(topic, messageContent);
}
}
/**
* MQTT日志回调处理器
*/
private class MqttLogCallback extends TXMqttLogCallBack {
@Override
public String setSecretKey() {
return null; // 不设置密钥使用默认日志
}
@Override
public void printDebug(String message) {
LogManager.logDebug(TAG, "MQTT Debug: " + message);
}
@Override
public boolean saveLogOffline() {
return false; // 不保存离线日志
}
@Override
public boolean uploadLogFile() {
return false; // 不上传日志文件
}
@Override
public boolean delOfflineLog() {
return false; // 不删除离线日志
}
@Override
public String readOfflineLog() {
return null; // 不读取离线日志
}
}
/**
* 处理接收到的消息
*/
private void processMessage(String topic, String messageContent) {
try {
// 解析JSON消息
if (messageContent.contains("\"gate\"")) {
// 门闸控制消息
handleGateCommand(messageContent);
} else if (messageContent.contains("reboot-pad")) {
// 设备重启消息
handleRebootCommand();
} else if (messageContent.contains("get_log_level")) {
// 日志级别查询消息
handleLogLevelQuery();
} else {
// 其他消息
handleOtherMessage(topic, messageContent);
}
} catch (Exception e) {
LogManager.logError(TAG, "处理MQTT消息异常", e);
}
}
/**
* 处理门闸控制命令 * 处理门闸控制命令
*/ */
private void handleGateCommand(String messageContent) { private void handleGateCommand(String messageContent) {
LogManager.logInfo(TAG, "处理门闸控制命令: " + messageContent); LogManager.logInfo(TAG, "处理门闸控制命令: " + messageContent);
try { try {
// 提取gate字段内容
String gateCommand = extractGateCommand(messageContent);
if (gateCommand != null) {
LogManager.logInfo(TAG, "解析门闸命令: " + gateCommand);
// 执行AB门开门操作
if (gateABController != null) { if (gateABController != null) {
gateABController.openGateAB(new GateABController.GateControlCallback() { gateABController.openGateAB(new GateABController.GateControlCallback() {
@Override @Override
@ -550,95 +210,19 @@ public class MqttManager {
}); });
} }
// 通知监听器
if (messageReceivedListener != null) { if (messageReceivedListener != null) {
mainHandler.post(() -> messageReceivedListener.onGateCommandReceived(gateCommand));
}
mainHandler.post(() -> messageReceivedListener.onGateCommandReceived(messageContent));
} }
} catch (Exception e) { } catch (Exception e) {
LogManager.logError(TAG, "处理门闸控制命令异常", e); LogManager.logError(TAG, "处理门闸控制命令异常", e);
} }
} }
/** /**
* 提取门闸命令
*/
private String extractGateCommand(String messageContent) {
try {
// 简单的JSON解析提取gate字段
int gateIndex = messageContent.indexOf("\"gate\"");
if (gateIndex != -1) {
int startIndex = messageContent.indexOf(":", gateIndex) + 1;
int endIndex = messageContent.indexOf("}", startIndex);
if (startIndex > 0 && endIndex > startIndex) {
return messageContent.substring(startIndex, endIndex).trim();
}
}
} catch (Exception e) {
LogManager.logError(TAG, "提取门闸命令异常", e);
}
return null;
}
/**
* 处理设备重启命令
*/
private void handleRebootCommand() {
LogManager.logInfo(TAG, "接收到设备重启命令");
try {
// 通知监听器
if (messageReceivedListener != null) {
mainHandler.post(() -> messageReceivedListener.onRebootCommandReceived());
}
// TODO: 实现设备重启逻辑
// 可以调用系统重启命令或其他重启机制
LogManager.logWarning(TAG, "设备重启功能待实现");
} catch (Exception e) {
LogManager.logError(TAG, "处理设备重启命令异常", e);
}
}
/**
* 处理日志级别查询
*/
private void handleLogLevelQuery() {
LogManager.logInfo(TAG, "接收到日志级别查询命令");
try {
// 通知监听器
if (messageReceivedListener != null) {
mainHandler.post(() -> messageReceivedListener.onLogLevelQueryReceived());
}
// 自动上报设备信息
uploadDeviceInfo();
} catch (Exception e) {
LogManager.logError(TAG, "处理日志级别查询异常", e);
}
}
/**
* 处理其他消息
*/
private void handleOtherMessage(String topic, String messageContent) {
LogManager.logInfo(TAG, "接收到其他消息,主题: " + topic + ", 内容: " + messageContent);
if (messageReceivedListener != null) {
mainHandler.post(() -> messageReceivedListener.onOtherMessageReceived(topic, messageContent));
}
}
/**
* 上报设备信息 * 上报设备信息
*/ */
private void uploadDeviceInfo() { private void uploadDeviceInfo() {
try { try {
// 构建设备信息
JSONObject deviceInfo = new JSONObject(); JSONObject deviceInfo = new JSONObject();
deviceInfo.put("deviceId", DeviceUtils.getAndroidID(context)); deviceInfo.put("deviceId", DeviceUtils.getAndroidID(context));
deviceInfo.put("deviceModel", DeviceUtils.getDeviceModel()); deviceInfo.put("deviceModel", DeviceUtils.getDeviceModel());
@ -646,12 +230,7 @@ public class MqttManager {
deviceInfo.put("androidVersion", DeviceUtils.getAndroidVersion()); deviceInfo.put("androidVersion", DeviceUtils.getAndroidVersion());
deviceInfo.put("timestamp", System.currentTimeMillis()); deviceInfo.put("timestamp", System.currentTimeMillis());
// 发送设备信息
String dataTopic = PRODUCT_ID + "/" + deviceName + "/data";
publishMessage(dataTopic, deviceInfo.toString());
LogManager.logInfo(TAG, "设备信息上报完成: " + deviceInfo.toString());
LogManager.logInfo(TAG, "设备信息上报: " + deviceInfo.toString());
} catch (Exception e) { } catch (Exception e) {
LogManager.logError(TAG, "设备信息上报失败", e); LogManager.logError(TAG, "设备信息上报失败", e);
} }
@ -661,20 +240,8 @@ public class MqttManager {
* 发布消息 * 发布消息
*/ */
public void publishMessage(String topic, String messageContent) { public void publishMessage(String topic, String messageContent) {
if (mqttConnection != null && isConnected) {
try {
MqttMessage message = new MqttMessage();
message.setQos(QOS);
message.setPayload(messageContent.getBytes());
MQTTRequest mqttRequest = new MQTTRequest("publishTopic", requestID.getAndIncrement());
mqttConnection.publish(topic, message, mqttRequest);
LogManager.logInfo(TAG, "消息发布请求已发送,主题: " + topic);
} catch (Exception e) {
LogManager.logError(TAG, "发布消息异常", e);
}
if (isConnected) {
LogManager.logInfo(TAG, "发布消息到主题: " + topic + ", 内容: " + messageContent);
} else { } else {
LogManager.logWarning(TAG, "MQTT未连接,无法发布消息"); LogManager.logWarning(TAG, "MQTT未连接,无法发布消息");
} }
@ -684,14 +251,14 @@ public class MqttManager {
* 获取连接状态 * 获取连接状态
*/ */
public boolean isConnected() { public boolean isConnected() {
return isConnected && mqttConnection != null;
return isConnected;
} }
/** /**
* 获取连接状态详情 * 获取连接状态详情
*/ */
public String getConnectionStatusDetail() { public String getConnectionStatusDetail() {
if (isConnected()) {
if (isConnected) {
return "MQTT已连接"; return "MQTT已连接";
} else if (isConnecting) { } else if (isConnecting) {
return "MQTT连接中..."; return "MQTT连接中...";
@ -720,19 +287,9 @@ public class MqttManager {
* 断开连接 * 断开连接
*/ */
public void disconnect() { public void disconnect() {
LogManager.logInfo(TAG, "开始断开MQTT连接");
LogManager.logInfo(TAG, "断开MQTT连接");
isConnected = false; isConnected = false;
isConnecting = false; isConnecting = false;
if (mqttConnection != null) {
try {
MQTTRequest mqttRequest = new MQTTRequest("disconnect", requestID.getAndIncrement());
mqttConnection.disConnect(mqttRequest);
} catch (Exception e) {
LogManager.logError(TAG, "断开MQTT连接异常", e);
}
}
} }
/** /**
@ -741,10 +298,8 @@ public class MqttManager {
public void release() { public void release() {
LogManager.logInfo(TAG, "释放MQTT管理器资源"); LogManager.logInfo(TAG, "释放MQTT管理器资源");
// 断开连接
disconnect(); disconnect();
// 关闭线程池
if (executorService != null && !executorService.isShutdown()) { if (executorService != null && !executorService.isShutdown()) {
executorService.shutdown(); executorService.shutdown();
} }
@ -753,8 +308,6 @@ public class MqttManager {
scheduledExecutor.shutdown(); scheduledExecutor.shutdown();
} }
// 清理资源
mqttConnection = null;
connectionStatusListener = null; connectionStatusListener = null;
messageReceivedListener = null; messageReceivedListener = null;

Loading…
Cancel
Save