using MQTTnet; using MQTTnet.Protocol; using RobotApp.Common.Shares; using RobotApp.VDA5050; using System.Security.Cryptography.X509Certificates; using System.Text; namespace RobotApp.Services; public class MQTTClient : IAsyncDisposable { private readonly MqttClientFactory MqttClientFactory; private MqttClientOptions MqttClientOptions; private readonly MqttClientSubscribeOptions MqttClientSubscribeOptions; private IMqttClient? MqttClient; private readonly Logger Logger; private readonly VDA5050Setting VDA5050Setting; private readonly string ClientId; private readonly SemaphoreSlim ReconnectionSemaphore = new(1, 1); private volatile bool IsDisposed; public event Action? OrderChanged; public event Action? InstanceActionsChanged; public bool IsConnected => !IsDisposed && MqttClient is not null && MqttClient.IsConnected; private string OrderTopic => $"{VDA5050Setting.TopicPrefix}/{VDA5050Setting.Manufacturer}/{ClientId}/{VDA5050Topic.ORDER.ToTopicString()}"; private string InstanceActionsTopic => $"{VDA5050Setting.TopicPrefix}/{VDA5050Setting.Manufacturer}/{ClientId}/{VDA5050Topic.INSTANTACTIONS.ToTopicString()}"; public MQTTClient(string clientId, VDA5050Setting setting, Logger logger) { VDA5050Setting = setting; ClientId = clientId; Logger = logger; MqttClientFactory = new MqttClientFactory(); MqttClientOptions = MqttClientFactory.CreateClientOptionsBuilder() .WithTcpServer(VDA5050Setting.HostServer, VDA5050Setting.Port) .WithClientId($"{ClientId}") .WithCleanSession(true) .Build(); MqttClientSubscribeOptions = MqttClientFactory.CreateSubscribeOptionsBuilder() .WithTopicFilter(f => f.WithTopic(OrderTopic).WithAtLeastOnceQoS().WithExactlyOnceQoS().WithAtMostOnceQoS()) .WithTopicFilter(f => f.WithTopic(InstanceActionsTopic).WithAtLeastOnceQoS().WithExactlyOnceQoS().WithAtMostOnceQoS()) .Build(); } private async Task OnDisconnected(MqttClientDisconnectedEventArgs args) { if (IsDisposed || !args.ClientWasConnected) return; if (!await ReconnectionSemaphore.WaitAsync(0)) { Logger.Info("Reconnection đã đang được thực hiện bởi thread khác"); return; } try { Logger.Warning("Mất kết nối tới broker, đang cố gắng kết nối lại..."); await CleanupCurrentClient(); await ReconnectWithRetry(); } catch (Exception ex) { Logger.Error($"Lỗi trong quá trình reconnection: {ex.Message}"); } finally { ReconnectionSemaphore.Release(); } } private async Task CleanupCurrentClient() { if (MqttClient is not null) { try { MqttClient.DisconnectedAsync -= OnDisconnected; MqttClient.ApplicationMessageReceivedAsync -= OnMessageReceived; if (MqttClient.IsConnected) { await MqttClient.DisconnectAsync(); } } catch (Exception ex) { Logger.Warning($"Lỗi khi cleanup client: {ex.Message}"); } finally { MqttClient.Dispose(); MqttClient = null; } } } private async Task ReconnectWithRetry() { const int maxRetries = 5; const int retryDelayMs = 3000; for (int attempt = 1; attempt <= maxRetries && !IsDisposed; attempt++) { try { Logger.Info($"Thử reconnect lần {attempt}/{maxRetries}"); using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30)); await ConnectAsync(cts.Token); if (IsConnected) { await SubscribeAsync(cts.Token); Logger.Info("Reconnection thành công"); return; } } catch (OperationCanceledException) { Logger.Warning($"Reconnect attempt {attempt} bị timeout"); } catch (Exception ex) { Logger.Error($"Reconnect attempt {attempt} thất bại: {ex.Message}"); } if (attempt < maxRetries && !IsDisposed) { await Task.Delay(retryDelayMs * attempt); } } Logger.Error("Không thể reconnect sau tất cả các attempts"); } public async Task ConnectAsync(CancellationToken cancellationToken = default) { if (!IsDisposed) { BuildMqttClientOptions(VDA5050Setting.EnablePassword, VDA5050Setting.EnableTls); await CleanupCurrentClient(); MqttClient = MqttClientFactory.CreateMqttClient(); //MqttClient.ApplicationMessageReceivedAsync -= OnMessageReceived; MqttClient.ApplicationMessageReceivedAsync += OnMessageReceived; //MqttClient.DisconnectedAsync -= OnDisconnected; MqttClient.DisconnectedAsync += OnDisconnected; while (!cancellationToken.IsCancellationRequested) { try { var connection = await MqttClient.ConnectAsync(MqttClientOptions, cancellationToken); if (connection.ResultCode != MqttClientConnectResultCode.Success || !MqttClient.IsConnected) Logger.Warning($"Không thể kết nối tới broker do: {connection.ReasonString}"); else { Logger.Info($"Kết nối tới broker {VDA5050Setting.HostServer} thành công"); break; } } catch (Exception ex) { Logger.Error($"Lỗi khi tạo MQTT client: {ex.Message}"); } await Task.Delay(3000, cancellationToken); } } else throw new ObjectDisposedException(nameof(MQTTClient)); } private bool ValidateCertificates(MqttClientCertificateValidationEventArgs arg) { string certificatesPath = "MqttCertificates"; string caFilePath = "ca"; if (!string.IsNullOrEmpty(VDA5050Setting.CAFile)) { if (Directory.Exists(certificatesPath)) { var caFolder = Path.Combine(certificatesPath, caFilePath); if (Directory.Exists(caFolder)) { var caLocal = Path.Combine(caFolder, VDA5050Setting.CAFile); if (File.Exists(caLocal)) { var caCert = X509CertificateLoader.LoadCertificateFromFile(caLocal); arg.Chain.ChainPolicy.ExtraStore.Add(caCert); arg.Chain.ChainPolicy.RevocationMode = X509RevocationMode.NoCheck; arg.Chain.ChainPolicy.VerificationFlags = X509VerificationFlags.AllowUnknownCertificateAuthority; var isValid = arg.Chain.Build((X509Certificate2)arg.Certificate); if (isValid) { Console.WriteLine("Broker CERTIFICATE VALID"); } else { Console.WriteLine("Broker CERTIFICATE INVALID"); foreach (var status in arg.Chain.ChainStatus) Console.WriteLine($" -> Chain error: {status.Status} - {status.StatusInformation}"); } return isValid; } } } } return true; } private void BuildMqttClientOptions(bool enablePassword, bool enableTls) { var builder = MqttClientFactory.CreateClientOptionsBuilder() .WithTcpServer(VDA5050Setting.HostServer, VDA5050Setting.Port) .WithClientId(ClientId) .WithCleanSession(true); if (enablePassword) { builder = builder.WithCredentials(VDA5050Setting.UserName, VDA5050Setting.Password); } if (enableTls) { var tlsOptions = new MqttClientTlsOptionsBuilder() .UseTls(true) .WithSslProtocols(System.Security.Authentication.SslProtocols.Tls12 | System.Security.Authentication.SslProtocols.Tls13) //.WithCertificateValidationHandler(ValidateCertificates) .WithClientCertificatesProvider(new MQTTClientCertificatesProvider(VDA5050Setting.CerFile, VDA5050Setting.KeyFile)) .Build(); builder = builder.WithTlsOptions(tlsOptions); } MqttClientOptions = builder.Build(); } private Task OnMessageReceived(MqttApplicationMessageReceivedEventArgs args) { try { Logger.Info($"Has new message: {args.ApplicationMessage.Topic}"); if (IsDisposed) return Task.CompletedTask; var stringData = Encoding.UTF8.GetString(args.ApplicationMessage.Payload); //VDA5050Topic topic = EnumExtensions.ToTopic(args.ApplicationMessage.Topic); if (args.ApplicationMessage.Topic == OrderTopic) OrderChanged?.Invoke(stringData); else if (args.ApplicationMessage.Topic == InstanceActionsTopic) InstanceActionsChanged?.Invoke(stringData); } catch (Exception ex) { Logger.Error($"Lỗi khi xử lý message: {ex.Message}"); } return Task.CompletedTask; } public async Task SubscribeAsync(CancellationToken cancellationToken = default) { //if (!IsDisposed) //{ if (MqttClient is null) throw new Exception("Kết nối tới broker chưa được khởi tạo nhưng đã yêu cầu subscribe"); if (!MqttClient.IsConnected) throw new Exception("Kết nối tới broker chưa thành công nhưng đã yêu cầu subscribe"); while (!cancellationToken.IsCancellationRequested) { try { var response = await MqttClient.SubscribeAsync(MqttClientSubscribeOptions, cancellationToken); bool isSuccess = true; foreach (var item in response.Items) { if (item.ResultCode == MqttClientSubscribeResultCode.GrantedQoS0 || item.ResultCode == MqttClientSubscribeResultCode.GrantedQoS1 || item.ResultCode == MqttClientSubscribeResultCode.GrantedQoS2) { Logger.Info($"Subscribe thành công cho topic: {item.TopicFilter.Topic} với QoS: {item.ResultCode}"); } else { Logger.Warning($"Subscribe thất bại cho topic: {item.TopicFilter.Topic}. Lý do: {response.ReasonString}"); isSuccess = false; break; } } if (isSuccess) break; } catch (Exception ex) { Logger.Error($"Lỗi khi subscribe: {ex.Message}"); } if (!cancellationToken.IsCancellationRequested && !IsDisposed) { await Task.Delay(3000, cancellationToken); } } //} //else throw new ObjectDisposedException(nameof(MQTTClient)); } public async Task PublishAsync(string topic, string data) { if (IsDisposed) return new(false, "Client đã được disposed"); var repeat = VDA5050Setting.PublishRepeat; while (repeat-- > 0 && !IsDisposed) { try { var applicationMessage = MqttClientFactory.CreateApplicationMessageBuilder() .WithTopic(topic) .WithPayload(data) .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce) .Build(); if (MqttClient is null || !IsConnected) return new(false, "Chưa có kết nối tới broker"); var publish = await MqttClient.PublishAsync(applicationMessage); if (!publish.IsSuccess) continue; return new(true); } catch (Exception ex) { Logger.Error($"Lỗi khi publish MQTT: {ex.Message}"); } } return new(false, "Không thể publish tới broker"); } public async ValueTask DisposeAsync() { if (IsDisposed) return; IsDisposed = true; await ReconnectionSemaphore.WaitAsync(); try { await CleanupCurrentClient(); } finally { ReconnectionSemaphore.Dispose(); GC.SuppressFinalize(this); } } }