Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions helm/siclaw/templates/gateway-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@ spec:
{{- include "siclaw.selectorLabels" (dict "ctx" . "component" "gateway") | nindent 8 }}
spec:
serviceAccountName: {{ include "siclaw.fullname" . }}-gateway
{{- if gt (int .Values.gateway.replicas) 1 }}
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchLabels:
{{- include "siclaw.selectorLabels" (dict "ctx" . "component" "gateway") | nindent 20 }}
topologyKey: kubernetes.io/hostname
{{- end }}
containers:
- name: gateway
image: {{ include "siclaw.image" (dict "component" "gateway" "ctx" .) }}
Expand Down Expand Up @@ -52,6 +63,14 @@ spec:
{{- end }}
- name: SICLAW_SKILLS_DIR
value: ".siclaw/skills"
- name: SICLAW_POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: SICLAW_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: SICLAW_GATEWAY_HOSTNAME
value: "{{ include "siclaw.fullname" . }}-gateway.{{ .Values.namespace }}.svc.cluster.local"
{{- if .Values.database.existingSecret.name }}
Expand Down
6 changes: 6 additions & 0 deletions helm/siclaw/templates/gateway-service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ metadata:
{{- include "siclaw.labels" (dict "ctx" . "component" "gateway") | nindent 4 }}
spec:
type: {{ .Values.gateway.service.type }}
{{- if gt (int .Values.gateway.replicas) 1 }}
sessionAffinity: ClientIP
sessionAffinityConfig:
clientIP:
timeoutSeconds: {{ .Values.gateway.sessionAffinityTimeout | default 10800 }}
{{- end }}
ports:
- port: {{ .Values.gateway.service.port }}
targetPort: http
Expand Down
1 change: 1 addition & 0 deletions helm/siclaw/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ image:
# -- Gateway
gateway:
replicas: 1
sessionAffinityTimeout: 10800 # 3 hours, only effective when replicas > 1
service:
type: NodePort
port: 80
Expand Down
7 changes: 7 additions & 0 deletions src/gateway-main.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type http from "node:http";
import os from "node:os";
import { loadGatewayConfig } from "./gateway/config.js";
import { startGateway } from "./gateway/server.js";
import { AgentBoxManager, LocalSpawner, K8sSpawner, ProcessSpawner } from "./gateway/agentbox/index.js";
Expand Down Expand Up @@ -56,12 +57,18 @@ try {
// Internal plugin not available — running in open-source mode
}

// Instance ID for multi-replica cron coordination (K8s mode only)
const instanceId = useK8s
? (process.env.SICLAW_POD_NAME || os.hostname())
: undefined;

// Start HTTP + WebSocket server
const gateway = await startGateway({
config,
agentBoxManager,
spawner,
extraHttpHandlers,
instanceId,
});

// --- Channel subsystem ---
Expand Down
151 changes: 142 additions & 9 deletions src/gateway/cron/cron-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
* Replaces the standalone cron process. Runs timers directly in the
* Gateway process and calls configRepo for DB access (no HTTP proxy).
* Job execution delegates to /api/internal/agent-prompt on localhost.
*
* Multi-replica coordination: when `instanceId` is set (K8s mode),
* each replica registers as a cron instance, heartbeats, and only
* schedules jobs assigned to it. Single-instance mode (local/dev)
* skips coordination and schedules all active jobs.
*/

import crypto from "node:crypto";
Expand All @@ -20,10 +25,15 @@ const SESSION_SOFT_DELETE_DAYS = 180;
const SESSION_HARD_DELETE_DAYS = 30;
const STATS_RETENTION_DAYS = 90;

// Coordination constants (tuned values from old CronCoordinator — commit 7c41fa6)
const RECONCILE_INTERVAL_MS = 15_000; // 15s (includes heartbeat)
const DEAD_THRESHOLD_MS = 30_000; // 30s = 2× reconcile interval

export type SendToUserFn = (userId: string, event: string, payload: Record<string, unknown>) => void;

export class CronService {
readonly scheduler: CronScheduler;
readonly instanceId: string | undefined;
/** Channel notification callback — set by gateway-main after construction */
onNotify?: (data: { userId: string; jobName: string; result: string; resultText: string; error?: string }) => void;

Expand All @@ -34,30 +44,41 @@ export class CronService {
private staleLockTimer: NodeJS.Timeout | null = null;
private purgeTimer: NodeJS.Timeout | null = null;
private sessionPurgeTimer: NodeJS.Timeout | null = null;
private reconcileTimer: NodeJS.Timeout | null = null;

constructor(opts: {
configRepo: ConfigRepository;
notifRepo: NotificationRepository;
sendToUser: SendToUserFn;
gatewayPort: number;
instanceId?: string;
}) {
this.configRepo = opts.configRepo;
this.notifRepo = opts.notifRepo;
this.sendToUser = opts.sendToUser;
this.gatewayPort = opts.gatewayPort;
this.instanceId = opts.instanceId;
this.scheduler = new CronScheduler((job) => this.execute(job));
}

/** Load all active jobs and start background timers */
async start(): Promise<void> {
// Load all active jobs
const jobs = await this.configRepo.listAllActiveCronJobs();
for (const job of jobs) {
this.scheduler.addOrUpdate(job as CronJobRow);
// Register this instance (multi-replica mode only)
if (this.instanceId) {
const endpoint = `http://${process.env.SICLAW_POD_IP || "localhost"}:${this.gatewayPort}`;
await this.configRepo.registerCronInstance(this.instanceId, endpoint);
}
console.log(`[cron-service] Loaded ${jobs.length} active jobs`);

// Stale lock cleanup every 6 minutes
// Initial reconcile — loads jobs (all in single-instance, assigned in multi)
await this.reconcile();

// Reconcile loop (15s — includes heartbeat in multi-replica mode)
if (this.instanceId) {
this.reconcileTimer = setInterval(() => this.reconcileTick(), RECONCILE_INTERVAL_MS);
this.reconcileTimer.unref();
}

// Stale lock cleanup every 6 minutes (single-instance keeps this too)
this.staleLockTimer = setInterval(async () => {
try {
await this.configRepo.clearStaleLocks(STALE_LOCK_CLEANUP_MS);
Expand All @@ -77,9 +98,99 @@ export class CronService {
this.sessionPurgeTimer = setInterval(() => this.purgeSessions(), PURGE_INTERVAL_MS);
this.sessionPurgeTimer.unref();

console.log("[cron-service] Started");
console.log(`[cron-service] Started${this.instanceId ? ` (instance=${this.instanceId})` : ""}`);
}

// ── Coordination (ported from old CronCoordinator — commit 7c41fa6) ───

/** Main reconcile loop — single-instance fast path or full coordination */
private async reconcile(): Promise<void> {
if (!this.instanceId) {
// Single-instance: schedule everything (backward compatible)
const jobs = await this.configRepo.listAllActiveCronJobs();
for (const job of jobs) this.scheduler.addOrUpdate(job as CronJobRow);
console.log(`[cron-service] Loaded ${jobs.length} active jobs`);
return;
}

// 0. Heartbeat
await this.configRepo.updateHeartbeat(this.instanceId, this.scheduler.jobCount);

// 1. Clear stale execution locks
await this.configRepo.clearStaleLocks(STALE_LOCK_CLEANUP_MS);

// 2. Detect dead instances → reassign orphaned jobs
const dead = await this.configRepo.getDeadInstances(DEAD_THRESHOLD_MS);
for (const inst of dead) {
const target = await this.configRepo.getLeastLoadedInstance(DEAD_THRESHOLD_MS);
if (target) {
await this.configRepo.reassignOrphanedJobs(inst.instanceId, target.instanceId);
console.log(`[cron-service] Reassigned jobs from dead instance ${inst.instanceId} → ${target.instanceId}`);
}
await this.configRepo.deleteInstance(inst.instanceId);
}

// 3. Cancel stale local timers (DB deleted/paused/reassigned)
await this.cancelStaleJobs();

// 4. Claim unassigned jobs (only if we are least-loaded)
await this.claimUnassignedJobs();

// 5. Sync: load DB-assigned jobs missing from local scheduler
await this.syncAssignedJobs();
}

/** Claim unassigned active jobs — only if this instance is least-loaded */
private async claimUnassignedJobs(): Promise<void> {
const unassigned = await this.configRepo.getUnassignedActiveJobs();
if (!unassigned.length) return;

for (const job of unassigned) {
const leastLoaded = await this.configRepo.getLeastLoadedInstance(DEAD_THRESHOLD_MS);
if (!leastLoaded) break;
if (leastLoaded.instanceId !== this.instanceId!) break; // not least-loaded, let another instance claim

const claimed = await this.configRepo.claimUnassignedJob(job.id, this.instanceId!);
if (!claimed) continue; // another instance claimed first

await this.configRepo.updateHeartbeat(this.instanceId!, this.scheduler.jobCount + 1);
this.scheduler.addOrUpdate(job as CronJobRow);
console.log(`[cron-service] Claimed job ${job.id} (${job.name})`);
}
}

/** Cancel local timers for jobs that are deleted/paused/reassigned in DB */
private async cancelStaleJobs(): Promise<void> {
for (const jobId of this.scheduler.scheduledJobIds) {
const dbJob = await this.configRepo.getCronJobById(jobId);
if (!dbJob || dbJob.status !== "active" || dbJob.assignedTo !== this.instanceId) {
this.scheduler.cancel(jobId);
}
}
}

/** Load DB-assigned jobs that are missing from local scheduler */
private async syncAssignedJobs(): Promise<void> {
const dbJobs = await this.configRepo.listCronJobsByInstance(this.instanceId!);
const scheduled = new Set(this.scheduler.scheduledJobIds);
for (const job of dbJobs) {
if (!scheduled.has(job.id)) {
this.scheduler.addOrUpdate(job as CronJobRow);
}
}
}

/** Timer callback for reconcile loop */
private async reconcileTick(): Promise<void> {
try {
await this.reconcile();
} catch (err) {
console.warn("[cron-service] Reconcile failed:", err instanceof Error ? err.message : err);
}
}

// ── Job execution ─────────────────────────────────────────────────────

/** Execute a single cron job */
private async execute(job: CronJobRow): Promise<void> {
// Re-validate from DB
Expand All @@ -89,6 +200,13 @@ export class CronService {
return;
}

// In multi-replica mode, verify we still own this job
if (this.instanceId && current.assignedTo !== this.instanceId) {
console.log(`[cron-service] Job ${job.id} reassigned to ${current.assignedTo}, skipping`);
this.scheduler.cancel(job.id);
return;
}

// Soft concurrent-execution limit — skip this run, retry next cycle
const executing = await this.configRepo.countCurrentlyExecutingJobs();
if (executing >= CRON_LIMITS.MAX_CONCURRENT_EXECUTIONS) {
Expand Down Expand Up @@ -225,8 +343,8 @@ export class CronService {
this.scheduler.cancel(jobId);
}

/** Stop all timers and clean up */
stop(): void {
/** Stop all timers and clean up. Deregisters instance in multi-replica mode. */
async stop(): Promise<void> {
this.scheduler.stop();
if (this.staleLockTimer) {
clearInterval(this.staleLockTimer);
Expand All @@ -240,6 +358,21 @@ export class CronService {
clearInterval(this.sessionPurgeTimer);
this.sessionPurgeTimer = null;
}
if (this.reconcileTimer) {
clearInterval(this.reconcileTimer);
this.reconcileTimer = null;
}

// Deregister instance and release jobs (multi-replica only)
if (this.instanceId) {
try {
await this.configRepo.releaseInstanceJobs(this.instanceId);
await this.configRepo.deleteInstance(this.instanceId);
} catch (err) {
console.warn("[cron-service] Deregister failed:", err instanceof Error ? err.message : err);
}
}

console.log("[cron-service] Stopped");
}

Expand Down
23 changes: 20 additions & 3 deletions src/gateway/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,12 @@ export interface StartGatewayOptions {
spawner?: import("./agentbox/spawner.js").BoxSpawner;
extraRpcMethods?: Map<string, RpcHandler>;
extraHttpHandlers?: Map<string, (req: http.IncomingMessage, res: http.ServerResponse) => void>;
/** Pod name for multi-replica cron coordination. undefined = single-instance mode. */
instanceId?: string;
}

export async function startGateway(opts: StartGatewayOptions): Promise<GatewayServer> {
const { config, agentBoxManager, spawner, extraRpcMethods, extraHttpHandlers } = opts;
const { config, agentBoxManager, spawner, extraRpcMethods, extraHttpHandlers, instanceId } = opts;

// Track users with active SSE prompt streams (web UI)
const activePromptUsers = new Set<string>();
Expand Down Expand Up @@ -163,7 +165,7 @@ export async function startGateway(opts: StartGatewayOptions): Promise<GatewaySe

// In-process cron service (replaces standalone cron process)
const cronService = (configRepo && notifRepo)
? new CronService({ configRepo, notifRepo, sendToUser, gatewayPort: config.port })
? new CronService({ configRepo, notifRepo, sendToUser, gatewayPort: config.port, instanceId })
: null;

// System config repo (used by JWT, SSO, cert-manager, metrics cache, etc.)
Expand Down Expand Up @@ -302,6 +304,20 @@ export async function startGateway(opts: StartGatewayOptions): Promise<GatewaySe
}

await refreshSsoCache();

// Config cache TTL — poll DB every 30s so non-local replicas pick up changes
const CONFIG_CACHE_TTL_MS = 30_000;
const configCacheTimer = setInterval(async () => {
try {
await refreshCspCache();
await refreshMetricsConfig();
await refreshSsoCache();
} catch (err) {
console.warn("[gateway] Config cache refresh failed:", err instanceof Error ? err.message : err);
}
}, CONFIG_CACHE_TTL_MS);
configCacheTimer.unref();

if (cachedOAuth2Config) {
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion -- TS can't narrow let vars captured by closures
const cfg = cachedOAuth2Config as OAuth2Config;
Expand Down Expand Up @@ -1188,7 +1204,8 @@ export async function startGateway(opts: StartGatewayOptions): Promise<GatewaySe
buildCredentialPayload,
agentBoxTlsOptions,
async close() {
cronService?.stop();
clearInterval(configCacheTimer);
await cronService?.stop();
bindCodeStore.dispose();
await agentBoxManager.cleanup();
for (const ws of clients) {
Expand Down
Loading