using MQTTnet; using MQTTnet.Protocol; using MQTTnet.Server; using RobotNet.RobotManager.Data; using RobotNet.RobotShares.VDA5050; using RobotNet.RobotShares.VDA5050.Connection; using System.Text.Json; namespace RobotNet.RobotManager.Services; public class MqttBroker : IHostedService { private readonly VDA5050Setting VDA5050Setting = new(); public event Action? NewClientConnected; public event Action? NewClientDisconnected; public uint ConnectionHeaderId = 0; private MqttServer? MqttServer; private readonly IServiceProvider ServiceProvider; public MqttBroker(IServiceProvider serviceProvider, IConfiguration configuration) { configuration.Bind("VDA5050Setting", VDA5050Setting); ServiceProvider = serviceProvider; } public MqttConnectReasonCode ValidatingConnection(ValidatingConnectionEventArgs arg) { using var scope = ServiceProvider.CreateScope(); var RobotDb = scope.ServiceProvider.GetRequiredService(); if (!RobotDb.Robots.Any(robot => arg.ClientId.ToLower() == robot.RobotId.ToString().ToLower()) && arg.ClientId != "FleetManager" && arg.ClientId != "OpenACS") return MqttConnectReasonCode.ClientIdentifierNotValid; if (!arg.UserName.Equals(VDA5050Setting.UserName, StringComparison.Ordinal) || !arg.Password.Equals(VDA5050Setting.Password, StringComparison.Ordinal)) return MqttConnectReasonCode.BadUserNameOrPassword; return MqttConnectReasonCode.Success; } public async Task StartAsync(CancellationToken cancellationToken) { if (!VDA5050Setting.ServerEnable) return; while (!cancellationToken.IsCancellationRequested) { try { var mqttFactory = new MqttServerFactory(); var mqttServerOptions = new MqttServerOptionsBuilder().WithDefaultEndpoint() .WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(VDA5050Setting.ConnectionTimeoutSeconds)) .WithTcpKeepAliveInterval(VDA5050Setting.KeepAliveInterval) .WithConnectionBacklog(VDA5050Setting.ConnectionBacklog) .Build(); MqttServer = mqttFactory.CreateMqttServer(mqttServerOptions); MqttServer.ValidatingConnectionAsync += e => { var validate = ValidatingConnection(e); if (validate != MqttConnectReasonCode.Success) e.ReasonCode = validate; return Task.CompletedTask; }; var connection = new ConnectionMsg() { Manufacturer = VDA5050Setting.Manufacturer, Version = VDA5050Setting.Version, }; MqttServer.ClientConnectedAsync += e => { connection.HeaderId = ++ConnectionHeaderId; connection.SerialNumber = e.ClientId; connection.Timestamp = DateTime.Now.ToString("yyyy-MM-ddTHH:mm:ss.fffZ"); connection.ConnectionState = ConnectionState.ONLINE.ToString(); var payload = JsonSerializer.Serialize(connection); var applicationMessage = new MqttApplicationMessageBuilder() .WithTopic("connection") .WithPayload(payload) .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce) .Build(); MqttServer.InjectApplicationMessage(new(applicationMessage)); NewClientConnected?.Invoke(e.ClientId); return Task.CompletedTask; }; MqttServer.ClientDisconnectedAsync += e => { connection.HeaderId = ++ConnectionHeaderId; connection.SerialNumber = e.ClientId; connection.Timestamp = DateTime.Now.ToString("yyyy-MM-ddTHH:mm:ss.fffZ"); connection.ConnectionState = ConnectionState.OFFLINE.ToString(); var payload = JsonSerializer.Serialize(connection); var applicationMessage = new MqttApplicationMessageBuilder() .WithTopic("connection") .WithPayload(payload) .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce) .Build(); MqttServer.InjectApplicationMessage(new(applicationMessage)); NewClientDisconnected?.Invoke(e.ClientId); return Task.CompletedTask; }; await MqttServer.StartAsync(); return; } catch { await Task.Delay(1000, cancellationToken); continue; } } } public async Task StopAsync(CancellationToken cancellationToken) { await MqttServer.StopAsync(); } }