RobotNet/RobotNet.ScriptManager/Connections/ModbusTcpClient.cs
2025-10-15 15:15:53 +07:00

489 lines
20 KiB
C#

using RobotNet.Script;
using System.Buffers.Binary;
using System.Net.Sockets;
namespace RobotNet.ScriptManager.Connections;
public sealed class ModbusTcpClient(string host, int port = 502, byte unitId = 1) : IModbusTcpClient
{
public string Host { get; } = host;
public int Port { get; } = port;
public byte UnitId { get; } = unitId;
public TimeSpan RequestTimeout { get; set; } = TimeSpan.FromSeconds(2);
public TimeSpan ConnectTimeout { get; set; } = TimeSpan.FromSeconds(3);
public int MaxRetries { get; set; } = 2; // per request
public bool AutoReconnect { get; set; } = true;
public bool KeepAlive { get; set; } = true; // enable TCP keepalive
public TimeSpan HeartbeatInterval { get; set; } = TimeSpan.FromSeconds(10);
public ushort HeartbeatRegisterAddress { get; set; } = 0; // dummy read address for heartbeat
private TcpClient? _client;
private NetworkStream? _stream;
private readonly SemaphoreSlim _sendLock = new(1, 1);
private int _transactionId = 0;
private CancellationTokenSource? _cts;
private Task? _heartbeatTask;
#region Connect/Dispose
public async Task ConnectAsync(CancellationToken ct = default)
{
await CloseAsync().ConfigureAwait(false);
_cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
var cts = _cts;
_client = new TcpClient
{
NoDelay = true,
LingerState = new LingerOption(true, 0)
};
using var connectCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
connectCts.CancelAfter(ConnectTimeout);
var connectTask = _client.ConnectAsync(Host, Port);
using (connectCts.Token.Register(() => SafeCancel(connectTask)))
{
await connectTask.ConfigureAwait(false);
}
if (KeepAlive)
{
try
{
SetKeepAlive(_client.Client, true, keepAliveTimeMs: 15_000, keepAliveIntervalMs: 5_000);
}
catch { /* best-effort */ }
}
_stream = _client.GetStream();
_stream.ReadTimeout = (int)RequestTimeout.TotalMilliseconds;
_stream.WriteTimeout = (int)RequestTimeout.TotalMilliseconds;
if (HeartbeatInterval > TimeSpan.Zero)
{
_heartbeatTask = Task.Run(() => HeartbeatLoopAsync(cts!.Token), CancellationToken.None);
}
}
public bool IsConnected => _client?.Connected == true && _stream != null;
public async ValueTask DisposeAsync()
{
await CloseAsync().ConfigureAwait(false);
_sendLock.Dispose();
GC.SuppressFinalize(this);
}
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
private async Task CloseAsync()
{
try { _cts?.Cancel(); } catch { }
try { if (_heartbeatTask != null) await Task.WhenAny(_heartbeatTask, Task.Delay(50)); } catch { }
try { _stream?.Dispose(); } catch { }
try { _client?.Close(); } catch { }
_stream = null;
_client = null;
_cts?.Dispose();
_cts = null;
_heartbeatTask = null;
}
#endregion
#region Public Modbus API
// Coils & Discretes
public Task<bool[]> ReadCoilsAsync(ushort startAddress, ushort count, CancellationToken ct = default)
=> ReadBitsAsync(0x01, startAddress, count, ct);
public Task<bool[]> ReadDiscreteInputsAsync(ushort startAddress, ushort count, CancellationToken ct = default)
=> ReadBitsAsync(0x02, startAddress, count, ct);
// Registers
public Task<ushort[]> ReadHoldingRegistersAsync(ushort startAddress, ushort count, CancellationToken ct = default)
=> ReadRegistersAsync(0x03, startAddress, count, ct);
public Task<ushort[]> ReadInputRegistersAsync(ushort startAddress, ushort count, CancellationToken ct = default)
=> ReadRegistersAsync(0x04, startAddress, count, ct);
public Task WriteSingleCoilAsync(ushort address, bool value, CancellationToken ct = default)
=> WriteSingleAsync(0x05, address, value ? (ushort)0xFF00 : (ushort)0x0000, ct);
public Task WriteSingleRegisterAsync(ushort address, ushort value, CancellationToken ct = default)
=> WriteSingleAsync(0x06, address, value, ct);
public Task WriteMultipleCoilsAsync(ushort startAddress, bool[] values, CancellationToken ct = default)
=> WriteMultipleCoilsCoreAsync(startAddress, values, ct);
public Task WriteMultipleRegistersAsync(ushort startAddress, ushort[] values, CancellationToken ct = default)
=> WriteMultipleRegistersCoreAsync(startAddress, values, ct);
// FC 22: Mask Write Register
public Task MaskWriteRegisterAsync(ushort address, ushort andMask, ushort orMask, CancellationToken ct = default)
=> SendExpectEchoAsync(0x16, writer: span =>
{
BinaryPrimitives.WriteUInt16BigEndian(span[..2], address);
BinaryPrimitives.WriteUInt16BigEndian(span.Slice(2, 2), andMask);
BinaryPrimitives.WriteUInt16BigEndian(span.Slice(4, 2), orMask);
return 6;
}, expectedEchoLength: 6, ct);
// FC 23: Read/Write Multiple Registers
public async Task<ushort[]> ReadWriteMultipleRegistersAsync(
ushort readStart, ushort readCount,
ushort writeStart, IReadOnlyList<ushort> writeValues,
CancellationToken ct = default)
{
return await SendAsync(0x17, readCount * 2, span =>
{
BinaryPrimitives.WriteUInt16BigEndian(span[..2], readStart);
BinaryPrimitives.WriteUInt16BigEndian(span.Slice(2, 2), readCount);
BinaryPrimitives.WriteUInt16BigEndian(span.Slice(4, 2), writeStart);
BinaryPrimitives.WriteUInt16BigEndian(span.Slice(6, 2), (ushort)writeValues.Count);
span[8] = (byte)(writeValues.Count * 2);
int pos = 9;
for (int i = 0; i < writeValues.Count; i++)
{
BinaryPrimitives.WriteUInt16BigEndian(span.Slice(pos, 2), writeValues[i]);
pos += 2;
}
return pos; // payload length
}, parse: resp =>
{
int byteCount = resp[0];
if (resp.Length != byteCount + 1) throw new ModbusException("Invalid byte count in response");
var result = new ushort[byteCount / 2];
for (int i = 0; i < result.Length; i++)
result[i] = BinaryPrimitives.ReadUInt16BigEndian(resp.Slice(1 + i * 2, 2));
return result;
}, ct).ConfigureAwait(false);
}
// FC 43/14: Read Device Identification (basic)
public async Task<Dictionary<byte, string>> ReadDeviceIdentificationAsync(byte category = 0x01 /* Basic */ , CancellationToken ct = default)
{
return await SendAsync(0x2B, 0, span =>
{
span[0] = 0x0E; // MEI type
span[1] = 0x01; // Read Device ID
span[2] = category; // category
span[3] = 0x00; // object id
return 4;
}, parse: resp =>
{
// resp: [MEI, ReadDevId, conformity, moreFollows, nextObjectId, numObjects, objects...]
if (resp.Length < 6) throw new ModbusException("Invalid Device ID response");
int pos = 0;
byte mei = resp[pos++];
if (mei != 0x0E) throw new ModbusException("Invalid MEI type");
pos++; // ReadDevId
pos++; // conformity
pos++; // moreFollows
pos++; // nextObjectId
byte num = resp[pos++];
var dict = new Dictionary<byte, string>();
for (int i = 0; i < num; i++)
{
byte id = resp[pos++];
byte len = resp[pos++];
if (pos + len > resp.Length) throw new ModbusException("Invalid object length");
string val = System.Text.Encoding.ASCII.GetString(resp.ToArray(), pos, len);
pos += len;
dict[id] = val;
}
return dict;
}, ct).ConfigureAwait(false);
}
#endregion
#region Core Send Helpers
private async Task<bool[]> ReadBitsAsync(byte function, ushort startAddress, ushort count, CancellationToken ct)
{
if (count is 0 or > 2000) throw new ArgumentOutOfRangeException(nameof(count));
var data = await SendAsync(function, expectedLength: (count + 7) / 8 + 1, writer: span =>
{
BinaryPrimitives.WriteUInt16BigEndian(span[..2], startAddress);
BinaryPrimitives.WriteUInt16BigEndian(span.Slice(2, 2), count);
return 4;
}, parse: resp => resp.ToArray(), ct).ConfigureAwait(false);
int byteCount = data[0];
if (byteCount != data.Length - 1) throw new ModbusException("Unexpected byte count");
var result = new bool[count];
for (int i = 0; i < count; i++)
{
int b = data[1 + (i / 8)];
result[i] = ((b >> (i % 8)) & 0x01) == 1;
}
return result;
}
private async Task<ushort[]> ReadRegistersAsync(byte function, ushort startAddress, ushort count, CancellationToken ct)
{
if (count is 0 or > 125) throw new ArgumentOutOfRangeException(nameof(count));
var data = await SendAsync(function, expectedLength: count * 2 + 1, writer: span =>
{
BinaryPrimitives.WriteUInt16BigEndian(span[..2], startAddress);
BinaryPrimitives.WriteUInt16BigEndian(span.Slice(2, 2), count);
return 4;
}, parse: resp => resp.ToArray(), ct).ConfigureAwait(false);
int byteCount = data[0];
if (byteCount != count * 2 || data.Length != byteCount + 1)
throw new ModbusException("Unexpected byte count");
var result = new ushort[count];
for (int i = 0; i < count; i++)
result[i] = BinaryPrimitives.ReadUInt16BigEndian(data.AsSpan(1 + i * 2, 2));
return result;
}
private Task WriteSingleAsync(byte function, ushort address, ushort value, CancellationToken ct)
=> SendExpectEchoAsync(function, span =>
{
BinaryPrimitives.WriteUInt16BigEndian(span[..2], address);
BinaryPrimitives.WriteUInt16BigEndian(span.Slice(2, 2), value);
return 4;
}, expectedEchoLength: 4, ct);
private Task WriteMultipleCoilsCoreAsync(ushort startAddress, bool[] values, CancellationToken ct)
{
if (values == null || values.Length == 0 || values.Length > 1968)
throw new ArgumentOutOfRangeException(nameof(values));
int byteCount = (values.Length + 7) / 8;
return SendExpectEchoAsync(0x0F, span =>
{
BinaryPrimitives.WriteUInt16BigEndian(span[..2], startAddress);
BinaryPrimitives.WriteUInt16BigEndian(span.Slice(2, 2), (ushort)values.Length);
span[4] = (byte)byteCount;
int pos = 5;
int bit = 0;
for (int i = 0; i < byteCount; i++)
{
byte b = 0;
for (int j = 0; j < 8 && bit < values.Length; j++, bit++)
if (values[bit]) b |= (byte)(1 << j);
span[pos++] = b;
}
return pos;
}, expectedEchoLength: 4, ct);
}
private Task WriteMultipleRegistersCoreAsync(ushort startAddress, ushort[] values, CancellationToken ct)
{
if (values == null || values.Length == 0 || values.Length > 123)
throw new ArgumentOutOfRangeException(nameof(values));
return SendExpectEchoAsync(0x10, span =>
{
BinaryPrimitives.WriteUInt16BigEndian(span[..2], startAddress);
BinaryPrimitives.WriteUInt16BigEndian(span.Slice(2, 2), (ushort)values.Length);
span[4] = (byte)(values.Length * 2);
int pos = 5;
for (int i = 0; i < values.Length; i++)
{
BinaryPrimitives.WriteUInt16BigEndian(span.Slice(pos, 2), values[i]);
pos += 2;
}
return pos;
}, expectedEchoLength: 4, ct);
}
private async Task SendExpectEchoAsync(byte function, Func<Span<byte>, int> writer, int expectedEchoLength, CancellationToken ct)
{
_ = await SendAsync(function, expectedEchoLength + 0, writer: writer, parse: resp =>
{
if (resp.Length != expectedEchoLength) throw new ModbusException("Unexpected echo length");
return 0;
}, ct).ConfigureAwait(false);
}
private async Task<T> SendAsync<T>(byte function, int expectedLength, Func<Span<byte>, int> writer, Func<ReadOnlySpan<byte>, T> parse, CancellationToken ct)
{
int attempts = 0;
while (true)
{
attempts++;
try
{
await EnsureConnectedAsync(ct).ConfigureAwait(false);
await _sendLock.WaitAsync(ct).ConfigureAwait(false);
try
{
var reqPayload = new byte[260]; // generous for payload
int payloadLen = writer(reqPayload);
var pdu = new byte[payloadLen + 1];
pdu[0] = function;
Buffer.BlockCopy(reqPayload, 0, pdu, 1, payloadLen);
var adu = BuildMbap(UnitId, pdu);
await _stream!.WriteAsync(adu, ct).ConfigureAwait(false);
// Read MBAP (7 bytes), then body
byte[] mbap = await ReadExactAsync(7, ct).ConfigureAwait(false);
ushort transId = BinaryPrimitives.ReadUInt16BigEndian(mbap.AsSpan(0, 2));
ushort proto = BinaryPrimitives.ReadUInt16BigEndian(mbap.AsSpan(2, 2));
ushort len = BinaryPrimitives.ReadUInt16BigEndian(mbap.AsSpan(4, 2));
byte unit = mbap[6];
if (proto != 0 || unit != UnitId) throw new ModbusException("Invalid MBAP header");
if (len < 2) throw new ModbusException("Invalid length");
byte[] body = await ReadExactAsync(len - 1, ct).ConfigureAwait(false); // len includes unitId
byte fc = body[0];
if ((fc & 0x80) != 0)
{
byte ex = body[1];
throw new ModbusException($"Exception (FC={(function):X2}): {ex}", (ModbusExceptionCode)ex);
}
if (fc != function) throw new ModbusException("Mismatched function in response");
var pduData = body.AsSpan(1);
// If caller supplied an expectedLength, do a soft sanity check when applicable
if (expectedLength > 0 && pduData.Length < expectedLength)
{
// some functions have variable lengths; we avoid hard-failing here
}
return parse(pduData);
}
finally
{
_sendLock.Release();
}
}
catch (Exception ex) when (attempts <= MaxRetries && IsTransient(ex))
{
if (AutoReconnect)
{
await ReconnectAsync(ct).ConfigureAwait(false);
continue; // retry
}
throw;
}
}
}
private async Task EnsureConnectedAsync(CancellationToken ct)
{
if (IsConnected) return;
await ConnectAsync(ct).ConfigureAwait(false);
}
private async Task ReconnectAsync(CancellationToken ct)
{
try { await CloseAsync(); } catch { }
await Task.Delay(100, ct).ConfigureAwait(false);
await ConnectAsync(ct).ConfigureAwait(false);
}
private byte[] BuildMbap(byte unitId, byte[] pdu)
{
// MBAP: Transaction(2) Protocol(2=0) Length(2) UnitId(1)
// Length = PDU length + 1 (UnitId)
ushort trans = unchecked((ushort)Interlocked.Increment(ref _transactionId));
var adu = new byte[7 + pdu.Length];
BinaryPrimitives.WriteUInt16BigEndian(adu.AsSpan(0, 2), trans);
BinaryPrimitives.WriteUInt16BigEndian(adu.AsSpan(2, 2), 0);
BinaryPrimitives.WriteUInt16BigEndian(adu.AsSpan(4, 2), (ushort)(pdu.Length + 1));
adu[6] = unitId;
Buffer.BlockCopy(pdu, 0, adu, 7, pdu.Length);
return adu;
}
private async Task<byte[]> ReadExactAsync(int length, CancellationToken ct)
{
byte[] buf = new byte[length];
int read = 0;
while (read < length)
{
int n = await _stream!.ReadAsync(buf.AsMemory(read, length - read), ct).ConfigureAwait(false);
if (n <= 0) throw new IOException("Remote closed the connection");
read += n;
}
return buf;
}
private static bool IsTransient(Exception ex)
=> ex is SocketException or IOException or TimeoutException;
private async Task HeartbeatLoopAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try
{
await Task.Delay(HeartbeatInterval, ct).ConfigureAwait(false);
if (ct.IsCancellationRequested) break;
if (IsConnected)
{
// best-effort heartbeat: read one register
using var timeoutCts = new CancellationTokenSource(RequestTimeout);
using var linked = CancellationTokenSource.CreateLinkedTokenSource(ct, timeoutCts.Token);
await ReadHoldingRegistersAsync(HeartbeatRegisterAddress, 1, linked.Token).ConfigureAwait(false);
}
}
catch (OperationCanceledException) when (ct.IsCancellationRequested)
{
break;
}
catch
{
if (AutoReconnect)
{
try { await ReconnectAsync(ct).ConfigureAwait(false); } catch { }
}
}
}
}
private static void SafeCancel(Task _)
{
try { } catch { }
}
private static void SetKeepAlive(Socket socket, bool on, int keepAliveTimeMs, int keepAliveIntervalMs)
{
// Windows & Linux support via IOControl (best-effort)
// On Linux, consider sysctl or TCP_KEEP* sockopts when available.
if (!on) return;
byte[] inOptionValues = new byte[12];
BinaryPrimitives.WriteUInt32LittleEndian(inOptionValues.AsSpan(0), 1);
BinaryPrimitives.WriteUInt32LittleEndian(inOptionValues.AsSpan(4), (uint)keepAliveTimeMs);
BinaryPrimitives.WriteUInt32LittleEndian(inOptionValues.AsSpan(8), (uint)keepAliveIntervalMs);
const int SIO_KEEPALIVE_VALS = -1744830460;
try { socket.IOControl(SIO_KEEPALIVE_VALS, inOptionValues, null); } catch { }
}
#endregion
}
public enum ModbusExceptionCode : byte
{
IllegalFunction = 0x01,
IllegalDataAddress = 0x02,
IllegalDataValue = 0x03,
SlaveDeviceFailure = 0x04,
Acknowledge = 0x05,
SlaveDeviceBusy = 0x06,
MemoryParityError = 0x08,
GatewayPathUnavailable = 0x0A,
GatewayTargetFailedToRespond = 0x0B
}
public sealed class ModbusException(string message, ModbusExceptionCode? code = null) : Exception(message)
{
public ModbusExceptionCode? Code { get; } = code;
public static string DescribeException(byte code) => code switch
{
0x01 => "Illegal Function",
0x02 => "Illegal Data Address",
0x03 => "Illegal Data Value",
0x04 => "Slave Device Failure",
0x05 => "Acknowledge",
0x06 => "Slave Device Busy",
0x08 => "Memory Parity Error",
0x0A => "Gateway Path Unavailable",
0x0B => "Gateway Target Failed To Respond",
_ => $"Unknown Exception 0x{code:X2}"
};
}