diff --git a/CHANGELOG.md b/CHANGELOG.md index 7350b3d..f83fba2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,14 @@ All notable changes to kronaxis-router. Format loosely follows [Keep a Changelog ## [Unreleased] +### Added: 5 advanced routing/execution features (all off by default, opt-in) + +- **Predictive SLA routing** — each backend keeps a rolling p95 latency window (`sla.go`); a rule's `max_ttft_ms` drops backends over budget (never empties the candidate set). +- **Spot-market arbitrage** — `server.cost_aware_routing` prefers the cheapest eligible backend; an optional `price_feed_url` (`pricefeed.go`) keeps effective per-backend cost live. +- **Semantic / fuzzy prompt cache** — `semantic_cache.enabled` embeds the prompt and returns a cached answer on cosine ≥ `min_similarity` (default 0.96). Reuses the graphify embedder + pgvector (`semcache.go`); response header `X-Kronaxis-Cache: SEMANTIC`. Only on already-cacheable (deterministic) requests. +- **System-2 reflection** — `X-Kronaxis-Reflect: 1` runs a review pass on the model's answer (`reflect.go`); header `X-Kronaxis-Reflected: true`. Non-streaming, best-effort. +- **Adversarial consensus** — `X-Kronaxis-Consensus: 1` dispatches to several backends, returns the agreed answer (Jaccard ≥ 0.8) or resolves divergence with `server.consensus_arbiter` (`consensus.go`); header `X-Kronaxis-Consensus: agreed|arbitrated`. + ### Added: per-request response-schema validation (wired the quality gate) The `SchemaValidator` shipped in v0.3.0 but was never reachable from a request — there was no way to supply a schema. Now `X-Kronaxis-Response-Schema: ` on a request makes the quality gate validate the model's JSON output against it and silently retry on the fallback backend on violation, so the client receives schema-valid JSON. A request-supplied schema activates gating regardless of the global `QUALITY_GATE_ENABLED` flag (streaming excluded); the retry needs `QUALITY_GATE_FALLBACK` set, else the original response is returned unchanged. Wired through both sequential and parallel gate modes. diff --git a/README.md b/README.md index 513c0a6..ab18157 100644 --- a/README.md +++ b/README.md @@ -612,6 +612,22 @@ Shipped in v0.3.0. All off by default; enable per need. - **Cost forecasting** — linear burn-rate extrapolation per service ("`my-api` hits its $50 budget at 2:14 PM"). `GET /api/costs/forecast`. - **DPO dataset export** — every quality-gate fallback (cheap fails, expensive succeeds) is logged as a preference pair (rejected/chosen) to build a fine-tuning dataset. Enable with `DPO_EXPORT_PATH=...`; inspect via `GET /api/dpo`. +## Advanced Routing & Execution + +More opt-in strategies. All off by default; each adds cost/latency only when enabled. + +- **Predictive SLA routing** — each backend keeps a rolling p95 latency window; set `max_ttft_ms` on a rule and the router drops backends whose p95 exceeds it (never leaving zero candidates). Reactive today (route away from observed spikes). +- **Spot-market arbitrage** — `server.cost_aware_routing: true` routes to the cheapest *eligible* backend (after health/SLA/cost filters). An optional `server.price_feed_url` (JSON map of backend → `{input_1m, output_1m}`, polled on `price_feed_interval`) keeps effective costs live. Cost takes precedence over cache warmth in this mode. +- **Semantic / fuzzy prompt cache** — on an exact-cache miss, embeds the prompt and returns a cached answer if a stored prompt is cosine **≥ `min_similarity`** (default 0.96). Reuses the graphify embedder + pgvector; only fires on already-cacheable (deterministic) requests. Response header `X-Kronaxis-Cache: SEMANTIC`. + + ```yaml + semantic_cache: + enabled: true + min_similarity: 0.96 # high by default — a near-duplicate returns a prior answer + ``` +- **System-2 reflection** — send `X-Kronaxis-Reflect: 1` and the router asks the model to review/correct its own answer before returning it (one extra round-trip, non-streaming). Response header `X-Kronaxis-Reflected: true`. +- **Adversarial consensus** — send `X-Kronaxis-Consensus: 1` to dispatch to several backends; if they agree (Jaccard ≥ 0.8) the agreed answer is returned, otherwise `server.consensus_arbiter` resolves the disagreement. Response header `X-Kronaxis-Consensus: agreed|arbitrated`. Costs N×+1 calls — high-stakes opt-in. + ## Agent Gateway Optional sub-service at `agent-gateway/`. Exposes CLI agents as OpenAI-compatible endpoints, so any kronaxis service that already speaks OpenAI can talk to a real agentic loop without changing client code. diff --git a/backends.go b/backends.go index 7ee94ee..759feed 100644 --- a/backends.go +++ b/backends.go @@ -48,7 +48,10 @@ type Backend struct { QueueDepth atomic.Int64 ActiveInference atomic.Int64 QueueScraped atomic.Bool - mu sync.RWMutex + // latency is a rolling window of recent end-to-end request latencies, used + // by predictive SLA routing (max_ttft_ms). Populated on each served request. + latency latencyWindow + mu sync.RWMutex } // QueueLoad returns the backend's total in-flight pressure as seen at the diff --git a/config.go b/config.go index bbcb28c..c284525 100644 --- a/config.go +++ b/config.go @@ -23,6 +23,14 @@ type Config struct { Batching BatchingConfig `yaml:"batching"` Defaults DefaultsConfig `yaml:"defaults"` Graphify GraphifyConfig `yaml:"graphify"` + SemanticCache SemanticCacheConfig `yaml:"semantic_cache"` +} + +// SemanticCacheConfig configures fuzzy prompt caching (reuses the graphify +// embedder + pgvector). Off by default. +type SemanticCacheConfig struct { + Enabled bool `yaml:"enabled"` + MinSimilarity float64 `yaml:"min_similarity"` // cosine threshold; 0 → 0.96 } // GraphifyConfig configures the graphify pre-stage middleware (token-saving @@ -246,6 +254,17 @@ type ServerConfig struct { QueueAwareRouting bool `yaml:"queue_aware_routing"` // QueueScrapeInterval is how often the QueueScraper polls /metrics. 0 → 5s. QueueScrapeInterval Duration `yaml:"queue_scrape_interval"` + // CostAwareRouting (spot-market arbitrage): prefer the cheapest eligible + // backend (after health/SLA/cost filters). Cost trumps cache warmth. + CostAwareRouting bool `yaml:"cost_aware_routing"` + // PriceFeedURL is an operator-supplied JSON endpoint mapping backend name → + // {input_1m, output_1m}; polled to keep effective costs live. Empty = use + // static per-backend costs from config. + PriceFeedURL string `yaml:"price_feed_url"` + PriceFeedInterval Duration `yaml:"price_feed_interval"` // 0 → 5m + // ConsensusArbiter is the backend that resolves disagreements for + // X-Kronaxis-Consensus requests. Empty → use the first candidate. + ConsensusArbiter string `yaml:"consensus_arbiter"` } type BrandingConfig struct { @@ -299,12 +318,13 @@ type KVPinningConfig struct { } type RoutingRule struct { - Name string `yaml:"name" json:"name"` - Priority int `yaml:"priority" json:"priority"` - Match RuleMatch `yaml:"match" json:"match"` - Backends []string `yaml:"backends" json:"backends"` - MaxCost float64 `yaml:"max_cost_1m" json:"max_cost_1m"` - Required []string `yaml:"required_capabilities" json:"required_capabilities"` + Name string `yaml:"name" json:"name"` + Priority int `yaml:"priority" json:"priority"` + Match RuleMatch `yaml:"match" json:"match"` + Backends []string `yaml:"backends" json:"backends"` + MaxCost float64 `yaml:"max_cost_1m" json:"max_cost_1m"` + Required []string `yaml:"required_capabilities" json:"required_capabilities"` + MaxTTFTMs int `yaml:"max_ttft_ms" json:"max_ttft_ms"` // predictive SLA: drop backends whose p95 latency exceeds this } type RuleMatch struct { diff --git a/config.yaml b/config.yaml index 2705de8..6c1bdbc 100644 --- a/config.yaml +++ b/config.yaml @@ -20,6 +20,14 @@ server: # node. Composes with per-backend kv_pinning. Only matters with 2+ vLLM nodes. queue_aware_routing: false queue_scrape_interval: 5s + # Spot-market arbitrage: route to the cheapest eligible backend. Optional + # price_feed_url is a JSON map {"": {"input_1m": x, "output_1m": y}}. + cost_aware_routing: false + price_feed_url: "" + price_feed_interval: 5m + # Adversarial consensus arbiter (for X-Kronaxis-Consensus requests). Empty = + # use the first candidate to resolve disagreements. + consensus_arbiter: "" branding: headers: true # X-Powered-By: Kronaxis Router on every response header_name: "Kronaxis Router" @@ -170,6 +178,7 @@ rules: match: priority_level: interactive backends: [local-large, cloud-fast] + # max_ttft_ms: 800 # predictive SLA: drop backends whose rolling p95 exceeds this # ── Principle 5: Vision routes to vision-capable backends ────────── - name: vision-tasks @@ -396,3 +405,13 @@ graphify: # cosine: 1.0 # tsvector: 0.0 # recency: 0.0 + +# ────────────────────────────────────────────────────────────────────── +# SEMANTIC CACHE (fuzzy prompt caching) +# On an exact-cache miss, embed the prompt and return a cached answer if a +# stored prompt is cosine >= min_similarity. Reuses the graphify embedder + +# pgvector (needs graphify embedder + DATABASE_URL). Off by default. +# ────────────────────────────────────────────────────────────────────── +semantic_cache: + enabled: false + min_similarity: 0.96 # high by default — a near-duplicate returns a prior answer diff --git a/consensus.go b/consensus.go new file mode 100644 index 0000000..1eff9ed --- /dev/null +++ b/consensus.go @@ -0,0 +1,132 @@ +package main + +import ( + "encoding/json" + "fmt" + "strings" + "sync" + "sync/atomic" +) + +// Adversarial consensus (ROADMAP #18). Opt-in via X-Kronaxis-Consensus: 1. +// The request is dispatched to several backends concurrently; if their answers +// agree (Jaccard similarity over the configured threshold) the agreed answer is +// returned, otherwise an arbiter model resolves the disagreement. Costs N×+1 +// calls, so opt-in only and never on streaming. + +var ( + consensusAgreedTotal atomic.Uint64 + consensusArbitratedTotal atomic.Uint64 +) + +const ( + defaultConsensusN = 3 + defaultConsensusAgree = 0.8 +) + +// runConsensus dispatches to up to N candidates concurrently and returns +// (responseBody, statusCode, mode) where mode is "agreed", "arbitrated", or "" +// (couldn't run — caller should fall back to normal dispatch). A nil body also +// signals fall-back. +func runConsensus(req *ChatRequest, body []byte, meta RouteRequest, candidates []RouteResult) ([]byte, int, string) { + n := defaultConsensusN + if n > len(candidates) { + n = len(candidates) + } + if n < 2 { + return nil, 0, "" // need at least two opinions + } + picks := candidates[:n] + + type res struct { + body []byte + status int + content string + err error + } + results := make([]res, n) + var wg sync.WaitGroup + for i := range picks { + wg.Add(1) + go func(i int) { + defer wg.Done() + st, _, b, err := forwardToBackend(picks[i].Backend, picks[i].ModelName, body, req, meta) + results[i] = res{body: b, status: st, content: extractContent(b), err: err} + }(i) + } + wg.Wait() + + // Keep the successful, non-empty answers. + var ok []res + for _, r := range results { + if r.err == nil && r.status < 400 && r.content != "" { + ok = append(ok, r) + } + } + if len(ok) == 0 { + return nil, 0, "" // all failed; let caller do normal dispatch + } + if len(ok) == 1 { + return ok[0].body, ok[0].status, "agreed" // only one opinion survived + } + + // Agreement: every pair must be at least `agree` similar. + agree := defaultConsensusAgree + agreed := true + for i := 0; i < len(ok) && agreed; i++ { + for j := i + 1; j < len(ok); j++ { + if jaccardSimilarity(ok[i].content, ok[j].content) < agree { + agreed = false + break + } + } + } + if agreed { + consensusAgreedTotal.Add(1) + return ok[0].body, ok[0].status, "agreed" + } + + // Divergence: ask an arbiter to resolve. + arb := consensusArbiter(picks) + if arb == nil { + // No arbiter available: return the first opinion rather than fail. + return ok[0].body, ok[0].status, "agreed" + } + question := lastUserMessage(req) + var b strings.Builder + fmt.Fprintf(&b, "%d models were asked the same question and disagreed. Here are their answers:\n\n", len(ok)) + for i, r := range ok { + fmt.Fprintf(&b, "--- Answer %d ---\n%s\n\n", i+1, r.content) + } + b.WriteString("Resolve the disagreement and produce the single correct, final answer. Reply with ONLY that answer.") + + arbReq := *req + arbReq.Stream = false + arbReq.Messages = []ChatMessage{ + {Role: "user", Content: question + "\n\n" + b.String()}, + } + arbBody, err := json.Marshal(arbReq) + if err != nil { + return ok[0].body, ok[0].status, "agreed" + } + st, _, arbResp, err := forwardToBackend(arb, arb.Config.ModelName, arbBody, &arbReq, meta) + if err != nil || st >= 400 || extractContent(arbResp) == "" { + return ok[0].body, ok[0].status, "agreed" + } + consensusArbitratedTotal.Add(1) + return arbResp, st, "arbitrated" +} + +// consensusArbiter resolves the arbiter backend: the configured +// server.consensus_arbiter if available, else the first candidate's backend. +func consensusArbiter(picks []RouteResult) *Backend { + if cfg != nil && cfg.Server.ConsensusArbiter != "" { + if b := pool.Get(cfg.Server.ConsensusArbiter); b != nil && b.IsAvailable() { + return b + } + } + if len(picks) > 0 { + return picks[0].Backend + } + return nil +} diff --git a/docs/api-reference.md b/docs/api-reference.md index 44bcc80..768c106 100644 --- a/docs/api-reference.md +++ b/docs/api-reference.md @@ -25,6 +25,8 @@ OpenAI-compatible chat completions proxy. This is the main endpoint. | `X-Kronaxis-Session-Create` | `true` | Store this transcript and return a session id. | | `X-Kronaxis-Session-ID` | string | Hydrate a stored session; send only the new turn. | | `X-Kronaxis-Session-TTL` | duration | Override the session TTL on create. | +| `X-Kronaxis-Reflect` | `1` | Run a System-2 review pass on the answer before returning (non-streaming). | +| `X-Kronaxis-Consensus` | `1` | Dispatch to several backends; return the agreed answer or an arbiter's resolution. | **Response:** Standard OpenAI ChatCompletion response. @@ -36,7 +38,9 @@ OpenAI-compatible chat completions proxy. This is the main endpoint. | `X-Kronaxis-Router-Version` | Router version | | `X-Kronaxis-Backend` | Backend that served the request | | `X-Kronaxis-Rule` | Rule that matched | -| `X-Kronaxis-Cache` | `HIT` if served from cache | +| `X-Kronaxis-Cache` | `HIT` (exact) or `SEMANTIC` (fuzzy near-duplicate) if served from cache | +| `X-Kronaxis-Reflected` | `true` if a System-2 reflection pass refined the answer | +| `X-Kronaxis-Consensus` | `agreed` or `arbitrated` when consensus mode ran | | `X-Kronaxis-Graphify` | Mode actually used (`lossless` / `compress` / `augment`; only present when it ran) | | `X-Kronaxis-Graphify-Chunks` | Number of chunks injected | | `X-Kronaxis-Graphify-Tokens-Saved` | Approximate input tokens saved by compression | diff --git a/main.go b/main.go index db89ebc..04186d1 100644 --- a/main.go +++ b/main.go @@ -203,6 +203,17 @@ func runServer() { logger.Printf("queue-aware routing enabled: scraping vLLM /metrics every %s", cfg.Server.QueueScrapeInterval.Duration) } + // Spot-market arbitrage: cheapest-eligible routing + optional live price feed. + costAwareRouting = cfg.Server.CostAwareRouting + if costAwareRouting { + logger.Printf("cost-aware routing enabled (cheapest eligible backend wins)") + if cfg.Server.PriceFeedURL != "" { + pf := newPriceFeed(cfg.Server.PriceFeedURL, cfg.Server.PriceFeedInterval.Duration) + go pf.Run(ctx) + logger.Printf("price feed enabled: %s every %s", cfg.Server.PriceFeedURL, cfg.Server.PriceFeedInterval.Duration) + } + } + // Graphify pre-stage: optional embedder + middleware. If embedder is // configured but unreachable, log and continue (router still works). // @@ -269,6 +280,19 @@ func runServer() { } } + // Semantic prompt cache (reuses the graphify embedder + pgvector). + if cfg.SemanticCache.Enabled && graphifyEmbedder != nil && db != nil { + semCache = newSemanticCache(graphifyEmbedder, cfg.SemanticCache.MinSimilarity) + if err := semCache.ensureSchema(ctx); err != nil { + logger.Printf("semantic cache disabled: %v", err) + semCache = nil + } else { + logger.Printf("semantic cache enabled: threshold=%.2f", semCache.threshold) + } + } else if cfg.SemanticCache.Enabled { + logger.Printf("semantic cache enabled in config but needs a graphify embedder + DATABASE_URL; disabled") + } + // Register routes mux := http.NewServeMux() mux.HandleFunc("/v1/chat/completions", handleChatCompletions) diff --git a/middleware.go b/middleware.go index e605e86..e062097 100644 --- a/middleware.go +++ b/middleware.go @@ -32,6 +32,18 @@ func extractHeaders(r *http.Request) RouteRequest { Tier: tier, PersonaID: r.Header.Get("X-Kronaxis-PersonaID"), ResponseSchema: r.Header.Get("X-Kronaxis-Response-Schema"), + Reflect: isTruthyHeader(r.Header.Get("X-Kronaxis-Reflect")), + Consensus: isTruthyHeader(r.Header.Get("X-Kronaxis-Consensus")), + } +} + +// isTruthyHeader treats "1"/"true"/"yes" (case-insensitive) as enabled. +func isTruthyHeader(v string) bool { + switch strings.ToLower(strings.TrimSpace(v)) { + case "1", "true", "yes": + return true + default: + return false } } diff --git a/pricefeed.go b/pricefeed.go new file mode 100644 index 0000000..cb3bc0c --- /dev/null +++ b/pricefeed.go @@ -0,0 +1,109 @@ +package main + +import ( + "context" + "encoding/json" + "io" + "net/http" + "sync" + "time" +) + +// Spot-market arbitrage (ROADMAP #14). A periodic price feed lets effective +// per-backend cost track live provider pricing; when cost_aware_routing is on, +// routing prefers the cheapest eligible backend (after health/SLA/cost filters). +// +// Honest framing: LLM prices don't move per-second, so this is a periodic +// refresh + cheapest-that-meets-SLA, not a high-frequency order book. The feed +// is an operator-supplied JSON map keyed by backend name: +// +// { "cloud-fast": {"input_1m": 2.5, "output_1m": 10.0}, ... } + +type backendPrice struct { + Input1M float64 `json:"input_1m"` + Output1M float64 `json:"output_1m"` +} + +var ( + priceMu sync.RWMutex + priceOverrides = map[string]backendPrice{} + costAwareRouting bool +) + +// EffectiveCost returns the backend's current input cost per 1M tokens: the +// live price-feed override if present, else the static config cost. +func (b *Backend) EffectiveCost() float64 { + priceMu.RLock() + p, ok := priceOverrides[b.Config.Name] + priceMu.RUnlock() + if ok { + return p.Input1M + } + return b.Config.CostInput1M +} + +// PriceFeed polls an operator-supplied pricing endpoint on an interval. +type PriceFeed struct { + url string + interval time.Duration + client *http.Client +} + +func newPriceFeed(url string, interval time.Duration) *PriceFeed { + if interval <= 0 { + interval = 5 * time.Minute + } + return &PriceFeed{url: url, interval: interval, client: &http.Client{Timeout: 10 * time.Second}} +} + +func (pf *PriceFeed) Run(ctx context.Context) { + pf.refresh(ctx) + ticker := time.NewTicker(pf.interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + pf.refresh(ctx) + } + } +} + +func (pf *PriceFeed) refresh(ctx context.Context) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, pf.url, nil) + if err != nil { + return + } + resp, err := pf.client.Do(req) + if err != nil { + logger.Printf("price feed: fetch failed (%v); keeping last prices", err) + return + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + logger.Printf("price feed: HTTP %d; keeping last prices", resp.StatusCode) + return + } + data, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) + if err != nil { + return + } + m, err := parsePriceFeed(data) + if err != nil { + logger.Printf("price feed: parse failed (%v); keeping last prices", err) + return + } + priceMu.Lock() + priceOverrides = m + priceMu.Unlock() + logger.Printf("price feed: updated %d backend prices", len(m)) +} + +func parsePriceFeed(data []byte) (map[string]backendPrice, error) { + var m map[string]backendPrice + if err := json.Unmarshal(data, &m); err != nil { + return nil, err + } + return m, nil +} diff --git a/proxy.go b/proxy.go index f3773d8..1b81d35 100644 --- a/proxy.go +++ b/proxy.go @@ -14,8 +14,8 @@ import ( // Shared HTTP clients for connection reuse. var ( - llmClient = &http.Client{Timeout: 120 * time.Second} - streamClient = &http.Client{Timeout: 180 * time.Second} + llmClient = &http.Client{Timeout: 120 * time.Second} + streamClient = &http.Client{Timeout: 180 * time.Second} ) // OpenAI-compatible request/response structures. @@ -114,8 +114,8 @@ func handleChatCompletions(w http.ResponseWriter, r *http.Request) { // transcript replaces req.Messages here. New sessions get their ID // surfaced via the X-Kronaxis-Session-ID response header below. var ( - sessionID string - newSession bool + sessionID string + newSession bool ) if sessionStore != nil { ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second) @@ -163,6 +163,23 @@ func handleChatCompletions(w http.ResponseWriter, r *http.Request) { } } + // Semantic cache: on an exact-cache miss, a near-duplicate prompt (cosine + // >= threshold) returns the prior answer. Keyed on the ORIGINAL prompt + // (captured here, before graphify) so the store side matches re-asks. + var semKey string + if cacheable && semCache != nil { + semKey = FlattenMessages(req.Messages) + sctx, scancel := context.WithTimeout(r.Context(), 2*time.Second) + cb, cst, ok := semCache.Lookup(sctx, semKey) + scancel() + if ok { + w.Header().Set("X-Kronaxis-Cache", "SEMANTIC") + w.WriteHeader(cst) + w.Write(cb) + return + } + } + // Graphify pre-stage: compress fat context or augment thin context // before classifier + forwarding. Skips silently when disabled or on // retrieval errors (we never fail a request because graphify is sad). @@ -223,6 +240,27 @@ func handleChatCompletions(w http.ResponseWriter, r *http.Request) { return } + // Adversarial consensus: dispatch to several backends + arbiter (opt-in, + // non-streaming). nil body => fall through to normal dispatch. + if meta.Consensus && !req.Stream && len(candidates) >= 2 { + consensusStart := time.Now() + if cbody, cstatus, cmode := runConsensus(&req, body, meta, candidates); cbody != nil { + latency := time.Since(consensusStart) + addBrandingHeaders(w, candidates[0]) + responseBody := postProcessResponse(cbody, candidates[0].Backend) + w.Header().Set("Content-Type", "application/json") + w.Header().Set("X-Kronaxis-Consensus", cmode) + w.WriteHeader(cstatus) + w.Write(responseBody) + var chatResp ChatResponse + json.Unmarshal(cbody, &chatResp) + inputTokens, outputTokens := estimateTokens(&req, &chatResp) + recordStat(meta, candidates[0], latency, cstatus < 400) + logRequest(meta, candidates[0], inputTokens, outputTokens, latency, cstatus < 400, "") + return + } + } + // Budget downgrade: prepend cheaper backend if over budget if budgetResult.action == "downgrade" && budgetResult.downgradeTarget != "" { downgraded := pool.Get(budgetResult.downgradeTarget) @@ -361,7 +399,16 @@ func handleChatCompletions(w http.ResponseWriter, r *http.Request) { } } + // System-2 reflection: optional review pass on the answer (opt-in). + if meta.Reflect && statusCode < 400 { + if refined, did := runReflection(&reqCopy, meta, routeResult.Backend, routeResult.ModelName, respBody); did { + respBody = refined + w.Header().Set("X-Kronaxis-Reflected", "true") + } + } + latency := time.Since(start) + routeResult.Backend.latency.record(latency) // predictive SLA window var chatResp ChatResponse json.Unmarshal(respBody, &chatResp) inputTokens, outputTokens := estimateTokens(&reqCopy, &chatResp) @@ -377,6 +424,9 @@ func handleChatCompletions(w http.ResponseWriter, r *http.Request) { success := statusCode < 400 if success && cacheable { respCache.Set(cacheHit, responseBody, statusCode, respHeaders) + if semCache != nil && semKey != "" { + semCache.Store(r.Context(), semKey, responseBody, statusCode) + } } // Quality validation: sample cheap-model responses if success && qualVal.ShouldSample() { @@ -570,9 +620,9 @@ func forwardToGemini(backend *Backend, _ []byte, req *ChatRequest) (int, map[str } type geminiRequest struct { - Contents []geminiContent `json:"contents"` - SystemInstruction *geminiContent `json:"systemInstruction,omitempty"` - GenerationConfig *geminiGenerationConfig `json:"generationConfig,omitempty"` + Contents []geminiContent `json:"contents"` + SystemInstruction *geminiContent `json:"systemInstruction,omitempty"` + GenerationConfig *geminiGenerationConfig `json:"generationConfig,omitempty"` } type geminiContent struct { @@ -581,8 +631,8 @@ type geminiContent struct { } type geminiPart struct { - Text string `json:"text,omitempty"` - InlineData *geminiInline `json:"inlineData,omitempty"` + Text string `json:"text,omitempty"` + InlineData *geminiInline `json:"inlineData,omitempty"` } type geminiInline struct { diff --git a/reflect.go b/reflect.go new file mode 100644 index 0000000..4f729ae --- /dev/null +++ b/reflect.go @@ -0,0 +1,58 @@ +package main + +import ( + "encoding/json" + "sync/atomic" +) + +// System-2 reflection loops (ROADMAP #15). When a request opts in via +// X-Kronaxis-Reflect: 1, the router takes the model's first answer, asks the +// same backend to review it for errors/omissions, and returns the corrected +// answer. Opt-in only (it costs a second round-trip) and never on streaming. +// Best-effort: any failure returns the original answer unchanged. + +const reflectionPrompt = "Review your previous answer for logical errors, incorrect claims, or omissions. " + + "If it is already correct, return it unchanged. Reply with ONLY the corrected, final answer — no preamble, no explanation of changes." + +var reflectionsTotal atomic.Uint64 + +// runReflection issues one review pass on the same backend and returns the +// refined response body. Returns (refinedBody, true) on success, or +// (initialBody, false) if anything went wrong (so the caller keeps the original). +func runReflection(req *ChatRequest, meta RouteRequest, backend *Backend, modelName string, initialBody []byte) ([]byte, bool) { + var resp ChatResponse + if err := json.Unmarshal(initialBody, &resp); err != nil || len(resp.Choices) == 0 { + return initialBody, false + } + answer, ok := resp.Choices[0].Message.Content.(string) + if !ok || answer == "" { + return initialBody, false + } + + // Build the review turn: original context + the model's answer + the review ask. + rr := *req + rr.Messages = append(append([]ChatMessage{}, req.Messages...), + ChatMessage{Role: "assistant", Content: answer}, + ChatMessage{Role: "user", Content: reflectionPrompt}, + ) + rr.Stream = false + body, err := json.Marshal(rr) + if err != nil { + return initialBody, false + } + + status, _, refined, err := forwardToBackend(backend, modelName, body, &rr, meta) + if err != nil || status >= 400 || len(refined) == 0 { + return initialBody, false + } + // Sanity: the refined body must be a parseable chat response with content. + var rresp ChatResponse + if err := json.Unmarshal(refined, &rresp); err != nil || len(rresp.Choices) == 0 { + return initialBody, false + } + if s, ok := rresp.Choices[0].Message.Content.(string); !ok || s == "" { + return initialBody, false + } + reflectionsTotal.Add(1) + return refined, true +} diff --git a/roadmap5_test.go b/roadmap5_test.go new file mode 100644 index 0000000..46129dd --- /dev/null +++ b/roadmap5_test.go @@ -0,0 +1,105 @@ +package main + +import ( + "testing" + "time" +) + +// ---- #7 predictive SLA ---- + +func TestLatencyWindowPercentile(t *testing.T) { + var w latencyWindow + for i := 1; i <= 100; i++ { + w.record(time.Duration(i) * time.Millisecond) + } + // window only holds the last slaWindowSize samples (37..100 for size 64). + p95, n := w.percentile(95) + if n != slaWindowSize { + t.Errorf("samples = %d, want %d", n, slaWindowSize) + } + // p95 of 37..100 should be near the top of the range. + if p95 < 90*time.Millisecond || p95 > 100*time.Millisecond { + t.Errorf("p95 = %v, want ~95ms", p95) + } + if _, n0 := (&latencyWindow{}).percentile(95); n0 != 0 { + t.Error("empty window should report 0 samples") + } +} + +func TestFilterBySLA(t *testing.T) { + fast := &Backend{Config: BackendConfig{Name: "fast"}} + slow := &Backend{Config: BackendConfig{Name: "slow"}} + for i := 0; i < slaMinSamples+2; i++ { + fast.latency.record(100 * time.Millisecond) + slow.latency.record(5 * time.Second) + } + cands := []RouteResult{{Backend: fast}, {Backend: slow}} + + // Budget 1s: slow (p95 5s) dropped, fast kept. + got := filterBySLA(cands, 1000) + if len(got) != 1 || got[0].Backend.Config.Name != "fast" { + t.Errorf("expected only 'fast' under 1s SLA; got %+v", got) + } + // maxTTFTMs 0 → no filtering. + if len(filterBySLA(cands, 0)) != 2 { + t.Error("0 budget should not filter") + } + // Both over SLA → never empty (returns input). + if len(filterBySLA(cands, 10)) != 2 { + t.Error("when all over SLA, must return all rather than empty") + } +} + +func TestFilterBySLAInsufficientSamples(t *testing.T) { + b := &Backend{Config: BackendConfig{Name: "new"}} + b.latency.record(9 * time.Second) // 1 sample < slaMinSamples + cands := []RouteResult{{Backend: b}, {Backend: &Backend{Config: BackendConfig{Name: "x"}}}} + if len(filterBySLA(cands, 100)) != 2 { + t.Error("backends below slaMinSamples must not be filtered out") + } +} + +// ---- #14 spot-market arbitrage ---- + +func TestParsePriceFeed(t *testing.T) { + m, err := parsePriceFeed([]byte(`{"cloud-fast":{"input_1m":2.5,"output_1m":10},"local":{"input_1m":0.01,"output_1m":0.01}}`)) + if err != nil { + t.Fatal(err) + } + if m["cloud-fast"].Input1M != 2.5 || m["local"].Input1M != 0.01 { + t.Errorf("parsed prices wrong: %+v", m) + } +} + +func TestEffectiveCost(t *testing.T) { + b := &Backend{Config: BackendConfig{Name: "b1", CostInput1M: 5.0}} + // No override → config cost. + if b.EffectiveCost() != 5.0 { + t.Errorf("EffectiveCost = %v, want 5.0 (config)", b.EffectiveCost()) + } + // Override wins. + priceMu.Lock() + priceOverrides = map[string]backendPrice{"b1": {Input1M: 1.25}} + priceMu.Unlock() + if b.EffectiveCost() != 1.25 { + t.Errorf("EffectiveCost = %v, want 1.25 (live override)", b.EffectiveCost()) + } + priceMu.Lock() + priceOverrides = map[string]backendPrice{} + priceMu.Unlock() +} + +// ---- header parsing (#15/#18 opt-in) ---- + +func TestIsTruthyHeader(t *testing.T) { + for _, v := range []string{"1", "true", "TRUE", "Yes"} { + if !isTruthyHeader(v) { + t.Errorf("%q should be truthy", v) + } + } + for _, v := range []string{"", "0", "false", "no", "off"} { + if isTruthyHeader(v) { + t.Errorf("%q should be falsy", v) + } + } +} diff --git a/router.go b/router.go index eb211a5..bf23c2b 100644 --- a/router.go +++ b/router.go @@ -27,6 +27,14 @@ type RouteRequest struct { // violation, so the client always receives schema-valid JSON. ResponseSchema string + // Reflect, set via X-Kronaxis-Reflect: 1, runs a System-2 review pass on the + // model's first answer before returning it (non-streaming only). + Reflect bool + + // Consensus, set via X-Kronaxis-Consensus: 1, dispatches to several backends + // and resolves divergence with an arbiter before returning. + Consensus bool + // KVPrompt is the flattened prompt content used for KV-cache-aware // prefix matching. Populated by the caller (proxy.go) from the // incoming messages array. Empty when KV pinning is disabled or the @@ -162,6 +170,21 @@ func (r *Router) balanceCandidatesPrompt(candidates []RouteResult, kvPrompt stri return candidates } + // Spot-market arbitrage: when enabled, the cheapest eligible backend wins + // (candidates are already filtered by health/SLA/max_cost). Cost takes + // precedence over cache warmth here — that's the point of arbitrage mode. + // Load is the tiebreaker among equally-priced backends. + if costAwareRouting { + sort.Slice(candidates, func(i, j int) bool { + ci, cj := candidates[i].Backend.EffectiveCost(), candidates[j].Backend.EffectiveCost() + if ci != cj { + return ci < cj + } + return backendLoad(candidates[i].Backend) < backendLoad(candidates[j].Backend) + }) + return candidates + } + if kvIndex != nil && kvPrompt != "" { reordered, depth := kvIndex.ChooseByKVDepth(candidates, kvPrompt) if depth > 0 { @@ -275,6 +298,9 @@ func (r *Router) resolveAllBackends(rule *RoutingRule, req RouteRequest, loraAda Complexity: req.ComplexityScore, }) } + // Predictive SLA: drop candidates whose rolling p95 latency blows the + // rule's max_ttft_ms budget (never empties the set). Off unless set. + results = filterBySLA(results, rule.MaxTTFTMs) return results } diff --git a/semcache.go b/semcache.go new file mode 100644 index 0000000..f3ffa98 --- /dev/null +++ b/semcache.go @@ -0,0 +1,112 @@ +package main + +import ( + "context" + "database/sql" + "fmt" + "sync/atomic" +) + +// Semantic / fuzzy prompt caching (ROADMAP #4). Deterministic SHA-256 caching +// misses identical intents phrased differently. This embeds the prompt and, on +// a near-duplicate (cosine >= threshold) hit, returns the cached response +// without calling the LLM. Reuses the graphify embedder + pgvector. +// +// Lossy by nature (a "close enough" prompt returns a prior answer), so it is +// applied ONLY to requests already deemed cacheable (deterministic / temp 0), +// and gated behind a high default threshold (0.96). Off by default. + +var ( + semCache *SemanticCache + semCacheHits atomic.Uint64 + semCacheStores atomic.Uint64 + defaultSemCacheSim = 0.96 +) + +type SemanticCache struct { + emb Embedder + threshold float64 +} + +func newSemanticCache(emb Embedder, threshold float64) *SemanticCache { + if threshold <= 0 || threshold > 1 { + threshold = defaultSemCacheSim + } + return &SemanticCache{emb: emb, threshold: threshold} +} + +// ensureSchema creates the kr_semcache table + HNSW index if absent. +func (sc *SemanticCache) ensureSchema(ctx context.Context) error { + if db == nil { + return fmt.Errorf("semantic cache: no database") + } + stmts := []string{ + fmt.Sprintf(`CREATE TABLE IF NOT EXISTS kr_semcache ( + id BIGSERIAL PRIMARY KEY, + embedding VECTOR(%d), + response BYTEA NOT NULL, + status INT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() +)`, sc.emb.Dim()), + `CREATE INDEX IF NOT EXISTS kr_semcache_embedding_hnsw ON kr_semcache USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 64)`, + } + for _, s := range stmts { + if _, err := db.ExecContext(ctx, s); err != nil { + return fmt.Errorf("semantic cache schema: %w", err) + } + } + return nil +} + +// Lookup returns a cached response for a prompt semantically >= threshold to a +// stored one. ok=false on miss or any error (caller proceeds normally). +func (sc *SemanticCache) Lookup(ctx context.Context, prompt string) (body []byte, status int, ok bool) { + if db == nil || prompt == "" { + return nil, 0, false + } + vecs, err := sc.emb.Embed(ctx, "query", []string{prompt}) + if err != nil || len(vecs) != 1 { + return nil, 0, false + } + q := vectorLiteral(vecs[0]) + var ( + resp []byte + st int + sim float64 + ) + row := db.QueryRowContext(ctx, ` + SELECT response, status, 1 - (embedding <=> $1::vector) AS sim + FROM kr_semcache + ORDER BY embedding <=> $1::vector + LIMIT 1`, q) + if err := row.Scan(&resp, &st, &sim); err != nil { + if err != sql.ErrNoRows { + // transient/db error: treat as miss + } + return nil, 0, false + } + if sim < sc.threshold { + return nil, 0, false + } + semCacheHits.Add(1) + return resp, st, true +} + +// Store records a prompt→response pair. Best-effort; errors are logged only. +func (sc *SemanticCache) Store(ctx context.Context, prompt string, body []byte, status int) { + if db == nil || prompt == "" || len(body) == 0 { + return + } + vecs, err := sc.emb.Embed(ctx, "query", []string{prompt}) + if err != nil || len(vecs) != 1 { + return + } + q := vectorLiteral(vecs[0]) + if _, err := db.ExecContext(ctx, + `INSERT INTO kr_semcache (embedding, response, status) VALUES ($1::vector, $2, $3)`, + q, body, status); err != nil { + logger.Printf("semantic cache store failed: %v", err) + return + } + semCacheStores.Add(1) +} diff --git a/sla.go b/sla.go new file mode 100644 index 0000000..160cdbf --- /dev/null +++ b/sla.go @@ -0,0 +1,82 @@ +package main + +import ( + "sort" + "sync" + "time" +) + +// Predictive SLA routing (ROADMAP #7). Each backend keeps a rolling window of +// recent end-to-end latencies; routing can drop or deprioritise backends whose +// p95 exceeds a rule's max_ttft_ms. Reactive today (route away from observed +// spikes); the window is the basis for a predictive policy later. + +const slaWindowSize = 64 // recent samples kept per backend + +// latencyWindow is a fixed-size ring of recent latencies with percentile query. +type latencyWindow struct { + mu sync.Mutex + buf [slaWindowSize]time.Duration + n int // total recorded (caps reads at min(n, size)) + next int +} + +func (w *latencyWindow) record(d time.Duration) { + w.mu.Lock() + w.buf[w.next] = d + w.next = (w.next + 1) % slaWindowSize + w.n++ + w.mu.Unlock() +} + +// percentile returns the p-th percentile (0–100) over the window, and the +// number of samples it was computed from. p95 = percentile(95). +func (w *latencyWindow) percentile(p float64) (time.Duration, int) { + w.mu.Lock() + count := w.n + if count > slaWindowSize { + count = slaWindowSize + } + if count == 0 { + w.mu.Unlock() + return 0, 0 + } + s := make([]time.Duration, count) + copy(s, w.buf[:count]) + w.mu.Unlock() + + sort.Slice(s, func(i, j int) bool { return s[i] < s[j] }) + idx := int(float64(count-1) * p / 100.0) + if idx < 0 { + idx = 0 + } + if idx >= count { + idx = count - 1 + } + return s[idx], count +} + +// slaMinSamples is the floor before SLA filtering trusts the window. Below this +// we don't have enough signal to exclude a backend. +const slaMinSamples = 8 + +// filterBySLA drops candidates whose p95 latency exceeds maxTTFTMs, but never +// returns empty: if every candidate is over SLA (or there isn't enough data) it +// returns the input unchanged so routing still has somewhere to go. +func filterBySLA(candidates []RouteResult, maxTTFTMs int) []RouteResult { + if maxTTFTMs <= 0 || len(candidates) <= 1 { + return candidates + } + budget := time.Duration(maxTTFTMs) * time.Millisecond + var ok []RouteResult + for _, c := range candidates { + p95, samples := c.Backend.latency.percentile(95) + if samples < slaMinSamples || p95 <= budget { + ok = append(ok, c) + } + } + if len(ok) == 0 { + return candidates + } + return ok +}