Skip to content

Commit 6e29505

Browse files
wenshaoclaude
andauthored
feat(core): intelligent tool parallelism with Kind-based batching and shell read-only detection (#2864)
* feat(core): implement intelligent tool parallelism via Kind-based batching Replace the hard-coded Agent-vs-others split with consecutive batching based on tool Kind. Read-only tools (Read, Search, Fetch, Think) now execute in parallel; mutating tools (Edit, Execute) run sequentially. - Add CONCURRENCY_SAFE_KINDS set to tools.ts - Add partitionToolCalls() for consecutive batch grouping - Add isConcurrencySafe() helper (Agent name + Kind check) - Add runConcurrently() with configurable concurrency cap (QWEN_CODE_MAX_TOOL_CONCURRENCY env var, default 10) - Update MockTool to support custom Kind for testing Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(core): add conditional concurrency for shell read-only commands Shell commands detected as read-only (e.g., git log, cat, ls) now run concurrently with other safe tools instead of breaking parallel batches. Uses the existing isShellCommandReadOnly() checker (synchronous, fail-closed). Commands that can't be verified as read-only remain sequential. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: address Copilot review on tool parallelism - Remove Kind.Think from CONCURRENCY_SAFE_KINDS (save_memory and todo_write write to disk) - Use .finally() instead of .then() in runConcurrently for cleanup - Validate maxConcurrency (clamp to >= 1, default 10) - Add comment explaining why sync checker is used over async AST - Add test for mixed safe/unsafe tool batch partitioning Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: update comment to match CONCURRENCY_SAFE_KINDS (remove Think) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: remove abort break in runConcurrently to prevent stuck scheduled calls Let all calls go through executeSingleToolCall which handles abort internally, ensuring every tool reaches a terminal state. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test: isolate concurrency tests from QWEN_CODE_MAX_TOOL_CONCURRENCY env Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: address Copilot review - comment, test label, shell test - Update batching comment to clarify Execute conditional safety - Rename describe block to "Concurrent tool execution" - Add test for shell read-only concurrency (git log + ls parallel, npm install sequential) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: add indexOf !== -1 guards to concurrency test assertions Prevents false-positive test passes when expected log entries are missing (indexOf returns -1 which is always < any positive index). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent dddb56d commit 6e29505

File tree

4 files changed

+344
-36
lines changed

4 files changed

+344
-36
lines changed

packages/core/src/core/coreToolScheduler.test.ts

Lines changed: 237 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
* SPDX-License-Identifier: Apache-2.0
55
*/
66

7-
import { describe, it, expect, vi, beforeEach } from 'vitest';
7+
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
88
import type { Mock } from 'vitest';
99
import type {
1010
Config,
@@ -3004,7 +3004,20 @@ describe('Fire hook functions integration', () => {
30043004
});
30053005
});
30063006

3007-
describe('Concurrent agent tool execution', () => {
3007+
describe('Concurrent tool execution', () => {
3008+
// Ensure tests are deterministic regardless of environment.
3009+
const origEnv = process.env['QWEN_CODE_MAX_TOOL_CONCURRENCY'];
3010+
beforeEach(() => {
3011+
delete process.env['QWEN_CODE_MAX_TOOL_CONCURRENCY'];
3012+
});
3013+
afterEach(() => {
3014+
if (origEnv !== undefined) {
3015+
process.env['QWEN_CODE_MAX_TOOL_CONCURRENCY'] = origEnv;
3016+
} else {
3017+
delete process.env['QWEN_CODE_MAX_TOOL_CONCURRENCY'];
3018+
}
3019+
});
3020+
30083021
function createScheduler(
30093022
tools: Map<string, MockTool>,
30103023
onAllToolCallsComplete: Mock,
@@ -3132,7 +3145,7 @@ describe('Fire hook functions integration', () => {
31323145
expect(startIndices.every((i) => i < firstEnd)).toBe(true);
31333146
});
31343147

3135-
it('should run agent tools concurrently while other tools run sequentially', async () => {
3148+
it('should run concurrency-safe tools in parallel and unsafe tools sequentially', async () => {
31363149
const executionLog: string[] = [];
31373150

31383151
const agentTool = new MockTool({
@@ -3151,10 +3164,11 @@ describe('Fire hook functions integration', () => {
31513164

31523165
const readTool = new MockTool({
31533166
name: 'read_file',
3167+
kind: Kind.Read,
31543168
execute: async (params) => {
31553169
const id = (params as { id: string }).id;
31563170
executionLog.push(`read:start:${id}`);
3157-
await new Promise((r) => setTimeout(r, 20));
3171+
await new Promise((r) => setTimeout(r, 50));
31583172
executionLog.push(`read:end:${id}`);
31593173
return {
31603174
llmContent: `Read ${id} done`,
@@ -3176,6 +3190,8 @@ describe('Fire hook functions integration', () => {
31763190
);
31773191

31783192
const abortController = new AbortController();
3193+
// All 4 calls are concurrency-safe (read_file=Kind.Read, agent=Agent name)
3194+
// so they form one parallel batch and all run concurrently.
31793195
const requests = [
31803196
{
31813197
callId: '1',
@@ -3215,20 +3231,226 @@ describe('Fire hook functions integration', () => {
32153231
expect(completedCalls).toHaveLength(4);
32163232
expect(completedCalls.every((c) => c.status === 'success')).toBe(true);
32173233

3218-
// Non-agent tools should execute sequentially: read:1 finishes before read:2 starts
3219-
const read1End = executionLog.indexOf('read:end:1');
3220-
const read2Start = executionLog.indexOf('read:start:2');
3221-
expect(read1End).toBeLessThan(read2Start);
3222-
3223-
// Agent tools should execute concurrently: both start before either ends
3224-
const agentAStart = executionLog.indexOf('agent:start:A');
3225-
const agentBStart = executionLog.indexOf('agent:start:B');
3226-
const firstAgentEnd = Math.min(
3234+
// All 4 tools are concurrency-safe → they should all start
3235+
// before any of them finishes (parallel execution).
3236+
const allStarts = [
3237+
executionLog.indexOf('read:start:1'),
3238+
executionLog.indexOf('agent:start:A'),
3239+
executionLog.indexOf('read:start:2'),
3240+
executionLog.indexOf('agent:start:B'),
3241+
];
3242+
const firstEnd = Math.min(
3243+
executionLog.indexOf('read:end:1'),
32273244
executionLog.indexOf('agent:end:A'),
3245+
executionLog.indexOf('read:end:2'),
32283246
executionLog.indexOf('agent:end:B'),
32293247
);
3230-
expect(agentAStart).toBeLessThan(firstAgentEnd);
3231-
expect(agentBStart).toBeLessThan(firstAgentEnd);
3248+
// Ensure all entries exist before comparing ordering
3249+
for (const start of allStarts) {
3250+
expect(start).not.toBe(-1);
3251+
}
3252+
expect(firstEnd).not.toBe(-1);
3253+
for (const start of allStarts) {
3254+
expect(start).toBeLessThan(firstEnd);
3255+
}
3256+
});
3257+
3258+
it('should partition mixed safe/unsafe tools into correct batches', async () => {
3259+
const executionLog: string[] = [];
3260+
3261+
const readTool = new MockTool({
3262+
name: 'read_file',
3263+
kind: Kind.Read,
3264+
execute: async (params) => {
3265+
const id = (params as { id: string }).id;
3266+
executionLog.push(`read:start:${id}`);
3267+
await new Promise((r) => setTimeout(r, 50));
3268+
executionLog.push(`read:end:${id}`);
3269+
return {
3270+
llmContent: `Read ${id} done`,
3271+
returnDisplay: `Read ${id} done`,
3272+
};
3273+
},
3274+
});
3275+
3276+
const editTool = new MockTool({
3277+
name: 'edit',
3278+
kind: Kind.Edit,
3279+
execute: async (params) => {
3280+
const id = (params as { id: string }).id;
3281+
executionLog.push(`edit:start:${id}`);
3282+
await new Promise((r) => setTimeout(r, 20));
3283+
executionLog.push(`edit:end:${id}`);
3284+
return {
3285+
llmContent: `Edit ${id} done`,
3286+
returnDisplay: `Edit ${id} done`,
3287+
};
3288+
},
3289+
});
3290+
3291+
const tools = new Map<string, MockTool>([
3292+
['read_file', readTool],
3293+
['edit', editTool],
3294+
]);
3295+
const onAllToolCallsComplete = vi.fn();
3296+
const onToolCallsUpdate = vi.fn();
3297+
const scheduler = createScheduler(
3298+
tools,
3299+
onAllToolCallsComplete,
3300+
onToolCallsUpdate,
3301+
);
3302+
3303+
// [Read₁, Read₂, Edit, Read₃]
3304+
// Expected batches: [Read₁,Read₂](parallel) → [Edit](seq) → [Read₃](seq)
3305+
const requests = [
3306+
{
3307+
callId: '1',
3308+
name: 'read_file',
3309+
args: { id: '1' },
3310+
isClientInitiated: false,
3311+
prompt_id: 'p1',
3312+
},
3313+
{
3314+
callId: '2',
3315+
name: 'read_file',
3316+
args: { id: '2' },
3317+
isClientInitiated: false,
3318+
prompt_id: 'p1',
3319+
},
3320+
{
3321+
callId: '3',
3322+
name: 'edit',
3323+
args: { id: 'E' },
3324+
isClientInitiated: false,
3325+
prompt_id: 'p1',
3326+
},
3327+
{
3328+
callId: '4',
3329+
name: 'read_file',
3330+
args: { id: '3' },
3331+
isClientInitiated: false,
3332+
prompt_id: 'p1',
3333+
},
3334+
];
3335+
3336+
await scheduler.schedule(requests, new AbortController().signal);
3337+
3338+
expect(onAllToolCallsComplete).toHaveBeenCalled();
3339+
const completedCalls = onAllToolCallsComplete.mock
3340+
.calls[0][0] as ToolCall[];
3341+
expect(completedCalls).toHaveLength(4);
3342+
expect(completedCalls.every((c) => c.status === 'success')).toBe(true);
3343+
3344+
// Batch 1: Read₁ and Read₂ run in parallel (both start before either ends)
3345+
const read1Start = executionLog.indexOf('read:start:1');
3346+
const read2Start = executionLog.indexOf('read:start:2');
3347+
const firstReadEnd = Math.min(
3348+
executionLog.indexOf('read:end:1'),
3349+
executionLog.indexOf('read:end:2'),
3350+
);
3351+
expect(read1Start).not.toBe(-1);
3352+
expect(read2Start).not.toBe(-1);
3353+
expect(firstReadEnd).not.toBe(-1);
3354+
expect(read1Start).toBeLessThan(firstReadEnd);
3355+
expect(read2Start).toBeLessThan(firstReadEnd);
3356+
3357+
// Batch 2: Edit starts after both reads complete
3358+
const lastReadEnd = Math.max(
3359+
executionLog.indexOf('read:end:1'),
3360+
executionLog.indexOf('read:end:2'),
3361+
);
3362+
const editStart = executionLog.indexOf('edit:start:E');
3363+
expect(editStart).not.toBe(-1);
3364+
expect(editStart).toBeGreaterThan(lastReadEnd);
3365+
3366+
// Batch 3: Read₃ starts after Edit completes
3367+
const editEnd = executionLog.indexOf('edit:end:E');
3368+
const read3Start = executionLog.indexOf('read:start:3');
3369+
expect(editEnd).not.toBe(-1);
3370+
expect(read3Start).not.toBe(-1);
3371+
expect(read3Start).toBeGreaterThan(editEnd);
3372+
});
3373+
3374+
it('should run read-only shell commands concurrently and non-read-only sequentially', async () => {
3375+
const executionLog: string[] = [];
3376+
3377+
const shellTool = new MockTool({
3378+
name: 'run_shell_command',
3379+
kind: Kind.Execute,
3380+
execute: async (params) => {
3381+
const cmd = (params as { command: string }).command;
3382+
executionLog.push(`shell:start:${cmd}`);
3383+
await new Promise((r) => setTimeout(r, 50));
3384+
executionLog.push(`shell:end:${cmd}`);
3385+
return {
3386+
llmContent: `Shell ${cmd} done`,
3387+
returnDisplay: `Shell ${cmd} done`,
3388+
};
3389+
},
3390+
});
3391+
3392+
const tools = new Map<string, MockTool>([
3393+
['run_shell_command', shellTool],
3394+
]);
3395+
const onAllToolCallsComplete = vi.fn();
3396+
const onToolCallsUpdate = vi.fn();
3397+
const scheduler = createScheduler(
3398+
tools,
3399+
onAllToolCallsComplete,
3400+
onToolCallsUpdate,
3401+
);
3402+
3403+
// "git log" and "ls" are read-only → concurrent
3404+
// "npm install" is not read-only → sequential, breaks the batch
3405+
const requests = [
3406+
{
3407+
callId: '1',
3408+
name: 'run_shell_command',
3409+
args: { command: 'git log' },
3410+
isClientInitiated: false,
3411+
prompt_id: 'p1',
3412+
},
3413+
{
3414+
callId: '2',
3415+
name: 'run_shell_command',
3416+
args: { command: 'ls' },
3417+
isClientInitiated: false,
3418+
prompt_id: 'p1',
3419+
},
3420+
{
3421+
callId: '3',
3422+
name: 'run_shell_command',
3423+
args: { command: 'npm install' },
3424+
isClientInitiated: false,
3425+
prompt_id: 'p1',
3426+
},
3427+
];
3428+
3429+
await scheduler.schedule(requests, new AbortController().signal);
3430+
3431+
expect(onAllToolCallsComplete).toHaveBeenCalled();
3432+
3433+
// "git log" and "ls" should start concurrently (both before either ends)
3434+
const gitStart = executionLog.indexOf('shell:start:git log');
3435+
const lsStart = executionLog.indexOf('shell:start:ls');
3436+
const firstReadOnlyEnd = Math.min(
3437+
executionLog.indexOf('shell:end:git log'),
3438+
executionLog.indexOf('shell:end:ls'),
3439+
);
3440+
expect(gitStart).not.toBe(-1);
3441+
expect(lsStart).not.toBe(-1);
3442+
expect(firstReadOnlyEnd).not.toBe(-1);
3443+
expect(gitStart).toBeLessThan(firstReadOnlyEnd);
3444+
expect(lsStart).toBeLessThan(firstReadOnlyEnd);
3445+
3446+
// "npm install" should start after both read-only commands complete
3447+
const lastReadOnlyEnd = Math.max(
3448+
executionLog.indexOf('shell:end:git log'),
3449+
executionLog.indexOf('shell:end:ls'),
3450+
);
3451+
const npmStart = executionLog.indexOf('shell:start:npm install');
3452+
expect(npmStart).not.toBe(-1);
3453+
expect(npmStart).toBeGreaterThan(lastReadOnlyEnd);
32323454
});
32333455
});
32343456
});

0 commit comments

Comments
 (0)