Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions config.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@
"default_mode": "auto",
"max_iterations": 12,
"default_attempts": 1
},
"ats-cli": {
"channel": "ats-cli",
"repo": "/home/openclaw/projects/ats-cli",
"github": "difflabai/ats-skill",
"default_mode": "auto",
"max_iterations": 10,
"default_attempts": 1
}
},
"claude_bin": "/usr/bin/claude",
Expand Down
86 changes: 28 additions & 58 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { readFileSync, existsSync } from 'node:fs';
import { fileURLToPath } from 'node:url';
import { dirname, join } from 'node:path';
import { parseArgs } from 'node:util';
import { parseJsonObjects, parseHumanReadable, trimBuffer } from './lib/watch-parser.js';

const __dirname = dirname(fileURLToPath(import.meta.url));
const VERSION = '3.0.0';
Expand Down Expand Up @@ -591,46 +592,23 @@ async function watchMode() {
buffer += chunk.toString();

// Try to extract JSON objects from the buffer
// The ats watch output interleaves JSON objects with human-readable lines
let startIdx;
while ((startIdx = buffer.indexOf('{')) !== -1) {
// Find the matching closing brace
let depth = 0;
let endIdx = -1;
for (let i = startIdx; i < buffer.length; i++) {
if (buffer[i] === '{') depth++;
else if (buffer[i] === '}') {
depth--;
if (depth === 0) {
endIdx = i;
break;
}
}
}

if (endIdx === -1) break; // incomplete JSON, wait for more data

const jsonStr = buffer.slice(startIdx, endIdx + 1);
buffer = buffer.slice(endIdx + 1);
const jsonResult = parseJsonObjects(buffer);
for (const event of jsonResult.events) {
handleWatchEvent(event, name, channel, project);
}
buffer = jsonResult.remaining;

try {
const event = JSON.parse(jsonStr);
// Fallback: parse human-readable format if no JSON was found
if (jsonResult.events.length === 0) {
const textResult = parseHumanReadable(buffer);
for (const event of textResult.events) {
log('info', 'Fallback text parser matched task', { channel, taskId: event.task_id });
handleWatchEvent(event, name, channel, project);
} catch {
// Not valid JSON, skip
log('debug', 'Failed to parse watch JSON', { channel, json: jsonStr.slice(0, 200) });
}
buffer = textResult.remaining;
}

// If buffer gets too large without valid JSON, trim non-JSON prefix
if (buffer.length > 10000) {
const lastBrace = buffer.lastIndexOf('{');
if (lastBrace > 0) {
buffer = buffer.slice(lastBrace);
} else {
buffer = '';
}
}
buffer = trimBuffer(buffer);
});

watcher.stderr.on('data', (chunk) => {
Expand Down Expand Up @@ -670,31 +648,23 @@ async function watchMode() {

watcher.stdout.on('data', (chunk) => {
buffer += chunk.toString();
let startIdx;
while ((startIdx = buffer.indexOf('{')) !== -1) {
let depth = 0;
let endIdx = -1;
for (let i = startIdx; i < buffer.length; i++) {
if (buffer[i] === '{') depth++;
else if (buffer[i] === '}') {
depth--;
if (depth === 0) { endIdx = i; break; }
}
}
if (endIdx === -1) break;
const jsonStr = buffer.slice(startIdx, endIdx + 1);
buffer = buffer.slice(endIdx + 1);
try {
const event = JSON.parse(jsonStr);

const jsonResult = parseJsonObjects(buffer);
for (const event of jsonResult.events) {
handleWatchEvent(event, name, channel, project);
}
buffer = jsonResult.remaining;

if (jsonResult.events.length === 0) {
const textResult = parseHumanReadable(buffer);
for (const event of textResult.events) {
log('info', 'Fallback text parser matched task', { channel, taskId: event.task_id });
handleWatchEvent(event, name, channel, project);
} catch {
log('debug', 'Failed to parse watch JSON', { channel, json: jsonStr.slice(0, 200) });
}
buffer = textResult.remaining;
}
if (buffer.length > 10000) {
const lastBrace = buffer.lastIndexOf('{');
buffer = lastBrace > 0 ? buffer.slice(lastBrace) : '';
}

buffer = trimBuffer(buffer);
});

watcher.stderr.on('data', () => {});
Expand Down Expand Up @@ -734,7 +704,7 @@ async function watchMode() {

// === Handle a watch event ===
function handleWatchEvent(event, projectName, channel, project) {
const taskId = event.id || event.uuid;
const taskId = event.task_id || event.id || event.uuid;
if (!taskId) return;

// Skip if already seen
Expand Down
102 changes: 102 additions & 0 deletions lib/watch-parser.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/**
* Watch mode parsers — extracted for testability.
*
* parseJsonObjects(buffer) - Extract JSON objects from a raw stdout buffer.
* parseHumanReadable(buffer) - Fallback: extract task.created events from
* human-readable ATS watch output.
*/

/**
* Attempt to extract one or more complete JSON objects from `buffer`.
*
* Returns { events: [ parsed objects ], remaining: unprocessed buffer tail }.
*/
export function parseJsonObjects(buffer) {
const events = [];
let pos = 0;

while (true) {
const startIdx = buffer.indexOf('{', pos);
if (startIdx === -1) break;

let depth = 0;
let endIdx = -1;
for (let i = startIdx; i < buffer.length; i++) {
if (buffer[i] === '{') depth++;
else if (buffer[i] === '}') {
depth--;
if (depth === 0) {
endIdx = i;
break;
}
}
}

if (endIdx === -1) {
// Incomplete JSON — return everything from startIdx onward as remaining
return { events, remaining: buffer.slice(startIdx) };
}

const jsonStr = buffer.slice(startIdx, endIdx + 1);
pos = endIdx + 1;

try {
events.push(JSON.parse(jsonStr));
} catch {
// Not valid JSON, skip this candidate and keep scanning
}
}

// Everything was consumed (or no braces found at all)
return { events, remaining: buffer.slice(pos) };
Comment thread
mikkel marked this conversation as resolved.
Outdated
}

/**
* Parse human-readable ATS watch output.
*
* Looks for lines containing "task.created" followed by a line matching
* "Task #<id>: <title>".
*
* Returns { events: [ { task_id, title, event } ], remaining: unprocessed tail }.
*/
export function parseHumanReadable(buffer) {
const events = [];
const lines = buffer.split('\n');
const consumedIndices = new Set();

for (let i = 0; i < lines.length; i++) {
if (consumedIndices.has(i)) continue;
if (/task\.created/.test(lines[i])) {
const window = lines.slice(i, i + 3).join('\n');
const taskMatch = window.match(/Task #(\d+):\s*(.+)/);
if (taskMatch) {
events.push({
task_id: parseInt(taskMatch[1], 10),
title: taskMatch[2].trim(),
event: 'task.created',
});
consumedIndices.add(i);
consumedIndices.add(i + 1);
consumedIndices.add(i + 2);
Comment thread
mikkel marked this conversation as resolved.
Outdated
}
}
}

if (consumedIndices.size === 0) {
return { events, remaining: buffer };
}

const lastConsumed = Math.max(...consumedIndices);
const remaining = lines.slice(lastConsumed + 1).join('\n');
return { events, remaining };
}

/**
* Trim an oversized buffer to avoid unbounded memory growth.
* Keeps content from the last '{' onward, or clears the buffer entirely.
*/
export function trimBuffer(buffer, maxLength = 10000) {
if (buffer.length <= maxLength) return buffer;
const lastBrace = buffer.lastIndexOf('{');
return lastBrace > 0 ? buffer.slice(lastBrace) : '';
}
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"ats-project-runner": "./index.js"
},
"scripts": {
"start": "node index.js"
"start": "node index.js",
"test": "node --test 'test/**/*.test.js'"
}
}
Loading