RobotNet/RobotNet.ScriptManager/Services/ScriptMissionManager.cs
2025-10-15 15:15:53 +07:00

411 lines
14 KiB
C#

using Microsoft.CodeAnalysis.CSharp.Scripting;
using Microsoft.CodeAnalysis.Scripting;
using RobotNet.Script;
using RobotNet.Script.Shares;
using RobotNet.ScriptManager.Data;
using RobotNet.ScriptManager.Helpers;
using RobotNet.ScriptManager.Models;
using System.Collections.Concurrent;
using System.Diagnostics;
namespace RobotNet.ScriptManager.Services;
public class ScriptMissionManager(ScriptGlobalsManager globalsManager, IServiceScopeFactory scopeFactory)
{
public ProcessorState State { get; private set; } = ProcessorState.Idle;
private readonly Dictionary<string, ScriptMissionData> MissionDatas = [];
private readonly Dictionary<string, ScriptRunner<IAsyncEnumerable<MissionState>>> Runners = [];
private readonly ConcurrentDictionary<Guid, ScriptMission> Missions = [];
private readonly ConcurrentQueue<ScriptMission> idleMissions = [];
private readonly ConcurrentQueue<ScriptMission> runningMissions = [];
public void Reset()
{
if (State != ProcessorState.Idle && State != ProcessorState.Ready)
{
throw new InvalidOperationException("Cannot reset missions while the processor is running.");
}
MissionDatas.Clear();
Runners.Clear();
foreach (var mission in Missions.Values)
{
mission.Dispose();
}
Missions.Clear();
foreach (var mission in idleMissions)
{
mission.Dispose();
}
idleMissions.Clear();
foreach (var mission in runningMissions)
{
mission.Dispose();
}
runningMissions.Clear();
GC.Collect();
State = ProcessorState.Idle;
}
public void LoadMissions(IEnumerable<ScriptMissionData> missionDatas)
{
if (State != ProcessorState.Idle && State != ProcessorState.Ready)
{
throw new InvalidOperationException("Cannot load missions while the processor is running.");
}
MissionDatas.Clear();
Runners.Clear();
runningMissions.Clear();
idleMissions.Clear();
runningMissions.Clear();
foreach (var mission in missionDatas)
{
MissionDatas.Add(mission.Name, mission);
var script = CSharpScript.Create<IAsyncEnumerable<MissionState>>(mission.Script, ScriptConfiguration.ScriptOptions, globalsType: typeof(ScriptMissionGlobals));
Runners.Add(mission.Name, script.CreateDelegate());
}
State = ProcessorState.Ready;
}
public IEnumerable<ScriptMissionDto> GetMissionDatas() =>
[.. MissionDatas.Values.Select(m => new ScriptMissionDto(m.Name, m.Parameters.Select(p => new ScriptMissionParameterDto(p.Name, p.Type.FullName ?? p.Type.Name, p.DefaultValue?.ToString())), m.Code))];
public bool ContainsMissionName(string name) => Runners.ContainsKey(name);
public void Create(Guid id, string name, IDictionary<string, string> parameterStrings)
{
if (!MissionDatas.TryGetValue(name, out var missionData)) throw new ArgumentException($"Mission data for '{name}' not found.");
var cts = CancellationTokenSource.CreateLinkedTokenSource(internalCts.Token);
var parameters = new ConcurrentDictionary<string, object?>();
bool hasCancellationToken = false;
foreach (var param in missionData.Parameters)
{
if (param.Type == typeof(CancellationToken))
{
if (hasCancellationToken)
{
throw new ArgumentException($"Mission '{name}' already has a CancellationToken parameter defined.");
}
hasCancellationToken = true;
parameters.TryAdd(param.Name, cts.Token); // Use the internal CancellationTokenSource for the mission
continue;
}
if (!parameterStrings.TryGetValue(param.Name, out string? valueStr)) throw new ArgumentException($"Parameter '{param.Name}' not found in provided parameters.");
if (CSharpSyntaxHelper.ResolveValueFromString(valueStr, param.Type, out var value) && value != null)
{
parameters.TryAdd(param.Name, value);
}
else
{
throw new ArgumentException($"Invalid value for parameter '{param.Name}': {valueStr}");
}
}
Create(id, name, parameters, cts);
}
public void Create(Guid id, string name, object[] parameters)
{
if (!MissionDatas.TryGetValue(name, out var missionData)) throw new ArgumentException($"Mission data for '{name}' not found.");
if (parameters.Length != missionData.Parameters.Count())
{
var count = missionData.Parameters.Count(p => p.Type == typeof(CancellationToken));
if (count == 1)
{
if (parameters.Length != missionData.Parameters.Count() - count)
{
throw new ArgumentException($"Mission '{name}' expects {missionData.Parameters.Count()} parameters, but received {parameters.Length} without CancellationToken.");
}
}
else if (count != 0)
{
throw new ArgumentException($"Mission '{name}' just have one CancellationToken, but received {parameters.Length}.");
}
}
var inputParameters = new ConcurrentDictionary<string, object?>();
bool hasCancellationToken = false;
var cts = CancellationTokenSource.CreateLinkedTokenSource(internalCts.Token);
int index = 0;
foreach (var param in missionData.Parameters)
{
if (param.Type == typeof(CancellationToken))
{
if (hasCancellationToken)
{
throw new ArgumentException($"Mission '{name}' already has a CancellationToken parameter defined.");
}
hasCancellationToken = true;
inputParameters.TryAdd(param.Name, cts.Token); // Use the internal CancellationTokenSource for the mission
continue;
}
inputParameters.TryAdd(param.Name, parameters[index]);
index++;
}
Create(id, name, inputParameters, cts);
}
public void Create(Guid id, string name, ConcurrentDictionary<string, object?> parameters, CancellationTokenSource cts)
{
if (!Runners.TryGetValue(name, out var runner)) throw new ArgumentException($"Mission '{name}' not found.");
var robotnet = globalsManager.GetRobotNetMission(id);
var mission = new ScriptMission(id, name, runner, new ScriptMissionGlobals(globalsManager.Globals, robotnet, parameters), cts);
Missions.TryAdd(id, mission);
idleMissions.Enqueue(mission);
}
private CancellationTokenSource internalCts = new();
private Thread? thread;
public void Start(CancellationToken cancellationToken = default)
{
Stop(); // Ensure previous thread is stopped before starting a new one
ResetMissionDb();
internalCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
var token = internalCts.Token;
thread = new Thread(() =>
{
State = ProcessorState.Running;
while (!token.IsCancellationRequested || !runningMissions.IsEmpty)
{
var stopwatch = Stopwatch.StartNew();
MiningIdleMissionHandle();
MiningRunningMissionHandle();
stopwatch.Stop();
int elapsed = (int)stopwatch.ElapsedMilliseconds;
int remaining = 1000 - elapsed;
// If execution time exceeds ProcessTime, add another cycle
if (elapsed > 900)
{
remaining += 1000;
}
if (remaining > 0)
{
try
{
Thread.Sleep(remaining);
}
catch (ThreadInterruptedException)
{
break;
}
}
}
State = ProcessorState.Ready;
})
{
IsBackground = true,
Priority = ThreadPriority.Highest,
};
thread.Start();
}
public bool Pause(Guid id)
{
if (Missions.TryGetValue(id, out var mission))
{
return mission.Pause();
}
return false;
}
public bool Resume(Guid id)
{
if (Missions.TryGetValue(id, out var mission))
{
return mission.Resume();
}
return false;
}
public bool Cancel(Guid id, string reason)
{
if (Missions.TryGetValue(id, out var mission))
{
return mission.Cancel(reason);
}
return false; // Mission not found or not running
}
public void Stop()
{
if (!idleMissions.IsEmpty || !runningMissions.IsEmpty)
{
var listWaitHandles = new List<WaitHandle>();
while (idleMissions.TryDequeue(out var mission))
{
mission.Cancel("Cancel by script mission manager is stoped");
listWaitHandles.Add(mission.WaitHandle);
}
while (runningMissions.TryDequeue(out var mission))
{
mission.Cancel("Cancel by script mission manager is stoped");
listWaitHandles.Add(mission.WaitHandle);
}
WaitHandle.WaitAll([.. listWaitHandles]);
}
if (!internalCts.IsCancellationRequested)
{
internalCts.Cancel();
}
if (thread != null && thread.IsAlive)
{
thread.Interrupt();
thread.Join();
}
internalCts.Dispose();
thread = null;
}
private void RemoveMission(ScriptMission mission)
{
mission.Dispose();
Missions.TryRemove(mission.Id, out _);
}
public void ResetMissionDb()
{
using var scope = scopeFactory.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<ScriptManagerDbContext>();
var missions = dbContext.InstanceMissions.Where(m => m.Status == MissionStatus.Running
|| m.Status == MissionStatus.Pausing
|| m.Status == MissionStatus.Paused
|| m.Status == MissionStatus.Canceling
|| m.Status == MissionStatus.Resuming).ToList();
foreach (var mission in missions)
{
mission.Log += $"{Environment.NewLine}{DateTime.UtcNow}: Mission Manager start, but instance mission has state {mission.Status}";
mission.Status = MissionStatus.Error;
}
dbContext.SaveChanges();
}
private void MiningIdleMissionHandle()
{
using var scope = scopeFactory.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<ScriptManagerDbContext>();
int count = idleMissions.Count;
bool hasChanges = false;
for (int i = 0; i < count; i++)
{
if (!idleMissions.TryDequeue(out var mission)) break;
var dbMission = dbContext.InstanceMissions.Find(mission.Id);
if (dbMission == null)
{
RemoveMission(mission);
continue; // Skip if mission not found in database
}
if (mission.Status == MissionStatus.Idle)
{
mission.Start();
runningMissions.Enqueue(mission);
dbMission.Status = mission.Status;
}
else
{
RemoveMission(mission);
if (mission.Status == MissionStatus.Canceled)
{
dbMission.Status = MissionStatus.Canceled;
dbMission.Log += $"{Environment.NewLine}{mission.GetLog()}";
}
else
{
dbMission.Status = MissionStatus.Error;
dbMission.Log += $"{Environment.NewLine}{mission.GetLog()}{Environment.NewLine}{DateTime.UtcNow}: Mission is not in idle state. [{mission.Status}]";
}
hasChanges = true;
}
}
if (hasChanges)
{
dbContext.SaveChanges();
}
}
private void MiningRunningMissionHandle()
{
using var scope = scopeFactory.CreateScope();
var dbContext = scope.ServiceProvider.GetRequiredService<ScriptManagerDbContext>();
bool hasChanges = false;
int count = runningMissions.Count;
for (int i = 0; i < count; i++)
{
if (!runningMissions.TryDequeue(out var mission)) break;
var dbMission = dbContext.InstanceMissions.Find(mission.Id);
if (dbMission == null)
{
RemoveMission(mission);
continue; // Skip if mission not found in database
}
switch (mission.Status)
{
case MissionStatus.Running:
case MissionStatus.Paused:
case MissionStatus.Canceling:
case MissionStatus.Resuming:
case MissionStatus.Pausing:
if (dbMission.Status != mission.Status)
{
dbMission.Status = mission.Status;
hasChanges = true;
}
runningMissions.Enqueue(mission);
break;
case MissionStatus.Completed:
case MissionStatus.Canceled:
case MissionStatus.Error:
dbMission.Status = mission.Status;
dbMission.Log += $"{Environment.NewLine}{mission.GetLog()}";
dbMission.StopedAt = DateTime.UtcNow;
hasChanges = true;
RemoveMission(mission);
break; // Handle these statuses in their respective methods
default:
dbMission.Status = MissionStatus.Error;
dbMission.Log += $"{Environment.NewLine} Wrong mission status on running: {mission.Status}";
dbMission.Log += $"{Environment.NewLine}{mission.GetLog()}";
hasChanges = true;
RemoveMission(mission);
continue; // Skip unknown statuses
}
}
if (hasChanges)
{
dbContext.SaveChanges();
}
}
}