Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion packages/2-sql/5-runtime/src/sql-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ class SqlRuntimeImpl<TContract extends Contract<SqlStorage> = Contract<SqlStorag
private codecRegistryValidated: boolean;
private verified: boolean;
private startupVerified: boolean;
// Shared promise for an in-flight cold-start marker read (onFirstUse /
// startup modes). Concurrent callers that arrive before the first one
// flips `verified = true` await the same promise instead of issuing
// their own redundant marker round-trip. Intentionally not used for
// `always` mode, where each call must observe a fresh marker read.
private verifyInFlight: Promise<void> | undefined;
private _telemetry: RuntimeTelemetryEvent | null;

constructor(options: RuntimeOptions<TContract>) {
Expand Down Expand Up @@ -186,6 +192,7 @@ class SqlRuntimeImpl<TContract extends Contract<SqlStorage> = Contract<SqlStorag
this.codecRegistryValidated = false;
this.verified = verify.mode === 'startup' ? false : verify.mode === 'always';
this.startupVerified = false;
this.verifyInFlight = undefined;
this._telemetry = null;

if (verify.mode === 'startup') {
Expand Down Expand Up @@ -417,14 +424,33 @@ class SqlRuntimeImpl<TContract extends Contract<SqlStorage> = Contract<SqlStorag
}

private async verifyMarker(): Promise<void> {
// `always` mode requires a fresh marker read per call — never share an
// in-flight verify, otherwise concurrent callers would silently degrade
// to `onFirstUse` semantics by satisfying their re-verify with another
// caller's read.
if (this.verify.mode === 'always') {
this.verified = false;
await this.runMarkerRead();
return;
}

if (this.verified) {
return;
}

if (this.verifyInFlight) {
await this.verifyInFlight;
return;
}

this.verifyInFlight = this.runMarkerRead();
try {
await this.verifyInFlight;
} finally {
this.verifyInFlight = undefined;
}
}

private async runMarkerRead(): Promise<void> {
const readStatement = this.familyAdapter.markerReader.readMarkerStatement();
const result = await this.driver.query(readStatement.sql, readStatement.params);

Expand Down
170 changes: 170 additions & 0 deletions packages/2-sql/5-runtime/test/sql-runtime.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ interface DriverMockSpies {
transactionCommit: ReturnType<typeof vi.fn>;
transactionRollback: ReturnType<typeof vi.fn>;
driverClose: ReturnType<typeof vi.fn>;
driverQuery: ReturnType<typeof vi.fn>;
}

type MockSqlDriver = SqlDriver & { __spies: DriverMockSpies };
Expand Down Expand Up @@ -160,6 +161,7 @@ function createMockDriver(): MockSqlDriver {
transactionCommit: transaction.commit,
transactionRollback: transaction.rollback,
driverClose,
driverQuery: query,
},
});
}
Expand Down Expand Up @@ -665,6 +667,174 @@ describe('createRuntime', () => {
});
});

describe('marker verification dedupe', () => {
// The marker reader's parser tolerates an empty row set when
// requireMarker is false (the runtime treats "no marker" as verified).
// The mock driver returns rows: [] by default, so a single marker-read
// round-trip is enough to flip `verified = true` for every caller that
// shares the in-flight verify promise.
const markerEmpty = { rows: [], rowCount: 0 };

/**
* Drains queued microtasks so concurrent `runtime.execute(...)` callers
* advance from the synchronous entry into their `await self.verifyMarker()`
* suspension point. Without this, only the first generator may have
* stepped past `await encodeParams` by the time we resolve the marker
* read, masking concurrent-arrival behavior.
*/
async function flushMicrotasks(rounds = 5): Promise<void> {
for (let i = 0; i < rounds; i++) {
await Promise.resolve();
}
}

it('dedupes concurrent cold-start marker reads in onFirstUse mode', async () => {
const { stackInstance, context, driver } = createTestSetup();
const runtime = createRuntime({
stackInstance,
context,
driver,
verify: { mode: 'onFirstUse', requireMarker: false },
});

let resolveMarker: (value: typeof markerEmpty) => void = () => {};
const heldMarker = new Promise<typeof markerEmpty>((resolve) => {
resolveMarker = resolve;
});
driver.__spies.driverQuery.mockReturnValueOnce(heldMarker);

const callers = 5;
const promises = Array.from({ length: callers }, () =>
runtime.execute(createRawExecutionPlan()).toArray(),
);

await flushMicrotasks();
resolveMarker(markerEmpty);
await Promise.all(promises);

expect(driver.__spies.driverQuery).toHaveBeenCalledTimes(1);
expect(driver.__spies.rootExecute).toHaveBeenCalledTimes(callers);
});

it('dedupes concurrent cold-start marker reads in startup mode', async () => {
const { stackInstance, context, driver } = createTestSetup();
const runtime = createRuntime({
stackInstance,
context,
driver,
verify: { mode: 'startup', requireMarker: false },
});

let resolveMarker: (value: typeof markerEmpty) => void = () => {};
const heldMarker = new Promise<typeof markerEmpty>((resolve) => {
resolveMarker = resolve;
});
driver.__spies.driverQuery.mockReturnValueOnce(heldMarker);

const callers = 5;
const promises = Array.from({ length: callers }, () =>
runtime.execute(createRawExecutionPlan()).toArray(),
);

await flushMicrotasks();
resolveMarker(markerEmpty);
await Promise.all(promises);

expect(driver.__spies.driverQuery).toHaveBeenCalledTimes(1);
});

it('skips marker read for warm callers in onFirstUse mode', async () => {
const { stackInstance, context, driver } = createTestSetup();
const runtime = createRuntime({
stackInstance,
context,
driver,
verify: { mode: 'onFirstUse', requireMarker: false },
});

await runtime.execute(createRawExecutionPlan()).toArray();
expect(driver.__spies.driverQuery).toHaveBeenCalledTimes(1);

driver.__spies.driverQuery.mockClear();
await Promise.all([
runtime.execute(createRawExecutionPlan()).toArray(),
runtime.execute(createRawExecutionPlan()).toArray(),
runtime.execute(createRawExecutionPlan()).toArray(),
]);

expect(driver.__spies.driverQuery).not.toHaveBeenCalled();
});

it('does not dedupe in always mode (every caller round-trips)', async () => {
const { stackInstance, context, driver } = createTestSetup();
const runtime = createRuntime({
stackInstance,
context,
driver,
verify: { mode: 'always', requireMarker: false },
});

const callers = 5;
const promises = Array.from({ length: callers }, () =>
runtime.execute(createRawExecutionPlan()).toArray(),
);
await Promise.all(promises);

expect(driver.__spies.driverQuery).toHaveBeenCalledTimes(callers);
});

it('shares an in-flight verify failure across concurrent callers', async () => {
const { stackInstance, context, driver } = createTestSetup();
const runtime = createRuntime({
stackInstance,
context,
driver,
verify: { mode: 'onFirstUse', requireMarker: true },
});

let rejectMarker: (reason: unknown) => void = () => {};
const heldMarker = new Promise<typeof markerEmpty>((_resolve, reject) => {
rejectMarker = reject;
});
driver.__spies.driverQuery.mockReturnValueOnce(heldMarker);

const callers = 3;
const promises = Array.from({ length: callers }, () =>
runtime.execute(createRawExecutionPlan()).toArray(),
);

await flushMicrotasks();
const driverFailure = new Error('connection refused');
rejectMarker(driverFailure);

const results = await Promise.allSettled(promises);
expect(driver.__spies.driverQuery).toHaveBeenCalledTimes(1);
for (const result of results) {
expect(result.status).toBe('rejected');
expect((result as PromiseRejectedResult).reason).toBe(driverFailure);
}
expect(driver.__spies.rootExecute).not.toHaveBeenCalled();
});

it('clears the in-flight slot after a failure so the next caller retries', async () => {
const { stackInstance, context, driver } = createTestSetup();
const runtime = createRuntime({
stackInstance,
context,
driver,
verify: { mode: 'onFirstUse', requireMarker: false },
});

driver.__spies.driverQuery.mockRejectedValueOnce(new Error('transient'));
await expect(runtime.execute(createRawExecutionPlan()).toArray()).rejects.toThrow('transient');

// Subsequent call falls back to the default mockResolvedValue (rows: [])
// and verifies successfully — proving the in-flight slot was cleared.
await runtime.execute(createRawExecutionPlan()).toArray();
expect(driver.__spies.driverQuery).toHaveBeenCalledTimes(2);
});
});

describe('withTransaction', () => {
function createRuntimeForTransaction() {
const { stackInstance, context, driver } = createTestSetup();
Expand Down
Loading