RobotApp/RobotApp/Services/MQTTClient.cs
Đăng Nguyễn 0d97684f70 update
2025-09-15 17:39:02 +07:00

162 lines
6.6 KiB
C#

using MQTTnet;
using MQTTnet.Protocol;
using RobotApp.Common.Shares;
using RobotApp.VDA5050;
using System.Text;
namespace RobotApp.Services;
public class MQTTClient : IAsyncDisposable
{
private readonly MqttClientFactory MqttClientFactory;
private readonly MqttClientOptions MqttClientOptions;
private readonly MqttClientSubscribeOptions MqttClientSubscribeOptions;
private IMqttClient? MqttClient;
private readonly Logger<MQTTClient> Logger;
private readonly VDA5050Setting VDA5050Setting;
private bool IsReconnecing;
public event Action<string>? OrderChanged;
public event Action<string>? InstanceActionsChanged;
public bool IsConnected => !IsReconnecing && MqttClient is not null && MqttClient.IsConnected;
public MQTTClient(string clientId, VDA5050Setting setting, Logger<MQTTClient> logger)
{
VDA5050Setting = setting;
Logger = logger;
MqttClientFactory = new MqttClientFactory();
MqttClient = MqttClientFactory.CreateMqttClient();
MqttClientOptions = MqttClientFactory.CreateClientOptionsBuilder()
.WithTcpServer(setting.HostServer, setting.Port)
.WithCredentials(setting.UserName, setting.Password)
.WithClientId(clientId)
.WithCleanSession(true)
.Build();
MqttClientSubscribeOptions = MqttClientFactory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(f => f.WithTopic(VDA5050Topic.ORDER.ToTopicString()))
.WithTopicFilter(f => f.WithTopic(VDA5050Topic.INSTANTACTIONS.ToTopicString()))
.Build();
MqttClient.DisconnectedAsync += async delegate (MqttClientDisconnectedEventArgs args)
{
if (args.ClientWasConnected && !IsReconnecing)
{
IsReconnecing = true;
Logger.Warning("Mất kết nối tới broker, đang cố gắng kết nối lại...");
if (MqttClient.IsConnected) await MqttClient.DisconnectAsync();
MqttClient.Dispose();
await ConnectAsync();
await SubscribeAsync();
IsReconnecing = false;
}
};
}
public async Task ConnectAsync(CancellationToken cancellationToken = default)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
MqttClient ??= MqttClientFactory.CreateMqttClient();
var connection = await MqttClient.ConnectAsync(MqttClientOptions, cancellationToken);
if (connection.ResultCode != MqttClientConnectResultCode.Success || !MqttClient.IsConnected)
Logger.Warning($"Không thể kết nối tới broker do: {connection.ReasonString}");
else
{
Logger.Info("Kết nối tới broker thành công");
break;
}
}
catch (Exception ex)
{
Logger.Error($"Lỗi khi tạo MQTT client: {ex.Message}");
}
await Task.Delay(3000, cancellationToken);
}
}
public async Task SubscribeAsync(CancellationToken cancellationToken = default)
{
if (MqttClient is null) throw new Exception("Kết nối tới broker chưa được khởi tạo nhưng đã yêu cầu subscribe");
if(!MqttClient.IsConnected) throw new Exception("Kết nối tới broker chưa thành công nhưng đã yêu cầu subscribe");
MqttClient.ApplicationMessageReceivedAsync += delegate (MqttApplicationMessageReceivedEventArgs args)
{
var stringData = Encoding.UTF8.GetString(args.ApplicationMessage.Payload);
VDA5050Topic topic = EnumExtensions.ToTopic(args.ApplicationMessage.Topic);
if (topic == VDA5050Topic.ORDER) OrderChanged?.Invoke(stringData);
else if (topic == VDA5050Topic.INSTANTACTIONS) InstanceActionsChanged?.Invoke(stringData);
return Task.CompletedTask;
};
while (!cancellationToken.IsCancellationRequested)
{
try
{
var response = await MqttClient.SubscribeAsync(MqttClientSubscribeOptions, cancellationToken);
bool isSuccess = true;
foreach (var item in response.Items)
{
if (item.ResultCode == MqttClientSubscribeResultCode.GrantedQoS0 ||
item.ResultCode == MqttClientSubscribeResultCode.GrantedQoS1 ||
item.ResultCode == MqttClientSubscribeResultCode.GrantedQoS2)
{
Logger.Info($"Subscribe thành công cho topic: {item.TopicFilter.Topic} với QoS: {item.ResultCode}");
}
else
{
Logger.Warning($"Subscribe thất bại cho topic: {item.TopicFilter.Topic}. Lý do: {response.ReasonString}");
isSuccess = false;
break;
}
}
if (isSuccess) break;
}
catch (Exception ex)
{
Logger.Error($"Lỗi khi subscribe: {ex.Message}");
}
await Task.Delay(3000, cancellationToken);
}
}
public async Task<MessageResult> PublishAsync(string topic, string data)
{
var repeat = VDA5050Setting.PublishRepeat;
while (repeat-- > 0)
{
try
{
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(data)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build();
if (MqttClient is null || !IsConnected) return new(false, "Chưa có kết nối tới broker");
var publish = await MqttClient.PublishAsync(applicationMessage);
if (!publish.IsSuccess) continue;
return new(true);
}
catch (Exception ex)
{
Logger.Error($"Lỗi khi publish MQTT: {ex.Message}");
}
}
return new(false, "Không thể publish tới broker");
}
public async ValueTask DisposeAsync()
{
if (MqttClient is not null)
{
if (MqttClient.IsConnected) await MqttClient.DisconnectAsync();
MqttClient.Dispose();
MqttClient = null;
}
GC.SuppressFinalize(this);
}
}