引入 KCP,进行基本迁移

This commit is contained in:
SepComet 2026-03-26 11:25:58 +08:00
parent aeca7a346c
commit 815d5f1f24
24 changed files with 1466 additions and 483 deletions

View File

@ -0,0 +1,156 @@
---
name: openspec-apply-change
description: Implement tasks from an OpenSpec change. Use when the user wants to start implementing, continue implementation, or work through tasks.
license: MIT
compatibility: Requires openspec CLI.
metadata:
author: openspec
version: "1.0"
generatedBy: "1.2.0"
---
Implement tasks from an OpenSpec change.
**Input**: Optionally specify a change name. If omitted, check if it can be inferred from conversation context. If vague or ambiguous you MUST prompt for available changes.
**Steps**
1. **Select the change**
If a name is provided, use it. Otherwise:
- Infer from conversation context if the user mentioned a change
- Auto-select if only one active change exists
- If ambiguous, run `openspec list --json` to get available changes and use the **AskUserQuestion tool** to let the user select
Always announce: "Using change: <name>" and how to override (e.g., `/opsx:apply <other>`).
2. **Check status to understand the schema**
```bash
openspec status --change "<name>" --json
```
Parse the JSON to understand:
- `schemaName`: The workflow being used (e.g., "spec-driven")
- Which artifact contains the tasks (typically "tasks" for spec-driven, check status for others)
3. **Get apply instructions**
```bash
openspec instructions apply --change "<name>" --json
```
This returns:
- Context file paths (varies by schema - could be proposal/specs/design/tasks or spec/tests/implementation/docs)
- Progress (total, complete, remaining)
- Task list with status
- Dynamic instruction based on current state
**Handle states:**
- If `state: "blocked"` (missing artifacts): show message, suggest using openspec-continue-change
- If `state: "all_done"`: congratulate, suggest archive
- Otherwise: proceed to implementation
4. **Read context files**
Read the files listed in `contextFiles` from the apply instructions output.
The files depend on the schema being used:
- **spec-driven**: proposal, specs, design, tasks
- Other schemas: follow the contextFiles from CLI output
5. **Show current progress**
Display:
- Schema being used
- Progress: "N/M tasks complete"
- Remaining tasks overview
- Dynamic instruction from CLI
6. **Implement tasks (loop until done or blocked)**
For each pending task:
- Show which task is being worked on
- Make the code changes required
- Keep changes minimal and focused
- Mark task complete in the tasks file: `- [ ]``- [x]`
- Continue to next task
**Pause if:**
- Task is unclear → ask for clarification
- Implementation reveals a design issue → suggest updating artifacts
- Error or blocker encountered → report and wait for guidance
- User interrupts
7. **On completion or pause, show status**
Display:
- Tasks completed this session
- Overall progress: "N/M tasks complete"
- If all done: suggest archive
- If paused: explain why and wait for guidance
**Output During Implementation**
```
## Implementing: <change-name> (schema: <schema-name>)
Working on task 3/7: <task description>
[...implementation happening...]
✓ Task complete
Working on task 4/7: <task description>
[...implementation happening...]
✓ Task complete
```
**Output On Completion**
```
## Implementation Complete
**Change:** <change-name>
**Schema:** <schema-name>
**Progress:** 7/7 tasks complete ✓
### Completed This Session
- [x] Task 1
- [x] Task 2
...
All tasks complete! Ready to archive this change.
```
**Output On Pause (Issue Encountered)**
```
## Implementation Paused
**Change:** <change-name>
**Schema:** <schema-name>
**Progress:** 4/7 tasks complete
### Issue Encountered
<description of the issue>
**Options:**
1. <option 1>
2. <option 2>
3. Other approach
What would you like to do?
```
**Guardrails**
- Keep going through tasks until done or blocked
- Always read context files before starting (from the apply instructions output)
- If task is ambiguous, pause and ask before implementing
- If implementation reveals issues, pause and suggest artifact updates
- Keep code changes minimal and scoped to each task
- Update task checkbox immediately after completing each task
- Pause on errors, blockers, or unclear requirements - don't guess
- Use contextFiles from CLI output, don't assume specific file names
**Fluid Workflow Integration**
This skill supports the "actions on a change" model:
- **Can be invoked anytime**: Before all artifacts are done (if tasks exist), after partial implementation, interleaved with other actions
- **Allows artifact updates**: If implementation reveals design issues, suggest updating artifacts - not phase-locked, work fluidly

View File

@ -0,0 +1,114 @@
---
name: openspec-archive-change
description: Archive a completed change in the experimental workflow. Use when the user wants to finalize and archive a change after implementation is complete.
license: MIT
compatibility: Requires openspec CLI.
metadata:
author: openspec
version: "1.0"
generatedBy: "1.2.0"
---
Archive a completed change in the experimental workflow.
**Input**: Optionally specify a change name. If omitted, check if it can be inferred from conversation context. If vague or ambiguous you MUST prompt for available changes.
**Steps**
1. **If no change name provided, prompt for selection**
Run `openspec list --json` to get available changes. Use the **AskUserQuestion tool** to let the user select.
Show only active changes (not already archived).
Include the schema used for each change if available.
**IMPORTANT**: Do NOT guess or auto-select a change. Always let the user choose.
2. **Check artifact completion status**
Run `openspec status --change "<name>" --json` to check artifact completion.
Parse the JSON to understand:
- `schemaName`: The workflow being used
- `artifacts`: List of artifacts with their status (`done` or other)
**If any artifacts are not `done`:**
- Display warning listing incomplete artifacts
- Use **AskUserQuestion tool** to confirm user wants to proceed
- Proceed if user confirms
3. **Check task completion status**
Read the tasks file (typically `tasks.md`) to check for incomplete tasks.
Count tasks marked with `- [ ]` (incomplete) vs `- [x]` (complete).
**If incomplete tasks found:**
- Display warning showing count of incomplete tasks
- Use **AskUserQuestion tool** to confirm user wants to proceed
- Proceed if user confirms
**If no tasks file exists:** Proceed without task-related warning.
4. **Assess delta spec sync state**
Check for delta specs at `openspec/changes/<name>/specs/`. If none exist, proceed without sync prompt.
**If delta specs exist:**
- Compare each delta spec with its corresponding main spec at `openspec/specs/<capability>/spec.md`
- Determine what changes would be applied (adds, modifications, removals, renames)
- Show a combined summary before prompting
**Prompt options:**
- If changes needed: "Sync now (recommended)", "Archive without syncing"
- If already synced: "Archive now", "Sync anyway", "Cancel"
If user chooses sync, use Task tool (subagent_type: "general-purpose", prompt: "Use Skill tool to invoke openspec-sync-specs for change '<name>'. Delta spec analysis: <include the analyzed delta spec summary>"). Proceed to archive regardless of choice.
5. **Perform the archive**
Create the archive directory if it doesn't exist:
```bash
mkdir -p openspec/changes/archive
```
Generate target name using current date: `YYYY-MM-DD-<change-name>`
**Check if target already exists:**
- If yes: Fail with error, suggest renaming existing archive or using different date
- If no: Move the change directory to archive
```bash
mv openspec/changes/<name> openspec/changes/archive/YYYY-MM-DD-<name>
```
6. **Display summary**
Show archive completion summary including:
- Change name
- Schema that was used
- Archive location
- Whether specs were synced (if applicable)
- Note about any warnings (incomplete artifacts/tasks)
**Output On Success**
```
## Archive Complete
**Change:** <change-name>
**Schema:** <schema-name>
**Archived to:** openspec/changes/archive/YYYY-MM-DD-<name>/
**Specs:** ✓ Synced to main specs (or "No delta specs" or "Sync skipped")
All artifacts complete. All tasks complete.
```
**Guardrails**
- Always prompt for change selection if not provided
- Use artifact graph (openspec status --json) for completion checking
- Don't block archive on warnings - just inform and confirm
- Preserve .openspec.yaml when moving to archive (it moves with the directory)
- Show clear summary of what happened
- If sync is requested, use openspec-sync-specs approach (agent-driven)
- If delta specs exist, always run the sync assessment and show the combined summary before prompting

View File

@ -0,0 +1,288 @@
---
name: openspec-explore
description: Enter explore mode - a thinking partner for exploring ideas, investigating problems, and clarifying requirements. Use when the user wants to think through something before or during a change.
license: MIT
compatibility: Requires openspec CLI.
metadata:
author: openspec
version: "1.0"
generatedBy: "1.2.0"
---
Enter explore mode. Think deeply. Visualize freely. Follow the conversation wherever it goes.
**IMPORTANT: Explore mode is for thinking, not implementing.** You may read files, search code, and investigate the codebase, but you must NEVER write code or implement features. If the user asks you to implement something, remind them to exit explore mode first and create a change proposal. You MAY create OpenSpec artifacts (proposals, designs, specs) if the user asks—that's capturing thinking, not implementing.
**This is a stance, not a workflow.** There are no fixed steps, no required sequence, no mandatory outputs. You're a thinking partner helping the user explore.
---
## The Stance
- **Curious, not prescriptive** - Ask questions that emerge naturally, don't follow a script
- **Open threads, not interrogations** - Surface multiple interesting directions and let the user follow what resonates. Don't funnel them through a single path of questions.
- **Visual** - Use ASCII diagrams liberally when they'd help clarify thinking
- **Adaptive** - Follow interesting threads, pivot when new information emerges
- **Patient** - Don't rush to conclusions, let the shape of the problem emerge
- **Grounded** - Explore the actual codebase when relevant, don't just theorize
---
## What You Might Do
Depending on what the user brings, you might:
**Explore the problem space**
- Ask clarifying questions that emerge from what they said
- Challenge assumptions
- Reframe the problem
- Find analogies
**Investigate the codebase**
- Map existing architecture relevant to the discussion
- Find integration points
- Identify patterns already in use
- Surface hidden complexity
**Compare options**
- Brainstorm multiple approaches
- Build comparison tables
- Sketch tradeoffs
- Recommend a path (if asked)
**Visualize**
```
┌─────────────────────────────────────────┐
│ Use ASCII diagrams liberally │
├─────────────────────────────────────────┤
│ │
│ ┌────────┐ ┌────────┐ │
│ │ State │────────▶│ State │ │
│ │ A │ │ B │ │
│ └────────┘ └────────┘ │
│ │
│ System diagrams, state machines, │
│ data flows, architecture sketches, │
│ dependency graphs, comparison tables │
│ │
└─────────────────────────────────────────┘
```
**Surface risks and unknowns**
- Identify what could go wrong
- Find gaps in understanding
- Suggest spikes or investigations
---
## OpenSpec Awareness
You have full context of the OpenSpec system. Use it naturally, don't force it.
### Check for context
At the start, quickly check what exists:
```bash
openspec list --json
```
This tells you:
- If there are active changes
- Their names, schemas, and status
- What the user might be working on
### When no change exists
Think freely. When insights crystallize, you might offer:
- "This feels solid enough to start a change. Want me to create a proposal?"
- Or keep exploring - no pressure to formalize
### When a change exists
If the user mentions a change or you detect one is relevant:
1. **Read existing artifacts for context**
- `openspec/changes/<name>/proposal.md`
- `openspec/changes/<name>/design.md`
- `openspec/changes/<name>/tasks.md`
- etc.
2. **Reference them naturally in conversation**
- "Your design mentions using Redis, but we just realized SQLite fits better..."
- "The proposal scopes this to premium users, but we're now thinking everyone..."
3. **Offer to capture when decisions are made**
| Insight Type | Where to Capture |
|--------------|------------------|
| New requirement discovered | `specs/<capability>/spec.md` |
| Requirement changed | `specs/<capability>/spec.md` |
| Design decision made | `design.md` |
| Scope changed | `proposal.md` |
| New work identified | `tasks.md` |
| Assumption invalidated | Relevant artifact |
Example offers:
- "That's a design decision. Capture it in design.md?"
- "This is a new requirement. Add it to specs?"
- "This changes scope. Update the proposal?"
4. **The user decides** - Offer and move on. Don't pressure. Don't auto-capture.
---
## What You Don't Have To Do
- Follow a script
- Ask the same questions every time
- Produce a specific artifact
- Reach a conclusion
- Stay on topic if a tangent is valuable
- Be brief (this is thinking time)
---
## Handling Different Entry Points
**User brings a vague idea:**
```
User: I'm thinking about adding real-time collaboration
You: Real-time collab is a big space. Let me think about this...
COLLABORATION SPECTRUM
════════════════════════════════════════════
Awareness Coordination Sync
│ │ │
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐
│Presence│ │Cursors │ │ CRDT │
│ "3 │ │ Multi │ │Conflict│
│online" │ │ select │ │ free │
└────────┘ └────────┘ └────────┘
│ │ │
trivial moderate complex
Where's your head at?
```
**User brings a specific problem:**
```
User: The auth system is a mess
You: [reads codebase]
┌─────────────────────────────────────────────┐
│ CURRENT AUTH FLOW │
└─────────────────────────────────────────────┘
┌─────────────┼─────────────┐
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Google │ │ GitHub │ │ Email │
│ OAuth │ │ OAuth │ │ Magic │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└─────────────┼─────────────┘
┌───────────┐
│ Session │
└─────┬─────┘
┌───────────┐
│ Perms │
└───────────┘
I see three tangles. Which one's burning?
```
**User is stuck mid-implementation:**
```
User: /opsx:explore add-auth-system
The OAuth integration is more complex than expected
You: [reads change artifacts]
You're on task 4: "Implement OAuth flow"
Let me trace what's involved...
[draws diagram, explores options, suggests paths]
Want to update the design to reflect this?
Or add a spike task to investigate?
```
**User wants to compare options:**
```
User: Should we use Postgres or SQLite?
You: Generic answer is boring. What's the context?
User: A CLI tool that tracks local dev environments
You: That changes everything.
┌─────────────────────────────────────────────────┐
│ CLI TOOL DATA STORAGE │
└─────────────────────────────────────────────────┘
Key constraints:
• No daemon running
• Must work offline
• Single user
SQLite Postgres
Deployment embedded ✓ needs server ✗
Offline yes ✓ no ✗
Single file yes ✓ no ✗
SQLite. Not even close.
Unless... is there a sync component?
```
---
## Ending Discovery
There's no required ending. Discovery might:
- **Flow into a proposal**: "Ready to start? I can create a change proposal."
- **Result in artifact updates**: "Updated design.md with these decisions"
- **Just provide clarity**: User has what they need, moves on
- **Continue later**: "We can pick this up anytime"
When it feels like things are crystallizing, you might summarize:
```
## What We Figured Out
**The problem**: [crystallized understanding]
**The approach**: [if one emerged]
**Open questions**: [if any remain]
**Next steps** (if ready):
- Create a change proposal
- Keep exploring: just keep talking
```
But this summary is optional. Sometimes the thinking IS the value.
---
## Guardrails
- **Don't implement** - Never write code or implement features. Creating OpenSpec artifacts is fine, writing application code is not.
- **Don't fake understanding** - If something is unclear, dig deeper
- **Don't rush** - Discovery is thinking time, not task time
- **Don't force structure** - Let patterns emerge naturally
- **Don't auto-capture** - Offer to save insights, don't just do it
- **Do visualize** - A good diagram is worth many paragraphs
- **Do explore the codebase** - Ground discussions in reality
- **Do question assumptions** - Including the user's and your own

View File

@ -0,0 +1,110 @@
---
name: openspec-propose
description: Propose a new change with all artifacts generated in one step. Use when the user wants to quickly describe what they want to build and get a complete proposal with design, specs, and tasks ready for implementation.
license: MIT
compatibility: Requires openspec CLI.
metadata:
author: openspec
version: "1.0"
generatedBy: "1.2.0"
---
Propose a new change - create the change and generate all artifacts in one step.
I'll create a change with artifacts:
- proposal.md (what & why)
- design.md (how)
- tasks.md (implementation steps)
When ready to implement, run /opsx:apply
---
**Input**: The user's request should include a change name (kebab-case) OR a description of what they want to build.
**Steps**
1. **If no clear input provided, ask what they want to build**
Use the **AskUserQuestion tool** (open-ended, no preset options) to ask:
> "What change do you want to work on? Describe what you want to build or fix."
From their description, derive a kebab-case name (e.g., "add user authentication" → `add-user-auth`).
**IMPORTANT**: Do NOT proceed without understanding what the user wants to build.
2. **Create the change directory**
```bash
openspec new change "<name>"
```
This creates a scaffolded change at `openspec/changes/<name>/` with `.openspec.yaml`.
3. **Get the artifact build order**
```bash
openspec status --change "<name>" --json
```
Parse the JSON to get:
- `applyRequires`: array of artifact IDs needed before implementation (e.g., `["tasks"]`)
- `artifacts`: list of all artifacts with their status and dependencies
4. **Create artifacts in sequence until apply-ready**
Use the **TodoWrite tool** to track progress through the artifacts.
Loop through artifacts in dependency order (artifacts with no pending dependencies first):
a. **For each artifact that is `ready` (dependencies satisfied)**:
- Get instructions:
```bash
openspec instructions <artifact-id> --change "<name>" --json
```
- The instructions JSON includes:
- `context`: Project background (constraints for you - do NOT include in output)
- `rules`: Artifact-specific rules (constraints for you - do NOT include in output)
- `template`: The structure to use for your output file
- `instruction`: Schema-specific guidance for this artifact type
- `outputPath`: Where to write the artifact
- `dependencies`: Completed artifacts to read for context
- Read any completed dependency files for context
- Create the artifact file using `template` as the structure
- Apply `context` and `rules` as constraints - but do NOT copy them into the file
- Show brief progress: "Created <artifact-id>"
b. **Continue until all `applyRequires` artifacts are complete**
- After creating each artifact, re-run `openspec status --change "<name>" --json`
- Check if every artifact ID in `applyRequires` has `status: "done"` in the artifacts array
- Stop when all `applyRequires` artifacts are done
c. **If an artifact requires user input** (unclear context):
- Use **AskUserQuestion tool** to clarify
- Then continue with creation
5. **Show final status**
```bash
openspec status --change "<name>"
```
**Output**
After completing all artifacts, summarize:
- Change name and location
- List of artifacts created with brief descriptions
- What's ready: "All artifacts created! Ready for implementation."
- Prompt: "Run `/opsx:apply` or ask me to implement to start working on the tasks."
**Artifact Creation Guidelines**
- Follow the `instruction` field from `openspec instructions` for each artifact type
- The schema defines what each artifact should contain - follow it
- Read dependency artifacts for context before creating new ones
- Use `template` as the structure for your output file - fill in its sections
- **IMPORTANT**: `context` and `rules` are constraints for YOU, not content for the file
- Do NOT copy `<context>`, `<rules>`, `<project_context>` blocks into the artifact
- These guide what you write, but should never appear in the output
**Guardrails**
- Create ALL artifacts needed for implementation (as defined by schema's `apply.requires`)
- Always read dependency artifacts before creating a new one
- If context is critically unclear, ask the user - but prefer making reasonable decisions to keep momentum
- If a change with that name already exists, ask if user wants to continue it or create a new one
- Verify each artifact file exists after writing before proceeding to next

Binary file not shown.

View File

@ -0,0 +1,33 @@
fileFormatVersion: 2
guid: 545b0ef2a81845344bf7962772adf4d4
PluginImporter:
externalObjects: {}
serializedVersion: 2
iconMap: {}
executionOrder: {}
defineConstraints: []
isPreloaded: 0
isOverridable: 0
isExplicitlyReferenced: 0
validateReferences: 1
platformData:
- first:
Any:
second:
enabled: 1
settings: {}
- first:
Editor: Editor
second:
enabled: 0
settings:
DefaultValueInitialized: true
- first:
Windows Store Apps: WindowsStoreApps
second:
enabled: 0
settings:
CPU: AnyCPU
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,14 @@
{
"name": "Network.Runtime",
"rootNamespace": "Network",
"references": [],
"includePlatforms": [],
"excludePlatforms": [],
"allowUnsafeCode": false,
"overrideReferences": false,
"precompiledReferences": [],
"autoReferenced": true,
"defineConstraints": [],
"versionDefines": [],
"noEngineReferences": false
}

View File

@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: d972d56d6b084684b5b0666f4856da75
AssemblyDefinitionImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -13,39 +13,53 @@ namespace Network.NetworkApplication
private readonly ITransport transport; private readonly ITransport transport;
private readonly Dictionary<MessageType, Func<byte[], IPEndPoint, Task>> handlers = private readonly Dictionary<MessageType, Func<byte[], IPEndPoint, Task>> handlers =
new Dictionary<MessageType, Func<byte[], IPEndPoint, Task>>(); new();
public MessageManager(ITransport transport) public MessageManager(ITransport transport)
{ {
this.transport = transport; this.transport = transport ?? throw new ArgumentNullException(nameof(transport));
this.transport.OnReceive += OnTransportReceiveAsync; this.transport.OnReceive += OnTransportReceiveAsync;
} }
public void RegisterHandler(MessageType type, IMessageHandler handler) public void RegisterHandler(MessageType type, IMessageHandler handler)
{ {
handlers[type] = async (payload, sender) => { await handler.HandleAsync(payload, sender); }; if (handler == null)
{
throw new ArgumentNullException(nameof(handler));
}
handlers[type] = (payload, sender) => handler.HandleAsync(payload, sender);
Console.WriteLine($"[MessageManager] 注册处理器:{type}"); Console.WriteLine($"[MessageManager] 注册处理器:{type}");
} }
public void RegisterHandler(MessageType type, Func<byte[], IPEndPoint, Task> handler) public void RegisterHandler(MessageType type, Func<byte[], IPEndPoint, Task> handler)
{ {
var han = new DelegateMessageHandler(handler); if (handler == null)
{
throw new ArgumentNullException(nameof(handler));
}
RegisterHandler(type, new DelegateMessageHandler(handler)); RegisterHandler(type, new DelegateMessageHandler(handler));
} }
public void RegisterHandler(MessageType type, Action<byte[], IPEndPoint> handler) public void RegisterHandler(MessageType type, Action<byte[], IPEndPoint> handler)
{ {
var han = new DelegateMessageHandler((msg, sender) => { handler(msg, sender); }); if (handler == null)
RegisterHandler(type, new DelegateMessageHandler((msg, sender) =>
{ {
handler(msg, sender); throw new ArgumentNullException(nameof(handler));
return Task.CompletedTask; }
}));
RegisterHandler(type, new DelegateMessageHandler(handler));
} }
public void SendMessage<T>(T message, MessageType type, IPEndPoint target = null) where T : IMessage public void SendMessage<T>(T message, MessageType type, IPEndPoint target = null) where T : IMessage
{ {
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
var envelope = new Envelope() var envelope = new Envelope()
{ {
Type = (int)type, Type = (int)type,
@ -66,6 +80,11 @@ namespace Network.NetworkApplication
public void BroadcastMessage<T>(T message, MessageType type) where T : IMessage public void BroadcastMessage<T>(T message, MessageType type) where T : IMessage
{ {
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}
Console.WriteLine($"[MessageManager] 广播消息:{type}"); Console.WriteLine($"[MessageManager] 广播消息:{type}");
var envelope = new Envelope() var envelope = new Envelope()
{ {

View File

@ -1,193 +0,0 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
namespace Network.NetworkTransport
{
public class ClientSession
{
private ITransport _transport;
private IPEndPoint _remote;
public long LastActivityTs { get; private set; }
public uint SendSequenceNumber { get; private set; } = 0;
private int _currentTicks = 0;
private int _nextSendTicks = 0;
private int _sendInterval = 10;
// 重传时间 5s
private long _retransmitTicks = 5000;
// 上层交付
private readonly LinkedList<Packet> _sendQueue = new LinkedList<Packet>();
// 已发送但未确认
private readonly LinkedList<Packet> _sendBuffer = new LinkedList<Packet>();
// 已收到但乱序
private readonly LinkedList<Packet> _receiveBuffer = new LinkedList<Packet>();
// 已收到可交付
private readonly LinkedList<Packet> _receiveQueue = new LinkedList<Packet>();
private bool _hasReceived = false;
private uint _expectedAck = 0;
private readonly object _lockObj = new object();
public ClientSession(ITransport transport, IPEndPoint remote)
{
_transport = transport;
_remote = remote;
LastActivityTs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
}
public uint GetExpectedAck() => _expectedAck;
public void SetSendInterval(int interval) => _sendInterval = interval;
private uint GetNextSendSequence()
{
lock (_lockObj)
{
return SendSequenceNumber++;
}
}
public void Tick(int currentTicks)
{
_currentTicks = currentTicks;
if (_currentTicks >= _nextSendTicks)
{
_nextSendTicks = currentTicks + _sendInterval;
SendPacketInternal();
}
}
public void SendPacket(byte[] data)
{
_sendQueue.AddLast(Packet.CreateDataPacket(GetNextSendSequence(), data));
}
public List<Packet> ReceivePackets()
{
var list = new List<Packet>();
lock (_lockObj)
{
while (_receiveQueue.Count > 0)
{
var packet = _receiveQueue.First.Value;
list.Add(packet);
_receiveQueue.RemoveFirst();
}
}
return list;
}
private void SendPacketInternal()
{
if (_hasReceived)
{
var packet = Packet.CreateAckPacket(_expectedAck);
_sendBuffer.AddLast(packet);
var bytes = packet.ToBytes();
_transport.SendTo(bytes, _remote);
_hasReceived = false;
}
foreach (var packet in _receiveBuffer)
{
if (_currentTicks - packet.Timestamp > _retransmitTicks)
{
var bytes = packet.ToBytes();
_transport.SendTo(bytes, _remote);
}
else break;
}
while (_sendQueue.Count > 0)
{
var packet = _sendQueue.First.Value;
_sendBuffer.AddLast(packet);
var bytes = packet.ToBytes();
_transport.SendTo(bytes, _remote);
}
}
public void ReceivePacketsInternal(Packet packet)
{
uint seq = packet.SequenceNumber;
// 是否是按序到达的包
if (seq == _expectedAck)
{
_receiveQueue.AddLast(packet);
while (_receiveBuffer.Count > 0)
{
var pendingPacket = _receiveBuffer.First.Value;
if (seq != pendingPacket.SequenceNumber) break;
seq++;
_receiveQueue.AddLast(pendingPacket);
_receiveBuffer.RemoveFirst();
}
_expectedAck = seq + 1;
_hasReceived = true;
}
// 将包按顺序追加在 receivingPackets 后面
else
{
var firstNode = _receiveBuffer.First;
while (firstNode.Next != null)
{
if (firstNode.Value.SequenceNumber > seq)
{
var node = new LinkedListNode<Packet>(packet);
_receiveBuffer.AddBefore(firstNode, node);
break;
}
firstNode = firstNode.Next;
}
if (firstNode == null) _receiveBuffer.AddLast(packet);
}
}
public bool TryProcessReceiveSequence(uint sequenceNumber, out bool shouldDeliver)
{
lock (_lockObj)
{
LastActivityTs = DateTime.Now;
if (sequenceNumber == _expectedAck)
{
_expectedAck++;
_receivedSequences.Add(sequenceNumber);
shouldDeliver = true;
return true;
}
else if (sequenceNumber < _expectedAck)
{
shouldDeliver = false;
return _receivedSequences.Contains(sequenceNumber);
}
else
{
shouldDeliver = false;
return false;
}
}
}
}
}

View File

@ -6,6 +6,7 @@ namespace Network.NetworkTransport
{ {
public interface ITransport public interface ITransport
{ {
void Send(byte[] data);
void SendTo(byte[] data, IPEndPoint target); void SendTo(byte[] data, IPEndPoint target);
void SendToAll(byte[] data); void SendToAll(byte[] data);
event Action<byte[], IPEndPoint> OnReceive; event Action<byte[], IPEndPoint> OnReceive;

View File

@ -1,63 +0,0 @@
using System;
using System.Linq;
using UnityEngine;
namespace Network.NetworkTransport
{
public enum PacketType : byte
{
Data = 1,
Ack = 2,
}
public struct Packet
{
public PacketType Type;
public uint SequenceNumber;
public byte[] Data;
public long Timestamp;
public byte[] ToBytes()
{
var result = new byte[1 + 4 + 8 + Data.Length];
result[0] = (byte)Type;
BitConverter.GetBytes(SequenceNumber).CopyTo(result, 1);
BitConverter.GetBytes(Timestamp).CopyTo(result, 5);
Data.CopyTo(result, 13);
return result;
}
public static Packet FromBytes(byte[] data)
{
return new Packet
{
Type = (PacketType)data[0],
SequenceNumber = BitConverter.ToUInt32(data, 1),
Timestamp = BitConverter.ToInt64(data, 5),
Data = new ArraySegment<byte>(data, 5, data.Length - 5).ToArray()
};
}
public static Packet CreateDataPacket(uint seqNum, byte[] data)
{
return new Packet
{
Type = PacketType.Data,
SequenceNumber = seqNum,
Data = data,
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
}
public static Packet CreateAckPacket(uint seqNum)
{
return new Packet
{
Type = PacketType.Ack,
SequenceNumber = seqNum,
Data = Array.Empty<byte>(),
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
};
}
}
}

View File

@ -1,11 +0,0 @@
fileFormatVersion: 2
guid: b84a2cb7ffe3cd14180358559e526dbe
MonoImporter:
externalObjects: {}
serializedVersion: 2
defaultReferences: []
executionOrder: 0
icon: {instanceID: 0}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -1,8 +1,7 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Concurrent;
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Network.NetworkTransport namespace Network.NetworkTransport
@ -13,93 +12,103 @@ namespace Network.NetworkTransport
private readonly IPEndPoint _defaultRemoteEndPoint; private readonly IPEndPoint _defaultRemoteEndPoint;
private readonly bool _isServer; private readonly bool _isServer;
private readonly List<ClientSession> _sessions = new(); // Stage one keeps this class name for compatibility while collapsing it to plain UDP.
private readonly ConcurrentDictionary<string, IPEndPoint> _knownRemoteEndPoints = new();
private readonly Timer _retransmitTimer;
private readonly Timer _cleanupTimer;
//TODO: volatile 关键字
private volatile bool _isRunning; private volatile bool _isRunning;
// 配置参数
private const int RetransmitTimeoutMs = 1000;
private const int SessionTimeoutMs = 30000;
private const int MaxRetransmitAttempts = 5;
public event Action<byte[], IPEndPoint> OnReceive; public event Action<byte[], IPEndPoint> OnReceive;
private Task _receiveTask; private Task _receiveTask = Task.CompletedTask;
// 构造函数——服务端模式
public ReliableUdpTransport(int listenPort) public ReliableUdpTransport(int listenPort)
{ {
_client = new UdpClient(listenPort); _client = new UdpClient(listenPort);
_isServer = true; _isServer = true;
_retransmitTimer = new Timer(CheckRetransmit, null, 100, 100);
_cleanupTimer = new Timer(CleanupSessions, null, 5000, 5000);
Console.WriteLine($"[Transport] 服务端模式,监听端口: {listenPort}"); Console.WriteLine($"[Transport] 服务端模式,监听端口: {listenPort}");
} }
// 构造函数——客户端模式
public ReliableUdpTransport(string serverIP, int serverPort) public ReliableUdpTransport(string serverIP, int serverPort)
{ {
_client = new UdpClient(0); _client = new UdpClient(0);
_defaultRemoteEndPoint = new IPEndPoint(IPAddress.Parse(serverIP), serverPort); _defaultRemoteEndPoint = new IPEndPoint(IPAddress.Parse(serverIP), serverPort);
_isServer = false; _isServer = false;
_retransmitTimer = new Timer(CheckRetransmit, null, 100, 100);
_cleanupTimer = new Timer(CleanupSessions, null, 5000, 5000);
Console.WriteLine($"[Transport] 客户端模式,目标: {_defaultRemoteEndPoint}"); Console.WriteLine($"[Transport] 客户端模式,目标: {_defaultRemoteEndPoint}");
} }
public async Task StartAsync() public async Task StartAsync()
{ {
_sessions.Clear(); if (_isRunning)
{
return;
}
_knownRemoteEndPoints.Clear();
_isRunning = true; _isRunning = true;
Console.WriteLine("[Transport] 传输层启动"); Console.WriteLine("[Transport] 传输层启动");
// 开始接收数据
_receiveTask = ReceiveLoop(); _receiveTask = ReceiveLoop();
await Task.Delay(100); // 给接收循环一点启动时间 await Task.Yield();
}
public void Tick()
{
foreach (var session in _sessions)
{
session.Tick(DateTime.UtcNow.Millisecond);
}
} }
public void Stop() public void Stop()
{
_isRunning = false;
_retransmitTimer.Dispose();
_cleanupTimer.Dispose();
_client.Close();
_sessions.Clear();
Console.WriteLine("[Transport] 传输层停止");
}
public async void SendTo(Packet packet, IPEndPoint target)
{ {
if (!_isRunning) if (!_isRunning)
{ {
return; return;
} }
var bytes = packet.ToBytes(); _isRunning = false;
await _client.SendAsync(bytes, bytes.Length, target); _client.Close();
_knownRemoteEndPoints.Clear();
Console.WriteLine("[Transport] 传输层停止");
}
Console.WriteLine($"[Transport] 发送数据包到 {target}"); public void Send(byte[] data)
{
if (_defaultRemoteEndPoint == null)
{
throw new InvalidOperationException("Default remote endpoint is not configured.");
}
SendTo(data, _defaultRemoteEndPoint);
}
public void SendTo(byte[] data, IPEndPoint target)
{
if (data == null)
{
throw new ArgumentNullException(nameof(data));
}
if (target == null)
{
throw new ArgumentNullException(nameof(target));
}
EnsureRunning();
RememberRemote(target);
_client.Send(data, data.Length, target);
Console.WriteLine($"[Transport] 发送数据到 {target}");
} }
public void SendToAll(byte[] data) public void SendToAll(byte[] data)
{ {
foreach (var session in _sessions) if (data == null)
{ {
session.SendPacket(data); throw new ArgumentNullException(nameof(data));
}
EnsureRunning();
if (!_isServer)
{
throw new InvalidOperationException("SendToAll is only supported in server mode.");
}
foreach (var remoteEndPoint in _knownRemoteEndPoints.Values)
{
_client.Send(data, data.Length, remoteEndPoint);
Console.WriteLine($"[Transport] 广播数据到 {remoteEndPoint}");
} }
} }
@ -110,20 +119,16 @@ namespace Network.NetworkTransport
try try
{ {
var result = await _client.ReceiveAsync(); var result = await _client.ReceiveAsync();
var packet = Packet.FromBytes(result.Buffer); RememberRemote(result.RemoteEndPoint);
OnReceive?.Invoke(result.Buffer, result.RemoteEndPoint);
if (packet.Type == PacketType.Data)
{
HandleDataPacket(packet, result.RemoteEndPoint);
} }
else if (packet.Type == PacketType.Ack) catch (ObjectDisposedException) when (!_isRunning)
{ {
HandleAckPacket(packet, result.RemoteEndPoint); return;
} }
} catch (SocketException) when (!_isRunning)
catch (ObjectDisposedException)
{ {
return; // 正常关闭 return;
} }
catch (Exception e) catch (Exception e)
{ {
@ -132,154 +137,22 @@ namespace Network.NetworkTransport
} }
} }
private void HandleDataPacket(Packet packet, IPEndPoint senderEndPoint) private void EnsureRunning()
{
var session = GetOrCreateSession(senderEndPoint);
Console.WriteLine(
$"[Transport] 收到数据包从{senderEndPoint} SeqNum={packet.SequenceNumber}, DataLen={packet.Data.Length}");
// 发送ACK
var ackPacket = Packet.CreateAckPacket(packet.SequenceNumber);
SendPacketTo(ackPacket, senderEndPoint);
Console.WriteLine($"[Transport] 发送ACK 到 {senderEndPoint} SeqNum={packet.SequenceNumber}");
// 检查是否应该交付
if (session.TryProcessReceiveSequence(packet.SequenceNumber, out bool shouldDeliver))
{
if (shouldDeliver)
{
OnReceive?.Invoke(packet.Data, senderEndPoint);
Console.WriteLine($"[Transport] 交付数据包从 {senderEndPoint} SeqNum={packet.SequenceNumber}");
}
else
{
Console.WriteLine($"[Transport] 重复包从 {senderEndPoint} SeqNum={packet.SequenceNumber},忽略");
}
}
else
{
// 乱序到达,暂存(简化处理:直接丢弃,依赖重传)
Console.WriteLine($"[Transport] 乱序包从 {senderEndPoint} SeqNum={packet.SequenceNumber},丢弃");
}
}
private void HandleAckPacket(Packet packet, IPEndPoint senderEndPoint)
{
var session = GetOrCreateSession(senderEndPoint);
Console.WriteLine($"[Transport] 收到ACK从 {senderEndPoint} SeqNum={packet.SequenceNumber}");
if (session.PendingAcks.TryRemove(packet.SequenceNumber, out _))
{
Console.WriteLine($"[Transport] 确认包到 {senderEndPoint} SeqNum={packet.SequenceNumber}");
}
}
private ClientSession GetOrCreateSession(IPEndPoint endPoint)
{
string key = endPoint.ToString();
return _sessions.GetOrAdd(key, _ =>
{
var session = new ClientSession(endPoint);
Console.WriteLine($"创建新会话:{endPoint}");
return session;
});
}
private void CheckRetransmit(object state)
{ {
if (!_isRunning) if (!_isRunning)
{
throw new InvalidOperationException("Transport has not been started.");
}
}
private void RememberRemote(IPEndPoint remoteEndPoint)
{
if (remoteEndPoint == null)
{ {
return; return;
} }
var now = DateTime.Now; _knownRemoteEndPoints[remoteEndPoint.ToString()] = remoteEndPoint;
var toRetransmit = new List<(IPEndPoint target, uint seqNum, Packet packet)>();
foreach (var sessionKvp in _sessions)
{
var session = sessionKvp.Value;
foreach (var ackKvp in session.PendingAcks)
{
var timeSinceLastSend = now - ackKvp.Value.sendTime;
if (timeSinceLastSend.TotalMilliseconds > RetransmitTimeoutMs)
{
toRetransmit.Add((session.EndPoint, ackKvp.Key, ackKvp.Value.packet));
}
}
}
foreach (var (target, seqNum, packet) in toRetransmit)
{
var session = GetOrCreateSession(target);
if (session.PendingAcks.ContainsKey(seqNum))
{
// 更新发送时间
session.PendingAcks[seqNum] = (packet, now);
SendPacketTo(packet, target);
Console.WriteLine($"[Transport] 重传包到 {target} SeqNum={seqNum}");
}
}
}
private void CleanupSessions(object state)
{
if (!_isRunning)
{
return;
}
var now = DateTime.Now;
var toRemove = new List<string>();
foreach (var sessionKvp in _sessions)
{
var session = sessionKvp.Value;
var timeSinceLastActivity = now - session.LastActivity;
if (timeSinceLastActivity.TotalMilliseconds > SessionTimeoutMs)
{
toRemove.Add(sessionKvp.Key);
}
}
foreach (string key in toRemove)
{
if (_sessions.TryRemove(key, out var session))
{
Console.WriteLine($"[Transport] 清理超时会话:{session.EndPoint}");
}
}
if (_isServer)
{
PrintSessionInfo();
}
}
private async void SendPacketTo(Packet packet, IPEndPoint endPoint)
{
try
{
var data = packet.ToBytes();
await _client.SendAsync(data, data.Length, endPoint);
}
catch (Exception e)
{
Console.WriteLine($"[Transport] 发送错误:{e.Message}");
}
}
private void PrintSessionInfo()
{
Console.WriteLine($"当前活跃会话数:{_sessions.Count}");
foreach (var sessionKvp in _sessions)
{
var session = sessionKvp.Value;
Console.WriteLine(
$" 会话:{session.EndPoint}发送SeqNum{session.SendSequenceNumber},期望接收:{session.GetExpectedAck()},待确认: {session.PendingAcks.Count}");
}
} }
} }
} }

View File

@ -9,7 +9,7 @@ using Vector3 = UnityEngine.Vector3;
public class NetworkManager : MonoBehaviour public class NetworkManager : MonoBehaviour
{ {
public static NetworkManager Instance; public static NetworkManager Instance;
private ReliableUdpTransport _transport; private ITransport _transport;
private MessageManager _messageManager; private MessageManager _messageManager;
private IPEndPoint _serverPoint; private IPEndPoint _serverPoint;
private uint _sequence = 0; private uint _sequence = 0;
@ -24,12 +24,30 @@ public class NetworkManager : MonoBehaviour
private IEnumerator InitNetwork() private IEnumerator InitNetwork()
{ {
_transport = new ReliableUdpTransport("127.0.0.1", 8080); _transport = new ReliableUdpTransport("127.0.0.1", 8080);
yield return _transport.StartAsync();
var startTask = _transport.StartAsync();
yield return new WaitUntil(() => startTask.IsCompleted);
if (startTask.IsFaulted)
{
Debug.LogException(startTask.Exception);
yield break;
}
_messageManager = new MessageManager(_transport); _messageManager = new MessageManager(_transport);
RegisterHandler(); RegisterHandler();
StartCoroutine(Heartbeat()); StartCoroutine(Heartbeat());
} }
private void OnDestroy()
{
_transport?.Stop();
if (Instance == this)
{
Instance = null;
}
}
private IEnumerator Heartbeat() private IEnumerator Heartbeat()
{ {
while (true) while (true)

8
Assets/Tests.meta Normal file
View File

@ -0,0 +1,8 @@
fileFormatVersion: 2
guid: c812daae3716423888606aadb6e01213
folderAsset: yes
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,8 @@
fileFormatVersion: 2
guid: c69cefd398574792a8a7a400fef3492e
folderAsset: yes
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,8 @@
fileFormatVersion: 2
guid: a2a986e9b1b74ce5a5698e5a81664204
folderAsset: yes
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

View File

@ -0,0 +1,198 @@
using System;
using System.Net;
using System.Threading.Tasks;
using Google.Protobuf;
using Network.Defines;
using Network.NetworkApplication;
using Network.NetworkTransport;
using NUnit.Framework;
namespace Tests.EditMode.Network
{
public class MessageManagerTests
{
private static readonly IPEndPoint Sender = new(IPAddress.Loopback, 8080);
[Test]
public void SendMessage_WithoutTarget_UsesDefaultSend()
{
var transport = new FakeTransport();
var manager = new MessageManager(transport);
var message = new Heartbeat();
manager.SendMessage(message, MessageType.Heartbeat);
Assert.That(transport.SendCallCount, Is.EqualTo(1));
Assert.That(transport.SendToCallCount, Is.EqualTo(0));
Assert.That(transport.LastSentData, Is.Not.Null);
var envelope = Envelope.Parser.ParseFrom(transport.LastSentData);
Assert.That(envelope.Type, Is.EqualTo((int)MessageType.Heartbeat));
Assert.That(envelope.Payload.ToByteArray(), Is.EqualTo(message.ToByteArray()));
}
[Test]
public void SendMessage_WithTarget_UsesExplicitSend()
{
var transport = new FakeTransport();
var manager = new MessageManager(transport);
var message = new LoginRequest
{
PlayerId = "player-1",
Speed = 5
};
manager.SendMessage(message, MessageType.LoginRequest, Sender);
Assert.That(transport.SendCallCount, Is.EqualTo(0));
Assert.That(transport.SendToCallCount, Is.EqualTo(1));
Assert.That(transport.LastSendTarget, Is.EqualTo(Sender));
var envelope = Envelope.Parser.ParseFrom(transport.LastSendToData);
Assert.That(envelope.Type, Is.EqualTo((int)MessageType.LoginRequest));
Assert.That(envelope.Payload.ToByteArray(), Is.EqualTo(message.ToByteArray()));
}
[Test]
public void BroadcastMessage_UsesBroadcastSend()
{
var transport = new FakeTransport();
var manager = new MessageManager(transport);
var message = new Heartbeat();
manager.BroadcastMessage(message, MessageType.Heartbeat);
Assert.That(transport.SendToAllCallCount, Is.EqualTo(1));
Assert.That(transport.LastBroadcastData, Is.Not.Null);
var envelope = Envelope.Parser.ParseFrom(transport.LastBroadcastData);
Assert.That(envelope.Type, Is.EqualTo((int)MessageType.Heartbeat));
Assert.That(envelope.Payload.ToByteArray(), Is.EqualTo(message.ToByteArray()));
}
[Test]
public void Receive_ValidEnvelope_DispatchesRegisteredHandler()
{
var transport = new FakeTransport();
var manager = new MessageManager(transport);
var handled = false;
IPEndPoint receivedSender = null;
byte[] receivedPayload = null;
var message = new Heartbeat();
manager.RegisterHandler(MessageType.Heartbeat, (payload, sender) =>
{
handled = true;
receivedSender = sender;
receivedPayload = payload;
});
transport.EmitReceive(BuildEnvelope(MessageType.Heartbeat, message), Sender);
Assert.That(handled, Is.True);
Assert.That(receivedSender, Is.EqualTo(Sender));
Assert.That(receivedPayload, Is.EqualTo(message.ToByteArray()));
}
[Test]
public void Receive_UnregisteredMessage_DoesNotThrow()
{
var transport = new FakeTransport();
var manager = new MessageManager(transport);
Assert.DoesNotThrow(() =>
transport.EmitReceive(BuildEnvelope(MessageType.Heartbeat, new Heartbeat()), Sender));
}
[Test]
public void Receive_InvalidBytes_DoesNotBreakFollowingDispatch()
{
var transport = new FakeTransport();
var manager = new MessageManager(transport);
var handledCount = 0;
manager.RegisterHandler(MessageType.Heartbeat, (payload, sender) =>
{
handledCount++;
});
Assert.DoesNotThrow(() => transport.EmitReceive(new byte[] { 0x01, 0x02, 0x03 }, Sender));
transport.EmitReceive(BuildEnvelope(MessageType.Heartbeat, new Heartbeat()), Sender);
Assert.That(handledCount, Is.EqualTo(1));
}
private static byte[] BuildEnvelope(MessageType type, IMessage payload)
{
return new Envelope
{
Type = (int)type,
Payload = payload.ToByteString()
}.ToByteArray();
}
private sealed class FakeTransport : ITransport
{
public byte[] LastSentData { get; private set; }
public byte[] LastSendToData { get; private set; }
public IPEndPoint LastSendTarget { get; private set; }
public byte[] LastBroadcastData { get; private set; }
public int SendCallCount { get; private set; }
public int SendToCallCount { get; private set; }
public int SendToAllCallCount { get; private set; }
public event Action<byte[], IPEndPoint> OnReceive;
public Task StartAsync()
{
return Task.CompletedTask;
}
public void Stop()
{
}
public void Send(byte[] data)
{
SendCallCount++;
LastSentData = Copy(data);
}
public void SendTo(byte[] data, IPEndPoint target)
{
SendToCallCount++;
LastSendToData = Copy(data);
LastSendTarget = target;
}
public void SendToAll(byte[] data)
{
SendToAllCallCount++;
LastBroadcastData = Copy(data);
}
public void EmitReceive(byte[] data, IPEndPoint sender)
{
OnReceive?.Invoke(Copy(data), sender);
}
private static byte[] Copy(byte[] data)
{
if (data == null)
{
return null;
}
var copy = new byte[data.Length];
Array.Copy(data, copy, data.Length);
return copy;
}
}
}
}

View File

@ -1,5 +1,5 @@
fileFormatVersion: 2 fileFormatVersion: 2
guid: ec6c25bc42967db499742dfa355380b7 guid: 191f9dd43ceb41ea86ea4aef4a120903
MonoImporter: MonoImporter:
externalObjects: {} externalObjects: {}
serializedVersion: 2 serializedVersion: 2

View File

@ -0,0 +1,25 @@
{
"name": "Network.EditMode.Tests",
"rootNamespace": "Tests.EditMode.Network",
"references": [
"Network.Runtime",
"UnityEngine.TestRunner",
"UnityEditor.TestRunner"
],
"includePlatforms": [
"Editor"
],
"excludePlatforms": [],
"allowUnsafeCode": false,
"overrideReferences": true,
"precompiledReferences": [
"nunit.framework.dll",
"Google.Protobuf.dll"
],
"autoReferenced": false,
"defineConstraints": [
"UNITY_INCLUDE_TESTS"
],
"versionDefines": [],
"noEngineReferences": false
}

View File

@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: 808181acd19049b8bdc63d1498ce9b25
AssemblyDefinitionImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

343
CodeX-TODO.md Normal file
View File

@ -0,0 +1,343 @@
# KCP 网络底层调整 TODO
## 目标
将当前项目的网络底层从“自写可靠 UDP + ACK/重传/会话”调整为“UDP 承载 KCP + 明确的传输层/会话层/消息层/同步层分层”,避免职责重叠,并为后续的同步优化、重连、监控打基础。
## 当前现状
- `Assets/Scripts/Network/NetworkTransport/ReliableUdpTransport.cs` 已经引入 `Kcp-CSharp.dll`,但主体逻辑仍然是自定义可靠 UDP。
- 当前传输层仍保留以下逻辑:
- 自定义 `Packet`
- 自定义 ACK
- 自定义重传
- 自定义超时会话清理
- 自定义顺序交付
- `NetworkManager -> MessageManager -> ITransport` 的抽象还没有完全收口,接口和实现存在不一致。
- 当前业务链路不是纯 RPC而是
- 登录 / 登出
- 心跳 / 对时
- `PlayerInput` 上行
- `PlayerState` 下行
- 本地预测 / 服务器校正
## 必须调整的内容
### 1. 收口传输层接口
统一 `ITransport` 的职责,避免上层绕过抽象调用不存在的方法。
建议接口至少包含:
- `StartAsync()`
- `Stop()`
- `Connect(...)` 或客户端构造时明确默认远端
- `Send(byte[] data)`
- `SendTo(byte[] data, IPEndPoint target)`,仅服务端或特殊场景需要
- `SendToAll(byte[] data)`,仅服务端广播需要
- `OnReceive`
- `OnConnected`
- `OnDisconnected`
- `OnError`
当前要处理的问题:
- `MessageManager` 调用了 `transport.Send(...)`,但 `ITransport` 中没有定义该接口。
- `ReliableUdpTransport` 当前的 `SendTo(Packet, IPEndPoint)` 与接口 `SendTo(byte[], IPEndPoint)` 不一致。
### 2. 用 KCP 替代自定义可靠 UDP
KCP 接入后,以下能力不应继续由项目侧重复实现:
- ACK 管理
- 重传调度
- 收发序号维护
- 有序交付
- 滑动窗口
因此需要删除或重构以下内容:
- `Assets/Scripts/Network/NetworkTransport/Packet.cs`
- `Assets/Scripts/Network/NetworkTransport/ClientSession.cs` 中基于自定义 seq/ack 的逻辑
- `Assets/Scripts/Network/NetworkTransport/ReliableUdpTransport.cs` 中的:
- `CheckRetransmit`
- `HandleAckPacket`
- 自定义 `PendingAcks`
- 自定义重复包 / 乱序包处理
- 自定义可靠性定时器
### 3. 重建会话层
KCP 模式下需要清晰区分:
- UDP Socket
- KCP Session
- 业务连接状态
建议设计:
- 客户端:
- 单一默认远端
- 单一 `KcpSession`
- 服务端:
- 按 `IPEndPoint + conv` 管理多个 `KcpSession`
- 支持会话建立、心跳超时、断线清理
会话层至少需要管理:
- `conv`
- 远端地址
- 最后活跃时间
- KCP 实例
- 连接状态
- 断开原因
### 4. 将网络线程与 Unity 主线程解耦
当前 `MessageManager.OnTransportReceiveAsync(...)` 直接进入业务 handler而后续 handler 会继续访问:
- `MasterManager`
- `Player`
- `GameObject`
- `UI`
这些逻辑不应该直接在网络接收线程执行。
需要改为:
1. 网络线程收包
2. 解析最小必要信息
3. 投递到线程安全队列
4. 在 Unity `Update()` 中统一分发到业务层
建议新增:
- `MainThreadDispatcher`
- 或 `ConcurrentQueue<Action>`
- 或 `ConcurrentQueue<ReceivedEnvelope>`
### 5. 重新划分消息 QoS
当前所有消息看起来都走同一种可靠传输语义,这对高频同步不合理。
建议至少拆成两类:
- 强可靠消息
- 登录
- 登出
- 房间管理
- 关键系统命令
- 高频同步消息
- `PlayerInput`
- `PlayerState`
- 以后可能的快照、插值状态、非关键位置更新
需要明确一个原则:
- 如果 `PlayerState` 继续走可靠有序流,旧包阻塞会放大延迟。
- 如果 `PlayerInput` 全部严格可靠发送,也可能产生输入堆积。
这部分要结合项目玩法决定:
- 方案 A全部先走 KCP先完成架构收口再做同步优化
- 方案 B命令消息走 KCP同步消息走裸 UDP / 另一条轻量通道
短期建议先用方案 A 收口,后续再细分。
### 6. 重构连接生命周期
需要把“传输连接”与“业务登录状态”分开。
建议生命周期:
1. 创建 UDP Socket
2. 初始化 KCP
3. 连接服务器 / 建立默认会话
4. 开始收包循环和 KCP Update
5. 发送 `LoginRequest`
6. 收到 `LoginResponse` 后进入已登录状态
7. 开始心跳与超时检测
8. 超时或异常时触发断线回调
9. 按需重连
不要再把“收到登录响应才知道默认服务器端点”这种逻辑和连接过程混在一起。
## 建议同步调整的内容
### 1. 对时与发送频率分离
当前 `MovementComponent` 通过修改 `_sendInterval` 来追赶服务器 Tick这会把
- 时钟校正
- 发包频率
- 同步稳定性
绑在一起。
建议改为:
- 固定输入发送频率
- 单独维护客户端与服务端 Tick 偏移
- 在校正阶段使用 replay / reconcile而不是直接依赖发包间隔漂移
### 2. 增加 KCP 参数配置入口
建议支持配置以下参数:
- `NoDelay`
- `Interval`
- `Resend`
- `NC`
- `SndWnd`
- `RcvWnd`
- `MTU`
- `DeadLink`
建议做法:
- 新增 `KcpTransportConfig`
- 客户端和服务端分别可配置
- 支持 Inspector 或 ScriptableObject 配置
### 3. 增加网络观测指标
至少需要输出:
- RTT
- 重传次数
- 待发送队列长度
- 待接收队列长度
- 会话数量
- 最后活跃时间
- 超时断线原因
后续排查“卡顿、抖动、延迟尖刺、重连失败”时会用到。
### 4. 明确服务端广播策略
当前 `SendToAll` 是直接遍历会话广播。接入 KCP 后要明确:
- 广播是否逐会话独立写入
- 广播时是否允许慢连接拖累整体发送
- 广播消息是否需要按类型分优先级
## 推荐实施步骤
### 阶段 1先把抽象收口
1. 调整 `ITransport`,补齐上层真正需要的发送与连接接口。
2. 让 `MessageManager` 只依赖 `ITransport` 暴露的方法,不再假设具体实现细节。
3. 修复当前 `ReliableUdpTransport``ITransport` 的方法签名不一致问题。
4. 让网络层先达到“接口一致、结构可替换”的状态。
交付标准:
- 上层不再直接依赖某个具体 Transport 的额外方法。
- 业务层不关心底层是 UDP 还是 KCP。
### 阶段 2引入 `KcpTransport`
1. 新建 `KcpTransport`,不要在旧的 `ReliableUdpTransport` 上继续打补丁。
2. 用 `UdpClient` 只负责收发原始 UDP 数据报。
3. 每个连接维护一个 `KcpSession`
4. UDP 收包后先交给对应的 KCP 实例 `Input`
5. 周期性驱动 KCP `Update` / `Check`
6. 从 KCP `Recv` 中取出完整业务消息后再触发 `OnReceive`
交付标准:
- 传输层不再维护自定义 ACK/重传逻辑。
- KCP 可以完成完整收发。
### 阶段 3移除旧可靠 UDP 结构
1. 删除或废弃 `Packet.cs`
2. 删除或废弃旧 `ClientSession` 中基于 seq/ack 的缓存和重传代码。
3. 删除 `ReliableUdpTransport` 中的:
- retransmit timer
- ack handler
- packet seq 交付逻辑
4. 保留必要的会话容器与连接生命周期管理。
交付标准:
- 项目中不再同时存在“两套可靠性机制”。
### 阶段 4主线程分发改造
1. 新增线程安全接收队列。
2. 网络线程只负责:
- 收包
- KCP 输入输出
- 基础错误处理
3. Unity 主线程负责:
- 消息分发
- 游戏对象修改
- UI 更新
交付标准:
- 网络消息不会直接在非主线程操作 Unity 对象。
### 阶段 5连接与心跳改造
1. 明确“连接成功”和“登录成功”是两个不同状态。
2. 心跳只承担:
- 存活检测
- RTT / 时间同步
3. 会话超时和断线重连逻辑放在 session manager而不是业务消息处理里。
交付标准:
- 断线、超时、登录失败、重连等状态可以被明确区分。
### 阶段 6同步策略优化
1. 重新评估 `PlayerInput` 是否必须严格可靠。
2. 重新评估 `PlayerState` 是否应使用可靠有序流。
3. 调整客户端预测、回滚、纠正策略。
4. 把对时逻辑从 `_sendInterval` 漂移中拆出来。
交付标准:
- 高频同步场景下不会因为旧包阻塞导致位置明显滞后。
### 阶段 7监控与调试工具补齐
1. 打印会话状态。
2. 输出 RTT、发送队列、丢包、重传等指标。
3. 提供调试开关,避免正式环境日志过多。
交付标准:
- 网络问题可以通过日志和指标定位。
## 推荐新增的结构
建议新增或重命名以下模块:
- `Assets/Scripts/Network/NetworkTransport/KcpTransport.cs`
- `Assets/Scripts/Network/NetworkTransport/KcpSession.cs`
- `Assets/Scripts/Network/NetworkTransport/KcpTransportConfig.cs`
- `Assets/Scripts/Network/NetworkApplication/MainThreadNetworkDispatcher.cs`
建议废弃或重构:
- `Assets/Scripts/Network/NetworkTransport/ReliableUdpTransport.cs`
- `Assets/Scripts/Network/NetworkTransport/ClientSession.cs`
- `Assets/Scripts/Network/NetworkTransport/Packet.cs`
## 验收标准
- 上层只依赖统一的 `ITransport`
- 传输层不再重复实现 ACK / 重传 / 顺序控制。
- 客户端与服务端都能正常建立 KCP 会话。
- 登录、心跳、输入、状态同步链路可正常跑通。
- 非主线程不再直接访问 Unity 对象。
- 会话超时、断线、重连有明确状态与日志。
- 高频移动同步在丢包 / 抖动场景下仍可用。
## 备注
- 当前最不建议的做法,是在现有 `ReliableUdpTransport` 上继续叠加更多 KCP 相关判断。这样会让自定义可靠 UDP 和 KCP 职责长期重叠。
- 正确方向是:先抽象收口,再用新的 `KcpTransport` 替换旧实现。

20
openspec/config.yaml Normal file
View File

@ -0,0 +1,20 @@
schema: spec-driven
# Project context (optional)
# This is shown to AI when creating artifacts.
# Add your tech stack, conventions, style guides, domain knowledge, etc.
# Example:
# context: |
# Tech stack: TypeScript, React, Node.js
# We use conventional commits
# Domain: e-commerce platform
# Per-artifact rules (optional)
# Add custom rules for specific artifacts.
# Example:
# rules:
# proposal:
# - Keep proposals under 500 words
# - Always include a "Non-goals" section
# tasks:
# - Break tasks into chunks of max 2 hours