RobotApp/RobotApp/Services/MQTTClient.cs
Đăng Nguyễn e4e135e35f update
2025-12-22 21:28:57 +07:00

340 lines
13 KiB
C#

using MQTTnet;
using MQTTnet.Protocol;
using RobotApp.Common.Shares;
using RobotApp.VDA5050;
using System.Security.Cryptography.X509Certificates;
using System.Text;
namespace RobotApp.Services;
public class MQTTClient : IAsyncDisposable
{
private readonly MqttClientFactory MqttClientFactory;
private MqttClientOptions MqttClientOptions;
private readonly MqttClientSubscribeOptions MqttClientSubscribeOptions;
private IMqttClient? MqttClient;
private readonly Logger<MQTTClient> Logger;
private readonly VDA5050Setting VDA5050Setting;
private readonly string ClientId;
private readonly SemaphoreSlim ReconnectionSemaphore = new(1, 1);
private volatile bool IsDisposed;
public event Action<string>? OrderChanged;
public event Action<string>? InstanceActionsChanged;
public bool IsConnected => !IsDisposed && MqttClient is not null && MqttClient.IsConnected;
private string OrderTopic => $"{VDA5050Setting.TopicPrefix}/{VDA5050Setting.Manufacturer}/{ClientId}/{VDA5050Topic.ORDER.ToTopicString()}";
private string InstanceActionsTopic => $"{VDA5050Setting.TopicPrefix}/{VDA5050Setting.Manufacturer}/{ClientId}/{VDA5050Topic.INSTANTACTIONS.ToTopicString()}";
public MQTTClient(string clientId, VDA5050Setting setting, Logger<MQTTClient> logger)
{
VDA5050Setting = setting;
ClientId = clientId;
Logger = logger;
MqttClientFactory = new MqttClientFactory();
MqttClientOptions = MqttClientFactory.CreateClientOptionsBuilder()
.WithTcpServer(VDA5050Setting.HostServer, VDA5050Setting.Port)
.WithClientId($"{ClientId}")
.WithCleanSession(true)
.Build();
MqttClientSubscribeOptions = MqttClientFactory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(f => f.WithTopic(OrderTopic).WithAtLeastOnceQoS().WithExactlyOnceQoS().WithAtMostOnceQoS())
.WithTopicFilter(f => f.WithTopic(InstanceActionsTopic).WithAtLeastOnceQoS().WithExactlyOnceQoS().WithAtMostOnceQoS())
.Build();
}
private async Task OnDisconnected(MqttClientDisconnectedEventArgs args)
{
if (IsDisposed || !args.ClientWasConnected) return;
if (!await ReconnectionSemaphore.WaitAsync(0))
{
Logger.Info("Reconnection đã đang được thực hiện bởi thread khác");
return;
}
try
{
Logger.Warning("Mất kết nối tới broker, đang cố gắng kết nối lại...");
await CleanupCurrentClient();
await ReconnectWithRetry();
}
catch (Exception ex)
{
Logger.Error($"Lỗi trong quá trình reconnection: {ex.Message}");
}
finally
{
ReconnectionSemaphore.Release();
}
}
private async Task CleanupCurrentClient()
{
if (MqttClient is not null)
{
try
{
MqttClient.DisconnectedAsync -= OnDisconnected;
MqttClient.ApplicationMessageReceivedAsync -= OnMessageReceived;
if (MqttClient.IsConnected)
{
await MqttClient.DisconnectAsync();
}
}
catch (Exception ex)
{
Logger.Warning($"Lỗi khi cleanup client: {ex.Message}");
}
finally
{
MqttClient.Dispose();
MqttClient = null;
}
}
}
private async Task ReconnectWithRetry()
{
const int maxRetries = 5;
const int retryDelayMs = 3000;
for (int attempt = 1; attempt <= maxRetries && !IsDisposed; attempt++)
{
try
{
Logger.Info($"Thử reconnect lần {attempt}/{maxRetries}");
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await ConnectAsync(cts.Token);
if (IsConnected)
{
await SubscribeAsync(cts.Token);
Logger.Info("Reconnection thành công");
return;
}
}
catch (OperationCanceledException)
{
Logger.Warning($"Reconnect attempt {attempt} bị timeout");
}
catch (Exception ex)
{
Logger.Error($"Reconnect attempt {attempt} thất bại: {ex.Message}");
}
if (attempt < maxRetries && !IsDisposed)
{
await Task.Delay(retryDelayMs * attempt);
}
}
Logger.Error("Không thể reconnect sau tất cả các attempts");
}
public async Task ConnectAsync(CancellationToken cancellationToken = default)
{
if (!IsDisposed)
{
BuildMqttClientOptions(VDA5050Setting.EnablePassword, VDA5050Setting.EnableTls);
await CleanupCurrentClient();
MqttClient = MqttClientFactory.CreateMqttClient();
MqttClient.ApplicationMessageReceivedAsync -= OnMessageReceived;
MqttClient.ApplicationMessageReceivedAsync += OnMessageReceived;
MqttClient.DisconnectedAsync -= OnDisconnected;
MqttClient.DisconnectedAsync += OnDisconnected;
while (!cancellationToken.IsCancellationRequested && !IsDisposed)
{
try
{
var connection = await MqttClient.ConnectAsync(MqttClientOptions, cancellationToken);
if (connection.ResultCode != MqttClientConnectResultCode.Success || !MqttClient.IsConnected)
Logger.Warning($"Không thể kết nối tới broker do: {connection.ReasonString}");
else
{
Logger.Info($"Kết nối tới broker {VDA5050Setting.HostServer} thành công");
break;
}
}
catch (Exception ex)
{
Logger.Error($"Lỗi khi tạo MQTT client: {ex.Message}");
}
try
{
await Task.Delay(3000, cancellationToken);
}
catch { }
}
}
else throw new ObjectDisposedException(nameof(MQTTClient));
}
private bool ValidateCertificates(MqttClientCertificateValidationEventArgs arg)
{
string certificatesPath = "MqttCertificates";
string caFilePath = "ca";
if (!string.IsNullOrEmpty(VDA5050Setting.CAFile))
{
if (Directory.Exists(certificatesPath))
{
var caFolder = Path.Combine(certificatesPath, caFilePath);
if (Directory.Exists(caFolder))
{
var caLocal = Path.Combine(caFolder, VDA5050Setting.CAFile);
if (File.Exists(caLocal))
{
var caCert = X509CertificateLoader.LoadCertificateFromFile(caLocal);
arg.Chain.ChainPolicy.ExtraStore.Add(caCert);
arg.Chain.ChainPolicy.RevocationMode = X509RevocationMode.NoCheck;
arg.Chain.ChainPolicy.VerificationFlags = X509VerificationFlags.AllowUnknownCertificateAuthority;
var isValid = arg.Chain.Build((X509Certificate2)arg.Certificate);
return isValid;
}
}
}
}
return true;
}
private void BuildMqttClientOptions(bool enablePassword, bool enableTls)
{
var builder = MqttClientFactory.CreateClientOptionsBuilder()
.WithTcpServer(VDA5050Setting.HostServer, VDA5050Setting.Port)
.WithClientId(ClientId)
.WithCleanSession(true);
if (enablePassword)
{
builder = builder.WithCredentials(VDA5050Setting.UserName, VDA5050Setting.Password);
}
if (enableTls)
{
var tlsOptions = new MqttClientTlsOptionsBuilder()
.UseTls(true)
.WithSslProtocols(System.Security.Authentication.SslProtocols.Tls12 | System.Security.Authentication.SslProtocols.Tls13)
.WithCertificateValidationHandler(ValidateCertificates)
.WithClientCertificatesProvider(new MQTTClientCertificatesProvider(VDA5050Setting.CerFile, VDA5050Setting.KeyFile))
.Build();
builder = builder.WithTlsOptions(tlsOptions);
}
MqttClientOptions = builder.Build();
}
private Task OnMessageReceived(MqttApplicationMessageReceivedEventArgs args)
{
try
{
Logger.Info($"Has new message: {args.ApplicationMessage.Topic}");
if (IsDisposed) return Task.CompletedTask;
var stringData = Encoding.UTF8.GetString(args.ApplicationMessage.Payload);
//VDA5050Topic topic = EnumExtensions.ToTopic(args.ApplicationMessage.Topic);
if (args.ApplicationMessage.Topic == OrderTopic) OrderChanged?.Invoke(stringData);
else if (args.ApplicationMessage.Topic == InstanceActionsTopic) InstanceActionsChanged?.Invoke(stringData);
}
catch (Exception ex)
{
Logger.Error($"Lỗi khi xử lý message: {ex.Message}");
}
return Task.CompletedTask;
}
public async Task SubscribeAsync(CancellationToken cancellationToken = default)
{
//if (!IsDisposed)
//{
if (MqttClient is null) throw new Exception("Kết nối tới broker chưa được khởi tạo nhưng đã yêu cầu subscribe");
if (!MqttClient.IsConnected) throw new Exception("Kết nối tới broker chưa thành công nhưng đã yêu cầu subscribe");
while (!cancellationToken.IsCancellationRequested)
{
try
{
var response = await MqttClient.SubscribeAsync(MqttClientSubscribeOptions, cancellationToken);
bool isSuccess = true;
foreach (var item in response.Items)
{
if (item.ResultCode == MqttClientSubscribeResultCode.GrantedQoS0 ||
item.ResultCode == MqttClientSubscribeResultCode.GrantedQoS1 ||
item.ResultCode == MqttClientSubscribeResultCode.GrantedQoS2)
{
Logger.Info($"Subscribe thành công cho topic: {item.TopicFilter.Topic} với QoS: {item.ResultCode}");
}
else
{
Logger.Warning($"Subscribe thất bại cho topic: {item.TopicFilter.Topic}. Lý do: {response.ReasonString}");
isSuccess = false;
break;
}
}
if (isSuccess) break;
}
catch (Exception ex)
{
Logger.Error($"Lỗi khi subscribe: {ex.Message}");
}
if (!cancellationToken.IsCancellationRequested && !IsDisposed)
{
await Task.Delay(3000, cancellationToken);
}
}
//}
//else throw new ObjectDisposedException(nameof(MQTTClient));
}
public async Task<MessageResult> PublishAsync(string topic, string data)
{
if (IsDisposed) return new(false, "Client đã được disposed");
var repeat = VDA5050Setting.PublishRepeat;
while (repeat-- > 0 && !IsDisposed)
{
try
{
var applicationMessage = MqttClientFactory.CreateApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload(data)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build();
if (MqttClient is null || !IsConnected) return new(false, "Chưa có kết nối tới broker");
var publish = await MqttClient.PublishAsync(applicationMessage);
if (!publish.IsSuccess) continue;
return new(true);
}
catch (Exception ex)
{
Logger.Error($"Lỗi khi publish MQTT: {ex.Message}");
}
}
return new(false, "Không thể publish tới broker");
}
public async ValueTask DisposeAsync()
{
if (IsDisposed) return;
IsDisposed = true;
await ReconnectionSemaphore.WaitAsync();
try
{
await CleanupCurrentClient();
}
finally
{
ReconnectionSemaphore.Dispose();
GC.SuppressFinalize(this);
}
}
}