Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1890,20 +1890,38 @@ export class ContainerRuntime
this.mc.config.getNumber("Fluid.ContainerRuntime.StagingModeAutoFlushThreshold") ??
runtimeOptions.stagingModeAutoFlushThreshold ??
defaultStagingModeAutoFlushThreshold;
this.batchIdTrackingEnabled =
this.mc.config.getBoolean("Fluid.Container.enableOfflineFull") ??
this.mc.config.getBoolean("Fluid.ContainerRuntime.enableBatchIdTracking") ??
false;

if (this.batchIdTrackingEnabled && this._flushMode !== FlushMode.TurnBased) {
// BatchId tracking powers DuplicateBatchDetector (catching forked-container duplicates)
// and is also a prerequisite for the Offline Load feature. It is enabled by default
// when both TurnBased flush mode and grouped batching are active; the kill-switch
// below allows disabling it without a code change if a regression is observed.
// Grouped batching is required because resubmits can produce empty batches that must
// still be sent on the wire as a placeholder grouped batch to preserve their batchId
// (see OpGroupingManager.createEmptyGroupedBatch / outbox.flushEmptyBatch).
// Offline Load requires both prerequisites, so a consumer that opts into it without
// them gets an explicit UsageError rather than silent degradation.
const offlineLoadRequested =
this.mc.config.getBoolean("Fluid.Container.enableOfflineFull") === true;
const disableBatchIdTracking =
this.mc.config.getBoolean("Fluid.ContainerRuntime.DisableBatchIdTracking") === true;

if (offlineLoadRequested && this._flushMode !== FlushMode.TurnBased) {
const error = new UsageError("Offline mode is only supported in turn-based mode");
this.closeFn(error);
throw error;
}
if (offlineLoadRequested && !this.groupedBatchingEnabled) {
const error = new UsageError("Offline mode requires grouped batching to be enabled");
this.closeFn(error);
throw error;
}

this.batchIdTrackingEnabled =
!disableBatchIdTracking &&
this._flushMode === FlushMode.TurnBased &&
this.groupedBatchingEnabled;
Comment on lines +1918 to +1921
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deep Review: Confirming this concern — independent analysis landed on the same finding. The PR's own added comment at containerRuntime.ts:1893 declares the intent: "Offline Load requires both prerequisites, so a consumer that opts into it without them gets an explicit UsageError rather than silent degradation." The constructor enforces that for the FlushMode and grouped-batching prereqs (the two existing if (offlineLoadRequested && ...) throw new UsageError(...) checks at lines ~1916), but not for the new kill-switch.

When a host sets enableOfflineFull=true and an operator flips DisableBatchIdTracking=true (the most likely path during an incident), batchIdTrackingEnabled becomes false, no DuplicateBatchDetector is constructed, batchIds stop being stamped on resubmits (the same flag also gates the metadata path at containerRuntime.ts:4995-4999), and any recentBatchInfo blob loaded from a prior snapshot is silently dropped on the next summary. Offline resubmission semantics break with no error or telemetry.

Suggested fix — add a third precondition check immediately after the existing two:

if (offlineLoadRequested && disableBatchIdTracking) {
    throw new UsageError("Offline Load requires batch ID tracking; remove Fluid.ContainerRuntime.DisableBatchIdTracking or disable Offline Load.");
}

Or make offlineLoadRequested override disableBatchIdTracking and emit a logged warning. Either choice restores the contract for all three preconditions. Add a unit test parallel to the two existing prereq-error tests in containerRuntime.spec.ts.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enableOfflineFull will be removed in a follow up PR. For know we just wanted to remove the batch id tracking flag safely and eventually just have one feature flag for all offline features.


// DuplicateBatchDetection is only enabled if Offline Load is enabled
// It maintains a cache of all batchIds/sequenceNumbers within the collab window.
// Don't waste resources doing so if not needed.
// DuplicateBatchDetector maintains a cache of all batchIds/sequenceNumbers within the
// collab window. Skip allocating it when batchId tracking is off.
if (this.batchIdTrackingEnabled) {
this.duplicateBatchDetector = new DuplicateBatchDetector(recentBatchInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ export class DuplicateBatchDetector {
*/
private readonly batchIdsBySeqNum = new Map<number, string>();

/**
* Number of inbound batches processed since the last summary. Reset by getRecentBatchInfoForSummary.
*/
private processedBatchCount = 0;

/**
* Largest tracked-batch count observed since the last summary. Reset by getRecentBatchInfoForSummary.
*/
private peakTrackedBatchCount = 0;

/**
* Initialize from snapshot data if provided - otherwise initialize empty
*/
Expand All @@ -37,6 +47,7 @@ export class DuplicateBatchDetector {
this.batchIdsBySeqNum.set(seqNum, batchId);
this.seqNumByBatchId.set(batchId, seqNum);
}
this.peakTrackedBatchCount = this.batchIdsBySeqNum.size;
}
}

Expand All @@ -50,6 +61,7 @@ export class DuplicateBatchDetector {
batchStart: BatchStartInfo,
): { duplicate: true; otherSequenceNumber: number } | { duplicate: false } {
const { sequenceNumber, minimumSequenceNumber } = batchStart.keyMessage;
this.processedBatchCount++;

// Glance at this batch's MSN. Any batchIds we're tracking with a lower sequence number are now safe to forget.
// Why? Because any other client holding the same batch locally would have seen the earlier batch and closed before submitting its duplicate.
Expand Down Expand Up @@ -80,6 +92,9 @@ export class DuplicateBatchDetector {
// Add new batch
this.batchIdsBySeqNum.set(sequenceNumber, batchId);
this.seqNumByBatchId.set(batchId, sequenceNumber);
if (this.batchIdsBySeqNum.size > this.peakTrackedBatchCount) {
this.peakTrackedBatchCount = this.batchIdsBySeqNum.size;
}

return { duplicate: false };
}
Expand Down Expand Up @@ -108,16 +123,22 @@ export class DuplicateBatchDetector {
public getRecentBatchInfoForSummary(
telemetryContext?: ITelemetryContext,
): [number, string][] | undefined {
if (telemetryContext !== undefined) {
const prefix = "fluid_DuplicateBatchDetector_";
telemetryContext.set(prefix, "recentBatchCount", this.batchIdsBySeqNum.size);
telemetryContext.set(prefix, "peakRecentBatchCount", this.peakTrackedBatchCount);
telemetryContext.set(prefix, "processedBatchCount", this.processedBatchCount);
}

// Reset per-window perf counters so each summary covers only the activity since the
// previous one. Peak resets to the current size (the floor for the next window).
this.processedBatchCount = 0;
this.peakTrackedBatchCount = this.batchIdsBySeqNum.size;

if (this.batchIdsBySeqNum.size === 0) {
return undefined;
}

telemetryContext?.set(
"fluid_DuplicateBatchDetector_",
"recentBatchCount",
this.batchIdsBySeqNum.size,
);

return [...this.batchIdsBySeqNum.entries()];
}
}
197 changes: 169 additions & 28 deletions packages/runtime/container-runtime/src/test/containerRuntime.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -437,11 +437,7 @@ describe("Runtime", () => {
let batchEnd = 0;
let callsToEnsure = 0;
const { runtime: containerRuntime } = await ContainerRuntime.loadRuntime2({
context: getMockContext({
settings: {
"Fluid.ContainerRuntime.enableBatchIdTracking": true,
},
}) as IContainerContext,
context: getMockContext() as IContainerContext,
registry: new FluidDataStoreRegistry([]),
existing: false,
runtimeOptions: {},
Expand Down Expand Up @@ -481,13 +477,19 @@ describe("Runtime", () => {
assert.strictEqual(containerRuntime.isDirty, false);
});

for (const enableBatchIdTracking of [true, undefined])
it("Replaying ops should resend in correct order, with batch ID if applicable", async () => {
// BatchId tracking is on by default (TurnBased mode); the kill-switch suppresses stamping.
for (const variant of [
{ name: "default (no settings)", settings: {}, expectStamped: true },
{
name: "kill-switch active",
settings: { "Fluid.ContainerRuntime.DisableBatchIdTracking": true },
expectStamped: false,
},
])
it(`Replaying ops should resend in correct order, with batch ID if applicable (${variant.name})`, async () => {
const { runtime } = await ContainerRuntime.loadRuntime2({
context: getMockContext({
settings: {
"Fluid.ContainerRuntime.enableBatchIdTracking": enableBatchIdTracking, // batchId only stamped if true
},
settings: variant.settings,
}) as IContainerContext,
registry: new FluidDataStoreRegistry([]),
existing: false,
Expand Down Expand Up @@ -525,7 +527,7 @@ describe("Runtime", () => {
);
}

if (enableBatchIdTracking === true) {
if (variant.expectStamped) {
assert(
batchIdMatchesUnsentFormat(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
Expand Down Expand Up @@ -921,8 +923,20 @@ describe("Runtime", () => {
{ batch: false },
];

// In TurnBased mode batchId tracking is on by default, so resubmitted batches
// will also carry a batchId on their first message. Strip it before comparing
// — this test cares about batch boundaries, not batchId stamping.
const normalizedMetadata = submittedOpsMetadata.map((m) => {
if (m === undefined) {
return undefined;
}
// eslint-disable-next-line @typescript-eslint/no-unused-vars, @typescript-eslint/no-unsafe-assignment
const { batchId: _ignored, ...rest } = m as { batchId?: string };
return Object.keys(rest).length === 0 ? undefined : rest;
});

assert.deepStrictEqual(
submittedOpsMetadata,
normalizedMetadata,
expectedBatchMetadata,
"batch metadata does not match",
);
Expand Down Expand Up @@ -2343,13 +2357,19 @@ describe("Runtime", () => {
});

describe("Duplicate Batch Detection", () => {
for (const enableBatchIdTracking of [undefined, true]) {
it(`DuplicateBatchDetector is disabled if Batch Id Tracking isn't needed (${enableBatchIdTracking === true ? "ENABLED" : "DISABLED"})`, async () => {
// BatchId tracking is on by default (TurnBased); the kill-switch suppresses detection.
for (const variant of [
{ name: "default (no settings)", settings: {}, expectDetection: true },
{
name: "kill-switch active",
settings: { "Fluid.ContainerRuntime.DisableBatchIdTracking": true },
expectDetection: false,
},
]) {
it(`DuplicateBatchDetector reflects batch-id tracking enablement (${variant.name})`, async () => {
const { runtime: containerRuntime } = await ContainerRuntime.loadRuntime2({
context: getMockContext({
settings: {
"Fluid.ContainerRuntime.enableBatchIdTracking": enableBatchIdTracking,
},
settings: variant.settings,
}) as IContainerContext,
registry: new FluidDataStoreRegistry([]),
existing: false,
Expand All @@ -2370,8 +2390,9 @@ describe("Runtime", () => {
false,
);
// Process a duplicate batch "batchId1" with different seqNum 234
const assertThrowsOnlyIfExpected =
enableBatchIdTracking === true ? assert.throws : assert.doesNotThrow;
const assertThrowsOnlyIfExpected = variant.expectDetection
? assert.throws
: assert.doesNotThrow;
const errorPredicate = (e: Error) =>
e.message === "Duplicate batch - The same batch was sequenced twice";
assertThrowsOnlyIfExpected(
Expand All @@ -2387,20 +2408,141 @@ describe("Runtime", () => {
);
},
errorPredicate,
"Expected duplicate batch detection to match Offline Load enablement",
"Expected duplicate batch detection to match enablement",
);
});
}

it("Can roundrip DuplicateBatchDetector state through summary/snapshot", async () => {
// Duplicate Batch Detection requires OfflineLoad enabled
const settings_enableOfflineLoad = {
"Fluid.ContainerRuntime.enableBatchIdTracking": true,
it("Default-on tracking is silently skipped in FlushMode.Immediate (no UsageError)", async () => {
const { runtime: containerRuntime } = await ContainerRuntime.loadRuntime2({
context: getMockContext() as IContainerContext,
registry: new FluidDataStoreRegistry([]),
existing: false,
runtimeOptions: {
flushMode: FlushMode.Immediate,
enableRuntimeIdCompressor: "on",
},
provideEntryPoint: mockProvideEntryPoint,
});
// Sending a duplicate batchId should not throw because tracking is inactive in Immediate mode.
containerRuntime.process(
{
sequenceNumber: 123,
type: MessageType.Operation,
contents: { type: ContainerMessageType.Rejoin, contents: undefined },
metadata: { batchId: "batchId1" },
} satisfies Partial<ISequencedDocumentMessage> as ISequencedDocumentMessage,
false,
);
assert.doesNotThrow(() =>
containerRuntime.process(
{
sequenceNumber: 234,
type: MessageType.Operation,
contents: { type: ContainerMessageType.Rejoin, contents: undefined },
metadata: { batchId: "batchId1" },
} satisfies Partial<ISequencedDocumentMessage> as ISequencedDocumentMessage,
false,
),
);
});

it("Offline Load opt-in still errors in FlushMode.Immediate (back-compat)", async () => {
const containerErrors: ICriticalContainerError[] = [];
const context = {
...getMockContext({
settings: {
"Fluid.Container.enableOfflineFull": true,
},
}),
closeFn: (error?: ICriticalContainerError): void => {
if (error !== undefined) {
containerErrors.push(error);
}
},
};
await assert.rejects(
ContainerRuntime.loadRuntime2({
context: context as IContainerContext,
registry: new FluidDataStoreRegistry([]),
existing: false,
runtimeOptions: { flushMode: FlushMode.Immediate },
provideEntryPoint: mockProvideEntryPoint,
}),
(error: Error) => error instanceof UsageError,
);
assert.strictEqual(containerErrors.length, 1);
});

// Regression: enabling default-on batchId tracking when grouped batching is disabled
// asserts 0xa00 in OpGroupingManager.createEmptyGroupedBatch the first time a resubmit
// produces an empty batch. Tracking must be silently skipped in that configuration.
it("Default-on tracking is silently skipped when grouped batching is disabled", async () => {
const { runtime: containerRuntime } = await ContainerRuntime.loadRuntime2({
context: getMockContext() as IContainerContext,
registry: new FluidDataStoreRegistry([]),
existing: false,
runtimeOptions: {
enableGroupedBatching: false,
enableRuntimeIdCompressor: "on",
},
provideEntryPoint: mockProvideEntryPoint,
});
// Sending a duplicate batchId should not throw because tracking is inactive
// when grouped batching is off.
containerRuntime.process(
{
sequenceNumber: 123,
type: MessageType.Operation,
contents: { type: ContainerMessageType.Rejoin, contents: undefined },
metadata: { batchId: "batchId1" },
} satisfies Partial<ISequencedDocumentMessage> as ISequencedDocumentMessage,
false,
);
assert.doesNotThrow(() =>
containerRuntime.process(
{
sequenceNumber: 234,
type: MessageType.Operation,
contents: { type: ContainerMessageType.Rejoin, contents: undefined },
metadata: { batchId: "batchId1" },
} satisfies Partial<ISequencedDocumentMessage> as ISequencedDocumentMessage,
false,
),
);
});

it("Offline Load opt-in errors when grouped batching is disabled", async () => {
const containerErrors: ICriticalContainerError[] = [];
const context = {
...getMockContext({
settings: {
"Fluid.Container.enableOfflineFull": true,
},
}),
closeFn: (error?: ICriticalContainerError): void => {
if (error !== undefined) {
containerErrors.push(error);
}
},
};
await assert.rejects(
ContainerRuntime.loadRuntime2({
context: context as IContainerContext,
registry: new FluidDataStoreRegistry([]),
existing: false,
runtimeOptions: { enableGroupedBatching: false },
provideEntryPoint: mockProvideEntryPoint,
}),
(error: Error) => error instanceof UsageError,
);
assert.strictEqual(containerErrors.length, 1);
});

it("Can roundtrip DuplicateBatchDetector state through summary/snapshot", async () => {
// Duplicate Batch Detection is on by default in TurnBased mode.
const { runtime: containerRuntime } = await ContainerRuntime.loadRuntime2({
context: getMockContext({
settings: settings_enableOfflineLoad,
}) as IContainerContext,
context: getMockContext() as IContainerContext,
registry: new FluidDataStoreRegistry([]),
existing: false,
runtimeOptions: {
Expand Down Expand Up @@ -2432,7 +2574,6 @@ describe("Runtime", () => {
};
const { runtime: containerRuntime2 } = await ContainerRuntime.loadRuntime2({
context: getMockContext({
settings: settings_enableOfflineLoad,
baseSnapshot: {
trees: {},
blobs: { [recentBatchInfoBlobName]: "nonempty_id_ignored_by_mockStorage" },
Expand Down
Loading
Loading