diff --git a/helm/siclaw/templates/gateway-deployment.yaml b/helm/siclaw/templates/gateway-deployment.yaml index 72872e89..27f92ab8 100644 --- a/helm/siclaw/templates/gateway-deployment.yaml +++ b/helm/siclaw/templates/gateway-deployment.yaml @@ -16,6 +16,17 @@ spec: {{- include "siclaw.selectorLabels" (dict "ctx" . "component" "gateway") | nindent 8 }} spec: serviceAccountName: {{ include "siclaw.fullname" . }}-gateway + {{- if gt (int .Values.gateway.replicas) 1 }} + affinity: + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - weight: 100 + podAffinityTerm: + labelSelector: + matchLabels: + {{- include "siclaw.selectorLabels" (dict "ctx" . "component" "gateway") | nindent 20 }} + topologyKey: kubernetes.io/hostname + {{- end }} containers: - name: gateway image: {{ include "siclaw.image" (dict "component" "gateway" "ctx" .) }} @@ -52,6 +63,14 @@ spec: {{- end }} - name: SICLAW_SKILLS_DIR value: ".siclaw/skills" + - name: SICLAW_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: SICLAW_POD_IP + valueFrom: + fieldRef: + fieldPath: status.podIP - name: SICLAW_GATEWAY_HOSTNAME value: "{{ include "siclaw.fullname" . }}-gateway.{{ .Values.namespace }}.svc.cluster.local" {{- if .Values.database.existingSecret.name }} diff --git a/helm/siclaw/templates/gateway-service.yaml b/helm/siclaw/templates/gateway-service.yaml index ba377701..83a32be0 100644 --- a/helm/siclaw/templates/gateway-service.yaml +++ b/helm/siclaw/templates/gateway-service.yaml @@ -7,6 +7,12 @@ metadata: {{- include "siclaw.labels" (dict "ctx" . "component" "gateway") | nindent 4 }} spec: type: {{ .Values.gateway.service.type }} + {{- if gt (int .Values.gateway.replicas) 1 }} + sessionAffinity: ClientIP + sessionAffinityConfig: + clientIP: + timeoutSeconds: {{ .Values.gateway.sessionAffinityTimeout | default 10800 }} + {{- end }} ports: - port: {{ .Values.gateway.service.port }} targetPort: http diff --git a/helm/siclaw/values.yaml b/helm/siclaw/values.yaml index 46b3d9ee..6d2137b8 100644 --- a/helm/siclaw/values.yaml +++ b/helm/siclaw/values.yaml @@ -12,6 +12,7 @@ image: # -- Gateway gateway: replicas: 1 + sessionAffinityTimeout: 10800 # 3 hours, only effective when replicas > 1 service: type: NodePort port: 80 diff --git a/src/gateway-main.ts b/src/gateway-main.ts index 5c11eeff..98c02730 100644 --- a/src/gateway-main.ts +++ b/src/gateway-main.ts @@ -1,4 +1,5 @@ import type http from "node:http"; +import os from "node:os"; import { loadGatewayConfig } from "./gateway/config.js"; import { startGateway } from "./gateway/server.js"; import { AgentBoxManager, LocalSpawner, K8sSpawner, ProcessSpawner } from "./gateway/agentbox/index.js"; @@ -56,12 +57,18 @@ try { // Internal plugin not available — running in open-source mode } +// Instance ID for multi-replica cron coordination (K8s mode only) +const instanceId = useK8s + ? (process.env.SICLAW_POD_NAME || os.hostname()) + : undefined; + // Start HTTP + WebSocket server const gateway = await startGateway({ config, agentBoxManager, spawner, extraHttpHandlers, + instanceId, }); // --- Channel subsystem --- diff --git a/src/gateway/cron/cron-service.ts b/src/gateway/cron/cron-service.ts index cbd48e6d..6e5c53bc 100644 --- a/src/gateway/cron/cron-service.ts +++ b/src/gateway/cron/cron-service.ts @@ -4,6 +4,11 @@ * Replaces the standalone cron process. Runs timers directly in the * Gateway process and calls configRepo for DB access (no HTTP proxy). * Job execution delegates to /api/internal/agent-prompt on localhost. + * + * Multi-replica coordination: when `instanceId` is set (K8s mode), + * each replica registers as a cron instance, heartbeats, and only + * schedules jobs assigned to it. Single-instance mode (local/dev) + * skips coordination and schedules all active jobs. */ import crypto from "node:crypto"; @@ -20,10 +25,15 @@ const SESSION_SOFT_DELETE_DAYS = 180; const SESSION_HARD_DELETE_DAYS = 30; const STATS_RETENTION_DAYS = 90; +// Coordination constants (tuned values from old CronCoordinator — commit 7c41fa6) +const RECONCILE_INTERVAL_MS = 15_000; // 15s (includes heartbeat) +const DEAD_THRESHOLD_MS = 30_000; // 30s = 2× reconcile interval + export type SendToUserFn = (userId: string, event: string, payload: Record) => void; export class CronService { readonly scheduler: CronScheduler; + readonly instanceId: string | undefined; /** Channel notification callback — set by gateway-main after construction */ onNotify?: (data: { userId: string; jobName: string; result: string; resultText: string; error?: string }) => void; @@ -34,30 +44,41 @@ export class CronService { private staleLockTimer: NodeJS.Timeout | null = null; private purgeTimer: NodeJS.Timeout | null = null; private sessionPurgeTimer: NodeJS.Timeout | null = null; + private reconcileTimer: NodeJS.Timeout | null = null; constructor(opts: { configRepo: ConfigRepository; notifRepo: NotificationRepository; sendToUser: SendToUserFn; gatewayPort: number; + instanceId?: string; }) { this.configRepo = opts.configRepo; this.notifRepo = opts.notifRepo; this.sendToUser = opts.sendToUser; this.gatewayPort = opts.gatewayPort; + this.instanceId = opts.instanceId; this.scheduler = new CronScheduler((job) => this.execute(job)); } /** Load all active jobs and start background timers */ async start(): Promise { - // Load all active jobs - const jobs = await this.configRepo.listAllActiveCronJobs(); - for (const job of jobs) { - this.scheduler.addOrUpdate(job as CronJobRow); + // Register this instance (multi-replica mode only) + if (this.instanceId) { + const endpoint = `http://${process.env.SICLAW_POD_IP || "localhost"}:${this.gatewayPort}`; + await this.configRepo.registerCronInstance(this.instanceId, endpoint); } - console.log(`[cron-service] Loaded ${jobs.length} active jobs`); - // Stale lock cleanup every 6 minutes + // Initial reconcile — loads jobs (all in single-instance, assigned in multi) + await this.reconcile(); + + // Reconcile loop (15s — includes heartbeat in multi-replica mode) + if (this.instanceId) { + this.reconcileTimer = setInterval(() => this.reconcileTick(), RECONCILE_INTERVAL_MS); + this.reconcileTimer.unref(); + } + + // Stale lock cleanup every 6 minutes (single-instance keeps this too) this.staleLockTimer = setInterval(async () => { try { await this.configRepo.clearStaleLocks(STALE_LOCK_CLEANUP_MS); @@ -77,9 +98,99 @@ export class CronService { this.sessionPurgeTimer = setInterval(() => this.purgeSessions(), PURGE_INTERVAL_MS); this.sessionPurgeTimer.unref(); - console.log("[cron-service] Started"); + console.log(`[cron-service] Started${this.instanceId ? ` (instance=${this.instanceId})` : ""}`); + } + + // ── Coordination (ported from old CronCoordinator — commit 7c41fa6) ─── + + /** Main reconcile loop — single-instance fast path or full coordination */ + private async reconcile(): Promise { + if (!this.instanceId) { + // Single-instance: schedule everything (backward compatible) + const jobs = await this.configRepo.listAllActiveCronJobs(); + for (const job of jobs) this.scheduler.addOrUpdate(job as CronJobRow); + console.log(`[cron-service] Loaded ${jobs.length} active jobs`); + return; + } + + // 0. Heartbeat + await this.configRepo.updateHeartbeat(this.instanceId, this.scheduler.jobCount); + + // 1. Clear stale execution locks + await this.configRepo.clearStaleLocks(STALE_LOCK_CLEANUP_MS); + + // 2. Detect dead instances → reassign orphaned jobs + const dead = await this.configRepo.getDeadInstances(DEAD_THRESHOLD_MS); + for (const inst of dead) { + const target = await this.configRepo.getLeastLoadedInstance(DEAD_THRESHOLD_MS); + if (target) { + await this.configRepo.reassignOrphanedJobs(inst.instanceId, target.instanceId); + console.log(`[cron-service] Reassigned jobs from dead instance ${inst.instanceId} → ${target.instanceId}`); + } + await this.configRepo.deleteInstance(inst.instanceId); + } + + // 3. Cancel stale local timers (DB deleted/paused/reassigned) + await this.cancelStaleJobs(); + + // 4. Claim unassigned jobs (only if we are least-loaded) + await this.claimUnassignedJobs(); + + // 5. Sync: load DB-assigned jobs missing from local scheduler + await this.syncAssignedJobs(); + } + + /** Claim unassigned active jobs — only if this instance is least-loaded */ + private async claimUnassignedJobs(): Promise { + const unassigned = await this.configRepo.getUnassignedActiveJobs(); + if (!unassigned.length) return; + + for (const job of unassigned) { + const leastLoaded = await this.configRepo.getLeastLoadedInstance(DEAD_THRESHOLD_MS); + if (!leastLoaded) break; + if (leastLoaded.instanceId !== this.instanceId!) break; // not least-loaded, let another instance claim + + const claimed = await this.configRepo.claimUnassignedJob(job.id, this.instanceId!); + if (!claimed) continue; // another instance claimed first + + await this.configRepo.updateHeartbeat(this.instanceId!, this.scheduler.jobCount + 1); + this.scheduler.addOrUpdate(job as CronJobRow); + console.log(`[cron-service] Claimed job ${job.id} (${job.name})`); + } + } + + /** Cancel local timers for jobs that are deleted/paused/reassigned in DB */ + private async cancelStaleJobs(): Promise { + for (const jobId of this.scheduler.scheduledJobIds) { + const dbJob = await this.configRepo.getCronJobById(jobId); + if (!dbJob || dbJob.status !== "active" || dbJob.assignedTo !== this.instanceId) { + this.scheduler.cancel(jobId); + } + } } + /** Load DB-assigned jobs that are missing from local scheduler */ + private async syncAssignedJobs(): Promise { + const dbJobs = await this.configRepo.listCronJobsByInstance(this.instanceId!); + const scheduled = new Set(this.scheduler.scheduledJobIds); + for (const job of dbJobs) { + if (!scheduled.has(job.id)) { + this.scheduler.addOrUpdate(job as CronJobRow); + } + } + } + + /** Timer callback for reconcile loop */ + private async reconcileTick(): Promise { + try { + await this.reconcile(); + } catch (err) { + console.warn("[cron-service] Reconcile failed:", err instanceof Error ? err.message : err); + } + } + + // ── Job execution ───────────────────────────────────────────────────── + /** Execute a single cron job */ private async execute(job: CronJobRow): Promise { // Re-validate from DB @@ -89,6 +200,13 @@ export class CronService { return; } + // In multi-replica mode, verify we still own this job + if (this.instanceId && current.assignedTo !== this.instanceId) { + console.log(`[cron-service] Job ${job.id} reassigned to ${current.assignedTo}, skipping`); + this.scheduler.cancel(job.id); + return; + } + // Soft concurrent-execution limit — skip this run, retry next cycle const executing = await this.configRepo.countCurrentlyExecutingJobs(); if (executing >= CRON_LIMITS.MAX_CONCURRENT_EXECUTIONS) { @@ -225,8 +343,8 @@ export class CronService { this.scheduler.cancel(jobId); } - /** Stop all timers and clean up */ - stop(): void { + /** Stop all timers and clean up. Deregisters instance in multi-replica mode. */ + async stop(): Promise { this.scheduler.stop(); if (this.staleLockTimer) { clearInterval(this.staleLockTimer); @@ -240,6 +358,21 @@ export class CronService { clearInterval(this.sessionPurgeTimer); this.sessionPurgeTimer = null; } + if (this.reconcileTimer) { + clearInterval(this.reconcileTimer); + this.reconcileTimer = null; + } + + // Deregister instance and release jobs (multi-replica only) + if (this.instanceId) { + try { + await this.configRepo.releaseInstanceJobs(this.instanceId); + await this.configRepo.deleteInstance(this.instanceId); + } catch (err) { + console.warn("[cron-service] Deregister failed:", err instanceof Error ? err.message : err); + } + } + console.log("[cron-service] Stopped"); } diff --git a/src/gateway/server.ts b/src/gateway/server.ts index ffc1ab81..100f4fc9 100644 --- a/src/gateway/server.ts +++ b/src/gateway/server.ts @@ -122,10 +122,12 @@ export interface StartGatewayOptions { spawner?: import("./agentbox/spawner.js").BoxSpawner; extraRpcMethods?: Map; extraHttpHandlers?: Map void>; + /** Pod name for multi-replica cron coordination. undefined = single-instance mode. */ + instanceId?: string; } export async function startGateway(opts: StartGatewayOptions): Promise { - const { config, agentBoxManager, spawner, extraRpcMethods, extraHttpHandlers } = opts; + const { config, agentBoxManager, spawner, extraRpcMethods, extraHttpHandlers, instanceId } = opts; // Track users with active SSE prompt streams (web UI) const activePromptUsers = new Set(); @@ -163,7 +165,7 @@ export async function startGateway(opts: StartGatewayOptions): Promise { + try { + await refreshCspCache(); + await refreshMetricsConfig(); + await refreshSsoCache(); + } catch (err) { + console.warn("[gateway] Config cache refresh failed:", err instanceof Error ? err.message : err); + } + }, CONFIG_CACHE_TTL_MS); + configCacheTimer.unref(); + if (cachedOAuth2Config) { // eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion -- TS can't narrow let vars captured by closures const cfg = cachedOAuth2Config as OAuth2Config; @@ -1188,7 +1204,8 @@ export async function startGateway(opts: StartGatewayOptions): Promise