diff --git a/broker/broker.ts b/broker/broker.ts index a2e1cc5..785e9cf 100644 --- a/broker/broker.ts +++ b/broker/broker.ts @@ -4,7 +4,7 @@ import { join } from "path"; import { homedir } from "os"; import { randomUUID } from "crypto"; import { writeMessage, createMessageReader } from "./framing.js"; -import { getBrokerSocketPath } from "./paths.js"; +import { getBrokerSocketPath, isTcpMode, parseTcpPort, writeBrokerPort } from "./paths.js"; import type { SessionInfo, Message, Attachment, BrokerMessage } from "../types.js"; const INTERCOM_DIR = join(homedir(), ".pi/agent/intercom"); @@ -112,9 +112,16 @@ class IntercomBroker { } start(): void { - this.server.listen(SOCKET_PATH, () => { + const listenOpts: any = isTcpMode(SOCKET_PATH) + ? { port: parseTcpPort(SOCKET_PATH), host: '127.0.0.1' } + : SOCKET_PATH; + + this.server.listen(listenOpts, () => { writeFileSync(PID_PATH, String(process.pid)); - console.log(`Intercom broker started (pid: ${process.pid})`); + if (isTcpMode(SOCKET_PATH)) { + writeBrokerPort(parseTcpPort(SOCKET_PATH)); + } + console.log(`Intercom broker started (pid: ${process.pid}, transport: ${isTcpMode(SOCKET_PATH) ? 'TCP port ' + SOCKET_PATH : 'pipe ' + SOCKET_PATH})`); }); process.on("SIGTERM", () => this.shutdown()); process.on("SIGINT", () => this.shutdown()); diff --git a/broker/client.ts b/broker/client.ts index a6647a7..4bba4b8 100644 --- a/broker/client.ts +++ b/broker/client.ts @@ -2,7 +2,7 @@ import { EventEmitter } from "events"; import net from "net"; import { randomUUID } from "crypto"; import { writeMessage, createMessageReader } from "./framing.js"; -import { getBrokerSocketPath } from "./paths.js"; +import { getBrokerSocketPath, isTcpMode, parseTcpPort } from "./paths.js"; import type { SessionInfo, Message, Attachment } from "../types.js"; const BROKER_SOCKET = getBrokerSocketPath(); @@ -155,7 +155,9 @@ export class IntercomClient extends EventEmitter { } return new Promise((resolve, reject) => { - const socket = net.connect(BROKER_SOCKET); + const socket = isTcpMode(BROKER_SOCKET) + ? net.connect({ port: parseTcpPort(BROKER_SOCKET), host: '127.0.0.1' }) + : net.connect(BROKER_SOCKET); this.socket = socket; this.disconnectError = null; let settled = false; diff --git a/broker/paths.ts b/broker/paths.ts index 981e69f..e72e96b 100644 --- a/broker/paths.ts +++ b/broker/paths.ts @@ -1,20 +1,82 @@ import { join } from "path"; import { homedir } from "os"; +import { writeFileSync, readFileSync, existsSync } from "fs"; -function sanitizePipeSegment(value: string): string { - return value - .replace(/[^a-zA-Z0-9]+/g, "-") - .replace(/^-+|-+$/g, "") - .toLowerCase() || "default"; +const INTERCOM_DIR = join(homedir(), ".pi/agent/intercom"); +const PORT_FILE = join(INTERCOM_DIR, "broker.port"); + +/** + * Check if TCP mode is forced via env var or config. + */ +function shouldUseTcp(platform: NodeJS.Platform = process.platform): boolean { + // Force TCP if env var is set + if (process.env.PI_INTERCOM_TCP === "1") return true; + // Force TCP on Windows if named pipes are blocked + if (platform === "win32") return true; + return false; +} + +/** + * Get the TCP port for the broker. + * Reads from port file if exists, otherwise uses a fixed port. + */ +function getTcpPort(): number { + const FIXED_PORT = 19315; + try { + if (existsSync(PORT_FILE)) { + const port = parseInt(readFileSync(PORT_FILE, "utf-8").trim(), 10); + if (Number.isFinite(port) && port > 0 && port < 65536) return port; + } + } catch {} + return FIXED_PORT; +} + +/** + * Write the TCP port to the port file so clients can discover it. + */ +export function writeBrokerPort(port: number): void { + writeFileSync(PORT_FILE, String(port), "utf-8"); +} + +/** + * Read the broker TCP port from the port file. + */ +export function readBrokerPort(): number { + return getTcpPort(); } export function getBrokerSocketPath( platform: NodeJS.Platform = process.platform, homeDir: string = homedir(), ): string { + if (shouldUseTcp(platform)) { + return String(getTcpPort()); + } + + // Original Unix named pipe / Windows named pipe logic if (platform === "win32") { + function sanitizePipeSegment(value: string): string { + return value + .replace(/[^a-zA-Z0-9]+/g, "-") + .replace(/^-+|-+$/g, "") + .toLowerCase() || "default"; + } return `\\\\.\\pipe\\pi-intercom-${sanitizePipeSegment(homeDir)}`; } return join(homeDir, ".pi/agent/intercom/broker.sock"); } + +/** + * Check if the socket path is a TCP port (numeric string). + */ +export function isTcpMode(socketPath: string): boolean { + return /^\d+$/.test(socketPath); +} + +/** + * Get the TCP port from a socket path that is a numeric string. + */ +export function parseTcpPort(socketPath: string): number { + return parseInt(socketPath, 10); +} \ No newline at end of file diff --git a/broker/spawn.ts b/broker/spawn.ts index 24296be..b04f716 100644 --- a/broker/spawn.ts +++ b/broker/spawn.ts @@ -4,7 +4,7 @@ import { join, dirname } from "path"; import { fileURLToPath } from "url"; import { homedir } from "os"; import net from "net"; -import { getBrokerSocketPath } from "./paths.js"; +import { getBrokerSocketPath, isTcpMode, parseTcpPort } from "./paths.js"; const INTERCOM_DIR = join(homedir(), ".pi/agent/intercom"); const EXTENSION_DIR = join(dirname(fileURLToPath(import.meta.url)), ".."); @@ -212,7 +212,9 @@ async function isBrokerRunning(): Promise { function checkSocketConnectable(): Promise { return new Promise((resolve) => { - const socket = net.connect(BROKER_SOCKET); + const socket = isTcpMode(BROKER_SOCKET) + ? net.connect({ port: parseTcpPort(BROKER_SOCKET), host: '127.0.0.1' }) + : net.connect(BROKER_SOCKET); const finish = (isConnected: boolean) => { clearTimeout(timeout); socket.off("connect", onConnect);