120 lines
5.0 KiB
C#
120 lines
5.0 KiB
C#
using MQTTnet;
|
|
using MQTTnet.Protocol;
|
|
using MQTTnet.Server;
|
|
using RobotNet.RobotManager.Data;
|
|
using RobotNet.RobotShares.VDA5050;
|
|
using RobotNet.RobotShares.VDA5050.Connection;
|
|
using System.Text.Json;
|
|
|
|
namespace RobotNet.RobotManager.Services;
|
|
|
|
public class MqttBroker : IHostedService
|
|
{
|
|
private readonly VDA5050Setting VDA5050Setting = new();
|
|
|
|
public event Action<string>? NewClientConnected;
|
|
public event Action<string>? NewClientDisconnected;
|
|
public uint ConnectionHeaderId = 0;
|
|
|
|
private MqttServer? MqttServer;
|
|
private readonly IServiceProvider ServiceProvider;
|
|
|
|
public MqttBroker(IServiceProvider serviceProvider, IConfiguration configuration)
|
|
{
|
|
configuration.Bind("VDA5050Setting", VDA5050Setting);
|
|
ServiceProvider = serviceProvider;
|
|
}
|
|
|
|
public MqttConnectReasonCode ValidatingConnection(ValidatingConnectionEventArgs arg)
|
|
{
|
|
using var scope = ServiceProvider.CreateScope();
|
|
var RobotDb = scope.ServiceProvider.GetRequiredService<RobotEditorDbContext>();
|
|
if (!RobotDb.Robots.Any(robot => arg.ClientId.ToLower() == robot.RobotId.ToString().ToLower()) && arg.ClientId != "FleetManager" && arg.ClientId != "OpenACS")
|
|
return MqttConnectReasonCode.ClientIdentifierNotValid;
|
|
if (!arg.UserName.Equals(VDA5050Setting.UserName, StringComparison.Ordinal) || !arg.Password.Equals(VDA5050Setting.Password, StringComparison.Ordinal))
|
|
return MqttConnectReasonCode.BadUserNameOrPassword;
|
|
return MqttConnectReasonCode.Success;
|
|
}
|
|
|
|
public async Task StartAsync(CancellationToken cancellationToken)
|
|
{
|
|
if (!VDA5050Setting.ServerEnable) return;
|
|
while (!cancellationToken.IsCancellationRequested)
|
|
{
|
|
try
|
|
{
|
|
var mqttFactory = new MqttServerFactory();
|
|
|
|
var mqttServerOptions = new MqttServerOptionsBuilder().WithDefaultEndpoint()
|
|
.WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(VDA5050Setting.ConnectionTimeoutSeconds))
|
|
.WithTcpKeepAliveInterval(VDA5050Setting.KeepAliveInterval)
|
|
.WithConnectionBacklog(VDA5050Setting.ConnectionBacklog)
|
|
.Build();
|
|
|
|
MqttServer = mqttFactory.CreateMqttServer(mqttServerOptions);
|
|
MqttServer.ValidatingConnectionAsync += e =>
|
|
{
|
|
var validate = ValidatingConnection(e);
|
|
if (validate != MqttConnectReasonCode.Success) e.ReasonCode = validate;
|
|
|
|
return Task.CompletedTask;
|
|
};
|
|
|
|
var connection = new ConnectionMsg()
|
|
{
|
|
Manufacturer = VDA5050Setting.Manufacturer,
|
|
Version = VDA5050Setting.Version,
|
|
};
|
|
|
|
MqttServer.ClientConnectedAsync += e =>
|
|
{
|
|
connection.HeaderId = ++ConnectionHeaderId;
|
|
connection.SerialNumber = e.ClientId;
|
|
connection.Timestamp = DateTime.Now.ToString("yyyy-MM-ddTHH:mm:ss.fffZ");
|
|
connection.ConnectionState = ConnectionState.ONLINE.ToString();
|
|
var payload = JsonSerializer.Serialize(connection);
|
|
var applicationMessage = new MqttApplicationMessageBuilder()
|
|
.WithTopic("connection")
|
|
.WithPayload(payload)
|
|
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
|
|
.Build();
|
|
MqttServer.InjectApplicationMessage(new(applicationMessage));
|
|
NewClientConnected?.Invoke(e.ClientId);
|
|
return Task.CompletedTask;
|
|
};
|
|
|
|
MqttServer.ClientDisconnectedAsync += e =>
|
|
{
|
|
connection.HeaderId = ++ConnectionHeaderId;
|
|
connection.SerialNumber = e.ClientId;
|
|
connection.Timestamp = DateTime.Now.ToString("yyyy-MM-ddTHH:mm:ss.fffZ");
|
|
connection.ConnectionState = ConnectionState.OFFLINE.ToString();
|
|
|
|
var payload = JsonSerializer.Serialize(connection);
|
|
var applicationMessage = new MqttApplicationMessageBuilder()
|
|
.WithTopic("connection")
|
|
.WithPayload(payload)
|
|
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
|
|
.Build();
|
|
MqttServer.InjectApplicationMessage(new(applicationMessage));
|
|
NewClientDisconnected?.Invoke(e.ClientId);
|
|
return Task.CompletedTask;
|
|
};
|
|
|
|
await MqttServer.StartAsync();
|
|
return;
|
|
}
|
|
catch
|
|
{
|
|
await Task.Delay(1000, cancellationToken);
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
|
|
public async Task StopAsync(CancellationToken cancellationToken)
|
|
{
|
|
await MqttServer.StopAsync();
|
|
}
|
|
}
|