引入 openspec,完成二阶段 Kcp 迁移

This commit is contained in:
SepComet 2026-03-26 12:01:17 +08:00
parent 815d5f1f24
commit f053c9ad0d
12 changed files with 1043 additions and 2 deletions

View File

@ -4,7 +4,7 @@
"references": [], "references": [],
"includePlatforms": [], "includePlatforms": [],
"excludePlatforms": [], "excludePlatforms": [],
"allowUnsafeCode": false, "allowUnsafeCode": true,
"overrideReferences": false, "overrideReferences": false,
"precompiledReferences": [], "precompiledReferences": [],
"autoReferenced": true, "autoReferenced": true,

View File

@ -0,0 +1,532 @@
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using kcp;
namespace Network.NetworkTransport
{
public class KcpTransport : ITransport
{
private const uint DefaultConv = 1;
private const int DefaultNoDelay = 1;
private const int DefaultInterval = 10;
private const int DefaultResend = 2;
private const int DefaultNc = 1;
private const int DefaultSendWindow = 128;
private const int DefaultReceiveWindow = 128;
private const int DefaultMtu = 1200;
private static readonly TimeSpan UpdateLoopDelay = TimeSpan.FromMilliseconds(DefaultInterval);
private readonly UdpClient _client;
private readonly bool _isServer;
private readonly IPEndPoint _defaultRemoteEndPoint;
private readonly uint _defaultConv;
private readonly ConcurrentDictionary<string, KcpSession> _sessions = new();
private readonly object _socketSendLock = new();
private CancellationTokenSource _cancellationTokenSource;
private Task _receiveTask = Task.CompletedTask;
private Task _updateTask = Task.CompletedTask;
private volatile bool _isRunning;
public event Action<byte[], IPEndPoint> OnReceive;
internal int ActiveSessionCount => _sessions.Count;
public KcpTransport(int listenPort, uint conv = DefaultConv)
{
_client = new UdpClient(listenPort);
_isServer = true;
_defaultConv = conv;
Console.WriteLine($"[KcpTransport] 服务端模式,监听端口: {listenPort}");
}
public KcpTransport(string serverIp, int serverPort, uint conv = DefaultConv)
{
if (string.IsNullOrWhiteSpace(serverIp))
{
throw new ArgumentException("Server IP is required.", nameof(serverIp));
}
_client = new UdpClient(0);
_defaultRemoteEndPoint = new IPEndPoint(IPAddress.Parse(serverIp), serverPort);
_defaultConv = conv;
Console.WriteLine($"[KcpTransport] 客户端模式,目标: {_defaultRemoteEndPoint}, conv={conv}");
}
public Task StartAsync()
{
if (_isRunning)
{
return Task.CompletedTask;
}
_sessions.Clear();
_cancellationTokenSource = new CancellationTokenSource();
_isRunning = true;
if (!_isServer)
{
GetOrCreateSession(_defaultRemoteEndPoint, _defaultConv);
}
_receiveTask = ReceiveLoopAsync(_cancellationTokenSource.Token);
_updateTask = UpdateLoopAsync(_cancellationTokenSource.Token);
Console.WriteLine("[KcpTransport] 传输层启动");
return Task.CompletedTask;
}
public void Stop()
{
if (!_isRunning)
{
return;
}
_isRunning = false;
_cancellationTokenSource?.Cancel();
_client.Close();
WaitForBackgroundTasks();
DisposeAllSessions();
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;
Console.WriteLine("[KcpTransport] 传输层停止");
}
public void Send(byte[] data)
{
if (_defaultRemoteEndPoint == null)
{
throw new InvalidOperationException("Default remote endpoint is not configured.");
}
SendTo(data, _defaultRemoteEndPoint);
}
public void SendTo(byte[] data, IPEndPoint target)
{
if (data == null)
{
throw new ArgumentNullException(nameof(data));
}
if (target == null)
{
throw new ArgumentNullException(nameof(target));
}
EnsureRunning();
if (!_isServer && !target.Equals(_defaultRemoteEndPoint))
{
throw new InvalidOperationException("Client mode only supports the configured default remote endpoint.");
}
var session = GetOrCreateSession(target, _defaultConv);
session.Send(data);
}
public void SendToAll(byte[] data)
{
if (data == null)
{
throw new ArgumentNullException(nameof(data));
}
EnsureRunning();
if (!_isServer)
{
throw new InvalidOperationException("SendToAll is only supported in server mode.");
}
foreach (var session in _sessions.Values)
{
session.Send(data);
}
}
private async Task ReceiveLoopAsync(CancellationToken cancellationToken)
{
while (_isRunning && !cancellationToken.IsCancellationRequested)
{
try
{
var result = await _client.ReceiveAsync();
var session = GetOrCreateSession(result.RemoteEndPoint, ResolveConv(result.Buffer));
session.Input(result.Buffer);
DrainReceivedMessages(session);
}
catch (ObjectDisposedException) when (!_isRunning || cancellationToken.IsCancellationRequested)
{
return;
}
catch (SocketException) when (!_isRunning || cancellationToken.IsCancellationRequested)
{
return;
}
catch (Exception exception)
{
Console.WriteLine($"[KcpTransport] 接收错误:{exception.Message}");
}
}
}
private async Task UpdateLoopAsync(CancellationToken cancellationToken)
{
while (_isRunning && !cancellationToken.IsCancellationRequested)
{
var current = GetCurrentTimeMilliseconds();
foreach (var session in _sessions.Values)
{
session.UpdateIfDue(current);
DrainReceivedMessages(session);
}
try
{
await Task.Delay(UpdateLoopDelay, cancellationToken);
}
catch (OperationCanceledException)
{
return;
}
}
}
private void DrainReceivedMessages(KcpSession session)
{
while (session.TryReceive(out var payload))
{
OnReceive?.Invoke(payload, session.RemoteEndPoint);
}
}
private KcpSession GetOrCreateSession(IPEndPoint remoteEndPoint, uint conv)
{
var normalizedEndPoint = NormalizeEndPoint(remoteEndPoint);
var key = normalizedEndPoint.ToString();
return _sessions.GetOrAdd(key, _ => new KcpSession(this, normalizedEndPoint, conv));
}
private IPEndPoint NormalizeEndPoint(IPEndPoint remoteEndPoint)
{
if (remoteEndPoint == null)
{
throw new ArgumentNullException(nameof(remoteEndPoint));
}
if (_isServer)
{
return new IPEndPoint(remoteEndPoint.Address, remoteEndPoint.Port);
}
if (_defaultRemoteEndPoint == null)
{
throw new InvalidOperationException("Default remote endpoint is not configured.");
}
return new IPEndPoint(_defaultRemoteEndPoint.Address, _defaultRemoteEndPoint.Port);
}
private unsafe uint ResolveConv(byte[] datagram)
{
if (datagram == null || datagram.Length < sizeof(uint))
{
return _defaultConv;
}
fixed (byte* buffer = datagram)
{
return KCP.ikcp_getconv(buffer);
}
}
private static uint GetCurrentTimeMilliseconds()
{
return unchecked((uint)Environment.TickCount);
}
private void EnsureRunning()
{
if (!_isRunning)
{
throw new InvalidOperationException("Transport has not been started.");
}
}
private void WaitForBackgroundTasks()
{
try
{
Task.WaitAll(new[] { _receiveTask, _updateTask }, TimeSpan.FromSeconds(1));
}
catch (AggregateException exception)
{
foreach (var innerException in exception.InnerExceptions)
{
Console.WriteLine($"[KcpTransport] 停止等待错误:{innerException.Message}");
}
}
}
private void DisposeAllSessions()
{
foreach (var pair in _sessions)
{
pair.Value.Dispose();
}
_sessions.Clear();
}
private unsafe int SendDatagram(byte* buffer, int length, IPEndPoint remoteEndPoint)
{
if (!_isRunning)
{
return -1;
}
var datagram = new byte[length];
Marshal.Copy((IntPtr)buffer, datagram, 0, length);
try
{
lock (_socketSendLock)
{
return _client.Send(datagram, datagram.Length, remoteEndPoint);
}
}
catch (ObjectDisposedException) when (!_isRunning)
{
return -1;
}
catch (SocketException exception)
{
Console.WriteLine($"[KcpTransport] 发送错误:{exception.Message}");
return -1;
}
}
private unsafe sealed class KcpSession : IDisposable
{
private readonly KcpTransport _owner;
private readonly object _gate = new();
private readonly GCHandle _handle;
private IKCPCB* _kcp;
private bool _disposed;
private uint _nextUpdateAt;
public KcpSession(KcpTransport owner, IPEndPoint remoteEndPoint, uint conv)
{
_owner = owner ?? throw new ArgumentNullException(nameof(owner));
RemoteEndPoint = remoteEndPoint ?? throw new ArgumentNullException(nameof(remoteEndPoint));
Conv = conv;
LastActivityUtc = DateTime.UtcNow;
_handle = GCHandle.Alloc(this);
_kcp = KCP.ikcp_create(conv, (void*)GCHandle.ToIntPtr(_handle));
KCP.ikcp_setoutput(_kcp, &OutputCallback);
KCP.ikcp_nodelay(_kcp, DefaultNoDelay, DefaultInterval, DefaultResend, DefaultNc);
KCP.ikcp_wndsize(_kcp, DefaultSendWindow, DefaultReceiveWindow);
KCP.ikcp_setmtu(_kcp, DefaultMtu);
_nextUpdateAt = GetCurrentTimeMilliseconds();
}
public uint Conv { get; }
public IPEndPoint RemoteEndPoint { get; }
public DateTime LastActivityUtc { get; private set; }
public void Send(byte[] payload)
{
if (payload == null)
{
throw new ArgumentNullException(nameof(payload));
}
lock (_gate)
{
ThrowIfDisposed();
if (payload.Length == 0)
{
return;
}
fixed (byte* buffer = payload)
{
var result = KCP.ikcp_send(_kcp, buffer, payload.Length);
if (result < 0)
{
throw new InvalidOperationException($"KCP send failed with error code {result}.");
}
}
LastActivityUtc = DateTime.UtcNow;
UpdateNoLock(GetCurrentTimeMilliseconds());
}
}
public void Input(byte[] datagram)
{
if (datagram == null)
{
throw new ArgumentNullException(nameof(datagram));
}
if (datagram.Length == 0)
{
return;
}
lock (_gate)
{
ThrowIfDisposed();
fixed (byte* buffer = datagram)
{
var result = KCP.ikcp_input(_kcp, buffer, datagram.Length);
if (result < 0)
{
Console.WriteLine($"[KcpTransport] KCP input failed for {RemoteEndPoint}: {result}");
return;
}
}
LastActivityUtc = DateTime.UtcNow;
UpdateNoLock(GetCurrentTimeMilliseconds());
}
}
public bool TryReceive(out byte[] payload)
{
lock (_gate)
{
if (_disposed)
{
payload = null;
return false;
}
var size = KCP.ikcp_peeksize(_kcp);
if (size <= 0)
{
payload = null;
return false;
}
payload = new byte[size];
fixed (byte* buffer = payload)
{
var result = KCP.ikcp_recv(_kcp, buffer, payload.Length);
if (result < 0)
{
payload = null;
return false;
}
if (result != payload.Length)
{
Array.Resize(ref payload, result);
}
}
LastActivityUtc = DateTime.UtcNow;
return true;
}
}
public void UpdateIfDue(uint current)
{
lock (_gate)
{
if (_disposed)
{
return;
}
if (KCP._itimediff(current, _nextUpdateAt) < 0)
{
return;
}
UpdateNoLock(current);
}
}
public void Dispose()
{
lock (_gate)
{
if (_disposed)
{
return;
}
_disposed = true;
if (_kcp != null)
{
KCP.ikcp_release(_kcp);
_kcp = null;
}
if (_handle.IsAllocated)
{
_handle.Free();
}
}
}
private void UpdateNoLock(uint current)
{
KCP.ikcp_update(_kcp, current);
_nextUpdateAt = KCP.ikcp_check(_kcp, current);
}
private void ThrowIfDisposed()
{
if (_disposed || _kcp == null)
{
throw new ObjectDisposedException(nameof(KcpSession));
}
}
private int SendRaw(byte* buffer, int length)
{
return _owner.SendDatagram(buffer, length, RemoteEndPoint);
}
private static int OutputCallback(byte* buffer, int length, IKCPCB* kcp, void* user)
{
if (user == null)
{
return -1;
}
var handle = GCHandle.FromIntPtr((IntPtr)user);
if (handle.Target is not KcpSession session)
{
return -1;
}
return session.SendRaw(buffer, length);
}
}
}
}

View File

@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: c6f3f71ba3954cc08d5fe8f9442f4f93
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -23,7 +23,7 @@ public class NetworkManager : MonoBehaviour
private IEnumerator InitNetwork() private IEnumerator InitNetwork()
{ {
_transport = new ReliableUdpTransport("127.0.0.1", 8080); _transport = new KcpTransport("127.0.0.1", 8080);
var startTask = _transport.StartAsync(); var startTask = _transport.StartAsync();
yield return new WaitUntil(() => startTask.IsCompleted); yield return new WaitUntil(() => startTask.IsCompleted);

View File

@ -0,0 +1,265 @@
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Reflection;
using System.Runtime.ExceptionServices;
using System.Threading.Tasks;
using Network.NetworkTransport;
using NUnit.Framework;
using UnityEngine.TestTools;
namespace Tests.EditMode.Network
{
public class KcpTransportTests
{
private const int DefaultTimeoutMs = 5000;
[UnityTest]
public IEnumerator ClientMode_SendRoutesThroughDefaultSession_AndDeliversCompletePayload()
{
return RunAsync(ClientMode_SendRoutesThroughDefaultSession_AndDeliversCompletePayloadAsync);
}
private static async Task ClientMode_SendRoutesThroughDefaultSession_AndDeliversCompletePayloadAsync()
{
var listenPort = GetAvailableUdpPort();
var server = new KcpTransport(listenPort);
var client = new KcpTransport(IPAddress.Loopback.ToString(), listenPort);
var payload = CreatePayload(4096, seed: 11);
var receiveCount = 0;
var receivedTask = new TaskCompletionSource<(byte[] Payload, IPEndPoint Sender)>(
TaskCreationOptions.RunContinuationsAsynchronously);
server.OnReceive += (data, sender) =>
{
receiveCount++;
receivedTask.TrySetResult((data, sender));
};
try
{
await server.StartAsync();
await client.StartAsync();
Assert.That(GetActiveSessionCount(client), Is.EqualTo(1));
client.Send(payload);
var result = await WaitFor(receivedTask.Task, "Timed out waiting for server payload.");
await Task.Delay(200);
Assert.That(result.Payload, Is.EqualTo(payload));
Assert.That(result.Sender.Address, Is.EqualTo(IPAddress.Loopback));
Assert.That(receiveCount, Is.EqualTo(1));
}
finally
{
client.Stop();
server.Stop();
}
}
[UnityTest]
public IEnumerator ServerMode_CreatesIndependentSessions_PerRemoteEndpoint()
{
return RunAsync(ServerMode_CreatesIndependentSessions_PerRemoteEndpointAsync);
}
private static async Task ServerMode_CreatesIndependentSessions_PerRemoteEndpointAsync()
{
var listenPort = GetAvailableUdpPort();
var server = new KcpTransport(listenPort);
var clientA = new KcpTransport(IPAddress.Loopback.ToString(), listenPort);
var clientB = new KcpTransport(IPAddress.Loopback.ToString(), listenPort);
var received = new ConcurrentQueue<(byte[] Payload, IPEndPoint Sender)>();
var allMessagesTask = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
server.OnReceive += (data, sender) =>
{
received.Enqueue((data, sender));
if (received.Count >= 2)
{
allMessagesTask.TrySetResult(true);
}
};
try
{
await server.StartAsync();
await clientA.StartAsync();
await clientB.StartAsync();
clientA.Send(CreatePayload(128, seed: 21));
clientB.Send(CreatePayload(256, seed: 37));
await WaitFor(allMessagesTask.Task, "Timed out waiting for messages from both clients.");
Assert.That(GetActiveSessionCount(server), Is.EqualTo(2));
var senders = received.Select(item => item.Sender.ToString()).Distinct().ToArray();
Assert.That(senders, Has.Length.EqualTo(2));
var payloadLengths = received.Select(item => item.Payload.Length).OrderBy(length => length).ToArray();
Assert.That(payloadLengths, Is.EqualTo(new[] { 128, 256 }));
}
finally
{
clientA.Stop();
clientB.Stop();
server.Stop();
}
}
[UnityTest]
public IEnumerator SendToAll_BroadcastsToEveryActiveSession()
{
return RunAsync(SendToAll_BroadcastsToEveryActiveSessionAsync);
}
private static async Task SendToAll_BroadcastsToEveryActiveSessionAsync()
{
var listenPort = GetAvailableUdpPort();
var server = new KcpTransport(listenPort);
var clientA = new KcpTransport(IPAddress.Loopback.ToString(), listenPort);
var clientB = new KcpTransport(IPAddress.Loopback.ToString(), listenPort);
var broadcastPayload = CreatePayload(2048, seed: 53);
var clientAReceivedTask = new TaskCompletionSource<byte[]>(TaskCreationOptions.RunContinuationsAsynchronously);
var clientBReceivedTask = new TaskCompletionSource<byte[]>(TaskCreationOptions.RunContinuationsAsynchronously);
clientA.OnReceive += (data, sender) => clientAReceivedTask.TrySetResult(data);
clientB.OnReceive += (data, sender) => clientBReceivedTask.TrySetResult(data);
try
{
await server.StartAsync();
await clientA.StartAsync();
await clientB.StartAsync();
clientA.Send(CreatePayload(64, seed: 61));
clientB.Send(CreatePayload(64, seed: 79));
await WaitUntilAsync(() => GetActiveSessionCount(server) == 2, "Timed out waiting for server sessions.");
server.SendToAll(broadcastPayload);
var clientAResult = await WaitFor(clientAReceivedTask.Task, "Timed out waiting for broadcast on client A.");
var clientBResult = await WaitFor(clientBReceivedTask.Task, "Timed out waiting for broadcast on client B.");
Assert.That(clientAResult, Is.EqualTo(broadcastPayload));
Assert.That(clientBResult, Is.EqualTo(broadcastPayload));
}
finally
{
clientA.Stop();
clientB.Stop();
server.Stop();
}
}
[UnityTest]
public IEnumerator Stop_ClearsAllActiveSessions()
{
return RunAsync(Stop_ClearsAllActiveSessionsAsync);
}
private static async Task Stop_ClearsAllActiveSessionsAsync()
{
var listenPort = GetAvailableUdpPort();
var transport = new KcpTransport(IPAddress.Loopback.ToString(), listenPort);
await transport.StartAsync();
Assert.That(GetActiveSessionCount(transport), Is.EqualTo(1));
transport.Stop();
Assert.That(GetActiveSessionCount(transport), Is.EqualTo(0));
Assert.DoesNotThrow(() => transport.Stop());
}
private static async Task<T> WaitFor<T>(Task<T> task, string failureMessage)
{
var completedTask = await Task.WhenAny(task, Task.Delay(DefaultTimeoutMs));
if (completedTask != task)
{
Assert.Fail(failureMessage);
}
return await task;
}
private static async Task WaitUntilAsync(Func<bool> condition, string failureMessage)
{
var deadline = DateTime.UtcNow.AddMilliseconds(DefaultTimeoutMs);
while (DateTime.UtcNow < deadline)
{
if (condition())
{
return;
}
await Task.Delay(25);
}
Assert.Fail(failureMessage);
}
private static IEnumerator RunAsync(Func<Task> asyncAction)
{
var task = asyncAction();
while (!task.IsCompleted)
{
yield return null;
}
if (task.IsFaulted)
{
var exception = task.Exception?.InnerExceptions.Count == 1
? task.Exception.InnerExceptions[0]
: task.Exception;
ExceptionDispatchInfo.Capture(exception ?? new Exception("Async test failed.")).Throw();
}
if (task.IsCanceled)
{
Assert.Fail("Async test was canceled.");
}
}
private static byte[] CreatePayload(int length, int seed)
{
var bytes = new byte[length];
for (var index = 0; index < bytes.Length; index++)
{
bytes[index] = (byte)((index + seed) % byte.MaxValue);
}
return bytes;
}
private static int GetAvailableUdpPort()
{
using var client = new UdpClient(0);
return ((IPEndPoint)client.Client.LocalEndPoint).Port;
}
private static int GetActiveSessionCount(KcpTransport transport)
{
var property = typeof(KcpTransport).GetProperty(
"ActiveSessionCount",
BindingFlags.Instance | BindingFlags.NonPublic);
if (property == null)
{
throw new MissingMemberException(typeof(KcpTransport).FullName, "ActiveSessionCount");
}
return (int)property.GetValue(transport);
}
}
}

View File

@ -0,0 +1,11 @@
fileFormatVersion: 2
guid: 4f76fdb4a5874bcc9ea8398a4daa71c3
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,2 @@
schema: spec-driven
created: 2026-03-26

View File

@ -0,0 +1,84 @@
## Context
阶段一已经把消息层收敛到 `ITransport`,当前 `MessageManager` 只依赖 `Send`、`SendTo`、`SendToAll`、`OnReceive`、`StartAsync` 和 `Stop`。运行时实现仍然是 `ReliableUdpTransport`,但它现在只是一个基于 `UdpClient` 的纯 UDP 收发器,并没有 KCP 会话、重传调度或完整消息重组能力。`NetworkManager` 仍然直接实例化这个实现,因此阶段二需要在不破坏上层消息封包逻辑的前提下,引入一个真正的 `KcpTransport`
当前代码还存在两个约束。第一,`MessageManager` 收到 `OnReceive` 后会立刻解析 `Envelope` 并调用业务 handler因此阶段二必须保证回调上来的已经是完整业务消息而不是原始 UDP 数据报。第二,项目仓库里还没有可见的 KCP 传输实现代码,设计需要把 KCP 依赖和装配点描述清楚同时避免把主线程派发、QoS 分流或旧类清理这些后续阶段内容混入本次改动。
## Goals / Non-Goals
**Goals:**
- 新增独立的 `KcpTransport` 实现 `ITransport`,保持现有消息层接口和 `MessageManager` 使用方式不变。
- 在客户端维护单一默认 `KcpSession`,在服务端按远端地址维护独立 `KcpSession`,使每个连接具备独立的 KCP 状态。
- 通过后台接收循环和更新循环驱动 `Kcp.Input`、`Kcp.Update`/`Check`、`Kcp.Recv`,并只在收到完整业务消息后触发 `OnReceive`
- 将默认网络入口切换到 `KcpTransport`,并增加覆盖会话路由、完整消息交付和停止清理的编辑器测试。
**Non-Goals:**
- 删除 `ReliableUdpTransport`、`Packet.cs` 或阶段三里的旧可靠 UDP 清理工作。
- 将消息处理切换到 Unity 主线程队列。
- 为不同消息类型拆分 QoS 通道,或新增裸 UDP 的并行同步链路。
- 引入完整的连接重连、登录态状态机或新的业务握手协议。
## Decisions
### 1. 以新类 `KcpTransport` 落地,而不是继续修改 `ReliableUdpTransport`
新增 `Assets/Scripts/Network/NetworkTransport/KcpTransport.cs`,保留 `ReliableUdpTransport` 作为阶段一兼容实现,避免“纯 UDP 兼容类”和“KCP 会话传输类”在同一文件里继续叠加职责。这样阶段二可以单独验证 KCP 行为,阶段三再决定是否完全移除旧实现。
备选方案是在 `ReliableUdpTransport` 上继续加入 KCP 分支,但那会让命名、日志和生命周期控制再次变得模糊,不利于后续清理。
### 2. 用内部 `KcpSession` 封装每个远端的 KCP 状态
新增内部会话对象,至少保存以下状态:
- `IPEndPoint RemoteEndPoint`
- `uint Conv`
- KCP 实例
- `DateTime LastActivityUtc`
- 连接是否仍有效
客户端模式在构造时确定默认远端,并在 `StartAsync` 时创建默认会话;`Send(byte[])` 永远写入该默认会话。服务端模式按远端地址查找或创建会话,`SendTo(byte[], IPEndPoint)` 写入指定会话,`SendToAll(byte[])` 遍历当前有效会话逐个写入。为避免阶段二扩散到新的握手协议,本次使用统一配置的 `conv` 默认值;服务端仍以内网端点隔离不同连接,后续如需协商 `conv` 再通过新协议扩展。
备选方案是直接把 `Kcp` 对象散落在 `KcpTransport` 字典中而不包装会话对象,但这样会让活动时间、清理策略和发送入口分散在多个代码路径中。
### 3. 分离 UDP 接收循环和 KCP 更新循环
`KcpTransport` 维护两个后台任务:
- 接收循环:阻塞读取 `UdpClient.ReceiveAsync()`,按远端定位会话,调用 `session.Kcp.Input(...)`,随后持续从 `Kcp.Recv` 拉取完整业务消息,并以原始远端地址触发 `OnReceive`
- 更新循环:周期性遍历活动会话,依据 `Kcp.Check`/`Kcp.Update` 推进重传、确认和 flush确保即使没有新的入站 UDP 包KCP 仍能推进超时与重发。
这种设计满足 KCP 对周期驱动的要求,同时将“收到原始 UDP 包”和“交付完整业务消息”明确分成两个层次。备选方案是仅在发送或收包时调用 `Update`,但那会让空闲期的重发和 flush 依赖外部流量,增加延迟和丢包恢复风险。
### 4. 保持 `ITransport` 契约稳定,仅替换默认实现
阶段二不扩展 `ITransport` 接口,也不要求 `MessageManager` 感知 KCP。`NetworkManager` 的改动限制为把 `new ReliableUdpTransport(...)` 替换成 `new KcpTransport(...)`,其余封包与 handler 注册逻辑保持不变。这样可以让编辑器测试继续围绕既有接口编写,并把接口扩展留给未来真正需要 `OnConnected`/`OnDisconnected`/`OnError` 的阶段。
备选方案是同步在本次 change 中扩展连接事件接口,但这会带来更多上层重构,不属于阶段二的最小交付面。
### 5. 通过可控测试桩验证 KCP 完整消息交付
测试重点放在三个方面:
- 客户端默认会话的 `Send` 走向配置远端。
- 服务端对多个远端维持独立会话,广播不会混淆会话状态。
- `OnReceive` 只在完整业务消息从 `Kcp.Recv` 取出后触发。
如果仓库内已有可引用的 KCP 程序集,编辑器测试可直接驱动真实 `KcpTransport`;如果当前工程缺失程序集引用,则先补齐插件接入,再用回环端口或可替代的 KCP 适配层完成测试。
## Risks / Trade-offs
- [KCP 程序集在仓库中不可见] → 在实现前确认插件来源与 asmdef 引用方式;若缺失,则先把依赖接入作为首个实现任务。
- [统一 `conv` 简化了阶段二,但不覆盖未来协商场景] → 设计中保留 `KcpSession.Conv` 和会话键扩展点,后续可在不推翻 `KcpTransport` 的前提下加入协商协议。
- [后台更新循环会引入额外线程与定时负担] → 将更新频率集中在 transport 内部,并确保 `Stop()` 能取消循环、关闭 socket、清理会话。
- [消息层仍在网络线程里执行业务 handler] → 本次只保证交付的是完整消息,线程切换问题留待后续阶段专门处理。
## Migration Plan
1. 接入或确认 KCP C# 依赖在 Unity 工程中可用。
2. 新增 `KcpTransport` 与内部 `KcpSession`,完成客户端和服务端的会话创建、发送、接收与更新循环。
3. 将 `NetworkManager` 默认实现切换到 `KcpTransport`,保持 `MessageManager` 和消息类型不变。
4. 增加编辑器测试,覆盖默认发送、服务端会话路由、完整消息交付和停止清理。
5. 如果集成验证失败,可临时切回 `ReliableUdpTransport` 入口,不影响 `ITransport` 和消息层接口。
## Open Questions
- 项目最终使用哪一个 KCP C# 实现,以及它在 Unity 中的程序集引用方式是什么?
- 阶段二是否需要立即加入空闲会话超时回收,还是只在 `Stop()` 时统一清理即可?
- 服务端是否已经有固定的监听入口需要同步替换为 `KcpTransport`,还是当前变更只覆盖客户端入口与共享 transport 代码?

View File

@ -0,0 +1,26 @@
## Why
阶段一已经把网络抽象收口到 `ITransport`,但当前运行时仍然只有一个名为 `ReliableUdpTransport` 的纯 UDP 实现,阶段二目标里要求的 KCP 可靠收发、会话维护和替换自定义可靠 UDP 结构还没有真正落地。继续在兼容类上堆补丁会再次混淆 UDP socket、会话状态和消息交付职责因此需要单独提出 `KcpTransport` 变更,作为后续移除旧可靠 UDP 结构的前置条件。
## What Changes
- 新增 `KcpTransport`,以 `UdpClient` 承载原始数据报,以 KCP 负责可靠、有序的消息交付,并完整实现现有 `ITransport` 接口。
- 新增面向 KCP 的会话模型,用于管理客户端默认会话和服务端按远端地址维护的会话实例、活动时间与更新驱动。
- 将网络入口装配从 `ReliableUdpTransport` 切换为 `KcpTransport`,保持 `MessageManager` 的发送与接收契约不变。
- 为 KCP 传输与会话路由补充编辑器测试,验证完整业务消息能够经 `KcpTransport` 可靠收发。
## Capabilities
### New Capabilities
- `kcp-transport`: 提供基于 UDP + KCP 的 `ITransport` 实现,支持客户端默认会话和服务端多会话场景下的完整消息收发。
### Modified Capabilities
None.
## Impact
- 受影响代码:`Assets/Scripts/Network/NetworkTransport/`、`Assets/Scripts/NetworkManager.cs`、`Assets/Tests/EditMode/Network/`
- 受影响接口:`ITransport` 的调用方式保持不变,但默认运行时实现从 `ReliableUdpTransport` 切换到 `KcpTransport`
- 受影响依赖:需要项目中可用的 KCP C# 实现和对应程序集引用
- 受影响系统:客户端登录、心跳、输入上行、状态下行等所有通过 `MessageManager` 封包的网络消息链路

View File

@ -0,0 +1,45 @@
## ADDED Requirements
### Requirement: Client mode uses a default KCP session
The transport SHALL support a client mode that is constructed with a default remote endpoint and creates exactly one default KCP session after `StartAsync`. Calls to `Send(byte[] data)` in client mode MUST encode the payload through that session and emit the resulting UDP datagrams to the configured remote endpoint.
#### Scenario: Client sends through the default remote session
- **WHEN** the application starts a client-mode transport and calls `Send` with a business payload
- **THEN** the transport routes the payload through the default KCP session
- **THEN** the UDP socket sends the encoded datagrams to the configured remote endpoint
### Requirement: Server mode isolates KCP session state per remote endpoint
The transport SHALL support a server mode that receives UDP datagrams from multiple remote endpoints and maintains independent KCP session state for each active remote endpoint. A datagram received from one endpoint MUST only be applied to that endpoint's session, and `SendToAll(byte[] data)` MUST encode and enqueue the payload once per active session.
#### Scenario: Two remotes do not share KCP state
- **WHEN** a server-mode transport receives KCP traffic from two different remote endpoints
- **THEN** the transport creates or reuses separate KCP session state for each endpoint
- **THEN** payloads reconstructed for one endpoint are delivered with that endpoint as the sender
#### Scenario: Broadcast writes through each active session
- **WHEN** the application calls `SendToAll` while the server transport has multiple active sessions
- **THEN** the transport encodes the payload for each active KCP session
- **THEN** the UDP socket sends datagrams to every active remote endpoint without collapsing them into a shared session
### Requirement: OnReceive only dispatches complete KCP payloads
The transport SHALL invoke `OnReceive` only after a complete application payload has been reconstructed from `Kcp.Recv`. Raw UDP packets, partial KCP fragments, and transport-level acknowledgements MUST NOT be surfaced to the message layer.
#### Scenario: Fragmented payload is withheld until complete
- **WHEN** a business message spans multiple UDP datagrams and only a subset of those datagrams has been processed
- **THEN** the transport does not invoke `OnReceive`
#### Scenario: Reassembled payload is forwarded to the message layer
- **WHEN** the remaining datagrams for a fragmented KCP message are processed and `Kcp.Recv` yields a complete payload
- **THEN** the transport invokes `OnReceive` exactly once with the reconstructed payload and the originating remote endpoint
### Requirement: Active sessions are driven until stop and cleaned up on shutdown
The transport SHALL continue driving KCP timers for every active session while it is running, so retransmissions, acknowledgements, and flushes can progress even when no new UDP datagrams arrive. Calling `Stop()` MUST stop the receive and update loops, release the UDP socket, and clear active session state.
#### Scenario: Idle sessions still receive KCP timer updates
- **WHEN** the transport has an active session with pending KCP work but no new incoming UDP datagrams
- **THEN** the transport continues calling the KCP update path according to its internal schedule
#### Scenario: Stop releases transport resources
- **WHEN** the application calls `Stop()` on a running transport
- **THEN** the transport stops receiving new UDP datagrams
- **THEN** the transport clears its active KCP session state before shutdown completes

View File

@ -0,0 +1,16 @@
## 1. KCP transport scaffolding
- [x] 1.1 确认并接入 Unity 工程可用的 KCP C# 依赖与程序集引用,保证 `Assets/Scripts/Network/NetworkTransport/` 可以实例化 KCP 对象
- [x] 1.2 新增 `KcpTransport` 与内部 `KcpSession` 基础结构,补齐客户端默认远端、服务端监听模式和会话状态容器
## 2. Core transport implementation
- [x] 2.1 实现客户端 `Send`、服务端 `SendTo`、`SendToAll` 的 KCP 编码路径,确保所有出站消息都通过对应会话发送 UDP 数据报
- [x] 2.2 实现 UDP 接收循环,将入站数据报路由到正确的 `KcpSession.Input`,并在 `Kcp.Recv` 拿到完整业务消息后触发 `OnReceive`
- [x] 2.3 实现会话更新与关闭流程,包括周期性 `Kcp.Check`/`Kcp.Update` 驱动、活动状态刷新、`Stop()` 时的循环停止和资源清理
## 3. Integration and verification
- [x] 3.1 将默认网络入口从 `ReliableUdpTransport` 切换到 `KcpTransport`,保持 `MessageManager` 和现有消息封包逻辑不变
- [x] 3.2 为 `KcpTransport` 增加编辑器测试,覆盖默认会话发送、多远端会话隔离、完整消息交付和停止清理行为
- [x] 3.3 运行相关网络编辑器测试并修正集成问题,确认阶段二 capability 达到 apply-ready 的实现标准

View File

@ -0,0 +1,49 @@
# kcp-transport Specification
## Purpose
TBD - created by archiving change introduce-kcp-transport. Update Purpose after archive.
## Requirements
### Requirement: Client mode uses a default KCP session
The transport SHALL support a client mode that is constructed with a default remote endpoint and creates exactly one default KCP session after `StartAsync`. Calls to `Send(byte[] data)` in client mode MUST encode the payload through that session and emit the resulting UDP datagrams to the configured remote endpoint.
#### Scenario: Client sends through the default remote session
- **WHEN** the application starts a client-mode transport and calls `Send` with a business payload
- **THEN** the transport routes the payload through the default KCP session
- **THEN** the UDP socket sends the encoded datagrams to the configured remote endpoint
### Requirement: Server mode isolates KCP session state per remote endpoint
The transport SHALL support a server mode that receives UDP datagrams from multiple remote endpoints and maintains independent KCP session state for each active remote endpoint. A datagram received from one endpoint MUST only be applied to that endpoint's session, and `SendToAll(byte[] data)` MUST encode and enqueue the payload once per active session.
#### Scenario: Two remotes do not share KCP state
- **WHEN** a server-mode transport receives KCP traffic from two different remote endpoints
- **THEN** the transport creates or reuses separate KCP session state for each endpoint
- **THEN** payloads reconstructed for one endpoint are delivered with that endpoint as the sender
#### Scenario: Broadcast writes through each active session
- **WHEN** the application calls `SendToAll` while the server transport has multiple active sessions
- **THEN** the transport encodes the payload for each active KCP session
- **THEN** the UDP socket sends datagrams to every active remote endpoint without collapsing them into a shared session
### Requirement: OnReceive only dispatches complete KCP payloads
The transport SHALL invoke `OnReceive` only after a complete application payload has been reconstructed from `Kcp.Recv`. Raw UDP packets, partial KCP fragments, and transport-level acknowledgements MUST NOT be surfaced to the message layer.
#### Scenario: Fragmented payload is withheld until complete
- **WHEN** a business message spans multiple UDP datagrams and only a subset of those datagrams has been processed
- **THEN** the transport does not invoke `OnReceive`
#### Scenario: Reassembled payload is forwarded to the message layer
- **WHEN** the remaining datagrams for a fragmented KCP message are processed and `Kcp.Recv` yields a complete payload
- **THEN** the transport invokes `OnReceive` exactly once with the reconstructed payload and the originating remote endpoint
### Requirement: Active sessions are driven until stop and cleaned up on shutdown
The transport SHALL continue driving KCP timers for every active session while it is running, so retransmissions, acknowledgements, and flushes can progress even when no new UDP datagrams arrive. Calling `Stop()` MUST stop the receive and update loops, release the UDP socket, and clear active session state.
#### Scenario: Idle sessions still receive KCP timer updates
- **WHEN** the transport has an active session with pending KCP work but no new incoming UDP datagrams
- **THEN** the transport continues calling the KCP update path according to its internal schedule
#### Scenario: Stop releases transport resources
- **WHEN** the application calls `Stop()` on a running transport
- **THEN** the transport stops receiving new UDP datagrams
- **THEN** the transport clears its active KCP session state before shutdown completes