APM/Assets/Scripting/MQTT/MqttClientManager.cs
2025-11-17 15:02:30 +07:00

134 lines
4.5 KiB
C#

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Packets;
using UnityEngine;
public class MqttClientManager
{
private readonly string mqttBroker;
private readonly int mqttPort;
private readonly string mqttUsername;
private readonly string mqttPassword;
private readonly string mqttClientId;
private readonly string orderTopic;
private readonly string instantActionsTopic;
private readonly ConcurrentQueue<string> jsonQueue;
private readonly ConcurrentQueue<string> instantActionQueue;
private IManagedMqttClient mqttClient;
public MqttClientManager(string broker, int port, string username, string password, string clientId,
string orderTopic, string instantActionsTopic, ConcurrentQueue<string> jsonQueue,
ConcurrentQueue<string> instantActionQueue)
{
mqttBroker = broker;
mqttPort = port;
mqttUsername = username;
mqttPassword = password;
mqttClientId = clientId;
this.orderTopic = orderTopic;
this.instantActionsTopic = instantActionsTopic;
this.jsonQueue = jsonQueue;
this.instantActionQueue = instantActionQueue;
}
public async Task InitializeAsync()
{
try
{
var mqttFactory = new MqttFactory();
mqttClient = mqttFactory.CreateManagedMqttClient();
var optionsBuilder = new MqttClientOptionsBuilder()
.WithTcpServer(mqttBroker, mqttPort)
.WithClientId(mqttClientId);
if (!string.IsNullOrEmpty(mqttUsername) && !string.IsNullOrEmpty(mqttPassword))
{
optionsBuilder.WithCredentials(mqttUsername, mqttPassword);
}
var managedOptions = new ManagedMqttClientOptionsBuilder()
.WithClientOptions(optionsBuilder.Build())
.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
.Build();
mqttClient.ApplicationMessageReceivedAsync += async e =>
{
await Task.Run(() =>
{
string topic = e.ApplicationMessage.Topic;
string payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? Array.Empty<byte>());
if (topic == orderTopic)
{
Debug.Log($"{payload}");
jsonQueue.Enqueue(payload);
}
else if (topic == instantActionsTopic)
{
Debug.Log($"Nhận được JSON từ chủ đề {instantActionsTopic}: {payload}");
instantActionQueue.Enqueue(payload);
}
});
};
mqttClient.ConnectedAsync += async e =>
{
try
{
await mqttClient.SubscribeAsync(new List<MqttTopicFilter>
{
new MqttTopicFilterBuilder().WithTopic(orderTopic).Build(),
new MqttTopicFilterBuilder().WithTopic(instantActionsTopic).Build()
});
}
catch (Exception ex)
{
Debug.LogError($"Lỗi khi kết nối hoặc đăng ký chủ đề: {ex.Message}");
}
};
mqttClient.DisconnectedAsync += async e =>
{
await Task.CompletedTask;
};
await mqttClient.StartAsync(managedOptions);
//Debug.Log("Đã khởi động MQTT client");
}
catch (Exception ex)
{
Debug.LogError($"Không thể khởi tạo MQTT client: {ex.Message}");
}
}
public async Task StopAsync()
{
if (mqttClient != null)
{
await mqttClient.StopAsync();
mqttClient.Dispose();
Debug.Log("Đã dừng và hủy MQTT client");
}
}
public async Task PublishAsync(string topic, string payload)
{
if (mqttClient == null || !mqttClient.IsConnected)
{
return;
}
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(Encoding.UTF8.GetBytes(payload))
.Build();
await mqttClient.EnqueueAsync(message);
}
}