Skip to content

Commit ba006eb

Browse files
leosvelpereznartc
authored andcommitted
feat(core): collect resource usage
1 parent fa45d79 commit ba006eb

File tree

22 files changed

+1442
-32
lines changed

22 files changed

+1442
-32
lines changed

packages/nx/src/daemon/client/client.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -623,14 +623,20 @@ export class DaemonClient {
623623
if (this._daemonStatus == DaemonStatus.DISCONNECTED) {
624624
this._daemonStatus = DaemonStatus.CONNECTING;
625625

626+
let daemonPid: number | null = null;
626627
if (!(await this.isServerAvailable())) {
627-
await this.startInBackground();
628+
daemonPid = await this.startInBackground();
628629
}
629630
this.setUpConnection();
630631
this._daemonStatus = DaemonStatus.CONNECTED;
631632
this._daemonReady();
633+
634+
daemonPid ??= getDaemonProcessIdSync();
635+
await this.registerDaemonProcessWithMetricsService(daemonPid);
632636
} else if (this._daemonStatus == DaemonStatus.CONNECTING) {
633637
await this._waitForDaemonReady;
638+
const daemonPid = getDaemonProcessIdSync();
639+
await this.registerDaemonProcessWithMetricsService(daemonPid);
634640
}
635641
// An open promise isn't enough to keep the event loop
636642
// alive, so we set a timeout here and clear it when we hear
@@ -649,6 +655,23 @@ export class DaemonClient {
649655
});
650656
}
651657

658+
private async registerDaemonProcessWithMetricsService(
659+
daemonPid: number | null
660+
) {
661+
if (!daemonPid) {
662+
return;
663+
}
664+
665+
try {
666+
const { getProcessMetricsService } = await import(
667+
'../../tasks-runner/process-metrics-service'
668+
);
669+
getProcessMetricsService().registerDaemonProcess(daemonPid);
670+
} catch {
671+
// don't error, this is a secondary concern that should not break task execution
672+
}
673+
}
674+
652675
private retryMessageAfterNewDaemonStarts() {
653676
const [msg, res, rej] = [
654677
this.currentMessage,

packages/nx/src/executors/run-commands/run-commands.impl.ts

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import * as yargsParser from 'yargs-parser';
22
import { ExecutorContext } from '../../config/misc-interfaces';
33
import { isTuiEnabled } from '../../tasks-runner/is-tui-enabled';
44
import { PseudoTerminal } from '../../tasks-runner/pseudo-terminal';
5+
import { createTaskId } from '../../tasks-runner/utils';
56
import { NoopChildProcess } from '../../tasks-runner/running-tasks/noop-child-process';
67
import {
78
ParallelRunningTasks,
@@ -97,7 +98,8 @@ export default async function (
9798

9899
export async function runCommands(
99100
options: RunCommandsOptions,
100-
context: ExecutorContext
101+
context: ExecutorContext,
102+
taskId?: string
101103
) {
102104
const normalized = normalizeOptions(options);
103105

@@ -138,11 +140,27 @@ export async function runCommands(
138140
const tuiEnabled = isTuiEnabled();
139141

140142
try {
143+
const resolvedTaskId =
144+
taskId ??
145+
createTaskId(
146+
context.projectName,
147+
context.targetName,
148+
context.configurationName
149+
);
141150
const runningTask = isSingleCommandAndCanUsePseudoTerminal
142-
? await runSingleCommandWithPseudoTerminal(normalized, context)
151+
? await runSingleCommandWithPseudoTerminal(
152+
normalized,
153+
context,
154+
resolvedTaskId
155+
)
143156
: options.parallel
144-
? new ParallelRunningTasks(normalized, context)
145-
: new SeriallyRunningTasks(normalized, context, tuiEnabled);
157+
? new ParallelRunningTasks(normalized, context, resolvedTaskId)
158+
: new SeriallyRunningTasks(
159+
normalized,
160+
context,
161+
tuiEnabled,
162+
resolvedTaskId
163+
);
146164
return runningTask;
147165
} catch (e) {
148166
if (process.env.NX_VERBOSE_LOGGING === 'true') {

packages/nx/src/executors/run-commands/running-tasks.ts

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
loadAndExpandDotEnvFile,
1515
unloadDotEnvFile,
1616
} from '../../tasks-runner/task-env';
17+
import { getProcessMetricsService } from '../../tasks-runner/process-metrics-service';
1718
import { signalToCode } from '../../utils/exit-codes';
1819
import {
1920
LARGE_BUFFER,
@@ -30,7 +31,11 @@ export class ParallelRunningTasks implements RunningTask {
3031
[];
3132
private outputCallbacks: Array<(terminalOutput: string) => void> = [];
3233

33-
constructor(options: NormalizedRunCommandsOptions, context: ExecutorContext) {
34+
constructor(
35+
options: NormalizedRunCommandsOptions,
36+
context: ExecutorContext,
37+
taskId: string
38+
) {
3439
this.childProcesses = options.commands.map(
3540
(commandConfig) =>
3641
new RunningNodeProcess(
@@ -40,7 +45,8 @@ export class ParallelRunningTasks implements RunningTask {
4045
options.env ?? {},
4146
options.readyWhenStatus,
4247
options.streamOutput,
43-
options.envFile
48+
options.envFile,
49+
taskId
4450
)
4551
);
4652
this.readyWhenStatus = options.readyWhenStatus;
@@ -228,7 +234,8 @@ export class SeriallyRunningTasks implements RunningTask {
228234
constructor(
229235
options: NormalizedRunCommandsOptions,
230236
context: ExecutorContext,
231-
private readonly tuiEnabled: boolean
237+
private readonly tuiEnabled: boolean,
238+
private readonly taskId: string
232239
) {
233240
this.run(options, context)
234241
.catch((e) => {
@@ -279,6 +286,7 @@ export class SeriallyRunningTasks implements RunningTask {
279286
options.color,
280287
calculateCwd(options.cwd, context),
281288
options.processEnv ?? options.env ?? {},
289+
this.taskId,
282290
options.usePty,
283291
options.streamOutput,
284292
options.tty,
@@ -314,6 +322,7 @@ export class SeriallyRunningTasks implements RunningTask {
314322
color: boolean,
315323
cwd: string,
316324
env: Record<string, string>,
325+
taskId: string,
317326
usePty: boolean = true,
318327
streamOutput: boolean = true,
319328
tty: boolean,
@@ -330,7 +339,7 @@ export class SeriallyRunningTasks implements RunningTask {
330339
const pseudoTerminal = createPseudoTerminal();
331340
registerProcessListener(this, pseudoTerminal);
332341

333-
return createProcessWithPseudoTty(
342+
const pseudoTtyProcess = await createProcessWithPseudoTty(
334343
pseudoTerminal,
335344
commandConfig,
336345
color,
@@ -340,6 +349,15 @@ export class SeriallyRunningTasks implements RunningTask {
340349
tty,
341350
envFile
342351
);
352+
353+
// Register process for metrics collection (direct run-commands execution)
354+
// Skip registration if we're in a forked executor - the fork wrapper already registered
355+
const pid = pseudoTtyProcess.getPid();
356+
if (pid && !process.env.NX_FORKED_TASK_EXECUTOR) {
357+
getProcessMetricsService().registerTaskProcess(taskId, pid);
358+
}
359+
360+
return pseudoTtyProcess;
343361
}
344362

345363
return new RunningNodeProcess(
@@ -349,7 +367,8 @@ export class SeriallyRunningTasks implements RunningTask {
349367
env,
350368
[],
351369
streamOutput,
352-
envFile
370+
envFile,
371+
taskId
353372
);
354373
}
355374
}
@@ -369,7 +388,8 @@ class RunningNodeProcess implements RunningTask {
369388
env: Record<string, string>,
370389
private readyWhenStatus: { stringToMatch: string; found: boolean }[],
371390
streamOutput = true,
372-
envFile: string
391+
envFile: string,
392+
private taskId: string
373393
) {
374394
env = processEnv(color, cwd, env, envFile);
375395
this.command = commandConfig.command;
@@ -384,6 +404,15 @@ class RunningNodeProcess implements RunningTask {
384404
windowsHide: false,
385405
});
386406

407+
// Register process for metrics collection
408+
// Skip registration if we're in a forked executor - the fork wrapper already registered
409+
if (this.childProcess.pid && !process.env.NX_FORKED_TASK_EXECUTOR) {
410+
getProcessMetricsService().registerTaskProcess(
411+
this.taskId,
412+
this.childProcess.pid
413+
);
414+
}
415+
387416
this.addListeners(commandConfig, streamOutput);
388417
}
389418

@@ -508,7 +537,8 @@ class RunningNodeProcess implements RunningTask {
508537

509538
export async function runSingleCommandWithPseudoTerminal(
510539
normalized: NormalizedRunCommandsOptions,
511-
context: ExecutorContext
540+
context: ExecutorContext,
541+
taskId: string
512542
): Promise<PseudoTtyProcess> {
513543
const pseudoTerminal = createPseudoTerminal();
514544
const pseudoTtyProcess = await createProcessWithPseudoTty(
@@ -521,6 +551,14 @@ export async function runSingleCommandWithPseudoTerminal(
521551
pseudoTerminal ? normalized.isTTY : false,
522552
normalized.envFile
523553
);
554+
555+
// Register process for metrics collection (direct run-commands execution)
556+
// Skip registration if we're in a forked executor - the fork wrapper already registered
557+
const pid = pseudoTtyProcess.getPid();
558+
if (pid && !process.env.NX_FORKED_TASK_EXECUTOR) {
559+
getProcessMetricsService().registerTaskProcess(taskId, pid);
560+
}
561+
524562
registerProcessListener(pseudoTtyProcess, pseudoTerminal);
525563
return pseudoTtyProcess;
526564
}

packages/nx/src/native/index.d.ts

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ export declare class AppLifeCycle {
2727

2828
export declare class ChildProcess {
2929
getParserAndWriter(): ExternalObject<[ParserArc, WriterArc]>
30+
getPid(): number
3031
kill(signal?: NodeJS.Signals): void
3132
onExit(callback: (message: string) => void): void
3233
onOutput(callback: (message: string) => void): void
@@ -92,6 +93,38 @@ export declare class NxTaskHistory {
9293
getEstimatedTaskTimings(targets: Array<TaskTarget>): Record<string, number>
9394
}
9495

96+
/**
97+
* NAPI wrapper for the process metrics collector
98+
* Provides a JavaScript-friendly interface for metrics collection
99+
*/
100+
export declare class ProcessMetricsCollector {
101+
/** Create a new metrics collector with default configuration */
102+
constructor()
103+
/**
104+
* Start metrics collection
105+
* Idempotent - safe to call multiple times
106+
*/
107+
startCollection(): void
108+
/**
109+
* Stop metrics collection
110+
* Returns true if collection was stopped, false if not running
111+
*/
112+
stopCollection(): boolean
113+
/** Register the main CLI process for metrics collection */
114+
registerMainCliProcess(pid: number): void
115+
/** Register the daemon process for metrics collection */
116+
registerDaemonProcess(pid: number): void
117+
/**
118+
* Register a process for a specific task
119+
* Automatically creates the task if it doesn't exist
120+
*/
121+
registerTaskProcess(taskId: string, pid: number): void
122+
/** Register a batch with multiple tasks sharing a worker */
123+
registerBatch(batchId: string, taskIds: Array<string>, pid: number): void
124+
/** Subscribe to push-based metrics notifications from TypeScript */
125+
subscribe(callback: (err: Error | null, event: MetricsUpdate) => void): void
126+
}
127+
95128
export declare class RunningTasksService {
96129
constructor(db: ExternalObject<NxDbConnection>)
97130
getRunningTasks(ids: Array<string>): Array<string>
@@ -153,6 +186,13 @@ export declare class WorkspaceContext {
153186
getFilesInDirectory(directory: string): Array<string>
154187
}
155188

189+
/** Batch metrics snapshot */
190+
export interface BatchMetricsSnapshot {
191+
batchId: string
192+
taskIds: Array<string>
193+
processes: Array<ProcessMetrics>
194+
}
195+
156196
export interface CachedResult {
157197
code: number
158198
terminalOutput?: string
@@ -170,6 +210,12 @@ export declare export declare function connectToNxDb(cacheDir: string, nxVersion
170210

171211
export declare export declare function copy(src: string, dest: string): number
172212

213+
/** Daemon metrics with main process and subprocesses */
214+
export interface DaemonMetrics {
215+
main: ProcessMetrics
216+
subprocesses: Array<ProcessMetrics>
217+
}
218+
173219
export interface DepsOutputsInput {
174220
dependentTasksOutputFiles: string
175221
transitive?: boolean
@@ -264,6 +310,20 @@ export declare export declare function isEditorInstalled(editor: SupportedEditor
264310

265311
export declare export declare function logDebug(message: string): void
266312

313+
/** Metrics data for collection cycle */
314+
export interface MetricsData {
315+
timestamp: number
316+
mainCliProcess?: ProcessSnapshot
317+
daemonProcesses: Array<ProcessSnapshot>
318+
tasks: Record<string, Array<ProcessSnapshot>>
319+
}
320+
321+
/** Metrics update sent every collection cycle */
322+
export interface MetricsUpdate {
323+
metrics: ProcessMetricsSnapshot
324+
metadata?: Record<string, ProcessMetadata>
325+
}
326+
267327
/** Stripped version of the NxJson interface for use in rust */
268328
export interface NxJson {
269329
namedInputs?: Record<string, Array<JsInputs>>
@@ -283,6 +343,43 @@ export interface NxWorkspaceFilesExternals {
283343

284344
export declare export declare function parseTaskStatus(stringStatus: string): TaskStatus
285345

346+
/** Process metadata (static, doesn't change during process lifetime) */
347+
export interface ProcessMetadata {
348+
ppid: number
349+
name: string
350+
command: string
351+
exePath: string
352+
cwd: string
353+
}
354+
355+
/** Process metrics (dynamic, changes every collection) */
356+
export interface ProcessMetrics {
357+
pid: number
358+
cpu: number
359+
memory: number
360+
}
361+
362+
/** Organized collection of process metrics with timestamp */
363+
export interface ProcessMetricsSnapshot {
364+
timestamp: number
365+
mainCli?: ProcessMetrics
366+
daemon?: DaemonMetrics
367+
tasks: Record<string, Array<ProcessMetrics>>
368+
batches: Record<string, BatchMetricsSnapshot>
369+
}
370+
371+
/** Process snapshot with full metadata and metrics */
372+
export interface ProcessSnapshot {
373+
pid: number
374+
ppid: number
375+
name: string
376+
command: string
377+
exePath: string
378+
cwd: string
379+
cpu: number
380+
memory: number
381+
}
382+
286383
export interface Project {
287384
root: string
288385
namedInputs?: Record<string, Array<JsInputs>>

0 commit comments

Comments
 (0)