using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; using MQTTnet; using MQTTnet.Client; using MQTTnet.Extensions.ManagedClient; using MQTTnet.Packets; using UnityEngine; public class MqttClientManager { private readonly string mqttBroker; private readonly int mqttPort; private readonly string mqttUsername; private readonly string mqttPassword; private readonly string mqttClientId; private readonly string orderTopic; private readonly string instantActionsTopic; private readonly ConcurrentQueue jsonQueue; private readonly ConcurrentQueue instantActionQueue; private IManagedMqttClient mqttClient; public MqttClientManager(string broker, int port, string username, string password, string clientId, string orderTopic, string instantActionsTopic, ConcurrentQueue jsonQueue, ConcurrentQueue instantActionQueue) { mqttBroker = broker; mqttPort = port; mqttUsername = username; mqttPassword = password; mqttClientId = clientId; this.orderTopic = orderTopic; this.instantActionsTopic = instantActionsTopic; this.jsonQueue = jsonQueue; this.instantActionQueue = instantActionQueue; } public async Task InitializeAsync() { try { var mqttFactory = new MqttFactory(); mqttClient = mqttFactory.CreateManagedMqttClient(); var optionsBuilder = new MqttClientOptionsBuilder() .WithTcpServer(mqttBroker, mqttPort) .WithClientId(mqttClientId); if (!string.IsNullOrEmpty(mqttUsername) && !string.IsNullOrEmpty(mqttPassword)) { optionsBuilder.WithCredentials(mqttUsername, mqttPassword); } var managedOptions = new ManagedMqttClientOptionsBuilder() .WithClientOptions(optionsBuilder.Build()) .WithAutoReconnectDelay(TimeSpan.FromSeconds(5)) .Build(); mqttClient.ApplicationMessageReceivedAsync += async e => { await Task.Run(() => { string topic = e.ApplicationMessage.Topic; string payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? Array.Empty()); if (topic == orderTopic) { Debug.Log($"{payload}"); jsonQueue.Enqueue(payload); } else if (topic == instantActionsTopic) { Debug.Log($"Nhận được JSON từ chủ đề {instantActionsTopic}: {payload}"); instantActionQueue.Enqueue(payload); } }); }; mqttClient.ConnectedAsync += async e => { try { await mqttClient.SubscribeAsync(new List { new MqttTopicFilterBuilder().WithTopic(orderTopic).Build(), new MqttTopicFilterBuilder().WithTopic(instantActionsTopic).Build() }); } catch (Exception ex) { Debug.LogError($"Lỗi khi kết nối hoặc đăng ký chủ đề: {ex.Message}"); } }; mqttClient.DisconnectedAsync += async e => { await Task.CompletedTask; }; await mqttClient.StartAsync(managedOptions); //Debug.Log("Đã khởi động MQTT client"); } catch (Exception ex) { Debug.LogError($"Không thể khởi tạo MQTT client: {ex.Message}"); } } public async Task StopAsync() { if (mqttClient != null) { await mqttClient.StopAsync(); mqttClient.Dispose(); Debug.Log("Đã dừng và hủy MQTT client"); } } public async Task PublishAsync(string topic, string payload) { if (mqttClient == null || !mqttClient.IsConnected) { return; } var message = new MqttApplicationMessageBuilder() .WithTopic(topic) .WithPayload(Encoding.UTF8.GetBytes(payload)) .Build(); await mqttClient.EnqueueAsync(message); } }