diff --git a/packages/app/src/components/session/session-status-panel.tsx b/packages/app/src/components/session/session-status-panel.tsx index 7bf4c25c7..28da73a7e 100644 --- a/packages/app/src/components/session/session-status-panel.tsx +++ b/packages/app/src/components/session/session-status-panel.tsx @@ -16,12 +16,22 @@ export function SessionStatusPanel(props: { shown: Accessor }) { const messages = sync.data.message[params.id] ?? [] return messages.flatMap((message) => sync.data.part[message.id] ?? []) }) - const backend = createMemo(() => (params.id ? globalSync.data.session_todo[params.id] : undefined)) - const backendClearActivePartsAt = createMemo(() => (params.id ? globalSync.data.session_todo_clear[params.id] : undefined)) + const canonical = createMemo(() => (params.id ? globalSync.data.session_todo[params.id] : undefined)) + const isAuthoritativelyInvalidated = createMemo(() => + params.id ? globalSync.todoHydrate.isAuthoritativelyInvalidated(params.id) : false, + ) + const isPending = createMemo(() => + params.id && sync.directory ? globalSync.todoHydrate.isPending(sync.directory, params.id) : false, + ) return (
- +
) diff --git a/packages/app/src/components/session/session-status-summary.test.ts b/packages/app/src/components/session/session-status-summary.test.ts index a2fe9a4bf..0a0a93ea8 100644 --- a/packages/app/src/components/session/session-status-summary.test.ts +++ b/packages/app/src/components/session/session-status-summary.test.ts @@ -37,4 +37,9 @@ describe("session-status-summary ยท row contract", () => { expect(SOURCE).toMatch(/status === "completed"\s*\|\|\s*props\.todo\.status === "cancelled"/) expect(SOURCE).toContain("line-through text-fg-weak") }) + + test("does not render the empty progress fallback while todo hydrate is pending", () => { + expect(SOURCE).toContain("selectSessionTodoDataSnapshot") + expect(SOURCE).toContain('snapshot().phase !== "pending"') + }) }) diff --git a/packages/app/src/components/session/session-status-summary.tsx b/packages/app/src/components/session/session-status-summary.tsx index a44031efc..48aba89fe 100644 --- a/packages/app/src/components/session/session-status-summary.tsx +++ b/packages/app/src/components/session/session-status-summary.tsx @@ -1,11 +1,11 @@ import { For, Show, createMemo, type Accessor, type JSX } from "solid-js" import { TodoStatusMarker } from "@opencode-ai/ui/todo-status-marker" import type { Part } from "@opencode-ai/sdk/v2" -import type { Todo } from "@opencode-ai/sdk/v2/client" import { useLanguage } from "@/context/language" import { extractSources } from "@/pages/session/session-status-extractors" -import { selectSessionTodos } from "@/pages/session/session-todos" +import { selectSessionTodoDataSnapshot } from "@/pages/session/session-todos" import type { SessionTodoItem } from "@/pages/session/todos/todo-model" +import type { CanonicalTodoSnapshot } from "@/pages/session/todos/todo-source" function Section(props: { title: string; children: JSX.Element }) { return ( @@ -51,29 +51,36 @@ function SourceRow(props: { url: string }) { } export function SessionStatusSummary(props: { - backend?: Accessor - backendClearActivePartsAt?: Accessor + canonical?: Accessor + isAuthoritativelyInvalidated?: Accessor + isPending?: Accessor parts: Accessor }) { const language = useLanguage() - const todos = createMemo(() => - selectSessionTodos({ - backend: props.backend?.(), - backendClearActivePartsAt: props.backendClearActivePartsAt?.(), - parts: props.parts(), + const snapshot = createMemo(() => + selectSessionTodoDataSnapshot({ + primary: { + canonical: props.canonical?.(), + isAuthoritativelyInvalidated: props.isAuthoritativelyInvalidated?.() ?? false, + isPending: props.isPending?.() ?? false, + parts: props.parts(), + }, }), ) + const todos = createMemo(() => snapshot().items) const sources = createMemo(() => extractSources(props.parts())) return (
-
- 0} fallback={}> -
- {(todo) => } -
-
-
+ +
+ 0} fallback={}> +
+ {(todo) => } +
+
+
+
0} fallback={}> diff --git a/packages/app/src/context/global-sync.test.ts b/packages/app/src/context/global-sync.test.ts index f1382685d..f40bb2cb3 100644 --- a/packages/app/src/context/global-sync.test.ts +++ b/packages/app/src/context/global-sync.test.ts @@ -1,27 +1,66 @@ import { describe, expect, test } from "bun:test" import type { Todo } from "@opencode-ai/sdk/v2/client" -import { nextSessionTodoClearFlag } from "./global-sync" +import { createStore } from "solid-js/store" +import { canAcceptSessionTodo, setSessionTodoSnapshot, type GlobalStore, type SessionTodoSnapshot } from "./global-sync" import { canDisposeDirectory, pickDirectoriesToEvict } from "./global-sync/eviction" import { estimateRootSessionTotal, loadRootSessionsWithFallback } from "./global-sync/session-load" -describe("nextSessionTodoClearFlag", () => { +const globalStoreFixture = (): GlobalStore => ({ + ready: true, + path: { state: "", config: "", worktree: "", directory: "", home: "" }, + project: [], + session_todo: {}, + provider: { all: [], connected: [], default: {} }, + provider_auth: {}, + config: {}, + reload: undefined, +}) + +describe("canAcceptSessionTodo", () => { const todo = { id: "todo_1", content: "work", status: "in_progress", priority: "medium" } as Todo + const snapshot = (revision: number): SessionTodoSnapshot => ({ revision, todos: [todo] }) - test("marks live empty backend updates as active-parts clears", () => { - expect(nextSessionTodoClearFlag(undefined, [], { clearActiveParts: true }, 10)).toBe(10) + test("accepts the first canonical snapshot", () => { + expect(canAcceptSessionTodo(undefined, snapshot(0))).toBe(true) }) - test("preserves existing live clear flag across ordinary empty backend refreshes", () => { - expect(nextSessionTodoClearFlag(10, [])).toBe(10) + test("rejects stale or equal revisions", () => { + expect(canAcceptSessionTodo(snapshot(2), snapshot(1))).toBe(false) + expect(canAcceptSessionTodo(snapshot(2), snapshot(2))).toBe(false) }) - test("does not create a clear flag for ordinary empty backend refreshes", () => { - expect(nextSessionTodoClearFlag(undefined, [])).toBeUndefined() + test("accepts newer revisions including authoritative empty snapshots", () => { + expect(canAcceptSessionTodo(snapshot(2), { revision: 3, todos: [] })).toBe(true) + }) +}) + +describe("setSessionTodoSnapshot", () => { + test("creates the canonical todo entry on first write", () => { + const [store, setStore] = createStore(globalStoreFixture()) + const todo = { id: "todo_1", content: "work", status: "in_progress", priority: "medium" } as Todo + + setSessionTodoSnapshot(setStore, "ses_1", undefined, { + revision: 1, + todos: [todo], + }) + + expect(store.session_todo.ses_1).toEqual({ revision: 1, todos: [todo] }) }) - test("clears the flag on non-empty backend updates and cleanup", () => { - expect(nextSessionTodoClearFlag(10, [todo])).toBeUndefined() - expect(nextSessionTodoClearFlag(10, undefined)).toBeUndefined() + test("updates todos through a keyed array path before writing revision", () => { + const calls: unknown[][] = [] + const setStore = ((...input: unknown[]) => { + calls.push(input) + return input.at(-1) + }) as Parameters[0] + const todo = { id: "todo_1", content: "work", status: "in_progress", priority: "medium" } as Todo + const current = { revision: 1, todos: [] } + + setSessionTodoSnapshot(setStore, "ses_1", current, { revision: 2, todos: [todo] }) + + expect(calls).toHaveLength(2) + expect(calls[0].slice(0, 3)).toEqual(["session_todo", "ses_1", "todos"]) + expect(calls[1]).toEqual(["session_todo", "ses_1", "revision", 2]) }) }) diff --git a/packages/app/src/context/global-sync.tsx b/packages/app/src/context/global-sync.tsx index 6804af3d0..72b80bcdc 100644 --- a/packages/app/src/context/global-sync.tsx +++ b/packages/app/src/context/global-sync.tsx @@ -6,11 +6,12 @@ import type { ProviderAuthResponse, ProviderListResponse, Todo, + TodoSnapshot, } from "@opencode-ai/sdk/v2/client" import { showToast } from "@opencode-ai/ui/toast" import { getFilename } from "@opencode-ai/util/path" import { createContext, getOwner, onCleanup, onMount, type ParentProps, untrack, useContext } from "solid-js" -import { createStore, produce, reconcile } from "solid-js/store" +import { createStore, produce, reconcile, type SetStoreFunction } from "solid-js/store" import { useLanguage } from "@/context/language" import { Persist, persisted } from "@/utils/persist" import { clientActionHeaders } from "@/utils/server" @@ -31,20 +32,42 @@ import { estimateRootSessionTotal, loadRootSessionsWithFallback } from "./global import { trimSessions } from "./global-sync/session-trim" import type { ProjectMeta } from "./global-sync/types" import { SESSION_RECENT_LIMIT } from "./global-sync/types" +import { createTodoHydrateCoordinator } from "./global-sync/todo-hydrate-coordinator" import { sanitizeProject } from "./global-sync/utils" import { formatServerError } from "@/utils/server-errors" import { queryOptions, useQueryClient } from "@tanstack/solid-query" -type GlobalStore = { +export type SessionTodoSnapshot = TodoSnapshot + +export function canAcceptSessionTodo( + current: SessionTodoSnapshot | undefined, + incoming: SessionTodoSnapshot, +): boolean { + return current === undefined || incoming.revision > current.revision +} + +export function setSessionTodoSnapshot( + setStore: SetStoreFunction, + sessionID: string, + current: SessionTodoSnapshot | undefined, + incoming: SessionTodoSnapshot, +) { + if (current === undefined) { + setStore("session_todo", sessionID, incoming) + return + } + + setStore("session_todo", sessionID, "todos", reconcile(incoming.todos, { key: "id" })) + setStore("session_todo", sessionID, "revision", incoming.revision) +} + +export type GlobalStore = { ready: boolean error?: InitError path: Path project: Project[] session_todo: { - [sessionID: string]: Todo[] - } - session_todo_clear: { - [sessionID: string]: number + [sessionID: string]: SessionTodoSnapshot } provider: ProviderListResponse provider_auth: ProviderAuthResponse @@ -54,18 +77,6 @@ type GlobalStore = { const inactiveQueryFn = async () => null -export function nextSessionTodoClearFlag( - previous: number | undefined, - todos: Todo[] | undefined, - options?: { clearActiveParts?: boolean }, - now = Date.now(), -) { - if (!todos) return undefined - if (todos.length > 0) return undefined - if (options?.clearActiveParts === true) return now - return previous -} - export const loadSessionsQuery = (directory: string) => queryOptions({ queryKey: [directory, "loadSessions"], queryFn: inactiveQueryFn, enabled: false }) @@ -80,6 +91,7 @@ function createGlobalSync() { const sessionLoads = new Map>() const sessionMeta = new Map() const blockerTerminals = createBlockerTerminalCache() + const todoHydrate = createTodoHydrateCoordinator() const [projectCache, setProjectCache, projectInit] = persisted( Persist.global("globalSync.project", ["globalSync.project.v1"]), @@ -91,7 +103,6 @@ function createGlobalSync() { path: { state: "", config: "", worktree: "", directory: "", home: "" }, project: projectCache.value, session_todo: {}, - session_todo_clear: {}, provider: { all: [], connected: [], default: {} }, provider_auth: {}, config: {}, @@ -162,41 +173,37 @@ function createGlobalSync() { }) } - const setSessionTodo = ( - sessionID: string, - todos: Todo[] | undefined, - options?: { clearActiveParts?: boolean }, - ) => { + const acceptSessionTodo = (sessionID: string, incoming: SessionTodoSnapshot): boolean => { + if (!sessionID) return false + const current = globalStore.session_todo[sessionID] + if (!canAcceptSessionTodo(current, incoming)) return false + setSessionTodoSnapshot(setGlobalStore, sessionID, current, incoming) + return true + } + + const clearSessionTodoAuthoritative = (sessionID: string) => { if (!sessionID) return - if (!todos) { - setGlobalStore( - "session_todo", - produce((draft) => { - delete draft[sessionID] - }), - ) - setGlobalStore( - "session_todo_clear", - produce((draft) => { - delete draft[sessionID] - }), - ) - return - } - setGlobalStore("session_todo", sessionID, reconcile(todos, { key: "id" })) - const clearFlag = nextSessionTodoClearFlag(globalStore.session_todo_clear[sessionID], todos, options) - if (clearFlag !== undefined) { - setGlobalStore("session_todo_clear", sessionID, clearFlag) - return - } setGlobalStore( - "session_todo_clear", + "session_todo", produce((draft) => { delete draft[sessionID] }), ) } + const setSessionTodo = ( + sessionID: string, + value: Todo[] | SessionTodoSnapshot | undefined, + ) => { + if (!sessionID) return + if (!value) { + clearSessionTodoAuthoritative(sessionID) + return + } + const snapshot = Array.isArray(value) ? { revision: 0, todos: value } : value + acceptSessionTodo(sessionID, snapshot) + } + const paused = () => untrack(() => globalStore.reload) !== undefined const queue = createRefreshQueue({ @@ -216,6 +223,7 @@ function createGlobalSync() { queue.clear(directory) sessionMeta.delete(directory) blockerTerminals.clearDirectory(directory) + todoHydrate.clearDirectory(directory) sdkCache.delete(directory) clearProviderRev(directory) clearSessionPrefetchDirectory(directory) @@ -247,8 +255,10 @@ function createGlobalSync() { permission: store.permission, }) if (next.length !== store.session.length) { + cleanupDroppedSessionCaches(store, setStore, next, { + onDropSession: (sessionID) => todoHydrate.invalidate(directory, sessionID), + }) setStore("session", reconcile(next, { key: "id" })) - cleanupDroppedSessionCaches(store, setStore, next, setSessionTodo) } children.unpin(directory) return @@ -283,8 +293,10 @@ function createGlobalSync() { limited: x.limited, }), ) + cleanupDroppedSessionCaches(store, setStore, sessions, { + onDropSession: (sessionID) => todoHydrate.invalidate(directory, sessionID), + }) setStore("session", reconcile(sessions, { key: "id" })) - cleanupDroppedSessionCaches(store, setStore, sessions, setSessionTodo) sessionMeta.set(directory, { limit }) }) .catch((err) => { @@ -361,12 +373,14 @@ function createGlobalSync() { setGlobalProject: setProjects, }) if (event.type === "server.connected") { + todoHydrate.markGlobalRecovery() for (const directory of Object.keys(children.children)) { queue.push(directory) } } if (event.type === "global.disposed") { if (recent) return + todoHydrate.markGlobalRecovery() for (const directory of Object.keys(children.children)) { queue.push(directory) } @@ -376,7 +390,13 @@ function createGlobalSync() { const existing = children.children[directory] if (!existing) { - applyDetachedDirectoryEvent({ event, setSessionTodo }) + applyDetachedDirectoryEvent({ + directory, + event, + acceptSessionTodo, + clearSessionTodoAuthoritative, + todoHydrate, + }) return } children.mark(directory) @@ -387,7 +407,9 @@ function createGlobalSync() { store, setStore, push: queue.push, - setSessionTodo, + acceptSessionTodo, + clearSessionTodoAuthoritative, + todoHydrate, blockerTerminals, vcsCache: children.vcsCache.get(directory), loadLsp: () => { @@ -530,7 +552,10 @@ function createGlobalSync() { project: projectApi, todo: { set: setSessionTodo, + accept: acceptSessionTodo, + clearAuthoritative: clearSessionTodoAuthoritative, }, + todoHydrate, } } diff --git a/packages/app/src/context/global-sync/bootstrap.ts b/packages/app/src/context/global-sync/bootstrap.ts index d284b9973..39f445593 100644 --- a/packages/app/src/context/global-sync/bootstrap.ts +++ b/packages/app/src/context/global-sync/bootstrap.ts @@ -10,7 +10,7 @@ import type { ProviderAuthResponse, ProviderListResponse, Session, - Todo, + TodoSnapshot, } from "@opencode-ai/sdk/v2/client" import { showToast } from "@opencode-ai/ui/toast" import { getFilename } from "@opencode-ai/util/path" @@ -28,10 +28,7 @@ type GlobalStore = { path: Path project: Project[] session_todo: { - [sessionID: string]: Todo[] - } - session_todo_clear: { - [sessionID: string]: number + [sessionID: string]: TodoSnapshot } provider: ProviderListResponse provider_auth: ProviderAuthResponse diff --git a/packages/app/src/context/global-sync/event-reducer.test.ts b/packages/app/src/context/global-sync/event-reducer.test.ts index 688d1dad1..6c1257bf9 100644 --- a/packages/app/src/context/global-sync/event-reducer.test.ts +++ b/packages/app/src/context/global-sync/event-reducer.test.ts @@ -7,6 +7,7 @@ import type { Session, SessionDiffResponse, Todo, + TodoSnapshot, } from "@opencode-ai/sdk/v2/client" import { createStore } from "solid-js/store" import type { State } from "./types" @@ -48,6 +49,24 @@ const textPart = (id: string, sessionID: string, messageID: string) => text: id, }) as Part +const todoToolPart = (id: string, sessionID: string, messageID: string, metadata: unknown) => + ({ + id, + sessionID, + messageID, + type: "tool", + callID: `call_${id}`, + tool: "todowrite", + state: { + status: "completed", + input: {}, + output: "", + title: "", + metadata, + time: { start: 1, end: 1 }, + }, + }) as Part + const permissionRequest = (id: string, sessionID: string, title = id) => ({ id, @@ -140,69 +159,66 @@ describe("applyGlobalEvent", () => { describe("applyDirectoryEvent", () => { test("caches detached todo updates before a directory child store exists", () => { const todos: Todo[] = [{ id: "todo_1", content: "fresh todo", status: "in_progress", priority: "high" } as Todo] - const writes: Array<{ - sessionID: string - todos: Todo[] | undefined - options?: { clearActiveParts?: boolean } - }> = [] + const writes: Array<{ sessionID: string; snapshot: TodoSnapshot }> = [] const handled = applyDetachedDirectoryEvent({ - event: { type: "todo.updated", properties: { sessionID: "ses_fresh", todos } }, - setSessionTodo(sessionID, value, options) { - writes.push({ sessionID, todos: value, options }) + directory: "/tmp", + event: { type: "todo.updated", properties: { sessionID: "ses_fresh", revision: 2, todos } }, + acceptSessionTodo(sessionID, snapshot) { + writes.push({ sessionID, snapshot }) + return true }, + todoHydrate: { canAcceptLiveTodo: () => true }, }) expect(handled).toBe(true) - expect(writes).toEqual([{ sessionID: "ses_fresh", todos, options: undefined }]) + expect(writes).toEqual([{ sessionID: "ses_fresh", snapshot: { revision: 2, todos } }]) }) - test("marks detached empty todo updates as active-parts clears", () => { - const writes: Array<{ - sessionID: string - todos: Todo[] | undefined - options?: { clearActiveParts?: boolean } - }> = [] + test("rejects detached todo updates when live writes are fenced", () => { + const writes: TodoSnapshot[] = [] const handled = applyDetachedDirectoryEvent({ - event: { type: "todo.updated", properties: { sessionID: "ses_clear", todos: [] } }, - setSessionTodo(sessionID, value, options) { - writes.push({ sessionID, todos: value, options }) + directory: "/tmp", + event: { type: "todo.updated", properties: { sessionID: "ses_clear", revision: 3, todos: [] } }, + acceptSessionTodo(_sessionID, snapshot) { + writes.push(snapshot) + return true }, + todoHydrate: { canAcceptLiveTodo: () => false }, }) expect(handled).toBe(true) - expect(writes).toEqual([{ sessionID: "ses_clear", todos: [], options: { clearActiveParts: true } }]) + expect(writes).toEqual([]) }) - test("marks directory empty todo updates as active-parts clears", () => { + test("directory todo updates mirror only accepted snapshots", () => { const [store, setStore] = createStore(baseState()) - const writes: Array<{ - sessionID: string - todos: Todo[] | undefined - options?: { clearActiveParts?: boolean } - }> = [] + const writes: TodoSnapshot[] = [] applyDirectoryEvent({ - event: { type: "todo.updated", properties: { sessionID: "ses_clear", todos: [] } }, + event: { type: "todo.updated", properties: { sessionID: "ses_clear", revision: 4, todos: [] } }, store, setStore, push() {}, directory: "/tmp", loadLsp() {}, - setSessionTodo(sessionID, value, options) { - writes.push({ sessionID, todos: value, options }) + acceptSessionTodo(_sessionID, snapshot) { + writes.push(snapshot) + return true }, + todoHydrate: { canAcceptLiveTodo: () => true }, }) expect(store.todo.ses_clear).toEqual([]) - expect(writes).toEqual([{ sessionID: "ses_clear", todos: [], options: { clearActiveParts: true } }]) + expect(writes).toEqual([{ revision: 4, todos: [] }]) }) test("ignores detached events that need a directory child store", () => { const handled = applyDetachedDirectoryEvent({ + directory: "/tmp", event: { type: "message.updated", properties: { info: userMessage("msg_1", "ses_1") } }, - setSessionTodo() { + acceptSessionTodo() { throw new Error("should not write detached todo cache") }, }) @@ -212,8 +228,9 @@ describe("applyDirectoryEvent", () => { test("ignores malformed detached todo updates", () => { const handled = applyDetachedDirectoryEvent({ + directory: "/tmp", event: { type: "todo.updated" }, - setSessionTodo() { + acceptSessionTodo() { throw new Error("should not write detached todo cache") }, }) @@ -221,32 +238,37 @@ describe("applyDirectoryEvent", () => { expect(handled).toBe(false) }) - test("clears detached todo cache for deleted and archived sessions", () => { - const writes: Array<{ sessionID: string; todos: Todo[] | undefined }> = [] - const setSessionTodo = (sessionID: string, todos: Todo[] | undefined) => { - writes.push({ sessionID, todos }) + test("clears and invalidates detached todo cache for deleted and archived sessions", () => { + const clears: string[] = [] + const invalidated: string[] = [] + const clearSessionTodoAuthoritative = (sessionID: string) => { + clears.push(sessionID) } const deleted = applyDetachedDirectoryEvent({ + directory: "/tmp", event: { type: "session.deleted", properties: { info: rootSession({ id: "ses_deleted" }) } }, - setSessionTodo, + clearSessionTodoAuthoritative, + todoHydrate: { invalidateSession: (sessionID: string) => invalidated.push(sessionID) }, }) const archived = applyDetachedDirectoryEvent({ + directory: "/tmp", event: { type: "session.updated", properties: { info: rootSession({ id: "ses_archived", archived: 2 }) } }, - setSessionTodo, + clearSessionTodoAuthoritative, + todoHydrate: { invalidateSession: (sessionID: string) => invalidated.push(sessionID) }, }) const activeUpdate = applyDetachedDirectoryEvent({ + directory: "/tmp", event: { type: "session.updated", properties: { info: rootSession({ id: "ses_active" }) } }, - setSessionTodo, + clearSessionTodoAuthoritative, + todoHydrate: { invalidateSession: (sessionID: string) => invalidated.push(sessionID) }, }) expect(deleted).toBe(true) expect(archived).toBe(true) expect(activeUpdate).toBe(false) - expect(writes).toEqual([ - { sessionID: "ses_deleted", todos: undefined }, - { sessionID: "ses_archived", todos: undefined }, - ]) + expect(clears).toEqual(["ses_deleted", "ses_archived"]) + expect(invalidated).toEqual(["ses_deleted", "ses_archived"]) }) test("inserts root sessions in sorted order and updates sessionTotal", () => { @@ -385,10 +407,7 @@ describe("applyDirectoryEvent", () => { push() {}, directory: "/tmp", loadLsp() {}, - setSessionTodo(sessionID, value) { - if (value !== undefined) return - todos.push(sessionID) - }, + clearSessionTodoAuthoritative: (sessionID) => todos.push(sessionID), }) expect(store.session.map((x) => x.id)).toEqual([created.id, existing.id]) @@ -414,6 +433,21 @@ describe("applyDirectoryEvent", () => { expect(store.part.msg_1).toBeUndefined() }) + test("cleanupDroppedSessionCaches reports dropped sessions even without child caches", () => { + const forgotten: string[] = [] + const keep = rootSession({ id: "ses_keep" }) + const drop = rootSession({ id: "ses_drop" }) + const [store, setStore] = createStore(baseState({ session: [keep, drop] })) + + cleanupDroppedSessionCaches(store, setStore, [keep], { + onDropSession: (sessionID) => { + forgotten.push(sessionID) + }, + }) + + expect(forgotten).toEqual(["ses_drop"]) + }) + test("clears cached aggregate when turn changes are invalidated", () => { const [store, setStore] = createStore( baseState({ @@ -552,6 +586,35 @@ describe("applyDirectoryEvent", () => { expect(store.part[messageID]).toBeUndefined() }) + test("completed live todowrite metadata writes canonical todo before part storage returns", () => { + const sessionID = "ses_1" + const messageID = "msg_1" + const todos: Todo[] = [{ id: "todo_1", content: "from metadata", status: "in_progress", priority: "high" } as Todo] + const [store, setStore] = createStore(baseState()) + const writes: TodoSnapshot[] = [] + + applyDirectoryEvent({ + event: { + type: "message.part.updated", + properties: { part: todoToolPart("prt_1", sessionID, messageID, { revision: 5, todos }) }, + }, + store, + setStore, + push() {}, + directory: "/tmp", + loadLsp() {}, + acceptSessionTodo(_sessionID, snapshot) { + writes.push(snapshot) + return true + }, + todoHydrate: { canAcceptLiveTodo: () => true }, + }) + + expect(writes).toEqual([{ revision: 5, todos }]) + expect(store.todo[sessionID]).toEqual(todos) + expect(store.part[messageID]?.[0]?.id).toBe("prt_1") + }) + test("tracks permission request lifecycle", () => { const sessionID = "ses_1" const [store, setStore] = createStore( diff --git a/packages/app/src/context/global-sync/event-reducer.ts b/packages/app/src/context/global-sync/event-reducer.ts index 3e62e97bf..31fa5db3c 100644 --- a/packages/app/src/context/global-sync/event-reducer.ts +++ b/packages/app/src/context/global-sync/event-reducer.ts @@ -8,15 +8,52 @@ import type { Session, SessionStatus, Todo, + TodoSnapshot, } from "@opencode-ai/sdk/v2/client" import type { State, VcsCache } from "./types" import { trimSessions } from "./session-trim" import { dropSessionCaches } from "./session-cache" import { message as clean } from "@/utils/diffs" import type { createBlockerTerminalCache } from "./blocker-terminal-cache" +import type { TodoHydrateCoordinator } from "./todo-hydrate-coordinator" const SKIP_PARTS = new Set(["patch", "step-start", "step-finish"]) -type SetSessionTodo = (sessionID: string, todos: Todo[] | undefined, options?: { clearActiveParts?: boolean }) => void +type AcceptSessionTodo = (sessionID: string, snapshot: TodoSnapshot) => boolean +type ClearSessionTodoAuthoritative = (sessionID: string) => void +type TodoHydrateBoundary = Partial> + +function todoSnapshotFromProperties(properties: unknown): { sessionID: string; snapshot: TodoSnapshot } | undefined { + if (!properties || typeof properties !== "object") return undefined + const props = properties as { sessionID?: unknown; revision?: unknown; todos?: unknown } + if (typeof props.sessionID !== "string") return undefined + if (typeof props.revision !== "number") return undefined + if (!Array.isArray(props.todos)) return undefined + return { sessionID: props.sessionID, snapshot: { revision: props.revision, todos: props.todos as Todo[] } } +} + +function todoSnapshotFromPart(part: Part): { sessionID: string; snapshot: TodoSnapshot } | undefined { + if (part.type !== "tool") return undefined + if (part.tool !== "todowrite") return undefined + if (part.state.status !== "completed") return undefined + if (!part.sessionID) return undefined + const metadata = part.state.metadata + if (!metadata || typeof metadata !== "object") return undefined + const snapshot = metadata as { revision?: unknown; todos?: unknown } + if (typeof snapshot.revision !== "number") return undefined + if (!Array.isArray(snapshot.todos)) return undefined + return { sessionID: part.sessionID, snapshot: { revision: snapshot.revision, todos: snapshot.todos as Todo[] } } +} + +function acceptLiveTodo(input: { + directory: string + sessionID: string + snapshot: TodoSnapshot + acceptSessionTodo?: AcceptSessionTodo + todoHydrate?: Pick +}) { + if (input.todoHydrate?.canAcceptLiveTodo?.(input.directory, input.sessionID) === false) return false + return input.acceptSessionTodo?.(input.sessionID, input.snapshot) ?? false +} export function applyGlobalEvent(input: { event: { type: string; properties?: unknown } @@ -43,9 +80,16 @@ export function applyGlobalEvent(input: { }) } -function cleanupSessionCaches(setStore: SetStoreFunction, sessionID: string, setSessionTodo?: SetSessionTodo) { +function cleanupSessionCaches(input: { + setStore: SetStoreFunction + sessionID: string + clearSessionTodoAuthoritative?: ClearSessionTodoAuthoritative + todoHydrate?: Pick +}) { + const { setStore, sessionID } = input if (!sessionID) return - setSessionTodo?.(sessionID, undefined) + input.clearSessionTodoAuthoritative?.(sessionID) + input.todoHydrate?.invalidateSession?.(sessionID) setStore( produce((draft) => { dropSessionCaches(draft, [sessionID]) @@ -57,10 +101,13 @@ export function cleanupDroppedSessionCaches( store: Store, setStore: SetStoreFunction, next: Session[], - setSessionTodo?: SetSessionTodo, + options?: { onDropSession?: (sessionID: string) => void }, ) { const keep = new Set(next.map((item) => item.id)) + const dropped = store.session.map((session) => session.id).filter((sessionID) => !keep.has(sessionID)) + for (const sessionID of dropped) options?.onDropSession?.(sessionID) const stale = [ + ...dropped, ...Object.keys(store.message), ...Object.keys(store.turn_change_aggregate), ...Object.keys(store.todo), @@ -71,9 +118,6 @@ export function cleanupDroppedSessionCaches( .filter((sessionID): sessionID is string => !!sessionID), ].filter((sessionID, index, list) => !keep.has(sessionID) && list.indexOf(sessionID) === index) if (stale.length === 0) return - for (const sessionID of stale) { - setSessionTodo?.(sessionID, undefined) - } setStore( produce((draft) => { dropSessionCaches(draft, stale) @@ -82,31 +126,38 @@ export function cleanupDroppedSessionCaches( } export function applyDetachedDirectoryEvent(input: { + directory: string event: { type: string; properties?: unknown } - setSessionTodo?: SetSessionTodo + acceptSessionTodo?: AcceptSessionTodo + clearSessionTodoAuthoritative?: ClearSessionTodoAuthoritative + todoHydrate?: TodoHydrateBoundary }) { if (!input.event.properties || typeof input.event.properties !== "object") return false switch (input.event.type) { case "todo.updated": { - const props = input.event.properties as { sessionID?: string; todos?: Todo[] } - if (!props.sessionID || !Array.isArray(props.todos)) return false - input.setSessionTodo?.( - props.sessionID, - props.todos, - props.todos.length === 0 ? { clearActiveParts: true } : undefined, - ) + const todo = todoSnapshotFromProperties(input.event.properties) + if (!todo) return false + acceptLiveTodo({ + directory: input.directory, + sessionID: todo.sessionID, + snapshot: todo.snapshot, + acceptSessionTodo: input.acceptSessionTodo, + todoHydrate: input.todoHydrate, + }) return true } case "session.deleted": { const info = (input.event.properties as { info?: Session }).info if (!info?.id) return false - input.setSessionTodo?.(info.id, undefined) + input.clearSessionTodoAuthoritative?.(info.id) + input.todoHydrate?.invalidateSession?.(info.id) return true } case "session.updated": { const info = (input.event.properties as { info?: Session }).info if (!info?.id || !info.time?.archived) return false - input.setSessionTodo?.(info.id, undefined) + input.clearSessionTodoAuthoritative?.(info.id) + input.todoHydrate?.invalidateSession?.(info.id) return true } default: @@ -122,7 +173,9 @@ export function applyDirectoryEvent(input: { directory: string loadLsp: () => void vcsCache?: VcsCache - setSessionTodo?: SetSessionTodo + acceptSessionTodo?: AcceptSessionTodo + clearSessionTodoAuthoritative?: ClearSessionTodoAuthoritative + todoHydrate?: TodoHydrateBoundary blockerTerminals?: ReturnType }) { const event = input.event @@ -156,7 +209,12 @@ export function applyDirectoryEvent(input: { }), ) } - cleanupSessionCaches(input.setStore, info.id, input.setSessionTodo) + cleanupSessionCaches({ + setStore: input.setStore, + sessionID: info.id, + clearSessionTodoAuthoritative: input.clearSessionTodoAuthoritative, + todoHydrate: input.todoHydrate, + }) if (info.parentID) break input.setStore("sessionTotal", (value) => Math.max(0, value - 1)) break @@ -181,7 +239,12 @@ export function applyDirectoryEvent(input: { }), ) } - cleanupSessionCaches(input.setStore, info.id, input.setSessionTodo) + cleanupSessionCaches({ + setStore: input.setStore, + sessionID: info.id, + clearSessionTodoAuthoritative: input.clearSessionTodoAuthoritative, + todoHydrate: input.todoHydrate, + }) if (info.parentID) break input.setStore("sessionTotal", (value) => Math.max(0, value - 1)) break @@ -192,13 +255,16 @@ export function applyDirectoryEvent(input: { break } case "todo.updated": { - const props = event.properties as { sessionID: string; todos: Todo[] } - input.setStore("todo", props.sessionID, reconcile(props.todos, { key: "id" })) - input.setSessionTodo?.( - props.sessionID, - props.todos, - props.todos.length === 0 ? { clearActiveParts: true } : undefined, - ) + const todo = todoSnapshotFromProperties(event.properties) + if (!todo) break + const accepted = acceptLiveTodo({ + directory: input.directory, + sessionID: todo.sessionID, + snapshot: todo.snapshot, + acceptSessionTodo: input.acceptSessionTodo, + todoHydrate: input.todoHydrate, + }) + if (accepted) input.setStore("todo", todo.sessionID, reconcile(todo.snapshot.todos, { key: "id" })) break } case "session.status": { @@ -244,6 +310,17 @@ export function applyDirectoryEvent(input: { case "message.part.updated": { const part = (event.properties as { part: Part }).part if (SKIP_PARTS.has(part.type)) break + const todo = todoSnapshotFromPart(part) + if (todo) { + const accepted = acceptLiveTodo({ + directory: input.directory, + sessionID: todo.sessionID, + snapshot: todo.snapshot, + acceptSessionTodo: input.acceptSessionTodo, + todoHydrate: input.todoHydrate, + }) + if (accepted) input.setStore("todo", todo.sessionID, reconcile(todo.snapshot.todos, { key: "id" })) + } const parts = input.store.part[part.messageID] if (!parts) { input.setStore("part", part.messageID, [part]) diff --git a/packages/app/src/context/global-sync/todo-hydrate-coordinator.test.ts b/packages/app/src/context/global-sync/todo-hydrate-coordinator.test.ts new file mode 100644 index 000000000..42fd4e2cd --- /dev/null +++ b/packages/app/src/context/global-sync/todo-hydrate-coordinator.test.ts @@ -0,0 +1,203 @@ +import { describe, expect, test } from "bun:test" +import { createRoot } from "solid-js" +import { createTodoHydrateCoordinator } from "./todo-hydrate-coordinator" + +describe("createTodoHydrateCoordinator", () => { + test("tracks sessions and returns local eviction requests", () => { + createRoot((dispose) => { + const coordinator = createTodoHydrateCoordinator({ sessionLimit: 2, directoryLimit: 2 }) + + expect(coordinator.touch("dir-a", "ses_1")).toEqual([]) + expect(coordinator.touch("dir-a", "ses_2")).toEqual([]) + expect(coordinator.touch("dir-a", "ses_3")).toEqual([{ directory: "dir-a", sessionIDs: ["ses_1"] }]) + expect(coordinator.has("dir-a", "ses_1")).toBe(false) + expect(coordinator.has("dir-a", "ses_3")).toBe(true) + + dispose() + }) + }) + + test("keeps scheduled hydrates pending until the current token completes", () => { + createRoot((dispose) => { + const coordinator = createTodoHydrateCoordinator() + + coordinator.touch("dir-a", "ses_1") + coordinator.scheduleHydrate("dir-a", "ses_1", "visible") + expect(coordinator.isPending("dir-a", "ses_1")).toBe(true) + + const token = coordinator.beginHydrate("dir-a", "ses_1", "visible") + expect(coordinator.isCurrent(token)).toBe(true) + + coordinator.completeHydrate(token, { + cacheAccepted: true, + recoveryValidated: false, + liveWritesReopened: true, + }) + expect(coordinator.isPending("dir-a", "ses_1")).toBe(false) + + dispose() + }) + }) + + test("invalidates stale hydrate tokens", () => { + createRoot((dispose) => { + const coordinator = createTodoHydrateCoordinator() + + coordinator.touch("dir-a", "ses_1") + const stale = coordinator.beginHydrate("dir-a", "ses_1", "visible") + const current = coordinator.beginHydrate("dir-a", "ses_1", "recovery") + + expect(coordinator.isCurrent(stale)).toBe(false) + expect(coordinator.isCurrent(current)).toBe(true) + + dispose() + }) + }) + + test("cancels scheduled hydrate without evicting the tracked session", () => { + createRoot((dispose) => { + const coordinator = createTodoHydrateCoordinator() + + coordinator.touch("dir-a", "ses_1") + coordinator.scheduleHydrate("dir-a", "ses_1", "visible") + coordinator.cancelHydrate("dir-a", "ses_1") + + expect(coordinator.isPending("dir-a", "ses_1")).toBe(false) + expect(coordinator.has("dir-a", "ses_1")).toBe(true) + + dispose() + }) + }) + + test("records recovery validation against the token target epoch", () => { + createRoot((dispose) => { + const coordinator = createTodoHydrateCoordinator() + + coordinator.touch("dir-a", "ses_1") + const epoch1 = coordinator.markGlobalRecovery() + const token = coordinator.beginHydrate("dir-a", "ses_1", "recovery") + const epoch2 = coordinator.markGlobalRecovery() + + coordinator.completeHydrate(token, { + cacheAccepted: false, + recoveryValidated: true, + liveWritesReopened: true, + }) + + expect(epoch1).toBe(1) + expect(epoch2).toBe(2) + expect(coordinator.validatedRecoveryEpoch("dir-a", "ses_1")).toBe(1) + expect(coordinator.recoveryEpoch()).toBe(2) + + dispose() + }) + }) + + test("authoritative invalidation closes live writes until a current hydrate reopens them", () => { + createRoot((dispose) => { + const coordinator = createTodoHydrateCoordinator() + + coordinator.touch("dir-a", "ses_1") + const token = coordinator.beginHydrate("dir-a", "ses_1", "visible") + coordinator.invalidateSession("ses_1") + + expect(coordinator.isCurrent(token)).toBe(false) + expect(coordinator.canAcceptLiveTodo("dir-a", "ses_1")).toBe(false) + expect(coordinator.isAuthoritativelyInvalidated("ses_1")).toBe(true) + + coordinator.touch("dir-a", "ses_1") + const reopened = coordinator.beginHydrate("dir-a", "ses_1", "visible") + coordinator.completeHydrate(reopened, { + cacheAccepted: true, + recoveryValidated: false, + liveWritesReopened: true, + }) + + expect(coordinator.canAcceptLiveTodo("dir-a", "ses_1")).toBe(true) + expect(coordinator.isAuthoritativelyInvalidated("ses_1")).toBe(false) + + dispose() + }) + }) + + test("authoritative invalidation removes tracked sessions from every directory", () => { + createRoot((dispose) => { + const coordinator = createTodoHydrateCoordinator() + + coordinator.touch("dir-a", "ses_1") + coordinator.touch("dir-b", "ses_1") + coordinator.scheduleHydrate("dir-a", "ses_1", "visible") + coordinator.scheduleHydrate("dir-b", "ses_1", "busy") + + coordinator.invalidateSession("ses_1") + + expect(coordinator.has("dir-a", "ses_1")).toBe(false) + expect(coordinator.has("dir-b", "ses_1")).toBe(false) + expect(coordinator.isPending("dir-a", "ses_1")).toBe(false) + expect(coordinator.isPending("dir-b", "ses_1")).toBe(false) + expect(coordinator.isAuthoritativelyInvalidated("ses_1")).toBe(true) + + dispose() + }) + }) + + test("local session eviction clears pending and recovery state", () => { + createRoot((dispose) => { + const coordinator = createTodoHydrateCoordinator({ sessionLimit: 1 }) + + coordinator.touch("dir-a", "ses_1") + coordinator.markGlobalRecovery() + const token = coordinator.beginHydrate("dir-a", "ses_1", "recovery") + coordinator.completeHydrate(token, { + cacheAccepted: true, + recoveryValidated: true, + liveWritesReopened: true, + }) + coordinator.scheduleHydrate("dir-a", "ses_1", "visible") + + expect(coordinator.touch("dir-a", "ses_2")).toEqual([{ directory: "dir-a", sessionIDs: ["ses_1"] }]) + expect(coordinator.has("dir-a", "ses_1")).toBe(false) + expect(coordinator.isPending("dir-a", "ses_1")).toBe(false) + expect(coordinator.validatedRecoveryEpoch("dir-a", "ses_1")).toBe(0) + + dispose() + }) + }) + + test("forgetting a session clears recovery state", () => { + createRoot((dispose) => { + const coordinator = createTodoHydrateCoordinator() + + coordinator.touch("dir-a", "ses_1") + coordinator.markGlobalRecovery() + const token = coordinator.beginHydrate("dir-a", "ses_1", "recovery") + coordinator.completeHydrate(token, { + cacheAccepted: true, + recoveryValidated: true, + liveWritesReopened: true, + }) + + coordinator.invalidate("dir-a", "ses_1") + + expect(coordinator.has("dir-a", "ses_1")).toBe(false) + expect(coordinator.isPending("dir-a", "ses_1")).toBe(false) + expect(coordinator.validatedRecoveryEpoch("dir-a", "ses_1")).toBe(0) + + dispose() + }) + }) + + test("authoritative invalidation clears pending-only sessions", () => { + createRoot((dispose) => { + const coordinator = createTodoHydrateCoordinator() + + coordinator.scheduleHydrate("dir-a", "ses_1", "visible") + coordinator.invalidateSession("ses_1") + + expect(coordinator.isPending("dir-a", "ses_1")).toBe(false) + expect(coordinator.isAuthoritativelyInvalidated("ses_1")).toBe(true) + + dispose() + }) + }) +}) diff --git a/packages/app/src/context/global-sync/todo-hydrate-coordinator.ts b/packages/app/src/context/global-sync/todo-hydrate-coordinator.ts new file mode 100644 index 000000000..8b792520e --- /dev/null +++ b/packages/app/src/context/global-sync/todo-hydrate-coordinator.ts @@ -0,0 +1,224 @@ +import { createStore, produce } from "solid-js/store" +import { SESSION_CACHE_LIMIT, pickSessionCacheEvictions } from "./session-cache" +import { MAX_DIR_STORES } from "./types" + +export type TodoHydrateReason = "visible" | "busy" | "recovery" + +export type TodoHydrateToken = { + directory: string + sessionID: string + epoch: number + reason: TodoHydrateReason + targetRecoveryEpoch?: number +} + +export type SessionRequestEviction = { + directory: string + sessionIDs: string[] +} + +export type TodoHydrateCoordinator = ReturnType + +const keyFor = (directory: string, sessionID: string) => `${directory}\0${sessionID}` + +const splitKey = (key: string) => { + const index = key.indexOf("\0") + if (index < 0) return undefined + return { directory: key.slice(0, index), sessionID: key.slice(index + 1) } +} + +export function createTodoHydrateCoordinator(options?: { sessionLimit?: number; directoryLimit?: number }) { + const sessionLimit = options?.sessionLimit ?? SESSION_CACHE_LIMIT + const directoryLimit = options?.directoryLimit ?? MAX_DIR_STORES + const seen = new Map>() + const tokenEpoch = new Map() + let nextTokenEpoch = 0 + const [state, setState] = createStore({ + pending: {} as Record, + invalidated: {} as Record, + recoveryEpoch: 0, + validatedRecovery: {} as Record, + }) + + const bumpToken = (directory: string, sessionID: string) => { + const key = keyFor(directory, sessionID) + const next = nextTokenEpoch + 1 + nextTokenEpoch = next + tokenEpoch.set(key, next) + return next + } + + const clearPending = (directory: string, sessionID: string) => { + const key = keyFor(directory, sessionID) + setState( + "pending", + produce((draft) => { + delete draft[key] + }), + ) + } + + const clearSessionState = (directory: string, sessionID: string) => { + const key = keyFor(directory, sessionID) + seen.get(directory)?.delete(sessionID) + tokenEpoch.delete(key) + setState( + produce((draft) => { + delete draft.pending[key] + delete draft.validatedRecovery[key] + }), + ) + } + + const removeDirectoryState = (directory: string) => { + const sessionIDs = new Set(seen.get(directory) ?? []) + for (const key of Object.keys(state.pending)) { + const parsed = splitKey(key) + if (parsed?.directory === directory) sessionIDs.add(parsed.sessionID) + } + for (const key of Object.keys(state.validatedRecovery)) { + const parsed = splitKey(key) + if (parsed?.directory === directory) sessionIDs.add(parsed.sessionID) + } + for (const key of Array.from(tokenEpoch.keys())) { + const parsed = splitKey(key) + if (parsed?.directory === directory) sessionIDs.add(parsed.sessionID) + } + for (const sessionID of sessionIDs) clearSessionState(directory, sessionID) + seen.delete(directory) + } + + const touch = (directory: string, sessionID: string): SessionRequestEviction[] => { + if (!directory || !sessionID) return [] + let tracked = seen.get(directory) + if (tracked) { + seen.delete(directory) + seen.set(directory, tracked) + } else { + tracked = new Set() + seen.set(directory, tracked) + } + + const evictions: SessionRequestEviction[] = [] + const local = pickSessionCacheEvictions({ seen: tracked, keep: sessionID, limit: sessionLimit }) + if (local.length > 0) { + for (const evictedSessionID of local) clearSessionState(directory, evictedSessionID) + evictions.push({ directory, sessionIDs: local }) + } + + while (seen.size > directoryLimit) { + const staleDirectory = seen.keys().next().value + if (!staleDirectory) break + const sessionIDs = [...(seen.get(staleDirectory) ?? [])] + seen.delete(staleDirectory) + removeDirectoryState(staleDirectory) + if (sessionIDs.length > 0) evictions.push({ directory: staleDirectory, sessionIDs }) + } + + return evictions + } + + const has = (directory: string, sessionID: string) => seen.get(directory)?.has(sessionID) ?? false + + const scheduleHydrate = (directory: string, sessionID: string, reason: TodoHydrateReason) => { + if (!directory || !sessionID) return + setState("pending", keyFor(directory, sessionID), reason) + } + + const beginHydrate = (directory: string, sessionID: string, reason: TodoHydrateReason): TodoHydrateToken => { + scheduleHydrate(directory, sessionID, reason) + return { + directory, + sessionID, + reason, + epoch: bumpToken(directory, sessionID), + targetRecoveryEpoch: reason === "recovery" ? state.recoveryEpoch : undefined, + } + } + + const isCurrent = (token: TodoHydrateToken) => { + if (!has(token.directory, token.sessionID)) return false + return tokenEpoch.get(keyFor(token.directory, token.sessionID)) === token.epoch + } + + const completeHydrate = ( + token: TodoHydrateToken, + outcome: { cacheAccepted: boolean; recoveryValidated: boolean; liveWritesReopened: boolean }, + ) => { + if (!isCurrent(token)) return + clearPending(token.directory, token.sessionID) + if (token.reason === "recovery" && outcome.recoveryValidated && token.targetRecoveryEpoch !== undefined) { + setState("validatedRecovery", keyFor(token.directory, token.sessionID), token.targetRecoveryEpoch) + } + if (outcome.liveWritesReopened) { + setState( + "invalidated", + produce((draft) => { + delete draft[token.sessionID] + }), + ) + } + } + + const forgetSession = (directory: string, sessionID: string) => { + clearSessionState(directory, sessionID) + } + + const invalidateSession = (sessionID: string) => { + if (!sessionID) return + setState("invalidated", sessionID, true) + const directories = new Set() + for (const [directory, sessions] of seen) { + if (sessions.has(sessionID)) directories.add(directory) + } + for (const key of Object.keys(state.pending)) { + const parsed = splitKey(key) + if (parsed?.sessionID === sessionID) directories.add(parsed.directory) + } + for (const key of Object.keys(state.validatedRecovery)) { + const parsed = splitKey(key) + if (parsed?.sessionID === sessionID) directories.add(parsed.directory) + } + for (const key of Array.from(tokenEpoch.keys())) { + const parsed = splitKey(key) + if (parsed?.sessionID === sessionID) directories.add(parsed.directory) + } + for (const directory of directories) { + clearSessionState(directory, sessionID) + } + } + + const clearDirectory = (directory: string) => { + seen.delete(directory) + removeDirectoryState(directory) + } + + const markGlobalRecovery = () => { + const next = state.recoveryEpoch + 1 + setState("recoveryEpoch", next) + return next + } + + return { + touch, + has, + scheduleHydrate, + beginHydrate, + isCurrent, + completeHydrate, + cancelHydrate(directory: string, sessionID: string) { + bumpToken(directory, sessionID) + clearPending(directory, sessionID) + }, + isPending: (directory: string, sessionID: string) => state.pending[keyFor(directory, sessionID)] !== undefined, + isAuthoritativelyInvalidated: (sessionID: string) => state.invalidated[sessionID] === true, + canAcceptLiveTodo: (_directory: string, sessionID: string) => state.invalidated[sessionID] !== true, + invalidate: forgetSession, + invalidateSession, + clearDirectory, + markGlobalRecovery, + recoveryEpoch: () => state.recoveryEpoch, + validatedRecoveryEpoch: (directory: string, sessionID: string) => + state.validatedRecovery[keyFor(directory, sessionID)] ?? 0, + } +} diff --git a/packages/app/src/context/sync.tsx b/packages/app/src/context/sync.tsx index 2b9cde746..f3189a69e 100644 --- a/packages/app/src/context/sync.tsx +++ b/packages/app/src/context/sync.tsx @@ -12,7 +12,8 @@ import { import { useGlobalSync } from "./global-sync" import { useSDK } from "./sdk" import type { Message, Part } from "@opencode-ai/sdk/v2/client" -import { SESSION_CACHE_LIMIT, dropSessionCaches, pickSessionCacheEvictions } from "./global-sync/session-cache" +import { dropSessionCaches } from "./global-sync/session-cache" +import type { TodoHydrateReason } from "./global-sync/todo-hydrate-coordinator" import { diffs as list, message as clean } from "@/utils/diffs" const SKIP_PARTS = new Set(["patch", "step-start", "step-finish"]) @@ -35,6 +36,14 @@ const keyFor = (directory: string, id: string) => `${directory}\n${id}` const cmp = (a: string, b: string) => (a < b ? -1 : a > b ? 1 : 0) +function isNotFoundError(error: unknown) { + if (!error || typeof error !== "object") return false + const value = error as { name?: unknown; status?: unknown; statusCode?: unknown; response?: { status?: unknown } } + if (value.name === "NotFoundError") return true + if (value.status === 404 || value.statusCode === 404) return true + return value.response?.status === 404 +} + function merge(a: readonly T[], b: readonly T[]) { const map = new Map(a.map((item) => [item.id, item] as const)) for (const item of b) map.set(item.id, item) @@ -257,8 +266,6 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ const inflightDiff = new Map>() const inflightTodo = new Map>() const optimistic = new Map>() - const maxDirs = 30 - const seen = new Map>() const [meta, setMeta] = createStore({ limit: {} as Record, cursor: {} as Record, @@ -300,26 +307,6 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ ...(optimistic.get(keyFor(directory, sessionID))?.values() ?? []), ] - const seenFor = (directory: string) => { - const existing = seen.get(directory) - if (existing) { - seen.delete(directory) - seen.set(directory, existing) - return existing - } - const created = new Set() - seen.set(directory, created) - while (seen.size > maxDirs) { - const first = seen.keys().next().value - if (!first) break - const stale = [...(seen.get(first) ?? [])] - seen.delete(first) - const [, setStore] = globalSync.child(first, { bootstrap: false }) - evict(first, setStore, stale) - } - return created - } - const clearMeta = (directory: string, sessionIDs: string[]) => { if (sessionIDs.length === 0) return for (const sessionID of sessionIDs) { @@ -342,7 +329,6 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ if (sessionIDs.length === 0) return clearSessionPrefetch(directory, sessionIDs) for (const sessionID of sessionIDs) { - globalSync.todo.set(sessionID, undefined) clearOptimistic(directory, sessionID) } setStore( @@ -354,12 +340,15 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ } const touch = (directory: string, setStore: Setter, sessionID: string) => { - const stale = pickSessionCacheEvictions({ - seen: seenFor(directory), - keep: sessionID, - limit: SESSION_CACHE_LIMIT, - }) - evict(directory, setStore, stale) + const evictions = globalSync.todoHydrate.touch(directory, sessionID) + for (const item of evictions) { + if (item.directory === directory) { + evict(directory, setStore, item.sessionIDs) + continue + } + const [, staleSetStore] = globalSync.child(item.directory, { bootstrap: false }) + evict(item.directory, staleSetStore, item.sessionIDs) + } } const fetchMessages = async (input: { @@ -383,7 +372,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ } } - const tracked = (directory: string, sessionID: string) => seen.get(directory)?.has(sessionID) ?? false + const tracked = (directory: string, sessionID: string) => globalSync.todoHydrate.has(directory, sessionID) const loadMessages = async (input: { directory: string @@ -599,33 +588,56 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ }), ) }, - async todo(sessionID: string, opts?: { force?: boolean }) { + async todo(sessionID: string, opts?: { force?: boolean; reason?: TodoHydrateReason }) { const directory = sdk.directory const client = sdk.client const [store, setStore] = globalSync.child(directory) touch(directory, setStore, sessionID) - const existing = store.todo[sessionID] const cached = globalSync.data.session_todo[sessionID] - if (existing !== undefined) { - if (cached === undefined) { - globalSync.todo.set(sessionID, existing) - } - if (!opts?.force) return - } - if (cached !== undefined) { - setStore("todo", sessionID, reconcile(cached, { key: "id" })) + setStore("todo", sessionID, reconcile(cached.todos, { key: "id" })) + if (!opts?.force && opts?.reason !== "recovery") return } const key = keyFor(directory, sessionID) - return runInflight(inflightTodo, key, () => - retry(() => client.session.todo({ sessionID })).then((todo) => { - if (!tracked(directory, sessionID)) return - const list = todo.data ?? [] - setStore("todo", sessionID, reconcile(list, { key: "id" })) - globalSync.todo.set(sessionID, list) - }), - ) + const reason = opts?.reason ?? (opts?.force ? "busy" : "visible") + const inflightKey = `${key}\n${opts?.force || reason === "recovery" ? "force" : "normal"}` + return runInflight(inflightTodo, inflightKey, async () => { + const token = globalSync.todoHydrate.beginHydrate(directory, sessionID, reason) + try { + const todo = await retry(() => client.session.todo({ sessionID })) + if (!globalSync.todoHydrate.isCurrent(token)) return + const snapshot = todo.data + if (!snapshot) { + globalSync.todoHydrate.completeHydrate(token, { + cacheAccepted: false, + recoveryValidated: false, + liveWritesReopened: false, + }) + return + } + const accepted = globalSync.todo.accept(sessionID, snapshot) + const current = globalSync.data.session_todo[sessionID] + if (current) setStore("todo", sessionID, reconcile(current.todos, { key: "id" })) + globalSync.todoHydrate.completeHydrate(token, { + cacheAccepted: accepted || current?.revision === snapshot.revision, + recoveryValidated: true, + liveWritesReopened: true, + }) + } catch (err) { + if (!isNotFoundError(err)) throw err + if (!globalSync.todoHydrate.isCurrent(token)) return + globalSync.todo.clearAuthoritative(sessionID) + globalSync.todoHydrate.invalidateSession(sessionID) + clearSessionPrefetch(directory, [sessionID]) + setStore( + produce((draft) => { + dropSessionCaches(draft, [sessionID]) + }), + ) + clearMeta(directory, [sessionID]) + } + }) }, history: { more(sessionID: string) { @@ -665,7 +677,7 @@ export const { use: useSync, provider: SyncProvider } = createSimpleContext({ }, evict(sessionID: string, directory = sdk.directory) { const [, setStore] = globalSync.child(directory) - seenFor(directory).delete(sessionID) + globalSync.todoHydrate.invalidate(directory, sessionID) evict(directory, setStore, [sessionID]) }, fetch: async (count = 10) => { diff --git a/packages/app/src/pages/layout.tsx b/packages/app/src/pages/layout.tsx index 324765d74..a9d604339 100644 --- a/packages/app/src/pages/layout.tsx +++ b/packages/app/src/pages/layout.tsx @@ -963,9 +963,6 @@ export default function Layout(props: ParentProps) { if (stale.length > 0) { clearSessionPrefetch(directory, stale) - for (const id of stale) { - globalSync.todo.set(id, undefined) - } } const current = store.message[sessionID] ?? [] diff --git a/packages/app/src/pages/session.tsx b/packages/app/src/pages/session.tsx index 20bfa7b2d..9b26e8ea3 100644 --- a/packages/app/src/pages/session.tsx +++ b/packages/app/src/pages/session.tsx @@ -211,7 +211,12 @@ export default function Page() { statusType: (id) => sync.data.session_status[id]?.type, blocked: composer.blocked, hasMessageCache: (id) => sync.data.message[id] !== undefined, - hasTodoCache: (id) => sync.data.todo[id] !== undefined || globalSync.data.session_todo[id] !== undefined, + hasTodoCache: (id) => globalSync.data.session_todo[id] !== undefined, + isTodoInvalidated: globalSync.todoHydrate.isAuthoritativelyInvalidated, + scheduleTodoHydrate: globalSync.todoHydrate.scheduleHydrate, + cancelTodoHydrate: globalSync.todoHydrate.cancelHydrate, + recoveryEpoch: globalSync.todoHydrate.recoveryEpoch, + validatedRecoveryEpoch: globalSync.todoHydrate.validatedRecoveryEpoch, syncSession: (id, options) => sync.session.sync(id, options), syncTodo: (id, options) => sync.session.todo(id, options), emitRendererDiagnostic, diff --git a/packages/app/src/pages/session/session-todos.test.ts b/packages/app/src/pages/session/session-todos.test.ts index e22ff55d0..d28061ad7 100644 --- a/packages/app/src/pages/session/session-todos.test.ts +++ b/packages/app/src/pages/session/session-todos.test.ts @@ -40,35 +40,37 @@ const backendTodo = (content: string, status: Todo["status"] = "pending"): Todo priority: "medium", }) +const canonical = (todos: Todo[]) => ({ revision: 1, todos }) + describe("selectSessionTodos", () => { - test("prefers message-derived todos over lagging backend todos", () => { + test("uses backend todos over render-only parts placeholders", () => { const parts = [toolPart("todowrite", completedState({ input: { todos: [todo("from parts", "in_progress")] } }))] - expect(selectSessionTodos({ backend: [backendTodo("from backend", "pending")], parts })).toEqual([ - todo("from parts", "in_progress"), + expect(selectSessionTodos({ canonical: canonical([backendTodo("from backend", "pending")]), parts })).toEqual([ + backendTodo("from backend", "pending"), ]) }) test("uses backend terminal todos when matching message-derived todos are stale active", () => { const parts = [toolPart("todowrite", completedState({ input: { todos: [todo("task A", "in_progress")] } }))] - expect(selectSessionTodos({ backend: [backendTodo("task A", "completed")], parts })).toEqual([ + expect(selectSessionTodos({ canonical: canonical([backendTodo("task A", "completed")]), parts })).toEqual([ backendTodo("task A", "completed"), ]) }) - test("keeps message-derived active todos when terminal backend todos do not match", () => { + test("uses terminal backend even when parts describe a different active todo", () => { const parts = [toolPart("todowrite", completedState({ input: { todos: [todo("new task", "in_progress")] } }))] - expect(selectSessionTodos({ backend: [backendTodo("old task", "completed")], parts })).toEqual([ - todo("new task", "in_progress"), + expect(selectSessionTodos({ canonical: canonical([backendTodo("old task", "completed")]), parts })).toEqual([ + backendTodo("old task", "completed"), ]) }) test("returns completed-only historical parts for status summary display", () => { const parts = [toolPart("todowrite", completedState({ input: { todos: [todo("done from parts", "completed")] } }))] - expect(selectSessionTodos({ backend: [], parts })).toEqual([todo("done from parts", "completed")]) + expect(selectSessionTodos({ canonical: undefined, parts })).toEqual([todo("done from parts", "completed")]) }) test("falls back to latest todowrite parts when backend todos are unknown", () => { @@ -77,16 +79,16 @@ describe("selectSessionTodos", () => { toolPart("todowrite", completedState({ input: { todos: [todo("new", "in_progress")] } })), ] - expect(selectSessionTodos({ backend: undefined, parts })).toEqual([todo("new", "in_progress")]) + expect(selectSessionTodos({ canonical: undefined, parts })).toEqual([todo("new", "in_progress")]) }) - test("returns empty when known backend todos clear stale active parts", () => { + test("returns empty when backend snapshot is empty", () => { const parts = [toolPart("todowrite", completedState({ input: { todos: [todo("old", "in_progress")] } }))] - expect(selectSessionTodos({ backend: [], backendClearActivePartsAt: 1, parts })).toEqual([]) + expect(selectSessionTodos({ canonical: canonical([]), parts })).toEqual([]) }) - test("keeps active parts created after a live empty backend clear", () => { + test("does not use part timestamps to override an empty backend snapshot", () => { const parts = [ toolPart( "todowrite", @@ -94,15 +96,13 @@ describe("selectSessionTodos", () => { ), ] - expect(selectSessionTodos({ backend: [], backendClearActivePartsAt: 1, parts })).toEqual([ - todo("new", "in_progress"), - ]) + expect(selectSessionTodos({ canonical: canonical([]), parts })).toEqual([]) }) - test("keeps active parts over ordinary empty backend cache", () => { + test("returns empty when ordinary backend cache is empty", () => { const parts = [toolPart("todowrite", completedState({ input: { todos: [todo("new", "in_progress")] } }))] - expect(selectSessionTodos({ backend: [], parts })).toEqual([todo("new", "in_progress")]) + expect(selectSessionTodos({ canonical: canonical([]), parts })).toEqual([]) }) test("falls back to a secondary session source when the primary source is empty", () => { @@ -110,7 +110,7 @@ describe("selectSessionTodos", () => { toolPart("todowrite", completedState({ input: { todos: [todo("route todo", "in_progress")] } })), ] - expect(selectSessionTodos({ backend: [], parts: [], fallback: { parts: fallbackParts } })).toEqual([ + expect(selectSessionTodos({ parts: [], fallback: { parts: fallbackParts } })).toEqual([ todo("route todo", "in_progress"), ]) }) diff --git a/packages/app/src/pages/session/todos/todo-model.test.ts b/packages/app/src/pages/session/todos/todo-model.test.ts index f47348ab3..1c66c8db3 100644 --- a/packages/app/src/pages/session/todos/todo-model.test.ts +++ b/packages/app/src/pages/session/todos/todo-model.test.ts @@ -37,14 +37,12 @@ describe("todoSnapshot", () => { sessionID: "ses_1", source: "primary-parts", items: [todo("working", "in_progress", "high", "todo_1")], - sourceUpdatedAt: 10, }), ).toEqual({ sessionID: "ses_1", source: "primary-parts", items: [todo("working", "in_progress", "high", "todo_1")], phase: "active", - sourceUpdatedAt: 10, }) }) }) diff --git a/packages/app/src/pages/session/todos/todo-model.ts b/packages/app/src/pages/session/todos/todo-model.ts index 2102f658e..c63b4add1 100644 --- a/packages/app/src/pages/session/todos/todo-model.ts +++ b/packages/app/src/pages/session/todos/todo-model.ts @@ -1,8 +1,15 @@ import type { Todo } from "@opencode-ai/sdk/v2/client" -export type TodoPhase = "empty" | "active" | "terminal" +export type TodoPhase = "pending" | "empty" | "active" | "terminal" -export type TodoSourceKind = "primary-backend" | "primary-parts" | "fallback-backend" | "fallback-parts" | "none" +export type TodoSourceKind = + | "primary-backend" + | "primary-parts" + | "fallback-backend" + | "fallback-parts" + | "pending" + | "invalidated" + | "none" export type SessionTodoItem = Pick & Partial> @@ -11,7 +18,6 @@ export type TodoSnapshot = { source: TodoSourceKind items: SessionTodoItem[] phase: TodoPhase - sourceUpdatedAt?: number } export function isTerminalTodo(todo: Pick): boolean { @@ -27,14 +33,12 @@ export function todoSnapshot(input: { sessionID?: string source: TodoSourceKind items: SessionTodoItem[] - sourceUpdatedAt?: number + phase?: TodoPhase }): TodoSnapshot { - const phase = todoPhase(input.items) return { sessionID: input.sessionID, source: input.source, items: input.items, - phase, - sourceUpdatedAt: input.sourceUpdatedAt, + phase: input.phase ?? todoPhase(input.items), } } diff --git a/packages/app/src/pages/session/todos/todo-source.test.ts b/packages/app/src/pages/session/todos/todo-source.test.ts index 0d820c002..670693428 100644 --- a/packages/app/src/pages/session/todos/todo-source.test.ts +++ b/packages/app/src/pages/session/todos/todo-source.test.ts @@ -32,110 +32,64 @@ const todo = (content: string, status: Todo["status"] = "pending"): Todo => ({ priority: "medium", }) as Todo +const canonical = (todos: Todo[]) => ({ revision: 1, todos }) + describe("selectSessionTodoDataSnapshot", () => { test("returns only status summary fields", () => { const parts = [toolPart("todowrite", completedState({ input: { todos: [todo("status only", "in_progress")] } }))] - expect(Object.keys(selectSessionTodoDataSnapshot({ primary: { backend: undefined, parts } })).sort()).toEqual( - ["items", "phase", "sessionID", "source", "sourceUpdatedAt"].sort(), + expect(Object.keys(selectSessionTodoDataSnapshot({ primary: { canonical: undefined, parts } })).sort()).toEqual( + ["items", "phase", "sessionID", "source"].sort(), ) }) - test("returns completed-only parts for status summary display", () => { - const parts = [toolPart("todowrite", completedState({ input: { todos: [todo("done from parts", "completed")] } }))] - - expect(selectSessionTodoDataSnapshot({ primary: { backend: [], parts } })).toMatchObject({ - source: "primary-parts", - items: [todo("done from parts", "completed")], - phase: "terminal", - }) - }) - - test("uses matching backend terminal todos over stale active parts", () => { - const parts = [toolPart("todowrite", completedState({ input: { todos: [todo("task A", "in_progress")] } }))] - - expect( - selectSessionTodoDataSnapshot({ - primary: { backend: [todo("task A", "completed")], parts }, - }), - ).toMatchObject({ - source: "primary-backend", - items: [todo("task A", "completed")], - phase: "terminal", - }) - }) - - test("prefers active primary message-derived todos over lagging backend todos", () => { + test("uses transcript parts only while backend snapshot is absent", () => { const parts = [toolPart("todowrite", completedState({ input: { todos: [todo("from parts", "in_progress")] } }))] - expect( - selectSessionTodoDataSnapshot({ primary: { backend: [todo("from backend", "pending")], parts } }), - ).toMatchObject({ + + expect(selectSessionTodoDataSnapshot({ primary: { canonical: undefined, parts } })).toMatchObject({ source: "primary-parts", items: [todo("from parts", "in_progress")], phase: "active", }) }) - test("prefers terminal primary parts over lagging backend todos", () => { - const parts = [toolPart("todowrite", completedState({ input: { todos: [todo("done from parts", "completed")] } }))] - - expect( - selectSessionTodoDataSnapshot({ primary: { backend: [todo("from backend", "pending")], parts } }), - ).toMatchObject({ - source: "primary-parts", - items: [todo("done from parts", "completed")], - phase: "terminal", - }) - }) - - test("backend terminal updates override stale active parts", () => { - // Scenario: LLM called todowrite once marking task as in_progress. - // Later, backend received a todo.updated event marking it completed. - // Backend terminal state should take precedence over stale active parts. - const parts = [ - toolPart("todowrite", completedState({ input: { todos: [todo("task A", "in_progress")] } })), - ] + test("uses backend todos over render-only parts placeholders", () => { + const parts = [toolPart("todowrite", completedState({ input: { todos: [todo("from parts", "in_progress")] } }))] expect( - selectSessionTodoDataSnapshot({ - primary: { backend: [todo("task A", "completed")], parts }, - }), + selectSessionTodoDataSnapshot({ primary: { canonical: canonical([todo("from backend", "pending")]), parts } }), ).toMatchObject({ source: "primary-backend", - items: [todo("task A", "completed")], - phase: "terminal", + items: [todo("from backend", "pending")], + phase: "active", }) }) - test("keeps active parts when terminal backend describes a different todo", () => { + test("uses terminal backend even when parts describe a different active todo", () => { const parts = [toolPart("todowrite", completedState({ input: { todos: [todo("new task", "in_progress")] } }))] expect( selectSessionTodoDataSnapshot({ - primary: { backend: [todo("old task", "completed")], parts }, + primary: { canonical: canonical([todo("old task", "completed")]), parts }, }), ).toMatchObject({ - source: "primary-parts", - items: [todo("new task", "in_progress")], - phase: "active", + source: "primary-backend", + items: [todo("old task", "completed")], + phase: "terminal", }) }) - test("uses known empty backend over stale active parts", () => { + test("uses known empty backend over active parts placeholders", () => { const parts = [toolPart("todowrite", completedState({ input: { todos: [todo("cleared task", "in_progress")] } }))] - expect( - selectSessionTodoDataSnapshot({ - primary: { backend: [], backendClearActivePartsAt: 1, parts }, - }), - ).toMatchObject({ + expect(selectSessionTodoDataSnapshot({ primary: { canonical: canonical([]), parts } })).toMatchObject({ source: "primary-backend", items: [], phase: "empty", }) }) - test("keeps active parts created after a live empty backend clear", () => { + test("does not use part timestamps to override an empty backend snapshot", () => { const parts = [ toolPart( "todowrite", @@ -143,125 +97,84 @@ describe("selectSessionTodoDataSnapshot", () => { ), ] - expect( - selectSessionTodoDataSnapshot({ - primary: { backend: [], backendClearActivePartsAt: 1, parts }, - }), - ).toMatchObject({ - source: "primary-parts", - items: [todo("new task", "in_progress")], - phase: "active", - }) - }) - - test("keeps active parts over ordinary empty backend cache", () => { - const parts = [toolPart("todowrite", completedState({ input: { todos: [todo("new task", "in_progress")] } }))] - - expect( - selectSessionTodoDataSnapshot({ - primary: { backend: [], parts }, - }), - ).toMatchObject({ - source: "primary-parts", - items: [todo("new task", "in_progress")], - phase: "active", - }) - }) - - test("does not reopen completed-only historical parts over an empty backend", () => { - const parts = [toolPart("todowrite", completedState({ input: { todos: [todo("done from parts", "completed")] } }))] - - expect(selectSessionTodoDataSnapshot({ primary: { backend: [], parts } })).toMatchObject({ - source: "primary-parts", - items: [todo("done from parts", "completed")], - phase: "terminal", + expect(selectSessionTodoDataSnapshot({ primary: { canonical: canonical([]), parts } })).toMatchObject({ + source: "primary-backend", + items: [], + phase: "empty", }) }) - test("uses active fallback parts when primary sources are empty", () => { + test("falls back to active fallback parts when primary sources are empty", () => { const fallbackParts = [ toolPart("todowrite", completedState({ input: { todos: [todo("route todo", "in_progress")] } })), ] expect( selectSessionTodoDataSnapshot({ - primary: { backend: [], parts: [] }, + primary: { parts: [] }, fallback: { parts: fallbackParts }, }), ).toMatchObject({ source: "fallback-parts", items: [todo("route todo", "in_progress")] }) }) - test("uses fallback backend when no active fallback parts exist", () => { - expect( - selectSessionTodoDataSnapshot({ - primary: { backend: [], parts: [] }, - fallback: { backend: [todo("fallback backend", "pending")], parts: [] }, - }), - ).toMatchObject({ source: "fallback-backend", items: [todo("fallback backend", "pending")] }) - }) - - test("uses matching fallback backend terminal todos over stale fallback active parts", () => { + test("uses known empty fallback backend over fallback active parts placeholders", () => { const fallbackParts = [ - toolPart("todowrite", completedState({ input: { todos: [todo("fallback task", "in_progress")] } })), + toolPart("todowrite", completedState({ input: { todos: [todo("fallback cleared", "in_progress")] } })), ] expect( selectSessionTodoDataSnapshot({ - primary: { backend: [], parts: [] }, - fallback: { backend: [todo("fallback task", "completed")], parts: fallbackParts }, + primary: { parts: [] }, + fallback: { canonical: canonical([]), parts: fallbackParts }, }), ).toMatchObject({ source: "fallback-backend", - items: [todo("fallback task", "completed")], - phase: "terminal", + items: [], + phase: "empty", }) }) - test("uses known empty fallback backend over stale fallback active parts", () => { + test("keeps present empty canonical snapshots authoritative over fallback parts", () => { const fallbackParts = [ - toolPart("todowrite", completedState({ input: { todos: [todo("fallback cleared", "in_progress")] } })), + toolPart("todowrite", completedState({ input: { todos: [todo("route todo", "in_progress")] } })), ] expect( selectSessionTodoDataSnapshot({ - primary: { backend: [], parts: [] }, - fallback: { backend: [], backendClearActivePartsAt: 1, parts: fallbackParts }, + primary: { canonical: canonical([]), parts: [] }, + fallback: { parts: fallbackParts }, }), ).toMatchObject({ - source: "fallback-backend", + source: "primary-backend", items: [], phase: "empty", }) }) - test("keeps fallback active parts over ordinary empty fallback backend cache", () => { - const fallbackParts = [ - toolPart("todowrite", completedState({ input: { todos: [todo("fallback active", "in_progress")] } })), - ] + test("does not show historical parts after authoritative invalidation", () => { + const parts = [toolPart("todowrite", completedState({ input: { todos: [todo("old task", "in_progress")] } }))] expect( selectSessionTodoDataSnapshot({ - primary: { backend: [], parts: [] }, - fallback: { backend: [], parts: fallbackParts }, + primary: { parts, isAuthoritativelyInvalidated: true }, }), ).toMatchObject({ - source: "fallback-parts", - items: [todo("fallback active", "in_progress")], - phase: "active", + source: "invalidated", + items: [], + phase: "empty", }) }) - test("keeps primary terminal backend ahead of fallback active parts", () => { - const fallbackParts = [ - toolPart("todowrite", completedState({ input: { todos: [todo("fallback active", "in_progress")] } })), - ] - + test("returns an explicit pending snapshot while canonical hydrate is pending", () => { expect( selectSessionTodoDataSnapshot({ - primary: { backend: [todo("primary done", "completed")], parts: [] }, - fallback: { backend: [], parts: fallbackParts }, + primary: { parts: [], isPending: true }, }), - ).toMatchObject({ source: "primary-backend", items: [todo("primary done", "completed")], phase: "terminal" }) + ).toMatchObject({ + source: "pending", + items: [], + phase: "pending", + }) }) }) @@ -269,55 +182,20 @@ describe("selectSessionTodos", () => { test("keeps the existing items-only wrapper", () => { const parts = [toolPart("todowrite", completedState({ input: { todos: [todo("from parts", "in_progress")] } }))] - expect(selectSessionTodos({ backend: undefined, parts })).toEqual([ - todo("from parts", "in_progress"), - ]) + expect(selectSessionTodos({ canonical: undefined, parts })).toEqual([todo("from parts", "in_progress")]) }) - test("returns backend terminal todos when matching parts are stale active", () => { + test("returns backend terminal todos when parts are stale active placeholders", () => { const parts = [toolPart("todowrite", completedState({ input: { todos: [todo("task A", "in_progress")] } }))] - expect(selectSessionTodos({ backend: [todo("task A", "completed")], parts })).toEqual([ + expect(selectSessionTodos({ canonical: canonical([todo("task A", "completed")]), parts })).toEqual([ todo("task A", "completed"), ]) }) - test("returns fallback backend terminal todos when matching fallback parts are stale active", () => { - const fallbackParts = [ - toolPart("todowrite", completedState({ input: { todos: [todo("fallback task", "in_progress")] } })), - ] - - expect( - selectSessionTodos({ - backend: [], - parts: [], - fallback: { backend: [todo("fallback task", "completed")], parts: fallbackParts }, - }), - ).toEqual([todo("fallback task", "completed")]) - }) - - test("returns empty todos when known empty backend clears stale active parts", () => { + test("returns empty todos when backend snapshot is empty", () => { const parts = [toolPart("todowrite", completedState({ input: { todos: [todo("cleared task", "in_progress")] } }))] - expect(selectSessionTodos({ backend: [], backendClearActivePartsAt: 1, parts })).toEqual([]) - }) - - test("returns active parts created after a live empty backend clear", () => { - const parts = [ - toolPart( - "todowrite", - completedState({ input: { todos: [todo("new task", "in_progress")] }, time: { start: 2, end: 2 } }), - ), - ] - - expect(selectSessionTodos({ backend: [], backendClearActivePartsAt: 1, parts })).toEqual([ - todo("new task", "in_progress"), - ]) - }) - - test("returns active parts when ordinary empty backend cache is older", () => { - const parts = [toolPart("todowrite", completedState({ input: { todos: [todo("new task", "in_progress")] } }))] - - expect(selectSessionTodos({ backend: [], parts })).toEqual([todo("new task", "in_progress")]) + expect(selectSessionTodos({ canonical: canonical([]), parts })).toEqual([]) }) }) diff --git a/packages/app/src/pages/session/todos/todo-source.ts b/packages/app/src/pages/session/todos/todo-source.ts index 11dea5500..64768ed1d 100644 --- a/packages/app/src/pages/session/todos/todo-source.ts +++ b/packages/app/src/pages/session/todos/todo-source.ts @@ -1,11 +1,17 @@ import type { Part, Todo } from "@opencode-ai/sdk/v2" -import { extractTodos, TOOL_TODOWRITE } from "@/pages/session/session-status-extractors" -import { todoPhase, todoSnapshot, type SessionTodoItem, type TodoSnapshot, type TodoSourceKind } from "./todo-model" +import { extractTodos } from "@/pages/session/session-status-extractors" +import { todoSnapshot, type SessionTodoItem, type TodoSnapshot, type TodoSourceKind } from "./todo-model" + +export type CanonicalTodoSnapshot = { + revision: number + todos: Todo[] +} export type SessionTodoSource = { sessionID?: string - backend?: Todo[] - backendClearActivePartsAt?: number + canonical?: CanonicalTodoSnapshot + isAuthoritativelyInvalidated?: boolean + isPending?: boolean parts: Part[] } @@ -16,72 +22,47 @@ export type SelectSessionTodosInput = { const partTodos = (parts: Part[]) => extractTodos(parts) -const latestTodoWriteTime = (parts: Part[]) => { - let latest: number | undefined - for (const part of parts) { - if (part.type !== "tool") continue - if (part.tool !== TOOL_TODOWRITE) continue - if (part.state.status !== "completed") continue - const time = part.state.time - const value = typeof time.end === "number" ? time.end : typeof time.start === "number" ? time.start : undefined - if (value === undefined) continue - latest = latest === undefined ? value : Math.max(latest, value) - } - return latest -} - -const sameTodoList = (backend: SessionTodoItem[], parts: SessionTodoItem[]) => { - if (backend.length !== parts.length) return false - return parts.every((part, index) => { - const fromBackend = backend[index] - if (!fromBackend) return false - if (part.id && fromBackend.id) return part.id === fromBackend.id - return part.content === fromBackend.content - }) -} - const sourceTodoSnapshot = ( input: SessionTodoSource, source: { backend: TodoSourceKind; parts: TodoSourceKind }, ): TodoSnapshot | undefined => { - const sourceParts = partTodos(input.parts) - const sourceBackend = input.backend ?? [] + if (input.isAuthoritativelyInvalidated) { + return todoSnapshot({ + sessionID: input.sessionID, + source: "invalidated", + items: [], + }) + } - if (sourceParts.length > 0 && sourceBackend.length > 0) { - const partsPhase = todoPhase(sourceParts) - const backendPhase = todoPhase(sourceBackend) - if (backendPhase === "terminal" && partsPhase === "active" && sameTodoList(sourceBackend, sourceParts)) { - return todoSnapshot({ - sessionID: input.sessionID, - source: source.backend, - items: sourceBackend, - }) - } + if (input.canonical !== undefined) { + return todoSnapshot({ + sessionID: input.sessionID, + source: source.backend, + items: input.canonical.todos, + }) } + if (input.isPending) { + return todoSnapshot({ + sessionID: input.sessionID, + source: "pending", + items: [], + phase: "pending", + }) + } + + const sourceParts = partTodos(input.parts) if (sourceParts.length > 0) { - const phase = todoPhase(sourceParts) - if (input.backendClearActivePartsAt !== undefined && sourceBackend.length === 0 && phase === "active") { - const partsTime = latestTodoWriteTime(input.parts) - if (partsTime === undefined || partsTime <= input.backendClearActivePartsAt) { - return todoSnapshot({ sessionID: input.sessionID, source: source.backend, items: [] }) - } - } return todoSnapshot({ sessionID: input.sessionID, source: source.parts, items: sourceParts, - sourceUpdatedAt: latestTodoWriteTime(input.parts), }) } - - if (sourceBackend.length > 0) { - return todoSnapshot({ sessionID: input.sessionID, source: source.backend, items: sourceBackend }) - } } -// Data snapshots are for status displays and should preserve the latest todo -// list even when it is terminal. +// Data snapshots are for status displays. Transcript parts are render-only +// placeholders until the canonical backend snapshot is present. export function selectSessionTodoDataSnapshot(input: SelectSessionTodosInput): TodoSnapshot { const primary = sourceTodoSnapshot(input.primary, { backend: "primary-backend", parts: "primary-parts" }) if (primary) return primary diff --git a/packages/app/src/pages/session/use-session-refresh-effects.test.ts b/packages/app/src/pages/session/use-session-refresh-effects.test.ts new file mode 100644 index 000000000..130182c9b --- /dev/null +++ b/packages/app/src/pages/session/use-session-refresh-effects.test.ts @@ -0,0 +1,135 @@ +import { describe, test } from "bun:test" +import { runBrowserCheck } from "@/testing/browser-subprocess" + +const browserCheck = String.raw` +import { createRoot, createSignal } from "solid-js" +import { useSessionRefreshEffects } from "./src/pages/session/use-session-refresh-effects.ts" + +const assert = (condition, message) => { + if (!condition) throw new Error(message) +} + +const installAnimationFrameQueue = () => { + let nextID = 1 + const frames = new Map() + + globalThis.requestAnimationFrame = (callback) => { + const id = nextID++ + frames.set(id, callback) + return id + } + + globalThis.cancelAnimationFrame = (id) => { + frames.delete(id) + } +} + +const installTimerQueue = () => { + let nextID = 1 + const timers = new Map() + + window.setTimeout = (callback) => { + const id = nextID++ + timers.set(id, callback) + return id + } + + window.clearTimeout = (id) => { + timers.delete(id) + } +} + +const mountRefreshEffects = ({ hasTodoCache, recoveryEpoch = () => 0, validatedRecoveryEpoch = () => 0 }) => { + const scheduledTodos = [] + const canceledTodos = [] + const syncedTodos = [] + + const dispose = createRoot((dispose) => { + const [directory] = createSignal("dir-a") + const [sessionID] = createSignal("ses_initial") + + useSessionRefreshEffects({ + directory, + routeSessionID: sessionID, + timelineSessionID: sessionID, + statusType: () => "idle", + blocked: () => false, + hasMessageCache: () => true, + hasTodoCache, + isTodoInvalidated: () => false, + scheduleTodoHydrate: (directory, sessionID, reason) => { + scheduledTodos.push({ directory, sessionID, reason }) + }, + cancelTodoHydrate: (directory, sessionID) => { + canceledTodos.push({ directory, sessionID }) + }, + recoveryEpoch, + validatedRecoveryEpoch, + syncSession: () => {}, + syncTodo: (sessionID, options) => { + syncedTodos.push({ sessionID, options }) + }, + emitRendererDiagnostic: () => {}, + }) + + return dispose + }) + + return { dispose, scheduledTodos, canceledTodos, syncedTodos } +} + +{ + installAnimationFrameQueue() + installTimerQueue() + const { dispose, scheduledTodos, syncedTodos } = mountRefreshEffects({ hasTodoCache: () => false }) + + await Promise.resolve() + assert(scheduledTodos.length === 1, "initial visible todo should schedule hydrate before the frame boundary") + assert(scheduledTodos[0].directory === "dir-a", "initial todo schedule should use the current directory") + assert(scheduledTodos[0].sessionID === "ses_initial", "initial todo schedule should use the visible session") + assert(scheduledTodos[0].reason === "visible", "initial absent-cache todo schedule should use the visible reason") + assert(syncedTodos.length === 0, "initial todo hydrate should not start before the frame boundary") + dispose() +} + +{ + installAnimationFrameQueue() + installTimerQueue() + const { dispose, scheduledTodos } = mountRefreshEffects({ hasTodoCache: () => true }) + + await Promise.resolve() + assert(scheduledTodos.length === 0, "initial cached idle todo should not schedule a redundant hydrate") + dispose() +} + +{ + installAnimationFrameQueue() + installTimerQueue() + const [recoveryEpoch, setRecoveryEpoch] = createSignal(1) + const [validatedRecoveryEpoch, setValidatedRecoveryEpoch] = createSignal(0) + const { dispose, scheduledTodos, canceledTodos } = mountRefreshEffects({ + hasTodoCache: () => true, + recoveryEpoch, + validatedRecoveryEpoch, + }) + + await Promise.resolve() + assert(scheduledTodos.length === 1, "stale cached todo should schedule recovery hydrate") + assert(scheduledTodos[0].reason === "recovery", "stale cached todo schedule should use recovery reason") + + setValidatedRecoveryEpoch(1) + await Promise.resolve() + assert(canceledTodos.length === 1, "validated recovery change should cancel the stale recovery hydrate") + + setRecoveryEpoch(2) + await Promise.resolve() + assert(scheduledTodos.length === 2, "new recovery epoch should schedule another recovery hydrate") + dispose() +} +` + +describe("useSessionRefreshEffects", () => { + test("schedules initial visible todo hydrate before the frame boundary", () => { + runBrowserCheck(browserCheck) + }) +}) diff --git a/packages/app/src/pages/session/use-session-refresh-effects.ts b/packages/app/src/pages/session/use-session-refresh-effects.ts index 7385108c7..01b8c649b 100644 --- a/packages/app/src/pages/session/use-session-refresh-effects.ts +++ b/packages/app/src/pages/session/use-session-refresh-effects.ts @@ -1,7 +1,13 @@ import { createEffect, on, onCleanup, untrack } from "solid-js" import { getSessionPrefetch, SESSION_PREFETCH_TTL } from "@/context/global-sync/session-prefetch" +import type { TodoHydrateReason } from "@/context/global-sync/todo-hydrate-coordinator" import type { RendererDiagnosticInput } from "@/context/platform" +type TodoSyncOptions = { + force?: boolean + reason?: TodoHydrateReason +} + export function useSessionRefreshEffects(input: { directory: () => string routeSessionID: () => string | undefined @@ -10,14 +16,20 @@ export function useSessionRefreshEffects(input: { blocked: () => boolean hasMessageCache: (sessionID: string) => boolean hasTodoCache: (sessionID: string) => boolean + isTodoInvalidated?: (sessionID: string) => boolean + scheduleTodoHydrate?: (directory: string, sessionID: string, reason: TodoHydrateReason) => void + cancelTodoHydrate?: (directory: string, sessionID: string) => void + recoveryEpoch?: () => number + validatedRecoveryEpoch?: (directory: string, sessionID: string) => number syncSession: (sessionID: string, options?: { force?: boolean }) => void | Promise - syncTodo: (sessionID: string, options?: { force?: boolean }) => void | Promise + syncTodo: (sessionID: string, options?: TodoSyncOptions) => void | Promise emitRendererDiagnostic?: (event: RendererDiagnosticInput) => void | Promise }) { let refreshFrame: number | undefined let refreshTimer: number | undefined let todoFrame: number | undefined let todoTimer: number | undefined + let scheduledTodo: { directory: string; sessionID: string } | undefined const emitRefresh = (event: RendererDiagnosticInput) => { try { @@ -66,7 +78,17 @@ export function useSessionRefreshEffects(input: { }) } - const syncTodoWithDiagnostics = (id: string, options: { force?: boolean } | undefined, cachePresent: boolean) => { + const cancelScheduledTodo = () => { + if (todoFrame !== undefined) cancelAnimationFrame(todoFrame) + if (todoTimer !== undefined) window.clearTimeout(todoTimer) + todoFrame = undefined + todoTimer = undefined + const pending = scheduledTodo + scheduledTodo = undefined + if (pending) input.cancelTodoHydrate?.(pending.directory, pending.sessionID) + } + + const syncTodoWithDiagnostics = (id: string, options: TodoSyncOptions | undefined, cachePresent: boolean) => { const startedAt = performance.now() const routeSessionID = input.routeSessionID() const phase = options?.force ? "todo_force" : "todo" @@ -143,36 +165,56 @@ export function useSessionRefreshEffects(input: { on( () => { const id = input.timelineSessionID() - return [input.directory(), id, id ? (input.statusType(id) ?? "idle") : "idle", id ? input.blocked() : false] as const + const dir = input.directory() + return [ + dir, + id, + id ? (input.statusType(id) ?? "idle") : "idle", + id ? input.blocked() : false, + input.recoveryEpoch?.() ?? 0, + id ? (input.validatedRecoveryEpoch?.(dir, id) ?? 0) : 0, + ] as const }, - ([dir, id, status, blocked]) => { - if (todoFrame !== undefined) cancelAnimationFrame(todoFrame) - if (todoTimer !== undefined) window.clearTimeout(todoTimer) - todoFrame = undefined - todoTimer = undefined + ([dir, id, status, blocked, recoveryEpoch, recoveryValidated]) => { + cancelScheduledTodo() if (!id) return - if (status === "idle" && !blocked) return + if (input.isTodoInvalidated?.(id)) return const cached = untrack(() => input.hasTodoCache(id)) + const recoveryDue = cached && recoveryEpoch > recoveryValidated + const busy = status !== "idle" || blocked + const reason: TodoHydrateReason | undefined = recoveryDue ? "recovery" : busy ? "busy" : cached ? undefined : "visible" + if (!reason) return + + input.scheduleTodoHydrate?.(dir, id, reason) + scheduledTodo = { directory: dir, sessionID: id } todoFrame = requestAnimationFrame(() => { todoFrame = undefined todoTimer = window.setTimeout(() => { todoTimer = undefined if (input.directory() !== dir || input.timelineSessionID() !== id) return + scheduledTodo = undefined untrack(() => { - syncTodoWithDiagnostics(id, cached ? { force: true } : undefined, cached) + if (input.isTodoInvalidated?.(id)) { + input.cancelTodoHydrate?.(dir, id) + return + } + const currentCached = input.hasTodoCache(id) + if (reason === "visible" && currentCached) { + input.cancelTodoHydrate?.(dir, id) + return + } + syncTodoWithDiagnostics(id, { force: recoveryDue || (busy && currentCached), reason }, cached) }) }, 0) }) }, - { defer: true }, ), ) onCleanup(() => { if (refreshFrame !== undefined) cancelAnimationFrame(refreshFrame) if (refreshTimer !== undefined) window.clearTimeout(refreshTimer) - if (todoFrame !== undefined) cancelAnimationFrame(todoFrame) - if (todoTimer !== undefined) window.clearTimeout(todoTimer) + cancelScheduledTodo() }) } diff --git a/packages/opencode/migration/20260523120000_session_todo_revision/migration.sql b/packages/opencode/migration/20260523120000_session_todo_revision/migration.sql new file mode 100644 index 000000000..209e2d9d5 --- /dev/null +++ b/packages/opencode/migration/20260523120000_session_todo_revision/migration.sql @@ -0,0 +1,14 @@ +CREATE TABLE `session_todo_revision` ( + `session_id` text PRIMARY KEY NOT NULL, + `revision` integer DEFAULT 0 NOT NULL, + CONSTRAINT `fk_session_todo_revision_session_id_session_id_fk` FOREIGN KEY (`session_id`) REFERENCES `session`(`id`) ON DELETE CASCADE +); +--> statement-breakpoint +INSERT INTO `session_todo_revision` (`session_id`, `revision`) +SELECT DISTINCT `session_id`, 1 +FROM `todo` +WHERE NOT EXISTS ( + SELECT 1 + FROM `session_todo_revision` + WHERE `session_todo_revision`.`session_id` = `todo`.`session_id` +); diff --git a/packages/opencode/src/server/instance/session.ts b/packages/opencode/src/server/instance/session.ts index fe9e3dfc3..7c8df281f 100644 --- a/packages/opencode/src/server/instance/session.ts +++ b/packages/opencode/src/server/instance/session.ts @@ -239,7 +239,7 @@ export const SessionRoutes = lazy(() => description: "Todo list", content: { "application/json": { - schema: resolver(Todo.Info.array()), + schema: resolver(Todo.Snapshot), }, }, }, diff --git a/packages/opencode/src/session/session.sql.ts b/packages/opencode/src/session/session.sql.ts index 049b6b571..59e202a06 100644 --- a/packages/opencode/src/session/session.sql.ts +++ b/packages/opencode/src/session/session.sql.ts @@ -112,6 +112,14 @@ export const TodoTable = sqliteTable( ], ) +export const SessionTodoRevisionTable = sqliteTable("session_todo_revision", { + session_id: text() + .$type() + .primaryKey() + .references(() => SessionTable.id, { onDelete: "cascade" }), + revision: integer().notNull().default(0), +}) + export const SessionEntryTable = sqliteTable( "session_entry", { diff --git a/packages/opencode/src/session/todo.ts b/packages/opencode/src/session/todo.ts index 7c66a15e2..7610dd59d 100644 --- a/packages/opencode/src/session/todo.ts +++ b/packages/opencode/src/session/todo.ts @@ -4,8 +4,8 @@ import { SessionID, TodoID as TodoIDSchema } from "./schema" import type { TodoID as TodoIDType } from "./schema" import { Effect, Layer, Context } from "effect" import z from "zod" -import { Database, eq, asc } from "../storage/db" -import { TodoTable } from "./session.sql" +import { Database, eq, asc, sql, NotFoundError } from "../storage/db" +import { SessionTable, SessionTodoRevisionTable, TodoTable } from "./session.sql" export const TodoID = TodoIDSchema export type TodoID = TodoIDType @@ -23,19 +23,28 @@ export const Info = Input.extend({ }).meta({ ref: "Todo" }) export type Info = z.infer +export const Snapshot = z + .object({ + revision: z.number().int().nonnegative(), + todos: z.array(Info), + }) + .meta({ ref: "TodoSnapshot" }) +export type Snapshot = z.infer + export const Event = { Updated: BusEvent.define( "todo.updated", z.object({ sessionID: SessionID.zod, + revision: z.number().int().nonnegative(), todos: z.array(Info), }), ), } export interface Interface { - readonly update: (input: { sessionID: SessionID; todos: Input[] }) => Effect.Effect - readonly get: (sessionID: SessionID) => Effect.Effect + readonly update: (input: { sessionID: SessionID; todos: Input[] }) => Effect.Effect + readonly get: (sessionID: SessionID) => Effect.Effect } export class Service extends Context.Service()("@opencode/SessionTodo") {} @@ -87,44 +96,86 @@ export const layer = Layer.effect( Effect.gen(function* () { const bus = yield* Bus.Service + const readSnapshotFromDb = (db: Database.TxOrDb, sessionID: SessionID, options?: { activeOnly?: boolean }) => { + const session = db + .select({ id: SessionTable.id, archived: SessionTable.time_archived }) + .from(SessionTable) + .where(eq(SessionTable.id, sessionID)) + .get() + if (!session || (options?.activeOnly && session.archived !== null)) { + throw new NotFoundError({ message: `Session not found: ${sessionID}` }) + } + const revision = db + .select({ revision: SessionTodoRevisionTable.revision }) + .from(SessionTodoRevisionTable) + .where(eq(SessionTodoRevisionTable.session_id, sessionID)) + .get() + const rows = db + .select() + .from(TodoTable) + .where(eq(TodoTable.session_id, sessionID)) + .orderBy(asc(TodoTable.position)) + .all() + const todos = rows.map((row) => ({ + id: row.id, + content: row.content, + status: row.status, + priority: row.priority, + })) + if (revision) return { revision: revision.revision, todos } + return { revision: todos.length > 0 ? 1 : 0, todos } + } + + const readSnapshot = (sessionID: SessionID, options?: { activeOnly?: boolean }) => + Effect.sync(() => Database.transaction((db) => readSnapshotFromDb(db, sessionID, options))) + const update = Effect.fn("Todo.update")(function* (input: { sessionID: SessionID; todos: Input[] }) { - const previous = yield* get(input.sessionID) - const resolved = resolveTodoIDs(previous, input.todos) - - yield* Effect.sync(() => - Database.transaction((db) => { - db.delete(TodoTable).where(eq(TodoTable.session_id, input.sessionID)).run() - if (resolved.length === 0) return - db.insert(TodoTable) - .values( - resolved.map((todo, position) => ({ - id: todo.id, + const snapshot = yield* Effect.sync(() => + Database.transaction( + (db) => { + const previous = readSnapshotFromDb(db, input.sessionID, { activeOnly: true }) + const resolved = resolveTodoIDs(previous.todos, input.todos) + + db.delete(TodoTable).where(eq(TodoTable.session_id, input.sessionID)).run() + if (resolved.length > 0) { + db.insert(TodoTable) + .values( + resolved.map((todo, position) => ({ + id: todo.id, + session_id: input.sessionID, + content: todo.content, + status: todo.status, + priority: todo.priority, + position, + })), + ) + .run() + } + const row = db + .insert(SessionTodoRevisionTable) + .values({ session_id: input.sessionID, - content: todo.content, - status: todo.status, - priority: todo.priority, - position, - })), - ) - .run() - }), + revision: Math.max(previous.revision + 1, 1), + }) + .onConflictDoUpdate({ + target: SessionTodoRevisionTable.session_id, + set: { + revision: sql`${SessionTodoRevisionTable.revision} + 1`, + }, + }) + .returning({ revision: SessionTodoRevisionTable.revision }) + .get() + return { revision: row.revision, todos: resolved } + }, + { behavior: "immediate" }, + ), ) - yield* bus.publish(Event.Updated, { sessionID: input.sessionID, todos: resolved }) - return resolved + yield* bus.publish(Event.Updated, { sessionID: input.sessionID, ...snapshot }) + return snapshot }) const get = Effect.fn("Todo.get")(function* (sessionID: SessionID) { - const rows = yield* Effect.sync(() => - Database.use((db) => - db.select().from(TodoTable).where(eq(TodoTable.session_id, sessionID)).orderBy(asc(TodoTable.position)).all(), - ), - ) - return rows.map((row) => ({ - id: row.id, - content: row.content, - status: row.status, - priority: row.priority, - })) + return yield* readSnapshot(sessionID) }) return Service.of({ update, get }) diff --git a/packages/opencode/src/storage/json-migration.ts b/packages/opencode/src/storage/json-migration.ts index 4c14842d8..24ce25042 100644 --- a/packages/opencode/src/storage/json-migration.ts +++ b/packages/opencode/src/storage/json-migration.ts @@ -3,7 +3,14 @@ import type { NodeSQLiteDatabase } from "drizzle-orm/node-sqlite" import { Global } from "../global" import { Log } from "@opencode-ai/core/util/log" import { ProjectTable } from "../project/project.sql" -import { SessionTable, MessageTable, PartTable, TodoTable, PermissionTable } from "../session/session.sql" +import { + SessionTable, + MessageTable, + PartTable, + TodoTable, + SessionTodoRevisionTable, + PermissionTable, +} from "../session/session.sql" import { SessionShareTable } from "../share/share.sql" import path from "path" import { existsSync } from "fs" @@ -322,6 +329,7 @@ export namespace JsonMigration { const end = Math.min(i + batchSize, todoFiles.length) const batch = await read(todoFiles, i, end) const values = [] as any[] + const revisionValues = [] as any[] for (let j = 0; j < batch.length; j++) { const data = batch[j] if (!data) continue @@ -334,6 +342,7 @@ export namespace JsonMigration { errs.push(`todo not an array: ${todoFiles[i + j]}`) continue } + let validCount = 0 for (let position = 0; position < data.length; position++) { const todo = data[position] if (!todo?.content || !todo?.status || !todo?.priority) continue @@ -351,9 +360,12 @@ export namespace JsonMigration { time_created: now, time_updated: now, }) + validCount++ } + if (validCount > 0) revisionValues.push({ session_id: sessionID, revision: 1 }) } stats.todos += insert(values, TodoTable, "todo") + insert(revisionValues, SessionTodoRevisionTable, "todo revision") step("todos", end - i) } log.info("migrated todos", { count: stats.todos }) diff --git a/packages/opencode/src/storage/schema.ts b/packages/opencode/src/storage/schema.ts index 0c12cee62..b07a70a50 100644 --- a/packages/opencode/src/storage/schema.ts +++ b/packages/opencode/src/storage/schema.ts @@ -1,5 +1,12 @@ export { AccountTable, AccountStateTable, ControlAccountTable } from "../account/account.sql" export { ProjectTable } from "../project/project.sql" -export { SessionTable, MessageTable, PartTable, TodoTable, PermissionTable } from "../session/session.sql" +export { + SessionTable, + MessageTable, + PartTable, + TodoTable, + SessionTodoRevisionTable, + PermissionTable, +} from "../session/session.sql" export { SessionShareTable } from "../share/share.sql" export { WorkspaceTable } from "../control-plane/workspace.sql" diff --git a/packages/opencode/src/tool/todo.ts b/packages/opencode/src/tool/todo.ts index d3646d497..4d6535cfc 100644 --- a/packages/opencode/src/tool/todo.ts +++ b/packages/opencode/src/tool/todo.ts @@ -23,6 +23,7 @@ export const Parameters = Schema.Struct({ }) type Metadata = { + revision: number todos: Todo.Info[] } @@ -43,7 +44,7 @@ export const TodoWriteTool = Tool.define ({ ...todo, @@ -52,11 +53,9 @@ export const TodoWriteTool = Tool.define x.status !== "completed").length} todos`, - output: JSON.stringify(todos, null, 2), - metadata: { - todos, - }, + title: `${snapshot.todos.filter((x) => x.status !== "completed").length} todos`, + output: JSON.stringify(snapshot.todos, null, 2), + metadata: snapshot, } }), } satisfies Tool.DefWithoutID diff --git a/packages/opencode/test/session/todo.test.ts b/packages/opencode/test/session/todo.test.ts index 606f7aef8..e5f06515e 100644 --- a/packages/opencode/test/session/todo.test.ts +++ b/packages/opencode/test/session/todo.test.ts @@ -1,8 +1,10 @@ -import { describe, expect, test } from "bun:test" +import { describe, expect, spyOn, test } from "bun:test" import { Effect } from "effect" import { Instance } from "../../src/project/instance" import { Session } from "../../src/session" import { Todo } from "../../src/session/todo" +import { SessionID } from "../../src/session/schema" +import { Database } from "../../src/storage/db" import { tmpdir } from "../fixture/fixture" const todo = (content: string, id?: Todo.TodoID): Todo.Input => ({ @@ -54,7 +56,7 @@ describe("resolveTodoIDs", () => { }) describe("Todo service", () => { - test("update returns ids and get persists them", async () => { + test("update returns a revisioned snapshot and get persists it", async () => { await using tmp = await tmpdir({ git: true }) await Instance.provide({ @@ -73,7 +75,8 @@ describe("Todo service", () => { Todo.Service.use((svc) => svc.get(session.id)).pipe(Effect.provide(Todo.defaultLayer)), ) - expect(first[0].id).toStartWith("todo_") + expect(first.revision).toBe(1) + expect(first.todos[0].id).toStartWith("todo_") expect(stored).toEqual(first) await Session.remove(session.id) @@ -100,7 +103,7 @@ describe("Todo service", () => { Todo.Service.use((svc) => svc.update({ sessionID: session.id, - todos: [{ id: first[0].id, content: "A", status: "completed", priority: "medium" }], + todos: [{ id: first.todos[0].id, content: "A", status: "completed", priority: "medium" }], }), ).pipe(Effect.provide(Todo.defaultLayer)), ) @@ -108,11 +111,144 @@ describe("Todo service", () => { Todo.Service.use((svc) => svc.get(session.id)).pipe(Effect.provide(Todo.defaultLayer)), ) - expect(second[0].id).toBe(first[0].id) - expect(stored).toEqual([{ ...second[0], status: "completed" }]) + expect(second.revision).toBe(2) + expect(second.todos[0].id).toBe(first.todos[0].id) + expect(stored).toEqual({ revision: 2, todos: [{ ...second.todos[0], status: "completed" }] }) await Session.remove(session.id) }, }) }) + + test("empty clear bumps revision and stays authoritative", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const session = await Session.create({ title: "todo clear" }) + await Effect.runPromise( + Todo.Service.use((svc) => + svc.update({ + sessionID: session.id, + todos: [{ content: "A", status: "pending", priority: "medium" }], + }), + ).pipe(Effect.provide(Todo.defaultLayer)), + ) + const cleared = await Effect.runPromise( + Todo.Service.use((svc) => svc.update({ sessionID: session.id, todos: [] })).pipe( + Effect.provide(Todo.defaultLayer), + ), + ) + const stored = await Effect.runPromise( + Todo.Service.use((svc) => svc.get(session.id)).pipe(Effect.provide(Todo.defaultLayer)), + ) + + expect(cleared).toEqual({ revision: 2, todos: [] }) + expect(stored).toEqual(cleared) + + await Session.remove(session.id) + }, + }) + }) + + test("get returns rev0 empty only for known sessions that never had todos", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const session = await Session.create({ title: "todo empty" }) + const stored = await Effect.runPromise( + Todo.Service.use((svc) => svc.get(session.id)).pipe(Effect.provide(Todo.defaultLayer)), + ) + + expect(stored).toEqual({ revision: 0, todos: [] }) + + await Session.remove(session.id) + }, + }) + }) + + test("get reads archived sessions while update rejects them", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const session = await Session.create({ title: "todo archived" }) + const snapshot = await Effect.runPromise( + Todo.Service.use((svc) => + svc.update({ + sessionID: session.id, + todos: [{ content: "A", status: "pending", priority: "medium" }], + }), + ).pipe(Effect.provide(Todo.defaultLayer)), + ) + await Session.setArchived({ sessionID: session.id, time: Date.now() }) + + await expect( + Effect.runPromise(Todo.Service.use((svc) => svc.get(session.id)).pipe(Effect.provide(Todo.defaultLayer))), + ).resolves.toEqual(snapshot) + await expect( + Effect.runPromise( + Todo.Service.use((svc) => svc.update({ sessionID: session.id, todos: [] })).pipe( + Effect.provide(Todo.defaultLayer), + ), + ), + ).rejects.toThrow("NotFoundError") + }, + }) + }) + + test("update checks the active session and writes todos in one transaction", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + const session = await Session.create({ title: "todo transaction boundary" }) + const originalTransaction = Database.transaction + let transactionCount = 0 + const transaction = spyOn(Database, "transaction").mockImplementation( + ((callback, options) => { + transactionCount += 1 + return originalTransaction(callback, options) + }) as typeof Database.transaction, + ) + + try { + await Effect.runPromise( + Todo.Service.use((svc) => + svc.update({ + sessionID: session.id, + todos: [{ content: "A", status: "pending", priority: "medium" }], + }), + ).pipe(Effect.provide(Todo.defaultLayer)), + ) + } finally { + transaction.mockRestore() + } + + expect(transactionCount).toBe(1) + + await Session.remove(session.id) + }, + }) + }) + + test("get rejects unknown sessions", async () => { + await using tmp = await tmpdir({ git: true }) + + await Instance.provide({ + directory: tmp.path, + fn: async () => { + await expect( + Effect.runPromise( + Todo.Service.use((svc) => svc.get(SessionID.make("ses_missing"))).pipe(Effect.provide(Todo.defaultLayer)), + ), + ).rejects.toThrow("NotFoundError") + }, + }) + }) }) diff --git a/packages/opencode/test/storage/json-migration.test.ts b/packages/opencode/test/storage/json-migration.test.ts index 78419049d..0af43937c 100644 --- a/packages/opencode/test/storage/json-migration.test.ts +++ b/packages/opencode/test/storage/json-migration.test.ts @@ -9,7 +9,14 @@ import { JsonMigration } from "../../src/storage/json-migration" import { Global } from "../../src/global" import { ProjectTable } from "../../src/project/project.sql" import { ProjectID } from "../../src/project/schema" -import { SessionTable, MessageTable, PartTable, TodoTable, PermissionTable } from "../../src/session/session.sql" +import { + SessionTable, + MessageTable, + PartTable, + TodoTable, + SessionTodoRevisionTable, + PermissionTable, +} from "../../src/session/session.sql" import { SessionShareTable } from "../../src/share/share.sql" import { SessionID, MessageID, PartID, TodoID } from "../../src/session/schema" @@ -510,6 +517,7 @@ describe("JSON to SQLite migration", () => { expect(stats?.todos).toBe(2) const todos = db.select().from(TodoTable).orderBy(TodoTable.position).all() + const revisions = db.select().from(SessionTodoRevisionTable).all() expect(todos.length).toBe(2) expect(todos[0].id).toBe(TodoID.ascending("todo_1")) expect(todos[0].content).toBe("First todo") @@ -519,6 +527,24 @@ describe("JSON to SQLite migration", () => { expect(todos[1].id).toBe(TodoID.ascending("todo_2")) expect(todos[1].content).toBe("Second todo") expect(todos[1].position).toBe(1) + expect(revisions).toEqual([{ session_id: SessionID.make("ses_test456def"), revision: 1 }]) + }) + + test("does not create a todo revision for empty migrated todo files", async () => { + await writeProject(storageDir, { + id: "proj_test123abc", + worktree: "/", + time: { created: Date.now(), updated: Date.now() }, + sandboxes: [], + }) + await writeSession(storageDir, "proj_test123abc", { ...fixtures.session }) + await Bun.write(path.join(storageDir, "todo", "ses_test456def.json"), JSON.stringify([])) + + const stats = await JsonMigration.run(db) + + expect(stats?.todos).toBe(0) + expect(db.select().from(TodoTable).all()).toEqual([]) + expect(db.select().from(SessionTodoRevisionTable).all()).toEqual([]) }) test("replaces duplicate legacy todo ids during migration", async () => { diff --git a/packages/sdk/js/src/gen/types.gen.ts b/packages/sdk/js/src/gen/types.gen.ts index 220ad0fe3..6a15f1af8 100644 --- a/packages/sdk/js/src/gen/types.gen.ts +++ b/packages/sdk/js/src/gen/types.gen.ts @@ -512,10 +512,16 @@ export type Todo = { id: string } +export type TodoSnapshot = { + revision: number + todos: Array +} + export type EventTodoUpdated = { type: "todo.updated" properties: { sessionID: string + revision: number todos: Array } } @@ -2039,7 +2045,7 @@ export type SessionTodoResponses = { /** * Todo list */ - 200: Array + 200: TodoSnapshot } export type SessionTodoResponse = SessionTodoResponses[keyof SessionTodoResponses] diff --git a/packages/sdk/js/src/v2/gen/types.gen.ts b/packages/sdk/js/src/v2/gen/types.gen.ts index 864e0c683..1ae3a2089 100644 --- a/packages/sdk/js/src/v2/gen/types.gen.ts +++ b/packages/sdk/js/src/v2/gen/types.gen.ts @@ -324,6 +324,7 @@ export type EventTodoUpdated = { type: "todo.updated" properties: { sessionID: string + revision: number todos: Array } } @@ -1997,6 +1998,11 @@ export type McpResource = { client: string } +export type TodoSnapshot = { + revision: number + todos: Array +} + export type SessionArtifact = { file: string kind: "added" | "modified" @@ -3632,7 +3638,7 @@ export type SessionTodoResponses = { /** * Todo list */ - 200: Array + 200: TodoSnapshot } export type SessionTodoResponse = SessionTodoResponses[keyof SessionTodoResponses] diff --git a/packages/sdk/openapi.json b/packages/sdk/openapi.json index 5747e6769..99c9350ef 100644 --- a/packages/sdk/openapi.json +++ b/packages/sdk/openapi.json @@ -2766,10 +2766,7 @@ "content": { "application/json": { "schema": { - "type": "array", - "items": { - "$ref": "#/components/schemas/Todo" - } + "$ref": "#/components/schemas/TodoSnapshot" } } } @@ -7933,6 +7930,24 @@ "priority" ] }, + "TodoSnapshot": { + "type": "object", + "properties": { + "revision": { + "type": "number" + }, + "todos": { + "type": "array", + "items": { + "$ref": "#/components/schemas/Todo" + } + } + }, + "required": [ + "revision", + "todos" + ] + }, "Event.todo.updated": { "type": "object", "properties": { @@ -7947,6 +7962,9 @@ "type": "string", "pattern": "^ses.*" }, + "revision": { + "type": "number" + }, "todos": { "type": "array", "items": { @@ -7956,6 +7974,7 @@ }, "required": [ "sessionID", + "revision", "todos" ] }