Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 81 additions & 6 deletions cli/src/execution/parallel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { syncPrdToIssue } from "../git/issue-sync.ts";
import {
abortMerge,
analyzePreMerge,
createIntegrationBranch,
deleteLocalBranch,
mergeAgentBranch,
sortByConflictLikelihood,
Expand Down Expand Up @@ -310,6 +311,9 @@ export async function runParallel(
// Track processed tasks in dry-run mode (since we don't modify the source file)
const dryRunProcessedIds = new Set<string>();

// Track the current base branch for chaining (integration branch pattern)
let currentBaseBranch = originalBaseBranch;

// Process tasks in batches
let iteration = 0;

Expand Down Expand Up @@ -409,7 +413,7 @@ export async function runParallel(
engine,
task,
globalAgentNum,
baseBranch,
currentBaseBranch,
isolationBase,
workDir,
prdSource,
Expand Down Expand Up @@ -466,11 +470,14 @@ export async function runParallel(
worktreeDir,
task.title,
agentNum,
originalBaseBranch,
currentBaseBranch,
);

if (commitResult.success) {
// Update branches map or result to ensure we track the generated branch
branchName = commitResult.branchName;
// Update the agentResult in place so we can find it later
agentResult.branchName = branchName;
logDebug(
`Agent ${agentNum}: Committed ${commitResult.filesCommitted} files to ${branchName}`,
);
Expand Down Expand Up @@ -611,14 +618,77 @@ export async function runParallel(
const batchDuration = formatDuration(Date.now() - batchStartTime);
logInfo(`Batch ${iteration} completed in ${batchDuration}`);
// If any retryable failure occurred, stop the run to allow retry later
// CHAINING LOGIC: Merge this batch's work into an integration branch
// Only do this if we have successful branches and we aren't stopping early
if (!skipMerge && !dryRun && !sawRetryableFailure && completedBranches.length > 0) {
// We need to find which branches were successful in THIS batch
// refetching from the results array is safer
const currentBatchBranches = results
.map(r => r.branchName) // This might be empty if failed
.filter(b => b && completedBranches.includes(b));

if (currentBatchBranches.length > 0) {
try {
logInfo(`Creating integration branch for batch ${iteration}...`);
// Create integration branch from current base
const integrationBranch = await createIntegrationBranch(
iteration,
currentBaseBranch,
workDir,
originalBaseBranch // Use original base branch as prefix for namespacing
);

logInfo(`Merging ${currentBatchBranches.length} branch(es) into ${integrationBranch}...`);

// Merge the batch's branches into the integration branch
// We use a cleaner merge function or the existing mergeCompletedBranches logic?
// Use mergeCompletedBranches but pointed at the integration branch
await mergeCompletedBranches(
currentBatchBranches,
integrationBranch,
engine,
workDir,
modelOverride,
engineArgs
);

// Update current base branch for the next batch
currentBaseBranch = integrationBranch;
logSuccess(`Batch ${iteration} integrated into ${currentBaseBranch}`);

// Remove merged branches from completedBranches to avoid re-merging them at the end
for (const branch of currentBatchBranches) {
const idx = completedBranches.indexOf(branch);
if (idx !== -1) {
completedBranches.splice(idx, 1);
}
}
} catch (err) {
logError(`Failed to create integration branch: ${err}`);
logWarn("Integration failed, stopping execution to preserve dependency chain.");
// Stop execution because subsequent batches depend on this integration
break;
}
Comment thread
BasselBlal marked this conversation as resolved.
Outdated
}
}

if (sawRetryableFailure) {
logWarn("Stopping early due to retryable errors. Try again later.");
break;
}
}

// Merge phase: merge completed branches back to base branch
if (!skipMerge && !dryRun && completedBranches.length > 0) {
// Merge phase: merge final integration branch back to base branch
// If we used integration branches, 'currentBaseBranch' holds the accumulated work.
// We need to merge 'currentBaseBranch' into 'originalBaseBranch'.
// We ALSO need to merge any leftover branches in 'completedBranches' (e.g. if a batch failed to integrate).

const branchesToMerge = [...completedBranches];
if (currentBaseBranch !== originalBaseBranch) {
branchesToMerge.push(currentBaseBranch);
}

if (!skipMerge && !dryRun && branchesToMerge.length > 0) {
const git = simpleGit(workDir);
let stashed = false;
try {
Expand All @@ -634,13 +704,18 @@ export async function runParallel(
}

try {
if (currentBaseBranch !== originalBaseBranch) {
// We have a chain of integration branches.
logInfo(`Merging final integration branch ${currentBaseBranch} and ${completedBranches.length} other(s) into ${originalBaseBranch}`);
}

await mergeCompletedBranches(
completedBranches,
branchesToMerge,
originalBaseBranch,
engine,
workDir,
modelOverride,
engineArgs,
engineArgs
);

// Restore starting branch if we're not already on it
Expand Down
10 changes: 9 additions & 1 deletion cli/src/git/merge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,17 @@ export async function createIntegrationBranch(
groupNum: number,
baseBranch: string,
workDir: string,
prefix?: string,
): Promise<string> {
const git: SimpleGit = simpleGit(workDir);
const branchName = `ralphy/integration-group-${groupNum}`;

let branchName = `ralphy/integration-group-${groupNum}`;

if (prefix) {
// Sanitize prefix: replace non-alphanumeric chars (mostly slashes) with dashes
const sanitizedPrefix = prefix.replace(/[^a-zA-Z0-9-]/g, "-");
branchName = `ralphy/${sanitizedPrefix}-integration-group-${groupNum}`;
}

// Checkout base branch first
await git.checkout(baseBranch);
Expand Down