Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 9 additions & 16 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1890,27 +1890,20 @@ export class ContainerRuntime
this.mc.config.getNumber("Fluid.ContainerRuntime.StagingModeAutoFlushThreshold") ??
runtimeOptions.stagingModeAutoFlushThreshold ??
defaultStagingModeAutoFlushThreshold;
// BatchId tracking powers DuplicateBatchDetector (catching forked-container duplicates)
// and is also a prerequisite for the Offline Load feature. It is enabled by default in
// TurnBased mode; the kill-switch below allows disabling it without a code change if a
// regression is observed. Offline Load still requires TurnBased, so consumers that
// explicitly opt into it with FlushMode.Immediate continue to get a UsageError.
const offlineLoadRequested =
this.mc.config.getBoolean("Fluid.Container.enableOfflineFull") === true;
const disableBatchIdTracking =
this.mc.config.getBoolean("Fluid.ContainerRuntime.DisableBatchIdTracking") === true;

if (offlineLoadRequested && this._flushMode !== FlushMode.TurnBased) {
this.batchIdTrackingEnabled =
this.mc.config.getBoolean("Fluid.Container.enableOfflineFull") ??
this.mc.config.getBoolean("Fluid.ContainerRuntime.enableBatchIdTracking") ??
false;
Comment on lines +1894 to +1896

if (this.batchIdTrackingEnabled && this._flushMode !== FlushMode.TurnBased) {
const error = new UsageError("Offline mode is only supported in turn-based mode");
this.closeFn(error);
throw error;
}
Comment on lines +1898 to 1902

this.batchIdTrackingEnabled =
!disableBatchIdTracking && this._flushMode === FlushMode.TurnBased;

// DuplicateBatchDetector maintains a cache of all batchIds/sequenceNumbers within the
// collab window. Skip allocating it when batchId tracking is off.
// DuplicateBatchDetection is only enabled if Offline Load is enabled
// It maintains a cache of all batchIds/sequenceNumbers within the collab window.
// Don't waste resources doing so if not needed.
if (this.batchIdTrackingEnabled) {
this.duplicateBatchDetector = new DuplicateBatchDetector(recentBatchInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,6 @@ export class DuplicateBatchDetector {
*/
private readonly batchIdsBySeqNum = new Map<number, string>();

/**
* Number of inbound batches processed since the last summary. Reset by getRecentBatchInfoForSummary.
*/
private processedBatchCount = 0;

/**
* Largest tracked-batch count observed since the last summary. Reset by getRecentBatchInfoForSummary.
*/
private peakTrackedBatchCount = 0;

/**
* Initialize from snapshot data if provided - otherwise initialize empty
*/
Expand All @@ -47,7 +37,6 @@ export class DuplicateBatchDetector {
this.batchIdsBySeqNum.set(seqNum, batchId);
this.seqNumByBatchId.set(batchId, seqNum);
}
this.peakTrackedBatchCount = this.batchIdsBySeqNum.size;
}
}

Expand All @@ -61,7 +50,6 @@ export class DuplicateBatchDetector {
batchStart: BatchStartInfo,
): { duplicate: true; otherSequenceNumber: number } | { duplicate: false } {
const { sequenceNumber, minimumSequenceNumber } = batchStart.keyMessage;
this.processedBatchCount++;

// Glance at this batch's MSN. Any batchIds we're tracking with a lower sequence number are now safe to forget.
// Why? Because any other client holding the same batch locally would have seen the earlier batch and closed before submitting its duplicate.
Expand Down Expand Up @@ -92,9 +80,6 @@ export class DuplicateBatchDetector {
// Add new batch
this.batchIdsBySeqNum.set(sequenceNumber, batchId);
this.seqNumByBatchId.set(batchId, sequenceNumber);
if (this.batchIdsBySeqNum.size > this.peakTrackedBatchCount) {
this.peakTrackedBatchCount = this.batchIdsBySeqNum.size;
}

return { duplicate: false };
}
Expand Down Expand Up @@ -123,22 +108,16 @@ export class DuplicateBatchDetector {
public getRecentBatchInfoForSummary(
telemetryContext?: ITelemetryContext,
): [number, string][] | undefined {
if (telemetryContext !== undefined) {
const prefix = "fluid_DuplicateBatchDetector_";
telemetryContext.set(prefix, "recentBatchCount", this.batchIdsBySeqNum.size);
telemetryContext.set(prefix, "peakRecentBatchCount", this.peakTrackedBatchCount);
telemetryContext.set(prefix, "processedBatchCount", this.processedBatchCount);
}

// Reset per-window perf counters so each summary covers only the activity since the
// previous one. Peak resets to the current size (the floor for the next window).
this.processedBatchCount = 0;
this.peakTrackedBatchCount = this.batchIdsBySeqNum.size;

if (this.batchIdsBySeqNum.size === 0) {
return undefined;
}

telemetryContext?.set(
"fluid_DuplicateBatchDetector_",
"recentBatchCount",
this.batchIdsBySeqNum.size,
);

return [...this.batchIdsBySeqNum.entries()];
}
}
132 changes: 28 additions & 104 deletions packages/runtime/container-runtime/src/test/containerRuntime.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,11 @@ describe("Runtime", () => {
let batchEnd = 0;
let callsToEnsure = 0;
const { runtime: containerRuntime } = await ContainerRuntime.loadRuntime2({
context: getMockContext() as IContainerContext,
context: getMockContext({
settings: {
"Fluid.ContainerRuntime.enableBatchIdTracking": true,
},
}) as IContainerContext,
registry: new FluidDataStoreRegistry([]),
existing: false,
runtimeOptions: {},
Expand Down Expand Up @@ -477,19 +481,13 @@ describe("Runtime", () => {
assert.strictEqual(containerRuntime.isDirty, false);
});

// BatchId tracking is on by default (TurnBased mode); the kill-switch suppresses stamping.
for (const variant of [
{ name: "default (no settings)", settings: {}, expectStamped: true },
{
name: "kill-switch active",
settings: { "Fluid.ContainerRuntime.DisableBatchIdTracking": true },
expectStamped: false,
},
])
it(`Replaying ops should resend in correct order, with batch ID if applicable (${variant.name})`, async () => {
for (const enableBatchIdTracking of [true, undefined])
it("Replaying ops should resend in correct order, with batch ID if applicable", async () => {
const { runtime } = await ContainerRuntime.loadRuntime2({
context: getMockContext({
settings: variant.settings,
settings: {
"Fluid.ContainerRuntime.enableBatchIdTracking": enableBatchIdTracking, // batchId only stamped if true
},
Comment on lines +484 to +490
}) as IContainerContext,
registry: new FluidDataStoreRegistry([]),
existing: false,
Expand Down Expand Up @@ -527,7 +525,7 @@ describe("Runtime", () => {
);
}

if (variant.expectStamped) {
if (enableBatchIdTracking === true) {
assert(
batchIdMatchesUnsentFormat(
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
Expand Down Expand Up @@ -923,20 +921,8 @@ describe("Runtime", () => {
{ batch: false },
];

// In TurnBased mode batchId tracking is on by default, so resubmitted batches
// will also carry a batchId on their first message. Strip it before comparing
// — this test cares about batch boundaries, not batchId stamping.
const normalizedMetadata = submittedOpsMetadata.map((m) => {
if (m === undefined) {
return undefined;
}
// eslint-disable-next-line @typescript-eslint/no-unused-vars, @typescript-eslint/no-unsafe-assignment
const { batchId: _ignored, ...rest } = m as { batchId?: string };
return Object.keys(rest).length === 0 ? undefined : rest;
});

assert.deepStrictEqual(
normalizedMetadata,
submittedOpsMetadata,
expectedBatchMetadata,
"batch metadata does not match",
);
Expand Down Expand Up @@ -2357,19 +2343,13 @@ describe("Runtime", () => {
});

describe("Duplicate Batch Detection", () => {
// BatchId tracking is on by default (TurnBased); the kill-switch suppresses detection.
for (const variant of [
{ name: "default (no settings)", settings: {}, expectDetection: true },
{
name: "kill-switch active",
settings: { "Fluid.ContainerRuntime.DisableBatchIdTracking": true },
expectDetection: false,
},
]) {
it(`DuplicateBatchDetector reflects batch-id tracking enablement (${variant.name})`, async () => {
for (const enableBatchIdTracking of [undefined, true]) {
it(`DuplicateBatchDetector is disabled if Batch Id Tracking isn't needed (${enableBatchIdTracking === true ? "ENABLED" : "DISABLED"})`, async () => {
const { runtime: containerRuntime } = await ContainerRuntime.loadRuntime2({
context: getMockContext({
settings: variant.settings,
settings: {
"Fluid.ContainerRuntime.enableBatchIdTracking": enableBatchIdTracking,
},
}) as IContainerContext,
registry: new FluidDataStoreRegistry([]),
existing: false,
Expand All @@ -2390,9 +2370,8 @@ describe("Runtime", () => {
false,
);
// Process a duplicate batch "batchId1" with different seqNum 234
const assertThrowsOnlyIfExpected = variant.expectDetection
? assert.throws
: assert.doesNotThrow;
const assertThrowsOnlyIfExpected =
enableBatchIdTracking === true ? assert.throws : assert.doesNotThrow;
const errorPredicate = (e: Error) =>
e.message === "Duplicate batch - The same batch was sequenced twice";
assertThrowsOnlyIfExpected(
Expand All @@ -2408,76 +2387,20 @@ describe("Runtime", () => {
);
},
errorPredicate,
"Expected duplicate batch detection to match enablement",
"Expected duplicate batch detection to match Offline Load enablement",
);
});
}

it("Default-on tracking is silently skipped in FlushMode.Immediate (no UsageError)", async () => {
const { runtime: containerRuntime } = await ContainerRuntime.loadRuntime2({
context: getMockContext() as IContainerContext,
registry: new FluidDataStoreRegistry([]),
existing: false,
runtimeOptions: {
flushMode: FlushMode.Immediate,
enableRuntimeIdCompressor: "on",
},
provideEntryPoint: mockProvideEntryPoint,
});
// Sending a duplicate batchId should not throw because tracking is inactive in Immediate mode.
containerRuntime.process(
{
sequenceNumber: 123,
type: MessageType.Operation,
contents: { type: ContainerMessageType.Rejoin, contents: undefined },
metadata: { batchId: "batchId1" },
} satisfies Partial<ISequencedDocumentMessage> as ISequencedDocumentMessage,
false,
);
assert.doesNotThrow(() =>
containerRuntime.process(
{
sequenceNumber: 234,
type: MessageType.Operation,
contents: { type: ContainerMessageType.Rejoin, contents: undefined },
metadata: { batchId: "batchId1" },
} satisfies Partial<ISequencedDocumentMessage> as ISequencedDocumentMessage,
false,
),
);
});

it("Offline Load opt-in still errors in FlushMode.Immediate (back-compat)", async () => {
const containerErrors: ICriticalContainerError[] = [];
const context = {
...getMockContext({
settings: {
"Fluid.Container.enableOfflineFull": true,
},
}),
closeFn: (error?: ICriticalContainerError): void => {
if (error !== undefined) {
containerErrors.push(error);
}
},
it("Can roundrip DuplicateBatchDetector state through summary/snapshot", async () => {
// Duplicate Batch Detection requires OfflineLoad enabled
const settings_enableOfflineLoad = {
"Fluid.ContainerRuntime.enableBatchIdTracking": true,
};
await assert.rejects(
ContainerRuntime.loadRuntime2({
context: context as IContainerContext,
registry: new FluidDataStoreRegistry([]),
existing: false,
runtimeOptions: { flushMode: FlushMode.Immediate },
provideEntryPoint: mockProvideEntryPoint,
}),
(error: Error) => error instanceof UsageError,
);
assert.strictEqual(containerErrors.length, 1);
});

it("Can roundtrip DuplicateBatchDetector state through summary/snapshot", async () => {
// Duplicate Batch Detection is on by default in TurnBased mode.
const { runtime: containerRuntime } = await ContainerRuntime.loadRuntime2({
context: getMockContext() as IContainerContext,
context: getMockContext({
settings: settings_enableOfflineLoad,
}) as IContainerContext,
Comment on lines +2396 to +2403
registry: new FluidDataStoreRegistry([]),
existing: false,
runtimeOptions: {
Expand Down Expand Up @@ -2509,6 +2432,7 @@ describe("Runtime", () => {
};
const { runtime: containerRuntime2 } = await ContainerRuntime.loadRuntime2({
context: getMockContext({
settings: settings_enableOfflineLoad,
baseSnapshot: {
trees: {},
blobs: { [recentBatchInfoBlobName]: "nonempty_id_ignored_by_mockStorage" },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,13 @@ describe("DuplicateBatchDetector", () => {
detector.processInboundBatch(inboundBatch1);
detector.processInboundBatch(inboundBatch2);

const telemetrySets = new Map<string, unknown>();
let setCalled = 0;
const telemetryContext = {
set: (key: string, subKey: string, value: unknown) => {
++setCalled;
assert.equal(key, "fluid_DuplicateBatchDetector_");
telemetrySets.set(subKey, value);
assert.equal(subKey, "recentBatchCount");
assert.equal(value, 2);
},
} satisfies Partial<ITelemetryContext> as ITelemetryContext;

Expand All @@ -235,57 +237,7 @@ describe("DuplicateBatchDetector", () => {
],
"Incorrect recentBatchInfo",
);
assert.equal(telemetrySets.get("recentBatchCount"), 2);
assert.equal(telemetrySets.get("peakRecentBatchCount"), 2);
assert.equal(telemetrySets.get("processedBatchCount"), 2);
});

it("Per-window perf counters reset after each summary", () => {
detector.processInboundBatch(
makeBatch({
sequenceNumber: seqNum++, // 1
minimumSequenceNumber: 0,
batchId: "batch1",
}),
);
detector.processInboundBatch(
makeBatch({
sequenceNumber: seqNum++, // 2
minimumSequenceNumber: 0,
batchId: "batch2",
}),
);

const firstWindow = new Map<string, unknown>();
detector.getRecentBatchInfoForSummary({
set: (_key: string, subKey: string, value: unknown) => {
firstWindow.set(subKey, value);
},
} satisfies Partial<ITelemetryContext> as ITelemetryContext);
assert.equal(firstWindow.get("processedBatchCount"), 2);
assert.equal(firstWindow.get("peakRecentBatchCount"), 2);

// Process one more batch; MSN advances enough to drop both prior entries.
detector.processInboundBatch(
makeBatch({
sequenceNumber: seqNum++, // 3
minimumSequenceNumber: 3,
batchId: "batch3",
}),
);

const secondWindow = new Map<string, unknown>();
detector.getRecentBatchInfoForSummary({
set: (_key: string, subKey: string, value: unknown) => {
secondWindow.set(subKey, value);
},
} satisfies Partial<ITelemetryContext> as ITelemetryContext);
// Only one batch processed since the prior summary.
assert.equal(secondWindow.get("processedBatchCount"), 1);
// Peak in this window starts at the size carried over from the prior window (2)
// — peak only ever grows during a window. Current size after cleanup is 1.
assert.equal(secondWindow.get("peakRecentBatchCount"), 2);
assert.equal(secondWindow.get("recentBatchCount"), 1);
assert.equal(setCalled, 1, "Expected telemetryContext.set to be called once");
});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ describe("Container Runtime", () => {
for (let i = 0; i < count; i++) {
const message: Partial<ISequencedDocumentMessage> = {
clientId,
clientSequenceNumber: seq,
minimumSequenceNumber: 0,
sequenceNumber: seq++,
type: MessageType.Operation,
Expand Down
2 changes: 1 addition & 1 deletion packages/test/snapshots/content
Submodule content updated 196 files
Loading
Loading