Browse Source

make change mqtt

devab
赵明涛 4 weeks ago
parent
commit
f7ff21c08e
  1. 275
      app/src/main/java/com/ouxuan/oxface/device/MqttManager.java

275
app/src/main/java/com/ouxuan/oxface/device/MqttManager.java

@ -3,17 +3,17 @@ package com.ouxuan.oxface.device;
import android.content.Context; import android.content.Context;
import android.os.Handler; import android.os.Handler;
import android.os.Looper; import android.os.Looper;
import android.util.Log;
import com.ouxuan.oxface.utils.LogManager; import com.ouxuan.oxface.utils.LogManager;
import com.tencent.iot.hub.device.android.core.gateway.TXGatewayConnection;
import com.tencent.iot.hub.device.java.core.common.Status;
import com.tencent.iot.hub.device.java.core.log.TXMqttLogCallBack;
import com.tencent.iot.hub.device.java.core.mqtt.TXMqttActionCallBack;
import org.eclipse.paho.android.service.MqttAndroidClient;
import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions; import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken; import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.json.JSONObject; import org.json.JSONObject;
@ -24,13 +24,21 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* 腾讯云MQTT管理器 - 真实实现版本
* 腾讯云MQTT管理器 - 基于TXGatewayConnection实现
* 负责MQTT连接管理自动重连消息订阅和发布 * 负责MQTT连接管理自动重连消息订阅和发布
* *
* 功能特性
* 1. 初始化后自动连接
* 2. 网络断线自动重连机制
* 3. 消息订阅和处理
* 4. 连接状态监控和上报
* 5. 门闸控制和设备重启命令处理
*
* @author AI Assistant * @author AI Assistant
* @version 3.0
* @version 4.0
* @date 2024/09/16 * @date 2024/09/16
*/ */
public class MqttManager { public class MqttManager {
@ -46,7 +54,7 @@ public class MqttManager {
private static final String PRODUCT_KEY = "qr3rximCZnT6ZU0NsAAiTC7O"; private static final String PRODUCT_KEY = "qr3rximCZnT6ZU0NsAAiTC7O";
// MQTT连接配置 // MQTT连接配置
private static final String BROKER_URL = "ssl://iotcloud-mqtt.gz.tencentcloudapi.com:8883";
private static final String BROKER_URL = null; // 传入null使用腾讯云IoT默认地址
private static final int KEEP_ALIVE_INTERVAL = 120; private static final int KEEP_ALIVE_INTERVAL = 120;
private static final int CONNECTION_TIMEOUT = 10; private static final int CONNECTION_TIMEOUT = 10;
private static final int QOS = 1; private static final int QOS = 1;
@ -58,10 +66,10 @@ public class MqttManager {
// 上下文和状态管理 // 上下文和状态管理
private Context context; private Context context;
private MqttAndroidClient mqttClient;
private TXGatewayConnection mqttConnection;
private String deviceName; private String deviceName;
private String subscribeTopic; private String subscribeTopic;
private String clientId;
private String path2Store;
// 连接状态管理 // 连接状态管理
private boolean isConnected = false; private boolean isConnected = false;
@ -71,6 +79,9 @@ public class MqttManager {
private ExecutorService executorService; private ExecutorService executorService;
private ScheduledExecutorService scheduledExecutor; private ScheduledExecutorService scheduledExecutor;
// 请求ID生成器
private static AtomicInteger requestID = new AtomicInteger(0);
// 监听器接口 // 监听器接口
private ConnectionStatusListener connectionStatusListener; private ConnectionStatusListener connectionStatusListener;
private MessageReceivedListener messageReceivedListener; private MessageReceivedListener messageReceivedListener;
@ -99,6 +110,24 @@ public class MqttManager {
} }
/** /**
* MQTT请求上下文
*/
private static class MQTTRequest {
public String requestType;
public int requestId;
public MQTTRequest(String requestType, int requestId) {
this.requestType = requestType;
this.requestId = requestId;
}
@Override
public String toString() {
return "MQTTRequest{requestType='" + requestType + "', requestId=" + requestId + "}";
}
}
/**
* 获取单例实例 * 获取单例实例
*/ */
public static MqttManager getInstance() { public static MqttManager getInstance() {
@ -127,19 +156,16 @@ public class MqttManager {
public void initialize(Context context) { public void initialize(Context context) {
this.context = context.getApplicationContext(); this.context = context.getApplicationContext();
this.gateABController = GateABController.getInstance(); this.gateABController = GateABController.getInstance();
this.path2Store = context.getCacheDir().getAbsolutePath();
// 生成设备名称和客户端ID
generateDeviceInfo();
// 生成设备名称
generateDeviceName();
// 构建MQTT配置 // 构建MQTT配置
buildMqttConfig(); buildMqttConfig();
// 创建MQTT客户端
createMqttClient();
LogManager.logInfo(TAG, "MQTT管理器初始化完成"); LogManager.logInfo(TAG, "MQTT管理器初始化完成");
LogManager.logInfo(TAG, "设备名称: " + deviceName); LogManager.logInfo(TAG, "设备名称: " + deviceName);
LogManager.logInfo(TAG, "客户端ID: " + clientId);
LogManager.logInfo(TAG, "订阅主题: " + subscribeTopic); LogManager.logInfo(TAG, "订阅主题: " + subscribeTopic);
// 启动健康检查 // 启动健康检查
@ -150,13 +176,12 @@ public class MqttManager {
} }
/** /**
* 生成设备信息
* 生成设备名称
*/ */
private void generateDeviceInfo() {
private void generateDeviceName() {
String androidId = DeviceUtils.getAndroidID(context); String androidId = DeviceUtils.getAndroidID(context);
deviceName = "PadV6" + androidId; deviceName = "PadV6" + androidId;
clientId = PRODUCT_ID + deviceName;
LogManager.logInfo(TAG, "生成设备信息 - 设备名: " + deviceName + ", 客户端ID: " + clientId);
LogManager.logInfo(TAG, "生成设备名称: " + deviceName);
} }
/** /**
@ -168,19 +193,6 @@ public class MqttManager {
} }
/** /**
* 创建MQTT客户端
*/
private void createMqttClient() {
try {
mqttClient = new MqttAndroidClient(context, BROKER_URL, clientId);
mqttClient.setCallback(new MqttCallbackHandler());
LogManager.logInfo(TAG, "MQTT客户端创建成功");
} catch (Exception e) {
LogManager.logError(TAG, "创建MQTT客户端失败", e);
}
}
/**
* 异步连接MQTT * 异步连接MQTT
*/ */
public void connectAsync() { public void connectAsync() {
@ -207,59 +219,36 @@ public class MqttManager {
isConnecting = true; isConnecting = true;
try { try {
// 创建TXGatewayConnection实例
mqttConnection = new TXGatewayConnection(
context,
BROKER_URL,
PRODUCT_ID,
deviceName,
DEV_PSK,
null, // devCert
null, // devPriv
false, // mqttLogFlag
null, // logCallBack
new MqttActionCallback()
);
// 配置连接选项
MqttConnectOptions options = new MqttConnectOptions(); MqttConnectOptions options = new MqttConnectOptions();
options.setConnectionTimeout(CONNECTION_TIMEOUT); options.setConnectionTimeout(CONNECTION_TIMEOUT);
options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL); options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL);
options.setAutomaticReconnect(false); // 手动控制重连 options.setAutomaticReconnect(false); // 手动控制重连
options.setCleanSession(false); options.setCleanSession(false);
// 设置用户名和密码腾讯云IoT认证
options.setUserName(deviceName);
options.setPassword(DEV_PSK.toCharArray());
// SSL配置
options.setSocketFactory(mqttClient.getSSLSocketFactory(null, null));
LogManager.logInfo(TAG, "使用PSK认证连接腾讯云IoT平台"); LogManager.logInfo(TAG, "使用PSK认证连接腾讯云IoT平台");
// 连接MQTT // 连接MQTT
IMqttToken token = mqttClient.connect(options, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
LogManager.logInfo(TAG, "MQTT连接成功");
isConnected = true;
isConnecting = false;
reconnectAttempts = 0;
// 订阅主题
subscribeToTopic();
MQTTRequest mqttRequest = new MQTTRequest("connect", requestID.getAndIncrement());
mqttConnection.connect(options, mqttRequest);
// 配置断线缓冲
configureDisconnectedBuffer();
// 通知连接成功
if (connectionStatusListener != null) {
mainHandler.post(() -> connectionStatusListener.onConnected());
}
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
LogManager.logError(TAG, "MQTT连接失败", exception);
isConnecting = false;
// 通知连接失败
if (connectionStatusListener != null) {
String reason = exception != null ? exception.getMessage() : "未知错误";
mainHandler.post(() -> connectionStatusListener.onConnectionFailed(reason));
}
LogManager.logInfo(TAG, "MQTT连接请求已发送");
// 调度重连
scheduleReconnect();
}
});
} catch (MqttException e) {
} catch (Exception e) {
LogManager.logError(TAG, "创建MQTT连接失败", e); LogManager.logError(TAG, "创建MQTT连接失败", e);
isConnecting = false; isConnecting = false;
scheduleReconnect(); scheduleReconnect();
@ -270,20 +259,12 @@ public class MqttManager {
* 订阅主题 * 订阅主题
*/ */
private void subscribeToTopic() { private void subscribeToTopic() {
if (mqttClient != null && isConnected) {
if (mqttConnection != null && isConnected) {
try { try {
mqttClient.subscribe(subscribeTopic, QOS, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
LogManager.logInfo(TAG, "主题订阅成功: " + subscribeTopic);
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
LogManager.logError(TAG, "主题订阅失败", exception);
}
});
} catch (MqttException e) {
MQTTRequest mqttRequest = new MQTTRequest("subscribe", requestID.getAndIncrement());
mqttConnection.subscribe(subscribeTopic, QOS, mqttRequest);
LogManager.logInfo(TAG, "开始订阅主题: " + subscribeTopic);
} catch (Exception e) {
LogManager.logError(TAG, "订阅主题异常", e); LogManager.logError(TAG, "订阅主题异常", e);
} }
} }
@ -299,7 +280,7 @@ public class MqttManager {
bufferOptions.setBufferSize(1024); bufferOptions.setBufferSize(1024);
bufferOptions.setDeleteOldestMessages(true); bufferOptions.setDeleteOldestMessages(true);
bufferOptions.setPersistBuffer(true); bufferOptions.setPersistBuffer(true);
mqttClient.setBufferOpts(bufferOptions);
mqttConnection.setBufferOpts(bufferOptions);
LogManager.logInfo(TAG, "断线缓冲配置完成"); LogManager.logInfo(TAG, "断线缓冲配置完成");
} catch (Exception e) { } catch (Exception e) {
LogManager.logError(TAG, "配置断线缓冲失败", e); LogManager.logError(TAG, "配置断线缓冲失败", e);
@ -351,31 +332,65 @@ public class MqttManager {
* 检查连接健康状态 * 检查连接健康状态
*/ */
private void checkConnectionHealth() { private void checkConnectionHealth() {
if (mqttClient != null) {
boolean actuallyConnected = mqttClient.isConnected();
if (isConnected != actuallyConnected) {
LogManager.logWarning(TAG, "连接状态不一致,更新状态");
isConnected = actuallyConnected;
}
if (mqttConnection != null) {
// 检查连接状态
if (!isConnected && !isConnecting) { if (!isConnected && !isConnecting) {
LogManager.logInfo(TAG, "健康检查:连接断开,尝试重连");
LogManager.logInfo(TAG, "健康检查:连接已断开,尝试重连");
connectAsync(); connectAsync();
} }
} else if (!isConnecting) { } else if (!isConnecting) {
LogManager.logInfo(TAG, "健康检查:客户端为空,重新创建");
createMqttClient();
LogManager.logInfo(TAG, "健康检查:客户端为空,尝试重连");
connectAsync(); connectAsync();
} }
} }
/** /**
* MQTT回调处理器
* MQTT动作回调处理器
*/ */
private class MqttCallbackHandler implements MqttCallback {
private class MqttActionCallback extends TXMqttActionCallBack {
@Override @Override
public void connectionLost(Throwable cause) {
public void onConnectCompleted(Status status, boolean reconnect, Object userContext, String msg, Throwable cause) {
String userContextInfo = "";
if (userContext instanceof MQTTRequest) {
userContextInfo = userContext.toString();
}
String logInfo = String.format("onConnectCompleted, status[%s], reconnect[%b], userContext[%s], msg[%s]",
status.name(), reconnect, userContextInfo, msg);
LogManager.logInfo(TAG, "连接回调:" + logInfo);
if (status.equals(Status.OK)) {
LogManager.logInfo(TAG, "MQTT连接成功");
isConnected = true;
isConnecting = false;
reconnectAttempts = 0;
// 订阅主题
subscribeToTopic();
// 配置断线缓冲
configureDisconnectedBuffer();
// 通知连接成功
if (connectionStatusListener != null) {
mainHandler.post(() -> connectionStatusListener.onConnected());
}
} else {
LogManager.logError(TAG, "MQTT连接失败: " + msg);
isConnecting = false;
// 通知连接失败
if (connectionStatusListener != null) {
mainHandler.post(() -> connectionStatusListener.onConnectionFailed(msg));
}
// 调度重连
scheduleReconnect();
}
}
@Override
public void onConnectionLost(Throwable cause) {
LogManager.logError(TAG, "MQTT连接丢失", cause); LogManager.logError(TAG, "MQTT连接丢失", cause);
isConnected = false; isConnected = false;
isConnecting = false; isConnecting = false;
@ -390,7 +405,37 @@ public class MqttManager {
} }
@Override @Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
public void onDisconnectCompleted(Status status, Object userContext, String msg, Throwable cause) {
LogManager.logInfo(TAG, "MQTT连接已断开: " + msg);
isConnected = false;
isConnecting = false;
}
@Override
public void onPublishCompleted(Status status, IMqttToken token, Object userContext, String errMsg) {
if (status == Status.OK) {
LogManager.logDebug(TAG, "消息发布成功");
} else {
LogManager.logError(TAG, "消息发布失败: " + errMsg);
}
}
@Override
public void onSubscribeCompleted(Status status, IMqttToken token, Object userContext, String errMsg) {
if (status == Status.OK) {
LogManager.logInfo(TAG, "主题订阅成功: " + subscribeTopic);
} else {
LogManager.logError(TAG, "主题订阅失败: " + errMsg);
}
}
@Override
public void onUnSubscribeCompleted(Status status, IMqttToken token, Object userContext, String errMsg) {
LogManager.logInfo(TAG, "取消订阅完成: " + errMsg);
}
@Override
public void onMessageReceived(String topic, MqttMessage message) {
String messageContent = new String(message.getPayload()); String messageContent = new String(message.getPayload());
String timestamp = new SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault()).format(new Date()); String timestamp = new SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault()).format(new Date());
@ -399,11 +444,6 @@ public class MqttManager {
// 处理消息 // 处理消息
processMessage(topic, messageContent); processMessage(topic, messageContent);
} }
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
LogManager.logDebug(TAG, "消息发送完成");
}
} }
/** /**
@ -546,15 +586,17 @@ public class MqttManager {
* 发布消息 * 发布消息
*/ */
public void publishMessage(String topic, String messageContent) { public void publishMessage(String topic, String messageContent) {
if (mqttClient != null && isConnected) {
if (mqttConnection != null && isConnected) {
try { try {
MqttMessage message = new MqttMessage(); MqttMessage message = new MqttMessage();
message.setQos(QOS); message.setQos(QOS);
message.setPayload(messageContent.getBytes()); message.setPayload(messageContent.getBytes());
mqttClient.publish(topic, message);
MQTTRequest mqttRequest = new MQTTRequest("publish", requestID.getAndIncrement());
mqttConnection.publish(topic, message, mqttRequest);
LogManager.logInfo(TAG, "消息发布成功,主题: " + topic); LogManager.logInfo(TAG, "消息发布成功,主题: " + topic);
} catch (MqttException e) {
} catch (Exception e) {
LogManager.logError(TAG, "发布消息异常", e); LogManager.logError(TAG, "发布消息异常", e);
} }
} else { } else {
@ -566,7 +608,7 @@ public class MqttManager {
* 获取连接状态 * 获取连接状态
*/ */
public boolean isConnected() { public boolean isConnected() {
return isConnected && mqttClient != null && mqttClient.isConnected();
return isConnected && mqttConnection != null;
} }
/** /**
@ -607,10 +649,11 @@ public class MqttManager {
isConnected = false; isConnected = false;
isConnecting = false; isConnecting = false;
if (mqttClient != null) {
if (mqttConnection != null) {
try { try {
mqttClient.disconnect();
} catch (MqttException e) {
MQTTRequest mqttRequest = new MQTTRequest("disconnect", requestID.getAndIncrement());
mqttConnection.disConnect(mqttRequest);
} catch (Exception e) {
LogManager.logError(TAG, "断开MQTT连接异常", e); LogManager.logError(TAG, "断开MQTT连接异常", e);
} }
} }
@ -632,7 +675,7 @@ public class MqttManager {
scheduledExecutor.shutdown(); scheduledExecutor.shutdown();
} }
mqttClient = null;
mqttConnection = null;
connectionStatusListener = null; connectionStatusListener = null;
messageReceivedListener = null; messageReceivedListener = null;

Loading…
Cancel
Save