From f053c9ad0dea32d6c0a73af91f73a98a0eec58ed Mon Sep 17 00:00:00 2001 From: SepComet <2428390463@qq.com> Date: Thu, 26 Mar 2026 12:01:17 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BC=95=E5=85=A5=20openspec=EF=BC=8C=E5=AE=8C?= =?UTF-8?q?=E6=88=90=E4=BA=8C=E9=98=B6=E6=AE=B5=20Kcp=20=E8=BF=81=E7=A7=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Assets/Scripts/Network/Network.Runtime.asmdef | 2 +- .../Network/NetworkTransport/KcpTransport.cs | 532 ++++++++++++++++++ .../NetworkTransport/KcpTransport.cs.meta | 11 + Assets/Scripts/NetworkManager.cs | 2 +- .../EditMode/Network/KcpTransportTests.cs | 265 +++++++++ .../Network/KcpTransportTests.cs.meta | 11 + .../.openspec.yaml | 2 + .../design.md | 84 +++ .../proposal.md | 26 + .../specs/kcp-transport/spec.md | 45 ++ .../tasks.md | 16 + openspec/specs/kcp-transport/spec.md | 49 ++ 12 files changed, 1043 insertions(+), 2 deletions(-) create mode 100644 Assets/Scripts/Network/NetworkTransport/KcpTransport.cs create mode 100644 Assets/Scripts/Network/NetworkTransport/KcpTransport.cs.meta create mode 100644 Assets/Tests/EditMode/Network/KcpTransportTests.cs create mode 100644 Assets/Tests/EditMode/Network/KcpTransportTests.cs.meta create mode 100644 openspec/changes/archive/2026-03-26-introduce-kcp-transport/.openspec.yaml create mode 100644 openspec/changes/archive/2026-03-26-introduce-kcp-transport/design.md create mode 100644 openspec/changes/archive/2026-03-26-introduce-kcp-transport/proposal.md create mode 100644 openspec/changes/archive/2026-03-26-introduce-kcp-transport/specs/kcp-transport/spec.md create mode 100644 openspec/changes/archive/2026-03-26-introduce-kcp-transport/tasks.md create mode 100644 openspec/specs/kcp-transport/spec.md diff --git a/Assets/Scripts/Network/Network.Runtime.asmdef b/Assets/Scripts/Network/Network.Runtime.asmdef index f245663..2405807 100644 --- a/Assets/Scripts/Network/Network.Runtime.asmdef +++ b/Assets/Scripts/Network/Network.Runtime.asmdef @@ -4,7 +4,7 @@ "references": [], "includePlatforms": [], "excludePlatforms": [], - "allowUnsafeCode": false, + "allowUnsafeCode": true, "overrideReferences": false, "precompiledReferences": [], "autoReferenced": true, diff --git a/Assets/Scripts/Network/NetworkTransport/KcpTransport.cs b/Assets/Scripts/Network/NetworkTransport/KcpTransport.cs new file mode 100644 index 0000000..362a137 --- /dev/null +++ b/Assets/Scripts/Network/NetworkTransport/KcpTransport.cs @@ -0,0 +1,532 @@ +using System; +using System.Collections.Concurrent; +using System.Net; +using System.Net.Sockets; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using kcp; + +namespace Network.NetworkTransport +{ + public class KcpTransport : ITransport + { + private const uint DefaultConv = 1; + private const int DefaultNoDelay = 1; + private const int DefaultInterval = 10; + private const int DefaultResend = 2; + private const int DefaultNc = 1; + private const int DefaultSendWindow = 128; + private const int DefaultReceiveWindow = 128; + private const int DefaultMtu = 1200; + private static readonly TimeSpan UpdateLoopDelay = TimeSpan.FromMilliseconds(DefaultInterval); + + private readonly UdpClient _client; + private readonly bool _isServer; + private readonly IPEndPoint _defaultRemoteEndPoint; + private readonly uint _defaultConv; + private readonly ConcurrentDictionary _sessions = new(); + private readonly object _socketSendLock = new(); + + private CancellationTokenSource _cancellationTokenSource; + private Task _receiveTask = Task.CompletedTask; + private Task _updateTask = Task.CompletedTask; + private volatile bool _isRunning; + + public event Action OnReceive; + + internal int ActiveSessionCount => _sessions.Count; + + public KcpTransport(int listenPort, uint conv = DefaultConv) + { + _client = new UdpClient(listenPort); + _isServer = true; + _defaultConv = conv; + + Console.WriteLine($"[KcpTransport] 服务端模式,监听端口: {listenPort}"); + } + + public KcpTransport(string serverIp, int serverPort, uint conv = DefaultConv) + { + if (string.IsNullOrWhiteSpace(serverIp)) + { + throw new ArgumentException("Server IP is required.", nameof(serverIp)); + } + + _client = new UdpClient(0); + _defaultRemoteEndPoint = new IPEndPoint(IPAddress.Parse(serverIp), serverPort); + _defaultConv = conv; + + Console.WriteLine($"[KcpTransport] 客户端模式,目标: {_defaultRemoteEndPoint}, conv={conv}"); + } + + public Task StartAsync() + { + if (_isRunning) + { + return Task.CompletedTask; + } + + _sessions.Clear(); + _cancellationTokenSource = new CancellationTokenSource(); + _isRunning = true; + + if (!_isServer) + { + GetOrCreateSession(_defaultRemoteEndPoint, _defaultConv); + } + + _receiveTask = ReceiveLoopAsync(_cancellationTokenSource.Token); + _updateTask = UpdateLoopAsync(_cancellationTokenSource.Token); + + Console.WriteLine("[KcpTransport] 传输层启动"); + return Task.CompletedTask; + } + + public void Stop() + { + if (!_isRunning) + { + return; + } + + _isRunning = false; + _cancellationTokenSource?.Cancel(); + _client.Close(); + WaitForBackgroundTasks(); + DisposeAllSessions(); + _cancellationTokenSource?.Dispose(); + _cancellationTokenSource = null; + + Console.WriteLine("[KcpTransport] 传输层停止"); + } + + public void Send(byte[] data) + { + if (_defaultRemoteEndPoint == null) + { + throw new InvalidOperationException("Default remote endpoint is not configured."); + } + + SendTo(data, _defaultRemoteEndPoint); + } + + public void SendTo(byte[] data, IPEndPoint target) + { + if (data == null) + { + throw new ArgumentNullException(nameof(data)); + } + + if (target == null) + { + throw new ArgumentNullException(nameof(target)); + } + + EnsureRunning(); + + if (!_isServer && !target.Equals(_defaultRemoteEndPoint)) + { + throw new InvalidOperationException("Client mode only supports the configured default remote endpoint."); + } + + var session = GetOrCreateSession(target, _defaultConv); + session.Send(data); + } + + public void SendToAll(byte[] data) + { + if (data == null) + { + throw new ArgumentNullException(nameof(data)); + } + + EnsureRunning(); + + if (!_isServer) + { + throw new InvalidOperationException("SendToAll is only supported in server mode."); + } + + foreach (var session in _sessions.Values) + { + session.Send(data); + } + } + + private async Task ReceiveLoopAsync(CancellationToken cancellationToken) + { + while (_isRunning && !cancellationToken.IsCancellationRequested) + { + try + { + var result = await _client.ReceiveAsync(); + var session = GetOrCreateSession(result.RemoteEndPoint, ResolveConv(result.Buffer)); + session.Input(result.Buffer); + DrainReceivedMessages(session); + } + catch (ObjectDisposedException) when (!_isRunning || cancellationToken.IsCancellationRequested) + { + return; + } + catch (SocketException) when (!_isRunning || cancellationToken.IsCancellationRequested) + { + return; + } + catch (Exception exception) + { + Console.WriteLine($"[KcpTransport] 接收错误:{exception.Message}"); + } + } + } + + private async Task UpdateLoopAsync(CancellationToken cancellationToken) + { + while (_isRunning && !cancellationToken.IsCancellationRequested) + { + var current = GetCurrentTimeMilliseconds(); + + foreach (var session in _sessions.Values) + { + session.UpdateIfDue(current); + DrainReceivedMessages(session); + } + + try + { + await Task.Delay(UpdateLoopDelay, cancellationToken); + } + catch (OperationCanceledException) + { + return; + } + } + } + + private void DrainReceivedMessages(KcpSession session) + { + while (session.TryReceive(out var payload)) + { + OnReceive?.Invoke(payload, session.RemoteEndPoint); + } + } + + private KcpSession GetOrCreateSession(IPEndPoint remoteEndPoint, uint conv) + { + var normalizedEndPoint = NormalizeEndPoint(remoteEndPoint); + var key = normalizedEndPoint.ToString(); + + return _sessions.GetOrAdd(key, _ => new KcpSession(this, normalizedEndPoint, conv)); + } + + private IPEndPoint NormalizeEndPoint(IPEndPoint remoteEndPoint) + { + if (remoteEndPoint == null) + { + throw new ArgumentNullException(nameof(remoteEndPoint)); + } + + if (_isServer) + { + return new IPEndPoint(remoteEndPoint.Address, remoteEndPoint.Port); + } + + if (_defaultRemoteEndPoint == null) + { + throw new InvalidOperationException("Default remote endpoint is not configured."); + } + + return new IPEndPoint(_defaultRemoteEndPoint.Address, _defaultRemoteEndPoint.Port); + } + + private unsafe uint ResolveConv(byte[] datagram) + { + if (datagram == null || datagram.Length < sizeof(uint)) + { + return _defaultConv; + } + + fixed (byte* buffer = datagram) + { + return KCP.ikcp_getconv(buffer); + } + } + + private static uint GetCurrentTimeMilliseconds() + { + return unchecked((uint)Environment.TickCount); + } + + private void EnsureRunning() + { + if (!_isRunning) + { + throw new InvalidOperationException("Transport has not been started."); + } + } + + private void WaitForBackgroundTasks() + { + try + { + Task.WaitAll(new[] { _receiveTask, _updateTask }, TimeSpan.FromSeconds(1)); + } + catch (AggregateException exception) + { + foreach (var innerException in exception.InnerExceptions) + { + Console.WriteLine($"[KcpTransport] 停止等待错误:{innerException.Message}"); + } + } + } + + private void DisposeAllSessions() + { + foreach (var pair in _sessions) + { + pair.Value.Dispose(); + } + + _sessions.Clear(); + } + + private unsafe int SendDatagram(byte* buffer, int length, IPEndPoint remoteEndPoint) + { + if (!_isRunning) + { + return -1; + } + + var datagram = new byte[length]; + Marshal.Copy((IntPtr)buffer, datagram, 0, length); + + try + { + lock (_socketSendLock) + { + return _client.Send(datagram, datagram.Length, remoteEndPoint); + } + } + catch (ObjectDisposedException) when (!_isRunning) + { + return -1; + } + catch (SocketException exception) + { + Console.WriteLine($"[KcpTransport] 发送错误:{exception.Message}"); + return -1; + } + } + + private unsafe sealed class KcpSession : IDisposable + { + private readonly KcpTransport _owner; + private readonly object _gate = new(); + private readonly GCHandle _handle; + + private IKCPCB* _kcp; + private bool _disposed; + private uint _nextUpdateAt; + + public KcpSession(KcpTransport owner, IPEndPoint remoteEndPoint, uint conv) + { + _owner = owner ?? throw new ArgumentNullException(nameof(owner)); + RemoteEndPoint = remoteEndPoint ?? throw new ArgumentNullException(nameof(remoteEndPoint)); + Conv = conv; + LastActivityUtc = DateTime.UtcNow; + + _handle = GCHandle.Alloc(this); + _kcp = KCP.ikcp_create(conv, (void*)GCHandle.ToIntPtr(_handle)); + KCP.ikcp_setoutput(_kcp, &OutputCallback); + KCP.ikcp_nodelay(_kcp, DefaultNoDelay, DefaultInterval, DefaultResend, DefaultNc); + KCP.ikcp_wndsize(_kcp, DefaultSendWindow, DefaultReceiveWindow); + KCP.ikcp_setmtu(_kcp, DefaultMtu); + + _nextUpdateAt = GetCurrentTimeMilliseconds(); + } + + public uint Conv { get; } + + public IPEndPoint RemoteEndPoint { get; } + + public DateTime LastActivityUtc { get; private set; } + + public void Send(byte[] payload) + { + if (payload == null) + { + throw new ArgumentNullException(nameof(payload)); + } + + lock (_gate) + { + ThrowIfDisposed(); + + if (payload.Length == 0) + { + return; + } + + fixed (byte* buffer = payload) + { + var result = KCP.ikcp_send(_kcp, buffer, payload.Length); + if (result < 0) + { + throw new InvalidOperationException($"KCP send failed with error code {result}."); + } + } + + LastActivityUtc = DateTime.UtcNow; + UpdateNoLock(GetCurrentTimeMilliseconds()); + } + } + + public void Input(byte[] datagram) + { + if (datagram == null) + { + throw new ArgumentNullException(nameof(datagram)); + } + + if (datagram.Length == 0) + { + return; + } + + lock (_gate) + { + ThrowIfDisposed(); + + fixed (byte* buffer = datagram) + { + var result = KCP.ikcp_input(_kcp, buffer, datagram.Length); + if (result < 0) + { + Console.WriteLine($"[KcpTransport] KCP input failed for {RemoteEndPoint}: {result}"); + return; + } + } + + LastActivityUtc = DateTime.UtcNow; + UpdateNoLock(GetCurrentTimeMilliseconds()); + } + } + + public bool TryReceive(out byte[] payload) + { + lock (_gate) + { + if (_disposed) + { + payload = null; + return false; + } + + var size = KCP.ikcp_peeksize(_kcp); + if (size <= 0) + { + payload = null; + return false; + } + + payload = new byte[size]; + + fixed (byte* buffer = payload) + { + var result = KCP.ikcp_recv(_kcp, buffer, payload.Length); + if (result < 0) + { + payload = null; + return false; + } + + if (result != payload.Length) + { + Array.Resize(ref payload, result); + } + } + + LastActivityUtc = DateTime.UtcNow; + return true; + } + } + + public void UpdateIfDue(uint current) + { + lock (_gate) + { + if (_disposed) + { + return; + } + + if (KCP._itimediff(current, _nextUpdateAt) < 0) + { + return; + } + + UpdateNoLock(current); + } + } + + public void Dispose() + { + lock (_gate) + { + if (_disposed) + { + return; + } + + _disposed = true; + + if (_kcp != null) + { + KCP.ikcp_release(_kcp); + _kcp = null; + } + + if (_handle.IsAllocated) + { + _handle.Free(); + } + } + } + + private void UpdateNoLock(uint current) + { + KCP.ikcp_update(_kcp, current); + _nextUpdateAt = KCP.ikcp_check(_kcp, current); + } + + private void ThrowIfDisposed() + { + if (_disposed || _kcp == null) + { + throw new ObjectDisposedException(nameof(KcpSession)); + } + } + + private int SendRaw(byte* buffer, int length) + { + return _owner.SendDatagram(buffer, length, RemoteEndPoint); + } + + private static int OutputCallback(byte* buffer, int length, IKCPCB* kcp, void* user) + { + if (user == null) + { + return -1; + } + + var handle = GCHandle.FromIntPtr((IntPtr)user); + if (handle.Target is not KcpSession session) + { + return -1; + } + + return session.SendRaw(buffer, length); + } + } + } +} diff --git a/Assets/Scripts/Network/NetworkTransport/KcpTransport.cs.meta b/Assets/Scripts/Network/NetworkTransport/KcpTransport.cs.meta new file mode 100644 index 0000000..3c123c7 --- /dev/null +++ b/Assets/Scripts/Network/NetworkTransport/KcpTransport.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: c6f3f71ba3954cc08d5fe8f9442f4f93 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/Assets/Scripts/NetworkManager.cs b/Assets/Scripts/NetworkManager.cs index c084d81..b50ec97 100644 --- a/Assets/Scripts/NetworkManager.cs +++ b/Assets/Scripts/NetworkManager.cs @@ -23,7 +23,7 @@ public class NetworkManager : MonoBehaviour private IEnumerator InitNetwork() { - _transport = new ReliableUdpTransport("127.0.0.1", 8080); + _transport = new KcpTransport("127.0.0.1", 8080); var startTask = _transport.StartAsync(); yield return new WaitUntil(() => startTask.IsCompleted); diff --git a/Assets/Tests/EditMode/Network/KcpTransportTests.cs b/Assets/Tests/EditMode/Network/KcpTransportTests.cs new file mode 100644 index 0000000..f9a3b54 --- /dev/null +++ b/Assets/Tests/EditMode/Network/KcpTransportTests.cs @@ -0,0 +1,265 @@ +using System; +using System.Collections; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Reflection; +using System.Runtime.ExceptionServices; +using System.Threading.Tasks; +using Network.NetworkTransport; +using NUnit.Framework; +using UnityEngine.TestTools; + +namespace Tests.EditMode.Network +{ + public class KcpTransportTests + { + private const int DefaultTimeoutMs = 5000; + + [UnityTest] + public IEnumerator ClientMode_SendRoutesThroughDefaultSession_AndDeliversCompletePayload() + { + return RunAsync(ClientMode_SendRoutesThroughDefaultSession_AndDeliversCompletePayloadAsync); + } + + private static async Task ClientMode_SendRoutesThroughDefaultSession_AndDeliversCompletePayloadAsync() + { + var listenPort = GetAvailableUdpPort(); + var server = new KcpTransport(listenPort); + var client = new KcpTransport(IPAddress.Loopback.ToString(), listenPort); + var payload = CreatePayload(4096, seed: 11); + var receiveCount = 0; + var receivedTask = new TaskCompletionSource<(byte[] Payload, IPEndPoint Sender)>( + TaskCreationOptions.RunContinuationsAsynchronously); + + server.OnReceive += (data, sender) => + { + receiveCount++; + receivedTask.TrySetResult((data, sender)); + }; + + try + { + await server.StartAsync(); + await client.StartAsync(); + + Assert.That(GetActiveSessionCount(client), Is.EqualTo(1)); + + client.Send(payload); + + var result = await WaitFor(receivedTask.Task, "Timed out waiting for server payload."); + await Task.Delay(200); + + Assert.That(result.Payload, Is.EqualTo(payload)); + Assert.That(result.Sender.Address, Is.EqualTo(IPAddress.Loopback)); + Assert.That(receiveCount, Is.EqualTo(1)); + } + finally + { + client.Stop(); + server.Stop(); + } + } + + [UnityTest] + public IEnumerator ServerMode_CreatesIndependentSessions_PerRemoteEndpoint() + { + return RunAsync(ServerMode_CreatesIndependentSessions_PerRemoteEndpointAsync); + } + + private static async Task ServerMode_CreatesIndependentSessions_PerRemoteEndpointAsync() + { + var listenPort = GetAvailableUdpPort(); + var server = new KcpTransport(listenPort); + var clientA = new KcpTransport(IPAddress.Loopback.ToString(), listenPort); + var clientB = new KcpTransport(IPAddress.Loopback.ToString(), listenPort); + var received = new ConcurrentQueue<(byte[] Payload, IPEndPoint Sender)>(); + var allMessagesTask = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + server.OnReceive += (data, sender) => + { + received.Enqueue((data, sender)); + if (received.Count >= 2) + { + allMessagesTask.TrySetResult(true); + } + }; + + try + { + await server.StartAsync(); + await clientA.StartAsync(); + await clientB.StartAsync(); + + clientA.Send(CreatePayload(128, seed: 21)); + clientB.Send(CreatePayload(256, seed: 37)); + + await WaitFor(allMessagesTask.Task, "Timed out waiting for messages from both clients."); + + Assert.That(GetActiveSessionCount(server), Is.EqualTo(2)); + + var senders = received.Select(item => item.Sender.ToString()).Distinct().ToArray(); + Assert.That(senders, Has.Length.EqualTo(2)); + + var payloadLengths = received.Select(item => item.Payload.Length).OrderBy(length => length).ToArray(); + Assert.That(payloadLengths, Is.EqualTo(new[] { 128, 256 })); + } + finally + { + clientA.Stop(); + clientB.Stop(); + server.Stop(); + } + } + + [UnityTest] + public IEnumerator SendToAll_BroadcastsToEveryActiveSession() + { + return RunAsync(SendToAll_BroadcastsToEveryActiveSessionAsync); + } + + private static async Task SendToAll_BroadcastsToEveryActiveSessionAsync() + { + var listenPort = GetAvailableUdpPort(); + var server = new KcpTransport(listenPort); + var clientA = new KcpTransport(IPAddress.Loopback.ToString(), listenPort); + var clientB = new KcpTransport(IPAddress.Loopback.ToString(), listenPort); + var broadcastPayload = CreatePayload(2048, seed: 53); + var clientAReceivedTask = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + var clientBReceivedTask = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + + clientA.OnReceive += (data, sender) => clientAReceivedTask.TrySetResult(data); + clientB.OnReceive += (data, sender) => clientBReceivedTask.TrySetResult(data); + + try + { + await server.StartAsync(); + await clientA.StartAsync(); + await clientB.StartAsync(); + + clientA.Send(CreatePayload(64, seed: 61)); + clientB.Send(CreatePayload(64, seed: 79)); + + await WaitUntilAsync(() => GetActiveSessionCount(server) == 2, "Timed out waiting for server sessions."); + + server.SendToAll(broadcastPayload); + + var clientAResult = await WaitFor(clientAReceivedTask.Task, "Timed out waiting for broadcast on client A."); + var clientBResult = await WaitFor(clientBReceivedTask.Task, "Timed out waiting for broadcast on client B."); + + Assert.That(clientAResult, Is.EqualTo(broadcastPayload)); + Assert.That(clientBResult, Is.EqualTo(broadcastPayload)); + } + finally + { + clientA.Stop(); + clientB.Stop(); + server.Stop(); + } + } + + [UnityTest] + public IEnumerator Stop_ClearsAllActiveSessions() + { + return RunAsync(Stop_ClearsAllActiveSessionsAsync); + } + + private static async Task Stop_ClearsAllActiveSessionsAsync() + { + var listenPort = GetAvailableUdpPort(); + var transport = new KcpTransport(IPAddress.Loopback.ToString(), listenPort); + + await transport.StartAsync(); + Assert.That(GetActiveSessionCount(transport), Is.EqualTo(1)); + + transport.Stop(); + + Assert.That(GetActiveSessionCount(transport), Is.EqualTo(0)); + Assert.DoesNotThrow(() => transport.Stop()); + } + + private static async Task WaitFor(Task task, string failureMessage) + { + var completedTask = await Task.WhenAny(task, Task.Delay(DefaultTimeoutMs)); + if (completedTask != task) + { + Assert.Fail(failureMessage); + } + + return await task; + } + + private static async Task WaitUntilAsync(Func condition, string failureMessage) + { + var deadline = DateTime.UtcNow.AddMilliseconds(DefaultTimeoutMs); + + while (DateTime.UtcNow < deadline) + { + if (condition()) + { + return; + } + + await Task.Delay(25); + } + + Assert.Fail(failureMessage); + } + + private static IEnumerator RunAsync(Func asyncAction) + { + var task = asyncAction(); + + while (!task.IsCompleted) + { + yield return null; + } + + if (task.IsFaulted) + { + var exception = task.Exception?.InnerExceptions.Count == 1 + ? task.Exception.InnerExceptions[0] + : task.Exception; + ExceptionDispatchInfo.Capture(exception ?? new Exception("Async test failed.")).Throw(); + } + + if (task.IsCanceled) + { + Assert.Fail("Async test was canceled."); + } + } + + private static byte[] CreatePayload(int length, int seed) + { + var bytes = new byte[length]; + for (var index = 0; index < bytes.Length; index++) + { + bytes[index] = (byte)((index + seed) % byte.MaxValue); + } + + return bytes; + } + + private static int GetAvailableUdpPort() + { + using var client = new UdpClient(0); + return ((IPEndPoint)client.Client.LocalEndPoint).Port; + } + + private static int GetActiveSessionCount(KcpTransport transport) + { + var property = typeof(KcpTransport).GetProperty( + "ActiveSessionCount", + BindingFlags.Instance | BindingFlags.NonPublic); + + if (property == null) + { + throw new MissingMemberException(typeof(KcpTransport).FullName, "ActiveSessionCount"); + } + + return (int)property.GetValue(transport); + } + } +} diff --git a/Assets/Tests/EditMode/Network/KcpTransportTests.cs.meta b/Assets/Tests/EditMode/Network/KcpTransportTests.cs.meta new file mode 100644 index 0000000..b0a5233 --- /dev/null +++ b/Assets/Tests/EditMode/Network/KcpTransportTests.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: 4f76fdb4a5874bcc9ea8398a4daa71c3 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/openspec/changes/archive/2026-03-26-introduce-kcp-transport/.openspec.yaml b/openspec/changes/archive/2026-03-26-introduce-kcp-transport/.openspec.yaml new file mode 100644 index 0000000..1e96444 --- /dev/null +++ b/openspec/changes/archive/2026-03-26-introduce-kcp-transport/.openspec.yaml @@ -0,0 +1,2 @@ +schema: spec-driven +created: 2026-03-26 diff --git a/openspec/changes/archive/2026-03-26-introduce-kcp-transport/design.md b/openspec/changes/archive/2026-03-26-introduce-kcp-transport/design.md new file mode 100644 index 0000000..a2ee8ab --- /dev/null +++ b/openspec/changes/archive/2026-03-26-introduce-kcp-transport/design.md @@ -0,0 +1,84 @@ +## Context + +阶段一已经把消息层收敛到 `ITransport`,当前 `MessageManager` 只依赖 `Send`、`SendTo`、`SendToAll`、`OnReceive`、`StartAsync` 和 `Stop`。运行时实现仍然是 `ReliableUdpTransport`,但它现在只是一个基于 `UdpClient` 的纯 UDP 收发器,并没有 KCP 会话、重传调度或完整消息重组能力。`NetworkManager` 仍然直接实例化这个实现,因此阶段二需要在不破坏上层消息封包逻辑的前提下,引入一个真正的 `KcpTransport`。 + +当前代码还存在两个约束。第一,`MessageManager` 收到 `OnReceive` 后会立刻解析 `Envelope` 并调用业务 handler,因此阶段二必须保证回调上来的已经是完整业务消息,而不是原始 UDP 数据报。第二,项目仓库里还没有可见的 KCP 传输实现代码,设计需要把 KCP 依赖和装配点描述清楚,同时避免把主线程派发、QoS 分流或旧类清理这些后续阶段内容混入本次改动。 + +## Goals / Non-Goals + +**Goals:** +- 新增独立的 `KcpTransport` 实现 `ITransport`,保持现有消息层接口和 `MessageManager` 使用方式不变。 +- 在客户端维护单一默认 `KcpSession`,在服务端按远端地址维护独立 `KcpSession`,使每个连接具备独立的 KCP 状态。 +- 通过后台接收循环和更新循环驱动 `Kcp.Input`、`Kcp.Update`/`Check`、`Kcp.Recv`,并只在收到完整业务消息后触发 `OnReceive`。 +- 将默认网络入口切换到 `KcpTransport`,并增加覆盖会话路由、完整消息交付和停止清理的编辑器测试。 + +**Non-Goals:** +- 删除 `ReliableUdpTransport`、`Packet.cs` 或阶段三里的旧可靠 UDP 清理工作。 +- 将消息处理切换到 Unity 主线程队列。 +- 为不同消息类型拆分 QoS 通道,或新增裸 UDP 的并行同步链路。 +- 引入完整的连接重连、登录态状态机或新的业务握手协议。 + +## Decisions + +### 1. 以新类 `KcpTransport` 落地,而不是继续修改 `ReliableUdpTransport` + +新增 `Assets/Scripts/Network/NetworkTransport/KcpTransport.cs`,保留 `ReliableUdpTransport` 作为阶段一兼容实现,避免“纯 UDP 兼容类”和“KCP 会话传输类”在同一文件里继续叠加职责。这样阶段二可以单独验证 KCP 行为,阶段三再决定是否完全移除旧实现。 + +备选方案是在 `ReliableUdpTransport` 上继续加入 KCP 分支,但那会让命名、日志和生命周期控制再次变得模糊,不利于后续清理。 + +### 2. 用内部 `KcpSession` 封装每个远端的 KCP 状态 + +新增内部会话对象,至少保存以下状态: +- `IPEndPoint RemoteEndPoint` +- `uint Conv` +- KCP 实例 +- `DateTime LastActivityUtc` +- 连接是否仍有效 + +客户端模式在构造时确定默认远端,并在 `StartAsync` 时创建默认会话;`Send(byte[])` 永远写入该默认会话。服务端模式按远端地址查找或创建会话,`SendTo(byte[], IPEndPoint)` 写入指定会话,`SendToAll(byte[])` 遍历当前有效会话逐个写入。为避免阶段二扩散到新的握手协议,本次使用统一配置的 `conv` 默认值;服务端仍以内网端点隔离不同连接,后续如需协商 `conv` 再通过新协议扩展。 + +备选方案是直接把 `Kcp` 对象散落在 `KcpTransport` 字典中而不包装会话对象,但这样会让活动时间、清理策略和发送入口分散在多个代码路径中。 + +### 3. 分离 UDP 接收循环和 KCP 更新循环 + +`KcpTransport` 维护两个后台任务: +- 接收循环:阻塞读取 `UdpClient.ReceiveAsync()`,按远端定位会话,调用 `session.Kcp.Input(...)`,随后持续从 `Kcp.Recv` 拉取完整业务消息,并以原始远端地址触发 `OnReceive`。 +- 更新循环:周期性遍历活动会话,依据 `Kcp.Check`/`Kcp.Update` 推进重传、确认和 flush,确保即使没有新的入站 UDP 包,KCP 仍能推进超时与重发。 + +这种设计满足 KCP 对周期驱动的要求,同时将“收到原始 UDP 包”和“交付完整业务消息”明确分成两个层次。备选方案是仅在发送或收包时调用 `Update`,但那会让空闲期的重发和 flush 依赖外部流量,增加延迟和丢包恢复风险。 + +### 4. 保持 `ITransport` 契约稳定,仅替换默认实现 + +阶段二不扩展 `ITransport` 接口,也不要求 `MessageManager` 感知 KCP。`NetworkManager` 的改动限制为把 `new ReliableUdpTransport(...)` 替换成 `new KcpTransport(...)`,其余封包与 handler 注册逻辑保持不变。这样可以让编辑器测试继续围绕既有接口编写,并把接口扩展留给未来真正需要 `OnConnected`/`OnDisconnected`/`OnError` 的阶段。 + +备选方案是同步在本次 change 中扩展连接事件接口,但这会带来更多上层重构,不属于阶段二的最小交付面。 + +### 5. 通过可控测试桩验证 KCP 完整消息交付 + +测试重点放在三个方面: +- 客户端默认会话的 `Send` 走向配置远端。 +- 服务端对多个远端维持独立会话,广播不会混淆会话状态。 +- `OnReceive` 只在完整业务消息从 `Kcp.Recv` 取出后触发。 + +如果仓库内已有可引用的 KCP 程序集,编辑器测试可直接驱动真实 `KcpTransport`;如果当前工程缺失程序集引用,则先补齐插件接入,再用回环端口或可替代的 KCP 适配层完成测试。 + +## Risks / Trade-offs + +- [KCP 程序集在仓库中不可见] → 在实现前确认插件来源与 asmdef 引用方式;若缺失,则先把依赖接入作为首个实现任务。 +- [统一 `conv` 简化了阶段二,但不覆盖未来协商场景] → 设计中保留 `KcpSession.Conv` 和会话键扩展点,后续可在不推翻 `KcpTransport` 的前提下加入协商协议。 +- [后台更新循环会引入额外线程与定时负担] → 将更新频率集中在 transport 内部,并确保 `Stop()` 能取消循环、关闭 socket、清理会话。 +- [消息层仍在网络线程里执行业务 handler] → 本次只保证交付的是完整消息,线程切换问题留待后续阶段专门处理。 + +## Migration Plan + +1. 接入或确认 KCP C# 依赖在 Unity 工程中可用。 +2. 新增 `KcpTransport` 与内部 `KcpSession`,完成客户端和服务端的会话创建、发送、接收与更新循环。 +3. 将 `NetworkManager` 默认实现切换到 `KcpTransport`,保持 `MessageManager` 和消息类型不变。 +4. 增加编辑器测试,覆盖默认发送、服务端会话路由、完整消息交付和停止清理。 +5. 如果集成验证失败,可临时切回 `ReliableUdpTransport` 入口,不影响 `ITransport` 和消息层接口。 + +## Open Questions + +- 项目最终使用哪一个 KCP C# 实现,以及它在 Unity 中的程序集引用方式是什么? +- 阶段二是否需要立即加入空闲会话超时回收,还是只在 `Stop()` 时统一清理即可? +- 服务端是否已经有固定的监听入口需要同步替换为 `KcpTransport`,还是当前变更只覆盖客户端入口与共享 transport 代码? diff --git a/openspec/changes/archive/2026-03-26-introduce-kcp-transport/proposal.md b/openspec/changes/archive/2026-03-26-introduce-kcp-transport/proposal.md new file mode 100644 index 0000000..01bd55e --- /dev/null +++ b/openspec/changes/archive/2026-03-26-introduce-kcp-transport/proposal.md @@ -0,0 +1,26 @@ +## Why + +阶段一已经把网络抽象收口到 `ITransport`,但当前运行时仍然只有一个名为 `ReliableUdpTransport` 的纯 UDP 实现,阶段二目标里要求的 KCP 可靠收发、会话维护和替换自定义可靠 UDP 结构还没有真正落地。继续在兼容类上堆补丁会再次混淆 UDP socket、会话状态和消息交付职责,因此需要单独提出 `KcpTransport` 变更,作为后续移除旧可靠 UDP 结构的前置条件。 + +## What Changes + +- 新增 `KcpTransport`,以 `UdpClient` 承载原始数据报,以 KCP 负责可靠、有序的消息交付,并完整实现现有 `ITransport` 接口。 +- 新增面向 KCP 的会话模型,用于管理客户端默认会话和服务端按远端地址维护的会话实例、活动时间与更新驱动。 +- 将网络入口装配从 `ReliableUdpTransport` 切换为 `KcpTransport`,保持 `MessageManager` 的发送与接收契约不变。 +- 为 KCP 传输与会话路由补充编辑器测试,验证完整业务消息能够经 `KcpTransport` 可靠收发。 + +## Capabilities + +### New Capabilities +- `kcp-transport`: 提供基于 UDP + KCP 的 `ITransport` 实现,支持客户端默认会话和服务端多会话场景下的完整消息收发。 + +### Modified Capabilities + +None. + +## Impact + +- 受影响代码:`Assets/Scripts/Network/NetworkTransport/`、`Assets/Scripts/NetworkManager.cs`、`Assets/Tests/EditMode/Network/` +- 受影响接口:`ITransport` 的调用方式保持不变,但默认运行时实现从 `ReliableUdpTransport` 切换到 `KcpTransport` +- 受影响依赖:需要项目中可用的 KCP C# 实现和对应程序集引用 +- 受影响系统:客户端登录、心跳、输入上行、状态下行等所有通过 `MessageManager` 封包的网络消息链路 diff --git a/openspec/changes/archive/2026-03-26-introduce-kcp-transport/specs/kcp-transport/spec.md b/openspec/changes/archive/2026-03-26-introduce-kcp-transport/specs/kcp-transport/spec.md new file mode 100644 index 0000000..26266bc --- /dev/null +++ b/openspec/changes/archive/2026-03-26-introduce-kcp-transport/specs/kcp-transport/spec.md @@ -0,0 +1,45 @@ +## ADDED Requirements + +### Requirement: Client mode uses a default KCP session +The transport SHALL support a client mode that is constructed with a default remote endpoint and creates exactly one default KCP session after `StartAsync`. Calls to `Send(byte[] data)` in client mode MUST encode the payload through that session and emit the resulting UDP datagrams to the configured remote endpoint. + +#### Scenario: Client sends through the default remote session +- **WHEN** the application starts a client-mode transport and calls `Send` with a business payload +- **THEN** the transport routes the payload through the default KCP session +- **THEN** the UDP socket sends the encoded datagrams to the configured remote endpoint + +### Requirement: Server mode isolates KCP session state per remote endpoint +The transport SHALL support a server mode that receives UDP datagrams from multiple remote endpoints and maintains independent KCP session state for each active remote endpoint. A datagram received from one endpoint MUST only be applied to that endpoint's session, and `SendToAll(byte[] data)` MUST encode and enqueue the payload once per active session. + +#### Scenario: Two remotes do not share KCP state +- **WHEN** a server-mode transport receives KCP traffic from two different remote endpoints +- **THEN** the transport creates or reuses separate KCP session state for each endpoint +- **THEN** payloads reconstructed for one endpoint are delivered with that endpoint as the sender + +#### Scenario: Broadcast writes through each active session +- **WHEN** the application calls `SendToAll` while the server transport has multiple active sessions +- **THEN** the transport encodes the payload for each active KCP session +- **THEN** the UDP socket sends datagrams to every active remote endpoint without collapsing them into a shared session + +### Requirement: OnReceive only dispatches complete KCP payloads +The transport SHALL invoke `OnReceive` only after a complete application payload has been reconstructed from `Kcp.Recv`. Raw UDP packets, partial KCP fragments, and transport-level acknowledgements MUST NOT be surfaced to the message layer. + +#### Scenario: Fragmented payload is withheld until complete +- **WHEN** a business message spans multiple UDP datagrams and only a subset of those datagrams has been processed +- **THEN** the transport does not invoke `OnReceive` + +#### Scenario: Reassembled payload is forwarded to the message layer +- **WHEN** the remaining datagrams for a fragmented KCP message are processed and `Kcp.Recv` yields a complete payload +- **THEN** the transport invokes `OnReceive` exactly once with the reconstructed payload and the originating remote endpoint + +### Requirement: Active sessions are driven until stop and cleaned up on shutdown +The transport SHALL continue driving KCP timers for every active session while it is running, so retransmissions, acknowledgements, and flushes can progress even when no new UDP datagrams arrive. Calling `Stop()` MUST stop the receive and update loops, release the UDP socket, and clear active session state. + +#### Scenario: Idle sessions still receive KCP timer updates +- **WHEN** the transport has an active session with pending KCP work but no new incoming UDP datagrams +- **THEN** the transport continues calling the KCP update path according to its internal schedule + +#### Scenario: Stop releases transport resources +- **WHEN** the application calls `Stop()` on a running transport +- **THEN** the transport stops receiving new UDP datagrams +- **THEN** the transport clears its active KCP session state before shutdown completes diff --git a/openspec/changes/archive/2026-03-26-introduce-kcp-transport/tasks.md b/openspec/changes/archive/2026-03-26-introduce-kcp-transport/tasks.md new file mode 100644 index 0000000..b5f9196 --- /dev/null +++ b/openspec/changes/archive/2026-03-26-introduce-kcp-transport/tasks.md @@ -0,0 +1,16 @@ +## 1. KCP transport scaffolding + +- [x] 1.1 确认并接入 Unity 工程可用的 KCP C# 依赖与程序集引用,保证 `Assets/Scripts/Network/NetworkTransport/` 可以实例化 KCP 对象 +- [x] 1.2 新增 `KcpTransport` 与内部 `KcpSession` 基础结构,补齐客户端默认远端、服务端监听模式和会话状态容器 + +## 2. Core transport implementation + +- [x] 2.1 实现客户端 `Send`、服务端 `SendTo`、`SendToAll` 的 KCP 编码路径,确保所有出站消息都通过对应会话发送 UDP 数据报 +- [x] 2.2 实现 UDP 接收循环,将入站数据报路由到正确的 `KcpSession.Input`,并在 `Kcp.Recv` 拿到完整业务消息后触发 `OnReceive` +- [x] 2.3 实现会话更新与关闭流程,包括周期性 `Kcp.Check`/`Kcp.Update` 驱动、活动状态刷新、`Stop()` 时的循环停止和资源清理 + +## 3. Integration and verification + +- [x] 3.1 将默认网络入口从 `ReliableUdpTransport` 切换到 `KcpTransport`,保持 `MessageManager` 和现有消息封包逻辑不变 +- [x] 3.2 为 `KcpTransport` 增加编辑器测试,覆盖默认会话发送、多远端会话隔离、完整消息交付和停止清理行为 +- [x] 3.3 运行相关网络编辑器测试并修正集成问题,确认阶段二 capability 达到 apply-ready 的实现标准 diff --git a/openspec/specs/kcp-transport/spec.md b/openspec/specs/kcp-transport/spec.md new file mode 100644 index 0000000..75e41f6 --- /dev/null +++ b/openspec/specs/kcp-transport/spec.md @@ -0,0 +1,49 @@ +# kcp-transport Specification + +## Purpose +TBD - created by archiving change introduce-kcp-transport. Update Purpose after archive. +## Requirements +### Requirement: Client mode uses a default KCP session +The transport SHALL support a client mode that is constructed with a default remote endpoint and creates exactly one default KCP session after `StartAsync`. Calls to `Send(byte[] data)` in client mode MUST encode the payload through that session and emit the resulting UDP datagrams to the configured remote endpoint. + +#### Scenario: Client sends through the default remote session +- **WHEN** the application starts a client-mode transport and calls `Send` with a business payload +- **THEN** the transport routes the payload through the default KCP session +- **THEN** the UDP socket sends the encoded datagrams to the configured remote endpoint + +### Requirement: Server mode isolates KCP session state per remote endpoint +The transport SHALL support a server mode that receives UDP datagrams from multiple remote endpoints and maintains independent KCP session state for each active remote endpoint. A datagram received from one endpoint MUST only be applied to that endpoint's session, and `SendToAll(byte[] data)` MUST encode and enqueue the payload once per active session. + +#### Scenario: Two remotes do not share KCP state +- **WHEN** a server-mode transport receives KCP traffic from two different remote endpoints +- **THEN** the transport creates or reuses separate KCP session state for each endpoint +- **THEN** payloads reconstructed for one endpoint are delivered with that endpoint as the sender + +#### Scenario: Broadcast writes through each active session +- **WHEN** the application calls `SendToAll` while the server transport has multiple active sessions +- **THEN** the transport encodes the payload for each active KCP session +- **THEN** the UDP socket sends datagrams to every active remote endpoint without collapsing them into a shared session + +### Requirement: OnReceive only dispatches complete KCP payloads +The transport SHALL invoke `OnReceive` only after a complete application payload has been reconstructed from `Kcp.Recv`. Raw UDP packets, partial KCP fragments, and transport-level acknowledgements MUST NOT be surfaced to the message layer. + +#### Scenario: Fragmented payload is withheld until complete +- **WHEN** a business message spans multiple UDP datagrams and only a subset of those datagrams has been processed +- **THEN** the transport does not invoke `OnReceive` + +#### Scenario: Reassembled payload is forwarded to the message layer +- **WHEN** the remaining datagrams for a fragmented KCP message are processed and `Kcp.Recv` yields a complete payload +- **THEN** the transport invokes `OnReceive` exactly once with the reconstructed payload and the originating remote endpoint + +### Requirement: Active sessions are driven until stop and cleaned up on shutdown +The transport SHALL continue driving KCP timers for every active session while it is running, so retransmissions, acknowledgements, and flushes can progress even when no new UDP datagrams arrive. Calling `Stop()` MUST stop the receive and update loops, release the UDP socket, and clear active session state. + +#### Scenario: Idle sessions still receive KCP timer updates +- **WHEN** the transport has an active session with pending KCP work but no new incoming UDP datagrams +- **THEN** the transport continues calling the KCP update path according to its internal schedule + +#### Scenario: Stop releases transport resources +- **WHEN** the application calls `Stop()` on a running transport +- **THEN** the transport stops receiving new UDP datagrams +- **THEN** the transport clears its active KCP session state before shutdown completes +