RobotNet/RobotNet.RobotManager/Services/MqttBroker.cs
2025-10-15 15:15:53 +07:00

120 lines
5.0 KiB
C#

using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;
using RobotNet.RobotManager.Data;
using RobotNet.RobotShares.VDA5050;
using RobotNet.RobotShares.VDA5050.Connection;
using System.Text.Json;
namespace RobotNet.RobotManager.Services;
public class MqttBroker : IHostedService
{
private readonly VDA5050Setting VDA5050Setting = new();
public event Action<string>? NewClientConnected;
public event Action<string>? NewClientDisconnected;
public uint ConnectionHeaderId = 0;
private MqttServer? MqttServer;
private readonly IServiceProvider ServiceProvider;
public MqttBroker(IServiceProvider serviceProvider, IConfiguration configuration)
{
configuration.Bind("VDA5050Setting", VDA5050Setting);
ServiceProvider = serviceProvider;
}
public MqttConnectReasonCode ValidatingConnection(ValidatingConnectionEventArgs arg)
{
using var scope = ServiceProvider.CreateScope();
var RobotDb = scope.ServiceProvider.GetRequiredService<RobotEditorDbContext>();
if (!RobotDb.Robots.Any(robot => arg.ClientId.ToLower() == robot.RobotId.ToString().ToLower()) && arg.ClientId != "FleetManager" && arg.ClientId != "OpenACS")
return MqttConnectReasonCode.ClientIdentifierNotValid;
if (!arg.UserName.Equals(VDA5050Setting.UserName, StringComparison.Ordinal) || !arg.Password.Equals(VDA5050Setting.Password, StringComparison.Ordinal))
return MqttConnectReasonCode.BadUserNameOrPassword;
return MqttConnectReasonCode.Success;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
if (!VDA5050Setting.ServerEnable) return;
while (!cancellationToken.IsCancellationRequested)
{
try
{
var mqttFactory = new MqttServerFactory();
var mqttServerOptions = new MqttServerOptionsBuilder().WithDefaultEndpoint()
.WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(VDA5050Setting.ConnectionTimeoutSeconds))
.WithTcpKeepAliveInterval(VDA5050Setting.KeepAliveInterval)
.WithConnectionBacklog(VDA5050Setting.ConnectionBacklog)
.Build();
MqttServer = mqttFactory.CreateMqttServer(mqttServerOptions);
MqttServer.ValidatingConnectionAsync += e =>
{
var validate = ValidatingConnection(e);
if (validate != MqttConnectReasonCode.Success) e.ReasonCode = validate;
return Task.CompletedTask;
};
var connection = new ConnectionMsg()
{
Manufacturer = VDA5050Setting.Manufacturer,
Version = VDA5050Setting.Version,
};
MqttServer.ClientConnectedAsync += e =>
{
connection.HeaderId = ++ConnectionHeaderId;
connection.SerialNumber = e.ClientId;
connection.Timestamp = DateTime.Now.ToString("yyyy-MM-ddTHH:mm:ss.fffZ");
connection.ConnectionState = ConnectionState.ONLINE.ToString();
var payload = JsonSerializer.Serialize(connection);
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic("connection")
.WithPayload(payload)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build();
MqttServer.InjectApplicationMessage(new(applicationMessage));
NewClientConnected?.Invoke(e.ClientId);
return Task.CompletedTask;
};
MqttServer.ClientDisconnectedAsync += e =>
{
connection.HeaderId = ++ConnectionHeaderId;
connection.SerialNumber = e.ClientId;
connection.Timestamp = DateTime.Now.ToString("yyyy-MM-ddTHH:mm:ss.fffZ");
connection.ConnectionState = ConnectionState.OFFLINE.ToString();
var payload = JsonSerializer.Serialize(connection);
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic("connection")
.WithPayload(payload)
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build();
MqttServer.InjectApplicationMessage(new(applicationMessage));
NewClientDisconnected?.Invoke(e.ClientId);
return Task.CompletedTask;
};
await MqttServer.StartAsync();
return;
}
catch
{
await Task.Delay(1000, cancellationToken);
continue;
}
}
}
public async Task StopAsync(CancellationToken cancellationToken)
{
await MqttServer.StopAsync();
}
}