Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1890,20 +1890,34 @@ export class ContainerRuntime
this.mc.config.getNumber("Fluid.ContainerRuntime.StagingModeAutoFlushThreshold") ??
runtimeOptions.stagingModeAutoFlushThreshold ??
defaultStagingModeAutoFlushThreshold;
this.batchIdTrackingEnabled =
this.mc.config.getBoolean("Fluid.Container.enableOfflineFull") ??
this.mc.config.getBoolean("Fluid.ContainerRuntime.enableBatchIdTracking") ??
false;

if (this.batchIdTrackingEnabled && this._flushMode !== FlushMode.TurnBased) {
// BatchId tracking powers DuplicateBatchDetector (catching forked-container duplicates)
// and is also a prerequisite for the Offline Load feature. It is enabled by default in
// TurnBased mode; the kill-switch below allows disabling it without a code change if a
// regression is observed. Offline Load still requires TurnBased, so consumers that
// explicitly opt into it with FlushMode.Immediate continue to get a UsageError.
const offlineLoadRequested =
this.mc.config.getBoolean("Fluid.Container.enableOfflineFull") === true;
const disableBatchIdTracking =
this.mc.config.getBoolean("Fluid.ContainerRuntime.DisableBatchIdTracking") === true;

if (offlineLoadRequested && this._flushMode !== FlushMode.TurnBased) {
Comment thread
markfields marked this conversation as resolved.
const error = new UsageError("Offline mode is only supported in turn-based mode");
this.closeFn(error);
throw error;
}

Copy link

Copilot AI May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Fluid.Container.enableOfflineFull is true while the kill-switch Fluid.ContainerRuntime.DisableBatchIdTracking is also true, the runtime will allow Offline Load in TurnBased mode but will disable batchIdTrackingEnabled (and thus disable batchId stamping + DuplicateBatchDetector). That seems to violate the comment that tracking is a prerequisite for Offline Load and could re-enable unsafe fork scenarios silently. Consider either (a) ignoring the kill-switch when Offline Load is explicitly requested, or (b) throwing a UsageError when both flags are set so the configuration can't be enabled in an unsupported state.

Suggested change
if (offlineLoadRequested && disableBatchIdTracking) {
const error = new UsageError(
"Offline mode requires batchId tracking and cannot be used when Fluid.ContainerRuntime.DisableBatchIdTracking is enabled.",
);
this.closeFn(error);
throw error;
}

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

offline load flag will also by removed in a follow up

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deep Review: Following up on this thread now that batchId tracking is default-on. The asymmetry the original comment flagged is still live in the current diff: the offlineLoadRequested UsageError validates only flush mode, not the kill-switch, and the constructor comment immediately above still describes batchId tracking as the prerequisite for Offline Load. Pre-PR, enableOfflineFull=true unconditionally enabled tracking; post-PR, setting enableOfflineFull=true together with Fluid.ContainerRuntime.DisableBatchIdTracking=true in TurnBased silently strips the prerequisite — Offline Load proceeds with fork detection disabled, no error, no telemetry. The PR description says Offline Load is "back-compat preserved" but only the FlushMode portion currently is.

@dannimad you mentioned enableOfflineFull will be removed in a follow-up — that resolves this cleanly if it's imminent. A couple of cheap options either way:

  • (a) Link the follow-up issue here and add a one-line comment in the constructor noting the asymmetry is intentionally transient, or
  • (b) Add the symmetric guard now — extend the existing UsageError to fire when offlineLoadRequested && disableBatchIdTracking, or emit a BatchIdTrackingDisabledForOfflineLoad warning telemetry so the misconfig is at least observable while the follow-up lands.

Either closes the loop on this thread.

Comment thread
dannimad marked this conversation as resolved.
// DuplicateBatchDetection is only enabled if Offline Load is enabled
// It maintains a cache of all batchIds/sequenceNumbers within the collab window.
// Don't waste resources doing so if not needed.
this.batchIdTrackingEnabled =
!disableBatchIdTracking && this._flushMode === FlushMode.TurnBased;

this.mc.logger.sendTelemetryEvent({
Comment thread
dannimad marked this conversation as resolved.
Outdated
eventName: "BatchIdTrackingEnablement",
enabled: this.batchIdTrackingEnabled,
disableBatchIdTracking,
flushMode: FlushMode[this._flushMode],
});

// DuplicateBatchDetector maintains a cache of all batchIds/sequenceNumbers within the
// collab window. Skip allocating it when batchId tracking is off.
if (this.batchIdTrackingEnabled) {
this.duplicateBatchDetector = new DuplicateBatchDetector(recentBatchInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ export class DuplicateBatchDetector {
*/
private readonly batchIdsBySeqNum = new Map<number, string>();

/**
* Number of inbound batches processed since the last summary. Reset by getRecentBatchInfoForSummary.
*/
private processedBatchCount = 0;

/**
* Largest tracked-batch count observed since the last summary. Reset by getRecentBatchInfoForSummary.
*/
private peakTrackedBatchCount = 0;

/**
* Initialize from snapshot data if provided - otherwise initialize empty
*/
Expand All @@ -37,6 +47,7 @@ export class DuplicateBatchDetector {
this.batchIdsBySeqNum.set(seqNum, batchId);
this.seqNumByBatchId.set(batchId, seqNum);
}
this.peakTrackedBatchCount = this.batchIdsBySeqNum.size;
}
}

Expand All @@ -50,6 +61,7 @@ export class DuplicateBatchDetector {
batchStart: BatchStartInfo,
): { duplicate: true; otherSequenceNumber: number } | { duplicate: false } {
const { sequenceNumber, minimumSequenceNumber } = batchStart.keyMessage;
this.processedBatchCount++;

// Glance at this batch's MSN. Any batchIds we're tracking with a lower sequence number are now safe to forget.
// Why? Because any other client holding the same batch locally would have seen the earlier batch and closed before submitting its duplicate.
Expand Down Expand Up @@ -80,6 +92,9 @@ export class DuplicateBatchDetector {
// Add new batch
this.batchIdsBySeqNum.set(sequenceNumber, batchId);
this.seqNumByBatchId.set(batchId, sequenceNumber);
if (this.batchIdsBySeqNum.size > this.peakTrackedBatchCount) {
this.peakTrackedBatchCount = this.batchIdsBySeqNum.size;
}

return { duplicate: false };
}
Expand Down Expand Up @@ -108,16 +123,22 @@ export class DuplicateBatchDetector {
public getRecentBatchInfoForSummary(
telemetryContext?: ITelemetryContext,
): [number, string][] | undefined {
if (telemetryContext !== undefined) {
const prefix = "fluid_DuplicateBatchDetector_";
telemetryContext.set(prefix, "recentBatchCount", this.batchIdsBySeqNum.size);
telemetryContext.set(prefix, "peakRecentBatchCount", this.peakTrackedBatchCount);
telemetryContext.set(prefix, "processedBatchCount", this.processedBatchCount);
}

// Reset per-window perf counters so each summary covers only the activity since the
// previous one. Peak resets to the current size (the floor for the next window).
this.processedBatchCount = 0;
this.peakTrackedBatchCount = this.batchIdsBySeqNum.size;

if (this.batchIdsBySeqNum.size === 0) {
return undefined;
}

telemetryContext?.set(
"fluid_DuplicateBatchDetector_",
"recentBatchCount",
this.batchIdsBySeqNum.size,
);

return [...this.batchIdsBySeqNum.entries()];
}
}
132 changes: 104 additions & 28 deletions packages/runtime/container-runtime/src/test/containerRuntime.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -437,11 +437,7 @@ describe("Runtime", () => {
let batchEnd = 0;
let callsToEnsure = 0;
const { runtime: containerRuntime } = await ContainerRuntime.loadRuntime2({
context: getMockContext({
settings: {
"Fluid.ContainerRuntime.enableBatchIdTracking": true,
},
}) as IContainerContext,
context: getMockContext() as IContainerContext,
registry: new FluidDataStoreRegistry([]),
existing: false,
runtimeOptions: {},
Expand Down Expand Up @@ -481,13 +477,19 @@ describe("Runtime", () => {
assert.strictEqual(containerRuntime.isDirty, false);
});

for (const enableBatchIdTracking of [true, undefined])
it("Replaying ops should resend in correct order, with batch ID if applicable", async () => {
// BatchId tracking is on by default (TurnBased mode); the kill-switch suppresses stamping.
for (const variant of [
{ name: "default (no settings)", settings: {}, expectStamped: true },
{
name: "kill-switch active",
settings: { "Fluid.ContainerRuntime.DisableBatchIdTracking": true },
expectStamped: false,
},
])
it(`Replaying ops should resend in correct order, with batch ID if applicable (${variant.name})`, async () => {
const { runtime } = await ContainerRuntime.loadRuntime2({
context: getMockContext({
settings: {
"Fluid.ContainerRuntime.enableBatchIdTracking": enableBatchIdTracking, // batchId only stamped if true
},
settings: variant.settings,
}) as IContainerContext,
registry: new FluidDataStoreRegistry([]),
existing: false,
Expand Down Expand Up @@ -525,7 +527,7 @@ describe("Runtime", () => {
);
}

if (enableBatchIdTracking === true) {
if (variant.expectStamped) {
assert(
batchIdMatchesUnsentFormat(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
Expand Down Expand Up @@ -921,8 +923,20 @@ describe("Runtime", () => {
{ batch: false },
];

// In TurnBased mode batchId tracking is on by default, so resubmitted batches
// will also carry a batchId on their first message. Strip it before comparing
// — this test cares about batch boundaries, not batchId stamping.
const normalizedMetadata = submittedOpsMetadata.map((m) => {
if (m === undefined) {
return m;
}
// eslint-disable-next-line @typescript-eslint/no-unused-vars, @typescript-eslint/no-unsafe-assignment
const { batchId: _ignored, ...rest } = m as { batchId?: string };
return Object.keys(rest).length === 0 ? undefined : rest;
});

assert.deepStrictEqual(
submittedOpsMetadata,
normalizedMetadata,
expectedBatchMetadata,
"batch metadata does not match",
);
Expand Down Expand Up @@ -2343,13 +2357,19 @@ describe("Runtime", () => {
});

describe("Duplicate Batch Detection", () => {
for (const enableBatchIdTracking of [undefined, true]) {
it(`DuplicateBatchDetector is disabled if Batch Id Tracking isn't needed (${enableBatchIdTracking === true ? "ENABLED" : "DISABLED"})`, async () => {
// BatchId tracking is on by default (TurnBased); the kill-switch suppresses detection.
for (const variant of [
{ name: "default (no settings)", settings: {}, expectDetection: true },
{
name: "kill-switch active",
settings: { "Fluid.ContainerRuntime.DisableBatchIdTracking": true },
expectDetection: false,
},
]) {
it(`DuplicateBatchDetector reflects batch-id tracking enablement (${variant.name})`, async () => {
const { runtime: containerRuntime } = await ContainerRuntime.loadRuntime2({
context: getMockContext({
settings: {
"Fluid.ContainerRuntime.enableBatchIdTracking": enableBatchIdTracking,
},
settings: variant.settings,
}) as IContainerContext,
registry: new FluidDataStoreRegistry([]),
existing: false,
Expand All @@ -2370,8 +2390,9 @@ describe("Runtime", () => {
false,
);
// Process a duplicate batch "batchId1" with different seqNum 234
const assertThrowsOnlyIfExpected =
enableBatchIdTracking === true ? assert.throws : assert.doesNotThrow;
const assertThrowsOnlyIfExpected = variant.expectDetection
? assert.throws
: assert.doesNotThrow;
const errorPredicate = (e: Error) =>
e.message === "Duplicate batch - The same batch was sequenced twice";
assertThrowsOnlyIfExpected(
Expand All @@ -2387,20 +2408,76 @@ describe("Runtime", () => {
);
},
errorPredicate,
"Expected duplicate batch detection to match Offline Load enablement",
"Expected duplicate batch detection to match enablement",
);
});
}

it("Can roundrip DuplicateBatchDetector state through summary/snapshot", async () => {
// Duplicate Batch Detection requires OfflineLoad enabled
const settings_enableOfflineLoad = {
"Fluid.ContainerRuntime.enableBatchIdTracking": true,
it("Default-on tracking is silently skipped in FlushMode.Immediate (no UsageError)", async () => {
const { runtime: containerRuntime } = await ContainerRuntime.loadRuntime2({
context: getMockContext() as IContainerContext,
registry: new FluidDataStoreRegistry([]),
existing: false,
runtimeOptions: {
flushMode: FlushMode.Immediate,
enableRuntimeIdCompressor: "on",
},
provideEntryPoint: mockProvideEntryPoint,
});
// Sending a duplicate batchId should not throw because tracking is inactive in Immediate mode.
containerRuntime.process(
{
sequenceNumber: 123,
type: MessageType.Operation,
contents: { type: ContainerMessageType.Rejoin, contents: undefined },
metadata: { batchId: "batchId1" },
} satisfies Partial<ISequencedDocumentMessage> as ISequencedDocumentMessage,
false,
);
assert.doesNotThrow(() =>
containerRuntime.process(
{
sequenceNumber: 234,
type: MessageType.Operation,
contents: { type: ContainerMessageType.Rejoin, contents: undefined },
metadata: { batchId: "batchId1" },
} satisfies Partial<ISequencedDocumentMessage> as ISequencedDocumentMessage,
false,
),
);
});

it("Offline Load opt-in still errors in FlushMode.Immediate (back-compat)", async () => {
const containerErrors: ICriticalContainerError[] = [];
const context = {
...getMockContext({
settings: {
"Fluid.Container.enableOfflineFull": true,
},
}),
closeFn: (error?: ICriticalContainerError): void => {
if (error !== undefined) {
containerErrors.push(error);
}
},
};
await assert.rejects(
ContainerRuntime.loadRuntime2({
context: context as IContainerContext,
registry: new FluidDataStoreRegistry([]),
existing: false,
runtimeOptions: { flushMode: FlushMode.Immediate },
provideEntryPoint: mockProvideEntryPoint,
}),
(error: Error) => error instanceof UsageError,
);
assert.strictEqual(containerErrors.length, 1);
});

it("Can roundrip DuplicateBatchDetector state through summary/snapshot", async () => {
Comment thread
dannimad marked this conversation as resolved.
Outdated
// Duplicate Batch Detection is on by default in TurnBased mode.
const { runtime: containerRuntime } = await ContainerRuntime.loadRuntime2({
context: getMockContext({
settings: settings_enableOfflineLoad,
}) as IContainerContext,
context: getMockContext() as IContainerContext,
registry: new FluidDataStoreRegistry([]),
existing: false,
runtimeOptions: {
Expand Down Expand Up @@ -2432,7 +2509,6 @@ describe("Runtime", () => {
};
const { runtime: containerRuntime2 } = await ContainerRuntime.loadRuntime2({
context: getMockContext({
settings: settings_enableOfflineLoad,
baseSnapshot: {
trees: {},
blobs: { [recentBatchInfoBlobName]: "nonempty_id_ignored_by_mockStorage" },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,11 @@ describe("DuplicateBatchDetector", () => {
detector.processInboundBatch(inboundBatch1);
detector.processInboundBatch(inboundBatch2);

let setCalled = 0;
const telemetrySets = new Map<string, unknown>();
const telemetryContext = {
set: (key: string, subKey: string, value: unknown) => {
++setCalled;
assert.equal(key, "fluid_DuplicateBatchDetector_");
assert.equal(subKey, "recentBatchCount");
assert.equal(value, 2);
telemetrySets.set(subKey, value);
},
} satisfies Partial<ITelemetryContext> as ITelemetryContext;

Expand All @@ -237,7 +235,57 @@ describe("DuplicateBatchDetector", () => {
],
"Incorrect recentBatchInfo",
);
assert.equal(setCalled, 1, "Expected telemetryContext.set to be called once");
assert.equal(telemetrySets.get("recentBatchCount"), 2);
assert.equal(telemetrySets.get("peakRecentBatchCount"), 2);
assert.equal(telemetrySets.get("processedBatchCount"), 2);
});

it("Per-window perf counters reset after each summary", () => {
detector.processInboundBatch(
makeBatch({
sequenceNumber: seqNum++, // 1
minimumSequenceNumber: 0,
batchId: "batch1",
}),
);
detector.processInboundBatch(
makeBatch({
sequenceNumber: seqNum++, // 2
minimumSequenceNumber: 0,
batchId: "batch2",
}),
);

const firstWindow = new Map<string, unknown>();
detector.getRecentBatchInfoForSummary({
set: (_key: string, subKey: string, value: unknown) => {
firstWindow.set(subKey, value);
},
} satisfies Partial<ITelemetryContext> as ITelemetryContext);
assert.equal(firstWindow.get("processedBatchCount"), 2);
assert.equal(firstWindow.get("peakRecentBatchCount"), 2);

// Process one more batch; MSN advances enough to drop both prior entries.
detector.processInboundBatch(
makeBatch({
sequenceNumber: seqNum++, // 3
minimumSequenceNumber: 3,
batchId: "batch3",
}),
);

const secondWindow = new Map<string, unknown>();
detector.getRecentBatchInfoForSummary({
set: (_key: string, subKey: string, value: unknown) => {
secondWindow.set(subKey, value);
},
} satisfies Partial<ITelemetryContext> as ITelemetryContext);
// Only one batch processed since the prior summary.
assert.equal(secondWindow.get("processedBatchCount"), 1);
// Peak in this window starts at the size carried over from the prior window (2)
// — peak only ever grows during a window. Current size after cleanup is 1.
assert.equal(secondWindow.get("peakRecentBatchCount"), 2);
assert.equal(secondWindow.get("recentBatchCount"), 1);
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ describeCompat("Flushing ops", "NoCompat", (getTestObjectProvider, apis) => {
registry,
loaderProps: {
configProvider: configProvider({
"Fluid.ContainerRuntime.enableBatchIdTracking": true,
"Fluid.Container.enableOfflineFull": true,
}),
},
Comment on lines 105 to 110
Copy link

Copilot AI May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This suite now enables Fluid.Container.enableOfflineFull for all tests. That changes behavior beyond just batchId tracking (and forces callers to remember to pass disableOfflineLoad=true for Immediate mode). If the goal is only to keep the one Offline Load validation test, consider scoping enableOfflineFull to that specific test/setup instead of setting it globally for the whole describeCompat, to avoid running unrelated batching tests under the Offline Load configuration.

Copilot uses AI. Check for mistakes.
};
Expand Down
Loading