-
Notifications
You must be signed in to change notification settings - Fork 575
feat(container-loader): add captureFullContainerState free function #27220
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
markfields
wants to merge
21
commits into
microsoft:main
Choose a base branch
from
markfields:full-container-state
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 10 commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
f71f674
feat(container-loader): add captureContainerPendingState free function
anthony-murphy d7a03b1
feat(container-loader): inline attachment blob contents in captureCon…
anthony-murphy 49df699
feat(container-loader): make captureContainerPendingState reference-a…
anthony-murphy 220a846
test(container-loader): unit-test captureReferencedContents helpers
anthony-murphy ebe5f7f
refactor(container-loader): rename captureContainerPendingState to ca…
anthony-murphy 73d1df0
style(local-server-tests): biome auto-format captureFullContainerStat…
anthony-murphy 8a748cc
fix(container-loader): address PR review feedback on captureFullConta…
anthony-murphy 8d89e8e
fix(container-loader): bound concurrency in captureGroupIdSnapshots too
anthony-murphy 2e5c3f7
fix(container-loader): preserve `this` in captureGroupIdSnapshots, sk…
anthony-murphy 57a1a76
Merge remote-tracking branch 'origin/main' into full-container-state
markfields 807f73f
build fixes
markfields aa4ce9c
Revert support for loading groups
markfields 2e7f66c
TMP - working notes
markfields 81d04de
Review feedback items
markfields a6bb56f
Revert mc addition
markfields e4065af
changeset
markfields 1d5a51b
Merge branch 'main' into full-container-state
markfields 64c6cc4
lint
markfields 7739827
nit: mitigate theoretical V8 scale limitation
markfields cdda9ba
Fix utf-8 encoding bug and some nits
markfields 1a4bcfa
formatting
markfields File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
365 changes: 365 additions & 0 deletions
365
packages/loader/container-loader/src/captureReferencedContents.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,365 @@ | ||
| /*! | ||
| * Copyright (c) Microsoft Corporation and contributors. All rights reserved. | ||
| * Licensed under the MIT License. | ||
| */ | ||
|
|
||
| import { bufferToString } from "@fluid-internal/client-utils"; | ||
| import type { | ||
| IDocumentStorageService, | ||
| ISnapshot, | ||
| ISnapshotTree, | ||
| } from "@fluidframework/driver-definitions/internal"; | ||
| import { readAndParse } from "@fluidframework/driver-utils/internal"; | ||
|
|
||
| import type { ISerializableBlobContents } from "./containerStorageAdapter.js"; | ||
| import type { SerializedSnapshotInfo } from "./serializedStateManager.js"; | ||
| import { getDocumentAttributes } from "./utils.js"; | ||
|
|
||
| // The following names are defined authoritatively in container-runtime and | ||
| // runtime-definitions. They are duplicated here to avoid a loader → runtime | ||
| // layering dependency. Keep in sync with: | ||
| // packages/runtime/container-runtime/src/blobManager/blobManagerSnapSum.ts | ||
| // packages/runtime/container-runtime/src/blobManager/blobManager.ts | ||
| // packages/runtime/runtime-definitions/src/garbageCollectionDefinitions.ts | ||
| const blobsTreeName = ".blobs"; | ||
| const redirectTableBlobName = ".redirectTable"; | ||
| const blobManagerBasePath = "_blobs"; | ||
| const gcTreeKey = "gc"; | ||
| const gcBlobPrefix = "__gc"; | ||
| const gcTombstoneBlobKey = "__tombstones"; | ||
| const gcDeletedBlobKey = "__deletedNodes"; | ||
|
|
||
| interface IGcNodeData { | ||
| outboundRoutes: string[]; | ||
| unreferencedTimestampMs?: number; | ||
| } | ||
|
|
||
| interface IGcState { | ||
| gcNodes: { [id: string]: IGcNodeData }; | ||
| } | ||
|
|
||
| /** | ||
| * The parsed subset of the `gc` subtree that drives reachability decisions. | ||
| */ | ||
| export interface IGcSnapshotData { | ||
| gcState: IGcState | undefined; | ||
| tombstones: string[] | undefined; | ||
| deletedNodes: string[] | undefined; | ||
| } | ||
|
|
||
| /** Reader that returns a blob's contents for a given storage id. */ | ||
| type BlobReader = (id: string) => Promise<ArrayBufferLike>; | ||
|
|
||
| /** | ||
| * Upper bound on concurrent `readBlob` calls. Driver/service back-pressure is | ||
| * real for large documents, and unbounded `Promise.all` can trigger throttling | ||
| * or spike memory. The value is a pragmatic middle ground — high enough to | ||
| * keep a typical driver's request pipeline full, low enough to avoid storms. | ||
| */ | ||
| const maxReadConcurrency = 32; | ||
|
|
||
| /** | ||
| * Upper bound on concurrent whole-snapshot fetches (for loading groups). | ||
| * Each `getSnapshot` call pulls a full tree's worth of metadata, so the limit | ||
| * is deliberately lower than the per-blob limit. | ||
| */ | ||
| const maxSnapshotFetchConcurrency = 4; | ||
|
|
||
| /** | ||
| * Runs `fn` over `items` with at most `limit` promises in flight. Preserves | ||
| * input order on output (not that any caller depends on it today). | ||
| */ | ||
| async function mapWithConcurrency<T, R>( | ||
| items: readonly T[], | ||
| limit: number, | ||
| fn: (item: T) => Promise<R>, | ||
| ): Promise<R[]> { | ||
| const results: R[] = Array.from({ length: items.length }); | ||
| let cursor = 0; | ||
| const workerCount = Math.min(limit, items.length); | ||
| const workers = Array.from({ length: workerCount }, async () => { | ||
| while (cursor < items.length) { | ||
| const index = cursor++; | ||
| const item = items[index]; | ||
| if (item !== undefined) { | ||
| results[index] = await fn(item); | ||
| } | ||
| } | ||
| }); | ||
| await Promise.all(workers); | ||
| return results; | ||
| } | ||
|
|
||
| /** | ||
| * Parses the `gc` subtree of a base snapshot. Returns `undefined` if the | ||
| * snapshot has no GC tree (GC disabled or pre-GC document). | ||
| */ | ||
| export async function parseGcSnapshotData( | ||
| baseSnapshot: ISnapshotTree, | ||
| storage: Pick<IDocumentStorageService, "readBlob">, | ||
| ): Promise<IGcSnapshotData | undefined> { | ||
| const gcSnapshotTree = baseSnapshot.trees[gcTreeKey]; | ||
| if (gcSnapshotTree === undefined) { | ||
| return undefined; | ||
| } | ||
| let gcState: IGcState | undefined; | ||
| let tombstones: string[] | undefined; | ||
| let deletedNodes: string[] | undefined; | ||
| for (const [key, blobId] of Object.entries(gcSnapshotTree.blobs)) { | ||
| if (key === gcDeletedBlobKey) { | ||
| deletedNodes = await readAndParse<string[]>(storage, blobId); | ||
| } else if (key === gcTombstoneBlobKey) { | ||
| tombstones = await readAndParse<string[]>(storage, blobId); | ||
| } else if (key.startsWith(gcBlobPrefix)) { | ||
| const partial = await readAndParse<IGcState>(storage, blobId); | ||
| if (gcState === undefined) { | ||
| gcState = { gcNodes: { ...partial.gcNodes } }; | ||
| } else { | ||
| for (const [nodeId, nodeData] of Object.entries(partial.gcNodes)) { | ||
| gcState.gcNodes[nodeId] ??= nodeData; | ||
| } | ||
| } | ||
| } | ||
| } | ||
| return { gcState, tombstones, deletedNodes }; | ||
| } | ||
|
|
||
| /** | ||
| * Walks a snapshot and inlines the contents of every blob reachable without | ||
| * crossing an `unreferenced` subtree boundary. Subtrees flagged | ||
| * `unreferenced: true` are skipped entirely — the summarizer sets that flag | ||
| * from GC state, so honouring it filters out dead subtrees without a | ||
| * separate GC-path traversal. | ||
| * | ||
| * The root-level `.blobs` subtree is special-cased: only its `.redirectTable` | ||
| * blob is read, because attachment blob contents are captured separately via | ||
| * {@link captureReferencedAttachmentBlobs}. | ||
| */ | ||
| export async function readReferencedSnapshotBlobs( | ||
| snapshot: ISnapshot | ISnapshotTree, | ||
| storage: Pick<IDocumentStorageService, "readBlob">, | ||
| ): Promise<ISerializableBlobContents> { | ||
| const { tree, read } = toTreeAndReader(snapshot, storage); | ||
| const ids = new Set<string>(); | ||
| collectReferencedBlobIds(tree, true, ids); | ||
| const blobs: ISerializableBlobContents = {}; | ||
| await mapWithConcurrency([...ids], maxReadConcurrency, async (id) => { | ||
| const data = await read(id); | ||
| blobs[id] = bufferToString(data, "utf8"); | ||
| }); | ||
| return blobs; | ||
| } | ||
|
|
||
| /** | ||
| * Synchronously walks the snapshot tree and gathers the set of blob ids that | ||
| * should be inlined. Subtrees flagged `unreferenced: true` are skipped | ||
| * entirely. The root-level `.blobs` subtree is special-cased: only its | ||
| * `.redirectTable` id is collected, because attachment blob contents are | ||
| * captured separately via {@link captureReferencedAttachmentBlobs}. | ||
| */ | ||
| function collectReferencedBlobIds( | ||
| tree: ISnapshotTree, | ||
| isRoot: boolean, | ||
| ids: Set<string>, | ||
| ): void { | ||
| if (tree.unreferenced === true) { | ||
| return; | ||
| } | ||
| for (const blobId of Object.values(tree.blobs)) { | ||
| ids.add(blobId); | ||
| } | ||
| for (const [key, subTree] of Object.entries(tree.trees)) { | ||
| if (isRoot && key === blobsTreeName) { | ||
| const tableBlobId = subTree.blobs[redirectTableBlobName]; | ||
| if (tableBlobId !== undefined) { | ||
| ids.add(tableBlobId); | ||
| } | ||
| } else { | ||
| collectReferencedBlobIds(subTree, false, ids); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| function toTreeAndReader( | ||
| snapshot: ISnapshot | ISnapshotTree, | ||
| storage: Pick<IDocumentStorageService, "readBlob">, | ||
| ): { tree: ISnapshotTree; read: BlobReader } { | ||
| if ("snapshotTree" in snapshot) { | ||
| const blobContents = snapshot.blobContents; | ||
| return { | ||
| tree: snapshot.snapshotTree, | ||
| read: async (id) => blobContents.get(id) ?? storage.readBlob(id), | ||
| }; | ||
| } | ||
| return { tree: snapshot, read: async (id) => storage.readBlob(id) }; | ||
| } | ||
|
|
||
| /** | ||
| * Fetches attachment blob contents from a snapshot, filtered by GC | ||
| * reachability. Blobs GC has explicitly marked unreferenced, tombstoned, or | ||
| * deleted are skipped. Blobs absent from the GC graph are kept — GC state | ||
| * lags behind recent attachments and dropping them would lose live data. | ||
| * If `gcData` is `undefined`, every attachment blob is returned. | ||
| * | ||
| * The returned map is keyed by attachment blob storage id and can be merged | ||
| * directly into a pending-state {@link ISerializableBlobContents} map. | ||
| */ | ||
| export async function captureReferencedAttachmentBlobs( | ||
| baseSnapshot: ISnapshotTree, | ||
| storage: Pick<IDocumentStorageService, "readBlob">, | ||
| gcData: IGcSnapshotData | undefined, | ||
| ): Promise<ISerializableBlobContents> { | ||
| const blobsTree = baseSnapshot.trees[blobsTreeName]; | ||
| if (blobsTree === undefined) { | ||
| return {}; | ||
| } | ||
| const localIdToStorageId = await readRedirectTable(blobsTree, storage); | ||
| if (localIdToStorageId.size === 0) { | ||
| return {}; | ||
| } | ||
|
|
||
| const unreferencedLocalIds = | ||
| gcData === undefined ? undefined : collectUnreferencedBlobLocalIds(gcData); | ||
|
|
||
| const storageIdsToFetch = new Set<string>(); | ||
| for (const [localId, storageId] of localIdToStorageId) { | ||
| if (unreferencedLocalIds?.has(localId) !== true) { | ||
| storageIdsToFetch.add(storageId); | ||
| } | ||
| } | ||
|
|
||
| const contents: ISerializableBlobContents = {}; | ||
| await mapWithConcurrency([...storageIdsToFetch], maxReadConcurrency, async (storageId) => { | ||
| const buffer = await storage.readBlob(storageId); | ||
| contents[storageId] = bufferToString(buffer, "utf8"); | ||
| }); | ||
| return contents; | ||
| } | ||
|
|
||
| /** | ||
| * Reconstructs the BlobManager's redirect table from a `.blobs` subtree. | ||
| * Mirrors `toRedirectTable` in blobManagerSnapSum.ts. | ||
| */ | ||
| async function readRedirectTable( | ||
| blobsTree: ISnapshotTree, | ||
| storage: Pick<IDocumentStorageService, "readBlob">, | ||
| ): Promise<Map<string, string>> { | ||
| const redirectTable = new Map<string, string>(); | ||
| const tableBlobId = blobsTree.blobs[redirectTableBlobName]; | ||
| if (tableBlobId !== undefined) { | ||
| const entries = await readAndParse<[string, string][]>(storage, tableBlobId); | ||
| for (const [localId, storageId] of entries) { | ||
| redirectTable.set(localId, storageId); | ||
| } | ||
| } | ||
| for (const [key, storageId] of Object.entries(blobsTree.blobs)) { | ||
| if (key !== redirectTableBlobName) { | ||
| // Identity mapping: storage ids referenced directly in handles (legacy). | ||
| redirectTable.set(storageId, storageId); | ||
|
markfields marked this conversation as resolved.
|
||
| } | ||
| } | ||
| return redirectTable; | ||
| } | ||
|
|
||
| /** | ||
| * Collects the set of blob localIds that GC has explicitly marked as | ||
| * unreferenced (via `unreferencedTimestampMs` on a gc node), tombstoned, or | ||
| * deleted. Tombstones and deletedNodes are applied regardless of whether | ||
| * `gcState` is present — they are authoritative on their own and must not | ||
| * be silently dropped when gc state is absent but tombstone/deleted lists | ||
| * exist. | ||
| */ | ||
| function collectUnreferencedBlobLocalIds(gcData: IGcSnapshotData): Set<string> { | ||
| const blobPathPrefix = `/${blobManagerBasePath}/`; | ||
| const unreferenced = new Set<string>(); | ||
| if (gcData.gcState !== undefined) { | ||
| for (const [nodePath, nodeData] of Object.entries(gcData.gcState.gcNodes)) { | ||
| if ( | ||
| nodePath.startsWith(blobPathPrefix) && | ||
| nodeData.unreferencedTimestampMs !== undefined | ||
| ) { | ||
| unreferenced.add(nodePath.slice(blobPathPrefix.length)); | ||
| } | ||
| } | ||
| } | ||
| for (const nodePath of [...(gcData.tombstones ?? []), ...(gcData.deletedNodes ?? [])]) { | ||
| if (nodePath.startsWith(blobPathPrefix)) { | ||
| unreferenced.add(nodePath.slice(blobPathPrefix.length)); | ||
| } | ||
| } | ||
| return unreferenced; | ||
| } | ||
|
|
||
| /** | ||
| * Enumerates the set of loading-group ids declared by datastores in the | ||
| * snapshot, skipping any subtree flagged `unreferenced`. | ||
| */ | ||
| function collectGroupIds(tree: ISnapshotTree): Set<string> { | ||
| const ids = new Set<string>(); | ||
| const visit = (node: ISnapshotTree): void => { | ||
| if (node.unreferenced === true) { | ||
| return; | ||
| } | ||
| if (node.groupId !== undefined) { | ||
| ids.add(node.groupId); | ||
| } | ||
| for (const child of Object.values(node.trees)) { | ||
| visit(child); | ||
| } | ||
| }; | ||
| visit(tree); | ||
| return ids; | ||
| } | ||
|
|
||
| /** | ||
| * Fetches each loading-group snapshot declared by the base snapshot and | ||
| * serializes it into {@link SerializedSnapshotInfo} form. Each group | ||
| * snapshot is walked with the same `unreferenced` filter as the main | ||
| * snapshot. | ||
| * | ||
| * Returns an empty record if the driver lacks `getSnapshot` support or the | ||
| * snapshot has no group ids. Callers can place the result directly into | ||
| * `IPendingContainerState.loadedGroupIdSnapshots`. | ||
| */ | ||
| export async function captureGroupIdSnapshots( | ||
| baseSnapshot: ISnapshotTree, | ||
| storage: Pick<IDocumentStorageService, "readBlob"> & { | ||
| getSnapshot?: IDocumentStorageService["getSnapshot"]; | ||
| }, | ||
| versionId: string | undefined, | ||
| scenarioName: string, | ||
| ): Promise<Record<string, SerializedSnapshotInfo>> { | ||
| // Bind to preserve `this` when the method is extracted — real driver | ||
| // implementations (e.g. LocalDocumentStorageService.getSnapshot) | ||
| // reference `this`, and detaching the method would TypeError in strict | ||
| // mode. Matches the pattern used by protocolTreeDocumentStorageService. | ||
| const getSnapshot = storage.getSnapshot?.bind(storage); | ||
| if (getSnapshot === undefined) { | ||
| return {}; | ||
| } | ||
| const groupIds = collectGroupIds(baseSnapshot); | ||
| if (groupIds.size === 0) { | ||
| return {}; | ||
| } | ||
| const result: Record<string, SerializedSnapshotInfo> = {}; | ||
| await mapWithConcurrency([...groupIds], maxSnapshotFetchConcurrency, async (groupId) => { | ||
| const groupSnapshot = await getSnapshot({ | ||
| cacheSnapshot: false, | ||
| versionId, | ||
| loadingGroupIds: [groupId], | ||
| scenarioName: `${scenarioName}.group`, | ||
| }); | ||
| const snapshotBlobs = await readReferencedSnapshotBlobs(groupSnapshot, storage); | ||
| let sequenceNumber = groupSnapshot.sequenceNumber; | ||
| if (sequenceNumber === undefined) { | ||
| const groupAttributes = await getDocumentAttributes(storage, groupSnapshot.snapshotTree); | ||
| sequenceNumber = groupAttributes.sequenceNumber; | ||
| } | ||
| result[groupId] = { | ||
| baseSnapshot: groupSnapshot.snapshotTree, | ||
| snapshotBlobs, | ||
| snapshotSequenceNumber: sequenceNumber, | ||
| }; | ||
| }); | ||
| return result; | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.