Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions broker/broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { join } from "path";
import { homedir } from "os";
import { randomUUID } from "crypto";
import { writeMessage, createMessageReader } from "./framing.js";
import { getBrokerSocketPath } from "./paths.js";
import { getBrokerSocketPath, isTcpMode, parseTcpPort, writeBrokerPort } from "./paths.js";
import type { SessionInfo, Message, Attachment, BrokerMessage } from "../types.js";

const INTERCOM_DIR = join(homedir(), ".pi/agent/intercom");
Expand Down Expand Up @@ -112,9 +112,16 @@ class IntercomBroker {
}

start(): void {
this.server.listen(SOCKET_PATH, () => {
const listenOpts: any = isTcpMode(SOCKET_PATH)
? { port: parseTcpPort(SOCKET_PATH), host: '127.0.0.1' }
: SOCKET_PATH;

this.server.listen(listenOpts, () => {
writeFileSync(PID_PATH, String(process.pid));
console.log(`Intercom broker started (pid: ${process.pid})`);
if (isTcpMode(SOCKET_PATH)) {
writeBrokerPort(parseTcpPort(SOCKET_PATH));
}
console.log(`Intercom broker started (pid: ${process.pid}, transport: ${isTcpMode(SOCKET_PATH) ? 'TCP port ' + SOCKET_PATH : 'pipe ' + SOCKET_PATH})`);
});
process.on("SIGTERM", () => this.shutdown());
process.on("SIGINT", () => this.shutdown());
Expand Down
6 changes: 4 additions & 2 deletions broker/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { EventEmitter } from "events";
import net from "net";
import { randomUUID } from "crypto";
import { writeMessage, createMessageReader } from "./framing.js";
import { getBrokerSocketPath } from "./paths.js";
import { getBrokerSocketPath, isTcpMode, parseTcpPort } from "./paths.js";
import type { SessionInfo, Message, Attachment } from "../types.js";

const BROKER_SOCKET = getBrokerSocketPath();
Expand Down Expand Up @@ -155,7 +155,9 @@ export class IntercomClient extends EventEmitter {
}

return new Promise((resolve, reject) => {
const socket = net.connect(BROKER_SOCKET);
const socket = isTcpMode(BROKER_SOCKET)
? net.connect({ port: parseTcpPort(BROKER_SOCKET), host: '127.0.0.1' })
: net.connect(BROKER_SOCKET);
this.socket = socket;
this.disconnectError = null;
let settled = false;
Expand Down
72 changes: 67 additions & 5 deletions broker/paths.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,82 @@
import { join } from "path";
import { homedir } from "os";
import { writeFileSync, readFileSync, existsSync } from "fs";

function sanitizePipeSegment(value: string): string {
return value
.replace(/[^a-zA-Z0-9]+/g, "-")
.replace(/^-+|-+$/g, "")
.toLowerCase() || "default";
const INTERCOM_DIR = join(homedir(), ".pi/agent/intercom");
const PORT_FILE = join(INTERCOM_DIR, "broker.port");

/**
* Check if TCP mode is forced via env var or config.
*/
function shouldUseTcp(platform: NodeJS.Platform = process.platform): boolean {
// Force TCP if env var is set
if (process.env.PI_INTERCOM_TCP === "1") return true;
// Force TCP on Windows if named pipes are blocked
if (platform === "win32") return true;
return false;
}

/**
* Get the TCP port for the broker.
* Reads from port file if exists, otherwise uses a fixed port.
*/
function getTcpPort(): number {
const FIXED_PORT = 19315;
try {
if (existsSync(PORT_FILE)) {
const port = parseInt(readFileSync(PORT_FILE, "utf-8").trim(), 10);
if (Number.isFinite(port) && port > 0 && port < 65536) return port;
}
} catch {}
return FIXED_PORT;
}

/**
* Write the TCP port to the port file so clients can discover it.
*/
export function writeBrokerPort(port: number): void {
writeFileSync(PORT_FILE, String(port), "utf-8");
}

/**
* Read the broker TCP port from the port file.
*/
export function readBrokerPort(): number {
return getTcpPort();
}

export function getBrokerSocketPath(
platform: NodeJS.Platform = process.platform,
homeDir: string = homedir(),
): string {
if (shouldUseTcp(platform)) {
return String(getTcpPort());
}

// Original Unix named pipe / Windows named pipe logic
if (platform === "win32") {
function sanitizePipeSegment(value: string): string {
return value
.replace(/[^a-zA-Z0-9]+/g, "-")
.replace(/^-+|-+$/g, "")
.toLowerCase() || "default";
}
return `\\\\.\\pipe\\pi-intercom-${sanitizePipeSegment(homeDir)}`;
}

return join(homeDir, ".pi/agent/intercom/broker.sock");
}

/**
* Check if the socket path is a TCP port (numeric string).
*/
export function isTcpMode(socketPath: string): boolean {
return /^\d+$/.test(socketPath);
}

/**
* Get the TCP port from a socket path that is a numeric string.
*/
export function parseTcpPort(socketPath: string): number {
return parseInt(socketPath, 10);
}
6 changes: 4 additions & 2 deletions broker/spawn.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { join, dirname } from "path";
import { fileURLToPath } from "url";
import { homedir } from "os";
import net from "net";
import { getBrokerSocketPath } from "./paths.js";
import { getBrokerSocketPath, isTcpMode, parseTcpPort } from "./paths.js";

const INTERCOM_DIR = join(homedir(), ".pi/agent/intercom");
const EXTENSION_DIR = join(dirname(fileURLToPath(import.meta.url)), "..");
Expand Down Expand Up @@ -212,7 +212,9 @@ async function isBrokerRunning(): Promise<boolean> {

function checkSocketConnectable(): Promise<boolean> {
return new Promise((resolve) => {
const socket = net.connect(BROKER_SOCKET);
const socket = isTcpMode(BROKER_SOCKET)
? net.connect({ port: parseTcpPort(BROKER_SOCKET), host: '127.0.0.1' })
: net.connect(BROKER_SOCKET);
const finish = (isConnected: boolean) => {
clearTimeout(timeout);
socket.off("connect", onConnect);
Expand Down