Browse Source

fix 6

devab
赵明涛 4 weeks ago
parent
commit
506d2ffae8
  1. 4
      app/build.gradle
  2. 447
      app/src/main/java/com/ouxuan/oxface/device/MqttManager.java

4
app/build.gradle

@ -101,6 +101,10 @@ dependencies {
// IoT Hub SDK
implementation 'com.tencent.iot.hub:hub-device-android:3.3.23'
// MQTT客户端库
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
testImplementation 'junit:junit:4.13.2'
androidTestImplementation 'androidx.test.ext:junit:1.1.5'

447
app/src/main/java/com/ouxuan/oxface/device/MqttManager.java

@ -5,26 +5,32 @@ import android.os.Handler;
import android.os.Looper;
import com.ouxuan.oxface.utils.LogManager;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.json.JSONObject;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 腾讯云MQTT管理器 - 简化版本
* 腾讯云MQTT管理器 - 真实实现版本
* 负责MQTT连接管理自动重连消息订阅和发布
*
* 功能特性
* 1. 初始化后自动连接
* 2. 网络断线自动重连机制
* 3. 消息订阅和处理
* 4. 连接状态监控和上报
* 5. 门闸控制和设备重启命令处理
*
* @author AI Assistant
* @version 2.1
* @version 3.0
* @date 2024/09/16
*/
public class MqttManager {
@ -39,6 +45,12 @@ public class MqttManager {
private static final String DEV_PSK = "7udrYcfTVThbzdMlLT9fHQ==";
private static final String PRODUCT_KEY = "qr3rximCZnT6ZU0NsAAiTC7O";
// MQTT连接配置
private static final String BROKER_URL = "ssl://iotcloud-mqtt.gz.tencentcloudapi.com:8883";
private static final int KEEP_ALIVE_INTERVAL = 120;
private static final int CONNECTION_TIMEOUT = 10;
private static final int QOS = 1;
// 重连配置
private static final int MAX_RECONNECT_ATTEMPTS = 5;
private static final long RECONNECT_DELAY = 5000;
@ -46,8 +58,10 @@ public class MqttManager {
// 上下文和状态管理
private Context context;
private MqttAndroidClient mqttClient;
private String deviceName;
private String subscribeTopic;
private String clientId;
// 连接状态管理
private boolean isConnected = false;
@ -114,30 +128,35 @@ public class MqttManager {
this.context = context.getApplicationContext();
this.gateABController = GateABController.getInstance();
// 生成设备名称
generateDeviceName();
// 生成设备名称和客户端ID
generateDeviceInfo();
// 构建MQTT配置
buildMqttConfig();
// 创建MQTT客户端
createMqttClient();
LogManager.logInfo(TAG, "MQTT管理器初始化完成");
LogManager.logInfo(TAG, "设备名称: " + deviceName);
LogManager.logInfo(TAG, "客户端ID: " + clientId);
LogManager.logInfo(TAG, "订阅主题: " + subscribeTopic);
// 启动健康检查
startHealthCheck();
// 模拟连接成功
simulateConnection();
// 自动连接
connectAsync();
}
/**
* 生成设备名称
* 生成设备信息
*/
private void generateDeviceName() {
private void generateDeviceInfo() {
String androidId = DeviceUtils.getAndroidID(context);
deviceName = "PadV6" + androidId;
LogManager.logInfo(TAG, "生成设备名称: " + deviceName);
clientId = PRODUCT_ID + deviceName;
LogManager.logInfo(TAG, "生成设备信息 - 设备名: " + deviceName + ", 客户端ID: " + clientId);
}
/**
@ -149,38 +168,177 @@ public class MqttManager {
}
/**
* 模拟连接 - 简化版本
* 创建MQTT客户端
*/
private void createMqttClient() {
try {
mqttClient = new MqttAndroidClient(context, BROKER_URL, clientId);
mqttClient.setCallback(new MqttCallbackHandler());
LogManager.logInfo(TAG, "MQTT客户端创建成功");
} catch (Exception e) {
LogManager.logError(TAG, "创建MQTT客户端失败", e);
}
}
/**
* 异步连接MQTT
*/
private void simulateConnection() {
public void connectAsync() {
if (isConnecting || isConnected) {
LogManager.logInfo(TAG, "MQTT正在连接或已连接,跳过连接请求");
return;
}
executorService.execute(() -> {
try {
Thread.sleep(2000); // 模拟连接延迟
isConnected = true;
isConnecting = false;
reconnectAttempts = 0;
LogManager.logInfo(TAG, "MQTT连接成功(模拟)");
if (connectionStatusListener != null) {
mainHandler.post(() -> connectionStatusListener.onConnected());
}
connectMqtt();
} catch (Exception e) {
LogManager.logError(TAG, "MQTT连接失败", e);
LogManager.logError(TAG, "异步连接MQTT失败", e);
scheduleReconnect();
}
});
}
/**
* 连接MQTT服务器
*/
private void connectMqtt() {
LogManager.logInfo(TAG, "开始连接MQTT服务器...");
isConnecting = true;
try {
MqttConnectOptions options = new MqttConnectOptions();
options.setConnectionTimeout(CONNECTION_TIMEOUT);
options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL);
options.setAutomaticReconnect(false); // 手动控制重连
options.setCleanSession(false);
// 设置用户名和密码腾讯云IoT认证
options.setUserName(deviceName);
options.setPassword(DEV_PSK.toCharArray());
// SSL配置
options.setSocketFactory(mqttClient.getSSLSocketFactory(null, null));
LogManager.logInfo(TAG, "使用PSK认证连接腾讯云IoT平台");
// 连接MQTT
IMqttToken token = mqttClient.connect(options, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
LogManager.logInfo(TAG, "MQTT连接成功");
isConnected = true;
isConnecting = false;
reconnectAttempts = 0;
// 订阅主题
subscribeToTopic();
// 配置断线缓冲
configureDisconnectedBuffer();
// 通知连接成功
if (connectionStatusListener != null) {
mainHandler.post(() -> connectionStatusListener.onConnected());
}
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
LogManager.logError(TAG, "MQTT连接失败", exception);
isConnecting = false;
// 通知连接失败
if (connectionStatusListener != null) {
String reason = exception != null ? exception.getMessage() : "未知错误";
mainHandler.post(() -> connectionStatusListener.onConnectionFailed(reason));
}
// 调度重连
scheduleReconnect();
}
});
} catch (MqttException e) {
LogManager.logError(TAG, "创建MQTT连接失败", e);
isConnecting = false;
scheduleReconnect();
}
}
/**
* 订阅主题
*/
private void subscribeToTopic() {
if (mqttClient != null && isConnected) {
try {
mqttClient.subscribe(subscribeTopic, QOS, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
LogManager.logInfo(TAG, "主题订阅成功: " + subscribeTopic);
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
LogManager.logError(TAG, "主题订阅失败", exception);
}
});
} catch (MqttException e) {
LogManager.logError(TAG, "订阅主题异常", e);
}
}
}
/**
* 配置断线缓冲
*/
private void configureDisconnectedBuffer() {
try {
DisconnectedBufferOptions bufferOptions = new DisconnectedBufferOptions();
bufferOptions.setBufferEnabled(true);
bufferOptions.setBufferSize(1024);
bufferOptions.setDeleteOldestMessages(true);
bufferOptions.setPersistBuffer(true);
mqttClient.setBufferOpts(bufferOptions);
LogManager.logInfo(TAG, "断线缓冲配置完成");
} catch (Exception e) {
LogManager.logError(TAG, "配置断线缓冲失败", e);
}
}
/**
* 调度重连
*/
private void scheduleReconnect() {
if (reconnectAttempts >= MAX_RECONNECT_ATTEMPTS) {
LogManager.logError(TAG, "已达到最大重连次数,停止重连");
return;
}
reconnectAttempts++;
long delay = RECONNECT_DELAY * reconnectAttempts;
LogManager.logInfo(TAG, "调度第" + reconnectAttempts + "次重连,延迟" + delay + "ms");
if (connectionStatusListener != null) {
mainHandler.post(() -> connectionStatusListener.onReconnecting(reconnectAttempts));
}
scheduledExecutor.schedule(() -> {
if (!isConnected) {
LogManager.logInfo(TAG, "执行第" + reconnectAttempts + "次重连");
connectAsync();
}
}, delay, TimeUnit.MILLISECONDS);
}
/**
* 启动健康检查
*/
private void startHealthCheck() {
scheduledExecutor.scheduleWithFixedDelay(() -> {
try {
// 简单的健康检查
if (!isConnected && !isConnecting) {
LogManager.logInfo(TAG, "健康检查:尝试重连");
simulateConnection();
}
checkConnectionHealth();
} catch (Exception e) {
LogManager.logError(TAG, "健康检查异常", e);
}
@ -190,31 +348,176 @@ public class MqttManager {
}
/**
* 检查连接健康状态
*/
private void checkConnectionHealth() {
if (mqttClient != null) {
boolean actuallyConnected = mqttClient.isConnected();
if (isConnected != actuallyConnected) {
LogManager.logWarning(TAG, "连接状态不一致,更新状态");
isConnected = actuallyConnected;
}
if (!isConnected && !isConnecting) {
LogManager.logInfo(TAG, "健康检查:连接断开,尝试重连");
connectAsync();
}
} else if (!isConnecting) {
LogManager.logInfo(TAG, "健康检查:客户端为空,重新创建");
createMqttClient();
connectAsync();
}
}
/**
* MQTT回调处理器
*/
private class MqttCallbackHandler implements MqttCallback {
@Override
public void connectionLost(Throwable cause) {
LogManager.logError(TAG, "MQTT连接丢失", cause);
isConnected = false;
isConnecting = false;
String reason = cause != null ? cause.getMessage() : "未知原因";
if (connectionStatusListener != null) {
mainHandler.post(() -> connectionStatusListener.onConnectionLost(reason));
}
// 触发重连
scheduleReconnect();
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String messageContent = new String(message.getPayload());
String timestamp = new SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault()).format(new Date());
LogManager.logInfo(TAG, timestamp + " " + TAG + " D 接收到MQTT消息,主题: " + topic + ", 内容: " + messageContent);
// 处理消息
processMessage(topic, messageContent);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
LogManager.logDebug(TAG, "消息发送完成");
}
}
/**
* 处理接收到的消息
*/
private void processMessage(String topic, String messageContent) {
try {
if (messageContent.contains("\"gate\"")) {
handleGateCommand(messageContent);
} else if (messageContent.contains("reboot-pad")) {
handleRebootCommand();
} else if (messageContent.contains("get_log_level")) {
handleLogLevelQuery();
} else {
handleOtherMessage(topic, messageContent);
}
} catch (Exception e) {
LogManager.logError(TAG, "处理MQTT消息异常", e);
}
}
/**
* 处理门闸控制命令
*/
private void handleGateCommand(String messageContent) {
LogManager.logInfo(TAG, "处理门闸控制命令: " + messageContent);
try {
if (gateABController != null) {
gateABController.openGateAB(new GateABController.GateControlCallback() {
@Override
public void onSuccess(String message) {
LogManager.logInfo(TAG, "MQTT门闸控制成功: " + message);
}
@Override
public void onError(String errorMessage) {
LogManager.logError(TAG, "MQTT门闸控制失败: " + errorMessage);
}
});
String gateCommand = extractGateCommand(messageContent);
if (gateCommand != null) {
LogManager.logInfo(TAG, "解析门闸命令: " + gateCommand);
if (gateABController != null) {
gateABController.openGateAB(new GateABController.GateControlCallback() {
@Override
public void onSuccess(String message) {
LogManager.logInfo(TAG, "MQTT门闸控制成功: " + message);
}
@Override
public void onError(String errorMessage) {
LogManager.logError(TAG, "MQTT门闸控制失败: " + errorMessage);
}
});
}
if (messageReceivedListener != null) {
mainHandler.post(() -> messageReceivedListener.onGateCommandReceived(gateCommand));
}
}
} catch (Exception e) {
LogManager.logError(TAG, "处理门闸控制命令异常", e);
}
}
/**
* 提取门闸命令
*/
private String extractGateCommand(String messageContent) {
try {
int gateIndex = messageContent.indexOf("\"gate\"");
if (gateIndex != -1) {
int startIndex = messageContent.indexOf(":", gateIndex) + 1;
int endIndex = messageContent.indexOf("}", startIndex);
if (startIndex > 0 && endIndex > startIndex) {
return messageContent.substring(startIndex, endIndex).trim();
}
}
} catch (Exception e) {
LogManager.logError(TAG, "提取门闸命令异常", e);
}
return null;
}
/**
* 处理设备重启命令
*/
private void handleRebootCommand() {
LogManager.logInfo(TAG, "接收到设备重启命令");
try {
if (messageReceivedListener != null) {
mainHandler.post(() -> messageReceivedListener.onGateCommandReceived(messageContent));
mainHandler.post(() -> messageReceivedListener.onRebootCommandReceived());
}
LogManager.logWarning(TAG, "设备重启功能待实现");
} catch (Exception e) {
LogManager.logError(TAG, "处理门闸控制命令异常", e);
LogManager.logError(TAG, "处理设备重启命令异常", e);
}
}
/**
* 处理日志级别查询
*/
private void handleLogLevelQuery() {
LogManager.logInfo(TAG, "接收到日志级别查询命令");
try {
if (messageReceivedListener != null) {
mainHandler.post(() -> messageReceivedListener.onLogLevelQueryReceived());
}
uploadDeviceInfo();
} catch (Exception e) {
LogManager.logError(TAG, "处理日志级别查询异常", e);
}
}
/**
* 处理其他消息
*/
private void handleOtherMessage(String topic, String messageContent) {
LogManager.logInfo(TAG, "接收到其他消息,主题: " + topic + ", 内容: " + messageContent);
if (messageReceivedListener != null) {
mainHandler.post(() -> messageReceivedListener.onOtherMessageReceived(topic, messageContent));
}
}
@ -230,7 +533,10 @@ public class MqttManager {
deviceInfo.put("androidVersion", DeviceUtils.getAndroidVersion());
deviceInfo.put("timestamp", System.currentTimeMillis());
LogManager.logInfo(TAG, "设备信息上报: " + deviceInfo.toString());
String dataTopic = PRODUCT_ID + "/" + deviceName + "/data";
publishMessage(dataTopic, deviceInfo.toString());
LogManager.logInfo(TAG, "设备信息上报完成: " + deviceInfo.toString());
} catch (Exception e) {
LogManager.logError(TAG, "设备信息上报失败", e);
}
@ -240,37 +546,34 @@ public class MqttManager {
* 发布消息
*/
public void publishMessage(String topic, String messageContent) {
if (isConnected) {
LogManager.logInfo(TAG, "发布消息到主题: " + topic + ", 内容: " + messageContent);
if (mqttClient != null && isConnected) {
try {
MqttMessage message = new MqttMessage();
message.setQos(QOS);
message.setPayload(messageContent.getBytes());
mqttClient.publish(topic, message);
LogManager.logInfo(TAG, "消息发布成功,主题: " + topic);
} catch (MqttException e) {
LogManager.logError(TAG, "发布消息异常", e);
}
} else {
LogManager.logWarning(TAG, "MQTT未连接,无法发布消息");
}
}
/**
* 异步连接MQTT - 对外公开接口
*/
public void connectAsync() {
LogManager.logInfo(TAG, "手动触发MQTT连接");
if (!isConnected && !isConnecting) {
simulateConnection();
} else {
LogManager.logInfo(TAG, "MQTT已连接或正在连接中");
}
}
/**
* 获取连接状态
*/
public boolean isConnected() {
return isConnected;
return isConnected && mqttClient != null && mqttClient.isConnected();
}
/**
* 获取连接状态详情
*/
public String getConnectionStatusDetail() {
if (isConnected) {
if (isConnected()) {
return "MQTT已连接";
} else if (isConnecting) {
return "MQTT连接中...";
@ -299,9 +602,18 @@ public class MqttManager {
* 断开连接
*/
public void disconnect() {
LogManager.logInfo(TAG, "断开MQTT连接");
LogManager.logInfo(TAG, "开始断开MQTT连接");
isConnected = false;
isConnecting = false;
if (mqttClient != null) {
try {
mqttClient.disconnect();
} catch (MqttException e) {
LogManager.logError(TAG, "断开MQTT连接异常", e);
}
}
}
/**
@ -320,6 +632,7 @@ public class MqttManager {
scheduledExecutor.shutdown();
}
mqttClient = null;
connectionStatusListener = null;
messageReceivedListener = null;

Loading…
Cancel
Save