|
1 | 1 | # Asynchronous Context Management (Dataflow Architecture) |
2 | 2 |
|
3 | 3 | ## The Problem |
4 | | -Context management today is an emergency response. When a chat session hits the maximum token limit (`maxTokens`), the system halts the user's request, synchronously runs expensive compression pipelines (masking tools, summarizing text with LLMs), and only proceeds when the token count falls below the limit. This introduces unacceptable latency and forces trade-offs between speed and data fidelity. |
| 4 | + |
| 5 | +Context management today is an emergency response. When a chat session hits the |
| 6 | +maximum token limit (`maxTokens`), the system halts the user's request, |
| 7 | +synchronously runs expensive compression pipelines (masking tools, summarizing |
| 8 | +text with LLMs), and only proceeds when the token count falls below the limit. |
| 9 | +This introduces unacceptable latency and forces trade-offs between speed and |
| 10 | +data fidelity. |
5 | 11 |
|
6 | 12 | ## The Vision: Eager Subconscious Compute |
7 | | -Instead of a reactive, synchronous pipeline, Context Management should be an **asynchronous dataflow graph**. |
8 | 13 |
|
9 | | -Because we know old memory will *eventually* need to be degraded or garbage collected, we should utilize the agent's idle time (while the user is reading or typing) to proactively compute "degraded variants" of episodes before there is any context pressure. |
| 14 | +Instead of a reactive, synchronous pipeline, Context Management should be an |
| 15 | +**asynchronous dataflow graph**. |
| 16 | + |
| 17 | +Because we know old memory will _eventually_ need to be degraded or garbage |
| 18 | +collected, we should utilize the agent's idle time (while the user is reading or |
| 19 | +typing) to proactively compute "degraded variants" of episodes before there is |
| 20 | +any context pressure. |
10 | 21 |
|
11 | 22 | ### The Three Phases of Memory Lifecycle |
12 | 23 |
|
13 | 24 | #### 1. The Eager Compute Phase (Background / Continuous Streaming) |
14 | | -Context pressure doesn't wait for an episode to finish. If a user pastes a 100k-token file, the budget explodes instantly. Therefore, the dataflow graph is fed continuously. |
15 | | -* Whenever `AgentChatHistory` emits a `PUSH` event, the new `Content` is mapped into the active, "open" `Episode` (e.g., as a `USER_PROMPT` trigger or a `TOOL_EXECUTION` step) and broadcast immediately. |
16 | | -* **Processors (e.g., SemanticCompressor, StateSnapshot) listen as background workers.** |
17 | | -* They eagerly compute degraded variants on partial episodes. For instance, `SemanticCompressionProcessor` can summarize a massive 100k `USER_PROMPT` the millisecond it arrives, without waiting for the model to reply. |
18 | | -* It attaches the result to the IR graph as `Episode#1.trigger.variants.summary`. |
19 | | -* **Result:** This costs the user zero latency. The agent is "dreaming/consolidating" granular memory chunks in the background, even during long-running "mono-episodes." |
| 25 | + |
| 26 | +Context pressure doesn't wait for an episode to finish. If a user pastes a |
| 27 | +100k-token file, the budget explodes instantly. Therefore, the dataflow graph is |
| 28 | +fed continuously. |
| 29 | + |
| 30 | +- Whenever `AgentChatHistory` emits a `PUSH` event, the new `Content` is mapped |
| 31 | + into the active, "open" `Episode` (e.g., as a `USER_PROMPT` trigger or a |
| 32 | + `TOOL_EXECUTION` step) and broadcast immediately. |
| 33 | +- **Processors (e.g., SemanticCompressor, StateSnapshot) listen as background |
| 34 | + workers.** |
| 35 | +- They eagerly compute degraded variants on partial episodes. For instance, |
| 36 | + `SemanticCompressionProcessor` can summarize a massive 100k `USER_PROMPT` the |
| 37 | + millisecond it arrives, without waiting for the model to reply. |
| 38 | +- It attaches the result to the IR graph as |
| 39 | + `Episode#1.trigger.variants.summary`. |
| 40 | +- **Result:** This costs the user zero latency. The agent is |
| 41 | + "dreaming/consolidating" granular memory chunks in the background, even during |
| 42 | + long-running "mono-episodes." |
20 | 43 |
|
21 | 44 | #### 2. Opportunistic Replacement (`retainedTokens` Threshold) |
| 45 | + |
22 | 46 | When the active context window crosses the "ideal" size (e.g., 65k tokens): |
23 | | -* The system identifies the oldest episodes that have fallen outside the `retained` window. |
24 | | -* It checks if they have pre-computed variants (e.g., a `summary` or `masked` variant). |
25 | | -* If yes, it instantly and silently swaps the raw episode for the degraded variant. |
26 | | -* **Result:** The context gently decays over time, completely avoiding hard limits for as long as possible, with zero latency cost. |
| 47 | + |
| 48 | +- The system identifies the oldest episodes that have fallen outside the |
| 49 | + `retained` window. |
| 50 | +- It checks if they have pre-computed variants (e.g., a `summary` or `masked` |
| 51 | + variant). |
| 52 | +- If yes, it instantly and silently swaps the raw episode for the degraded |
| 53 | + variant. |
| 54 | +- **Result:** The context gently decays over time, completely avoiding hard |
| 55 | + limits for as long as possible, with zero latency cost. |
27 | 56 |
|
28 | 57 | #### 3. The Pressure Barrier (`maxTokens` Hard Limit) |
29 | | -When the active context window crosses the absolute hard limit (e.g., 150k tokens)—perhaps because the user pasted a massive file and the background workers couldn't keep up—the system hits a **Synchronous Barrier**. |
30 | 58 |
|
31 | | -At this barrier, the `ContextManager` checks the user's configured `ContextPressureStrategy` to decide how to unblock the request: |
| 59 | +When the active context window crosses the absolute hard limit (e.g., 150k |
| 60 | +tokens)—perhaps because the user pasted a massive file and the background |
| 61 | +workers couldn't keep up—the system hits a **Synchronous Barrier**. |
32 | 62 |
|
33 | | -* **Strategy A: `truncate` (The Baseline)** |
34 | | - * *Behavior:* Instantly drop the oldest episodes until under `maxTokens`. |
35 | | - * *Tradeoff:* Maximum speed, maximum data loss. |
36 | | -* **Strategy B: `incrementalGc` (Progressive)** |
37 | | - * *Behavior:* Look for any pre-computed summaries/masks. If none exist, synchronously block to compute *just enough* summaries to survive the current turn. |
38 | | - * *Tradeoff:* Medium speed, medium data retention. |
39 | | -* **Strategy C: `compress` (State Snapshot)** |
40 | | - * *Behavior:* Identify the oldest N episodes causing the overflow. If their N-to-1 World State Snapshot isn't ready yet, **block the user's request** and force the `StateSnapshotProcessor` to generate it synchronously. Once generated, replace the N episodes with the 1 snapshot and proceed. |
41 | | - * *Tradeoff:* Maximum latency, maximum data retention/fidelity. |
| 63 | +At this barrier, the `ContextManager` checks the user's configured |
| 64 | +`ContextPressureStrategy` to decide how to unblock the request: |
| 65 | + |
| 66 | +- **Strategy A: `truncate` (The Baseline)** |
| 67 | + - _Behavior:_ Instantly drop the oldest episodes until under `maxTokens`. |
| 68 | + - _Tradeoff:_ Maximum speed, maximum data loss. |
| 69 | +- **Strategy B: `incrementalGc` (Progressive)** |
| 70 | + - _Behavior:_ Look for any pre-computed summaries/masks. If none exist, |
| 71 | + synchronously block to compute _just enough_ summaries to survive the |
| 72 | + current turn. |
| 73 | + - _Tradeoff:_ Medium speed, medium data retention. |
| 74 | +- **Strategy C: `compress` (State Snapshot)** |
| 75 | + - _Behavior:_ Identify the oldest N episodes causing the overflow. If their |
| 76 | + N-to-1 World State Snapshot isn't ready yet, **block the user's request** |
| 77 | + and force the `StateSnapshotProcessor` to generate it synchronously. Once |
| 78 | + generated, replace the N episodes with the 1 snapshot and proceed. |
| 79 | + - _Tradeoff:_ Maximum latency, maximum data retention/fidelity. |
42 | 80 |
|
43 | 81 | ## Architectural Changes Required |
44 | | -1. **Episode Variants:** Update the `Episode` IR type to support a `variants` dictionary. |
45 | | -2. **Event Bus:** Create an internal `EventEmitter` in `ContextManager` to dispatch granular `IR_CHUNK_RECEIVED` events (tied to the `PUSH` events of `AgentChatHistory`). |
46 | | -3. **Processor Interface:** Change `ContextProcessor` from a synchronous `process(episodes[])` function to an asynchronous worker that listens to the event bus, updates the `variants` dictionary, and emits `VARIANT_READY` events. |
47 | | -4. **Projection Logic:** Update `projectCompressedHistory()` to act as the Pressure Barrier, reading the user's strategy and either applying ready variants, waiting for variants, or truncating. |
| 82 | + |
| 83 | +1. **Episode Variants:** Update the `Episode` IR type to support a `variants` |
| 84 | + dictionary. |
| 85 | +2. **Event Bus:** Create an internal `EventEmitter` in `ContextManager` to |
| 86 | + dispatch granular `IR_CHUNK_RECEIVED` events (tied to the `PUSH` events of |
| 87 | + `AgentChatHistory`). |
| 88 | +3. **Processor Interface:** Change `ContextProcessor` from a synchronous |
| 89 | + `process(episodes[])` function to an asynchronous worker that listens to the |
| 90 | + event bus, updates the `variants` dictionary, and emits `VARIANT_READY` |
| 91 | + events. |
| 92 | +4. **Projection Logic:** Update `projectCompressedHistory()` to act as the |
| 93 | + Pressure Barrier, reading the user's strategy and either applying ready |
| 94 | + variants, waiting for variants, or truncating. |
0 commit comments