134 lines
4.5 KiB
C#
134 lines
4.5 KiB
C#
using System;
|
|
using System.Collections.Concurrent;
|
|
using System.Collections.Generic;
|
|
using System.Text;
|
|
using System.Threading.Tasks;
|
|
using MQTTnet;
|
|
using MQTTnet.Client;
|
|
using MQTTnet.Extensions.ManagedClient;
|
|
using MQTTnet.Packets;
|
|
using UnityEngine;
|
|
|
|
public class MqttClientManager
|
|
{
|
|
private readonly string mqttBroker;
|
|
private readonly int mqttPort;
|
|
private readonly string mqttUsername;
|
|
private readonly string mqttPassword;
|
|
private readonly string mqttClientId;
|
|
private readonly string orderTopic;
|
|
private readonly string instantActionsTopic;
|
|
private readonly ConcurrentQueue<string> jsonQueue;
|
|
private readonly ConcurrentQueue<string> instantActionQueue;
|
|
private IManagedMqttClient mqttClient;
|
|
|
|
public MqttClientManager(string broker, int port, string username, string password, string clientId,
|
|
string orderTopic, string instantActionsTopic, ConcurrentQueue<string> jsonQueue,
|
|
ConcurrentQueue<string> instantActionQueue)
|
|
{
|
|
mqttBroker = broker;
|
|
mqttPort = port;
|
|
mqttUsername = username;
|
|
mqttPassword = password;
|
|
mqttClientId = clientId;
|
|
this.orderTopic = orderTopic;
|
|
this.instantActionsTopic = instantActionsTopic;
|
|
this.jsonQueue = jsonQueue;
|
|
this.instantActionQueue = instantActionQueue;
|
|
}
|
|
|
|
public async Task InitializeAsync()
|
|
{
|
|
try
|
|
{
|
|
var mqttFactory = new MqttFactory();
|
|
mqttClient = mqttFactory.CreateManagedMqttClient();
|
|
|
|
var optionsBuilder = new MqttClientOptionsBuilder()
|
|
.WithTcpServer(mqttBroker, mqttPort)
|
|
.WithClientId(mqttClientId);
|
|
if (!string.IsNullOrEmpty(mqttUsername) && !string.IsNullOrEmpty(mqttPassword))
|
|
{
|
|
optionsBuilder.WithCredentials(mqttUsername, mqttPassword);
|
|
}
|
|
|
|
var managedOptions = new ManagedMqttClientOptionsBuilder()
|
|
.WithClientOptions(optionsBuilder.Build())
|
|
.WithAutoReconnectDelay(TimeSpan.FromSeconds(5))
|
|
.Build();
|
|
|
|
mqttClient.ApplicationMessageReceivedAsync += async e =>
|
|
{
|
|
await Task.Run(() =>
|
|
{
|
|
string topic = e.ApplicationMessage.Topic;
|
|
string payload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload ?? Array.Empty<byte>());
|
|
if (topic == orderTopic)
|
|
{
|
|
Debug.Log($"{payload}");
|
|
jsonQueue.Enqueue(payload);
|
|
}
|
|
else if (topic == instantActionsTopic)
|
|
{
|
|
Debug.Log($"Nhận được JSON từ chủ đề {instantActionsTopic}: {payload}");
|
|
instantActionQueue.Enqueue(payload);
|
|
}
|
|
});
|
|
};
|
|
|
|
mqttClient.ConnectedAsync += async e =>
|
|
{
|
|
try
|
|
{
|
|
await mqttClient.SubscribeAsync(new List<MqttTopicFilter>
|
|
{
|
|
new MqttTopicFilterBuilder().WithTopic(orderTopic).Build(),
|
|
new MqttTopicFilterBuilder().WithTopic(instantActionsTopic).Build()
|
|
});
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Debug.LogError($"Lỗi khi kết nối hoặc đăng ký chủ đề: {ex.Message}");
|
|
}
|
|
};
|
|
|
|
mqttClient.DisconnectedAsync += async e =>
|
|
{
|
|
await Task.CompletedTask;
|
|
};
|
|
|
|
await mqttClient.StartAsync(managedOptions);
|
|
//Debug.Log("Đã khởi động MQTT client");
|
|
}
|
|
catch (Exception ex)
|
|
{
|
|
Debug.LogError($"Không thể khởi tạo MQTT client: {ex.Message}");
|
|
}
|
|
}
|
|
|
|
public async Task StopAsync()
|
|
{
|
|
if (mqttClient != null)
|
|
{
|
|
await mqttClient.StopAsync();
|
|
mqttClient.Dispose();
|
|
Debug.Log("Đã dừng và hủy MQTT client");
|
|
}
|
|
}
|
|
|
|
|
|
public async Task PublishAsync(string topic, string payload)
|
|
{
|
|
if (mqttClient == null || !mqttClient.IsConnected)
|
|
{
|
|
return;
|
|
}
|
|
|
|
var message = new MqttApplicationMessageBuilder()
|
|
.WithTopic(topic)
|
|
.WithPayload(Encoding.UTF8.GetBytes(payload))
|
|
.Build();
|
|
|
|
await mqttClient.EnqueueAsync(message);
|
|
}
|
|
} |