From 354c31a4f395846b29e7ca2dff3586bcdce62142 Mon Sep 17 00:00:00 2001 From: Alexey Orlenko Date: Wed, 6 May 2026 16:07:17 +0200 Subject: [PATCH] fix(runtime): dedupe in-flight contract verification (TML-2303) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit K concurrent queries against a cold runtime in onFirstUse/startup modes used to issue K separate marker round-trips before the first one flipped verified=true. They now share a single in-flight verify promise so cold page loads pay one marker_read RTT instead of (K-1) extra ones. always mode keeps its per-call semantics — concurrent callers each observe a fresh marker read, so re-verify under concurrency does not silently degrade to onFirstUse. Co-Authored-By: Claude Opus 4.7 (1M context) --- packages/2-sql/5-runtime/src/sql-runtime.ts | 28 ++- .../2-sql/5-runtime/test/sql-runtime.test.ts | 170 ++++++++++++++++++ 2 files changed, 197 insertions(+), 1 deletion(-) diff --git a/packages/2-sql/5-runtime/src/sql-runtime.ts b/packages/2-sql/5-runtime/src/sql-runtime.ts index 518f377c44..8db9aa63a1 100644 --- a/packages/2-sql/5-runtime/src/sql-runtime.ts +++ b/packages/2-sql/5-runtime/src/sql-runtime.ts @@ -149,6 +149,12 @@ class SqlRuntimeImpl = Contract | undefined; private _telemetry: RuntimeTelemetryEvent | null; constructor(options: RuntimeOptions) { @@ -186,6 +192,7 @@ class SqlRuntimeImpl = Contract = Contract { + // `always` mode requires a fresh marker read per call — never share an + // in-flight verify, otherwise concurrent callers would silently degrade + // to `onFirstUse` semantics by satisfying their re-verify with another + // caller's read. if (this.verify.mode === 'always') { - this.verified = false; + await this.runMarkerRead(); + return; } if (this.verified) { return; } + if (this.verifyInFlight) { + await this.verifyInFlight; + return; + } + + this.verifyInFlight = this.runMarkerRead(); + try { + await this.verifyInFlight; + } finally { + this.verifyInFlight = undefined; + } + } + + private async runMarkerRead(): Promise { const readStatement = this.familyAdapter.markerReader.readMarkerStatement(); const result = await this.driver.query(readStatement.sql, readStatement.params); diff --git a/packages/2-sql/5-runtime/test/sql-runtime.test.ts b/packages/2-sql/5-runtime/test/sql-runtime.test.ts index 44e92b2e32..df5cc1de3e 100644 --- a/packages/2-sql/5-runtime/test/sql-runtime.test.ts +++ b/packages/2-sql/5-runtime/test/sql-runtime.test.ts @@ -61,6 +61,7 @@ interface DriverMockSpies { transactionCommit: ReturnType; transactionRollback: ReturnType; driverClose: ReturnType; + driverQuery: ReturnType; } type MockSqlDriver = SqlDriver & { __spies: DriverMockSpies }; @@ -160,6 +161,7 @@ function createMockDriver(): MockSqlDriver { transactionCommit: transaction.commit, transactionRollback: transaction.rollback, driverClose, + driverQuery: query, }, }); } @@ -665,6 +667,174 @@ describe('createRuntime', () => { }); }); +describe('marker verification dedupe', () => { + // The marker reader's parser tolerates an empty row set when + // requireMarker is false (the runtime treats "no marker" as verified). + // The mock driver returns rows: [] by default, so a single marker-read + // round-trip is enough to flip `verified = true` for every caller that + // shares the in-flight verify promise. + const markerEmpty = { rows: [], rowCount: 0 }; + + /** + * Drains queued microtasks so concurrent `runtime.execute(...)` callers + * advance from the synchronous entry into their `await self.verifyMarker()` + * suspension point. Without this, only the first generator may have + * stepped past `await encodeParams` by the time we resolve the marker + * read, masking concurrent-arrival behavior. + */ + async function flushMicrotasks(rounds = 5): Promise { + for (let i = 0; i < rounds; i++) { + await Promise.resolve(); + } + } + + it('dedupes concurrent cold-start marker reads in onFirstUse mode', async () => { + const { stackInstance, context, driver } = createTestSetup(); + const runtime = createRuntime({ + stackInstance, + context, + driver, + verify: { mode: 'onFirstUse', requireMarker: false }, + }); + + let resolveMarker: (value: typeof markerEmpty) => void = () => {}; + const heldMarker = new Promise((resolve) => { + resolveMarker = resolve; + }); + driver.__spies.driverQuery.mockReturnValueOnce(heldMarker); + + const callers = 5; + const promises = Array.from({ length: callers }, () => + runtime.execute(createRawExecutionPlan()).toArray(), + ); + + await flushMicrotasks(); + resolveMarker(markerEmpty); + await Promise.all(promises); + + expect(driver.__spies.driverQuery).toHaveBeenCalledTimes(1); + expect(driver.__spies.rootExecute).toHaveBeenCalledTimes(callers); + }); + + it('dedupes concurrent cold-start marker reads in startup mode', async () => { + const { stackInstance, context, driver } = createTestSetup(); + const runtime = createRuntime({ + stackInstance, + context, + driver, + verify: { mode: 'startup', requireMarker: false }, + }); + + let resolveMarker: (value: typeof markerEmpty) => void = () => {}; + const heldMarker = new Promise((resolve) => { + resolveMarker = resolve; + }); + driver.__spies.driverQuery.mockReturnValueOnce(heldMarker); + + const callers = 5; + const promises = Array.from({ length: callers }, () => + runtime.execute(createRawExecutionPlan()).toArray(), + ); + + await flushMicrotasks(); + resolveMarker(markerEmpty); + await Promise.all(promises); + + expect(driver.__spies.driverQuery).toHaveBeenCalledTimes(1); + }); + + it('skips marker read for warm callers in onFirstUse mode', async () => { + const { stackInstance, context, driver } = createTestSetup(); + const runtime = createRuntime({ + stackInstance, + context, + driver, + verify: { mode: 'onFirstUse', requireMarker: false }, + }); + + await runtime.execute(createRawExecutionPlan()).toArray(); + expect(driver.__spies.driverQuery).toHaveBeenCalledTimes(1); + + driver.__spies.driverQuery.mockClear(); + await Promise.all([ + runtime.execute(createRawExecutionPlan()).toArray(), + runtime.execute(createRawExecutionPlan()).toArray(), + runtime.execute(createRawExecutionPlan()).toArray(), + ]); + + expect(driver.__spies.driverQuery).not.toHaveBeenCalled(); + }); + + it('does not dedupe in always mode (every caller round-trips)', async () => { + const { stackInstance, context, driver } = createTestSetup(); + const runtime = createRuntime({ + stackInstance, + context, + driver, + verify: { mode: 'always', requireMarker: false }, + }); + + const callers = 5; + const promises = Array.from({ length: callers }, () => + runtime.execute(createRawExecutionPlan()).toArray(), + ); + await Promise.all(promises); + + expect(driver.__spies.driverQuery).toHaveBeenCalledTimes(callers); + }); + + it('shares an in-flight verify failure across concurrent callers', async () => { + const { stackInstance, context, driver } = createTestSetup(); + const runtime = createRuntime({ + stackInstance, + context, + driver, + verify: { mode: 'onFirstUse', requireMarker: true }, + }); + + let rejectMarker: (reason: unknown) => void = () => {}; + const heldMarker = new Promise((_resolve, reject) => { + rejectMarker = reject; + }); + driver.__spies.driverQuery.mockReturnValueOnce(heldMarker); + + const callers = 3; + const promises = Array.from({ length: callers }, () => + runtime.execute(createRawExecutionPlan()).toArray(), + ); + + await flushMicrotasks(); + const driverFailure = new Error('connection refused'); + rejectMarker(driverFailure); + + const results = await Promise.allSettled(promises); + expect(driver.__spies.driverQuery).toHaveBeenCalledTimes(1); + for (const result of results) { + expect(result.status).toBe('rejected'); + expect((result as PromiseRejectedResult).reason).toBe(driverFailure); + } + expect(driver.__spies.rootExecute).not.toHaveBeenCalled(); + }); + + it('clears the in-flight slot after a failure so the next caller retries', async () => { + const { stackInstance, context, driver } = createTestSetup(); + const runtime = createRuntime({ + stackInstance, + context, + driver, + verify: { mode: 'onFirstUse', requireMarker: false }, + }); + + driver.__spies.driverQuery.mockRejectedValueOnce(new Error('transient')); + await expect(runtime.execute(createRawExecutionPlan()).toArray()).rejects.toThrow('transient'); + + // Subsequent call falls back to the default mockResolvedValue (rows: []) + // and verifies successfully — proving the in-flight slot was cleared. + await runtime.execute(createRawExecutionPlan()).toArray(); + expect(driver.__spies.driverQuery).toHaveBeenCalledTimes(2); + }); +}); + describe('withTransaction', () => { function createRuntimeForTransaction() { const { stackInstance, context, driver } = createTestSetup();