Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ All notable changes to kronaxis-router. Format loosely follows [Keep a Changelog

## [Unreleased]

### Added: 5 advanced routing/execution features (all off by default, opt-in)

- **Predictive SLA routing** — each backend keeps a rolling p95 latency window (`sla.go`); a rule's `max_ttft_ms` drops backends over budget (never empties the candidate set).
- **Spot-market arbitrage** — `server.cost_aware_routing` prefers the cheapest eligible backend; an optional `price_feed_url` (`pricefeed.go`) keeps effective per-backend cost live.
- **Semantic / fuzzy prompt cache** — `semantic_cache.enabled` embeds the prompt and returns a cached answer on cosine ≥ `min_similarity` (default 0.96). Reuses the graphify embedder + pgvector (`semcache.go`); response header `X-Kronaxis-Cache: SEMANTIC`. Only on already-cacheable (deterministic) requests.
- **System-2 reflection** — `X-Kronaxis-Reflect: 1` runs a review pass on the model's answer (`reflect.go`); header `X-Kronaxis-Reflected: true`. Non-streaming, best-effort.
- **Adversarial consensus** — `X-Kronaxis-Consensus: 1` dispatches to several backends, returns the agreed answer (Jaccard ≥ 0.8) or resolves divergence with `server.consensus_arbiter` (`consensus.go`); header `X-Kronaxis-Consensus: agreed|arbitrated`.

### Added: per-request response-schema validation (wired the quality gate)

The `SchemaValidator` shipped in v0.3.0 but was never reachable from a request — there was no way to supply a schema. Now `X-Kronaxis-Response-Schema: <json-schema>` on a request makes the quality gate validate the model's JSON output against it and silently retry on the fallback backend on violation, so the client receives schema-valid JSON. A request-supplied schema activates gating regardless of the global `QUALITY_GATE_ENABLED` flag (streaming excluded); the retry needs `QUALITY_GATE_FALLBACK` set, else the original response is returned unchanged. Wired through both sequential and parallel gate modes.
Expand Down
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,22 @@ Shipped in v0.3.0. All off by default; enable per need.
- **Cost forecasting** — linear burn-rate extrapolation per service ("`my-api` hits its $50 budget at 2:14 PM"). `GET /api/costs/forecast`.
- **DPO dataset export** — every quality-gate fallback (cheap fails, expensive succeeds) is logged as a preference pair (rejected/chosen) to build a fine-tuning dataset. Enable with `DPO_EXPORT_PATH=...`; inspect via `GET /api/dpo`.

## Advanced Routing & Execution

More opt-in strategies. All off by default; each adds cost/latency only when enabled.

- **Predictive SLA routing** — each backend keeps a rolling p95 latency window; set `max_ttft_ms` on a rule and the router drops backends whose p95 exceeds it (never leaving zero candidates). Reactive today (route away from observed spikes).
- **Spot-market arbitrage** — `server.cost_aware_routing: true` routes to the cheapest *eligible* backend (after health/SLA/cost filters). An optional `server.price_feed_url` (JSON map of backend → `{input_1m, output_1m}`, polled on `price_feed_interval`) keeps effective costs live. Cost takes precedence over cache warmth in this mode.
- **Semantic / fuzzy prompt cache** — on an exact-cache miss, embeds the prompt and returns a cached answer if a stored prompt is cosine **≥ `min_similarity`** (default 0.96). Reuses the graphify embedder + pgvector; only fires on already-cacheable (deterministic) requests. Response header `X-Kronaxis-Cache: SEMANTIC`.

```yaml
semantic_cache:
enabled: true
min_similarity: 0.96 # high by default — a near-duplicate returns a prior answer
```
- **System-2 reflection** — send `X-Kronaxis-Reflect: 1` and the router asks the model to review/correct its own answer before returning it (one extra round-trip, non-streaming). Response header `X-Kronaxis-Reflected: true`.
- **Adversarial consensus** — send `X-Kronaxis-Consensus: 1` to dispatch to several backends; if they agree (Jaccard ≥ 0.8) the agreed answer is returned, otherwise `server.consensus_arbiter` resolves the disagreement. Response header `X-Kronaxis-Consensus: agreed|arbitrated`. Costs N×+1 calls — high-stakes opt-in.

## Agent Gateway

Optional sub-service at `agent-gateway/`. Exposes CLI agents as OpenAI-compatible endpoints, so any kronaxis service that already speaks OpenAI can talk to a real agentic loop without changing client code.
Expand Down
5 changes: 4 additions & 1 deletion backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ type Backend struct {
QueueDepth atomic.Int64
ActiveInference atomic.Int64
QueueScraped atomic.Bool
mu sync.RWMutex
// latency is a rolling window of recent end-to-end request latencies, used
// by predictive SLA routing (max_ttft_ms). Populated on each served request.
latency latencyWindow
mu sync.RWMutex
}

// QueueLoad returns the backend's total in-flight pressure as seen at the
Expand Down
32 changes: 26 additions & 6 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ type Config struct {
Batching BatchingConfig `yaml:"batching"`
Defaults DefaultsConfig `yaml:"defaults"`
Graphify GraphifyConfig `yaml:"graphify"`
SemanticCache SemanticCacheConfig `yaml:"semantic_cache"`
}

// SemanticCacheConfig configures fuzzy prompt caching (reuses the graphify
// embedder + pgvector). Off by default.
type SemanticCacheConfig struct {
Enabled bool `yaml:"enabled"`
MinSimilarity float64 `yaml:"min_similarity"` // cosine threshold; 0 → 0.96
}

// GraphifyConfig configures the graphify pre-stage middleware (token-saving
Expand Down Expand Up @@ -246,6 +254,17 @@ type ServerConfig struct {
QueueAwareRouting bool `yaml:"queue_aware_routing"`
// QueueScrapeInterval is how often the QueueScraper polls /metrics. 0 → 5s.
QueueScrapeInterval Duration `yaml:"queue_scrape_interval"`
// CostAwareRouting (spot-market arbitrage): prefer the cheapest eligible
// backend (after health/SLA/cost filters). Cost trumps cache warmth.
CostAwareRouting bool `yaml:"cost_aware_routing"`
// PriceFeedURL is an operator-supplied JSON endpoint mapping backend name →
// {input_1m, output_1m}; polled to keep effective costs live. Empty = use
// static per-backend costs from config.
PriceFeedURL string `yaml:"price_feed_url"`
PriceFeedInterval Duration `yaml:"price_feed_interval"` // 0 → 5m
// ConsensusArbiter is the backend that resolves disagreements for
// X-Kronaxis-Consensus requests. Empty → use the first candidate.
ConsensusArbiter string `yaml:"consensus_arbiter"`
}

type BrandingConfig struct {
Expand Down Expand Up @@ -299,12 +318,13 @@ type KVPinningConfig struct {
}

type RoutingRule struct {
Name string `yaml:"name" json:"name"`
Priority int `yaml:"priority" json:"priority"`
Match RuleMatch `yaml:"match" json:"match"`
Backends []string `yaml:"backends" json:"backends"`
MaxCost float64 `yaml:"max_cost_1m" json:"max_cost_1m"`
Required []string `yaml:"required_capabilities" json:"required_capabilities"`
Name string `yaml:"name" json:"name"`
Priority int `yaml:"priority" json:"priority"`
Match RuleMatch `yaml:"match" json:"match"`
Backends []string `yaml:"backends" json:"backends"`
MaxCost float64 `yaml:"max_cost_1m" json:"max_cost_1m"`
Required []string `yaml:"required_capabilities" json:"required_capabilities"`
MaxTTFTMs int `yaml:"max_ttft_ms" json:"max_ttft_ms"` // predictive SLA: drop backends whose p95 latency exceeds this
}

type RuleMatch struct {
Expand Down
19 changes: 19 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ server:
# node. Composes with per-backend kv_pinning. Only matters with 2+ vLLM nodes.
queue_aware_routing: false
queue_scrape_interval: 5s
# Spot-market arbitrage: route to the cheapest eligible backend. Optional
# price_feed_url is a JSON map {"<backend>": {"input_1m": x, "output_1m": y}}.
cost_aware_routing: false
price_feed_url: ""
price_feed_interval: 5m
# Adversarial consensus arbiter (for X-Kronaxis-Consensus requests). Empty =
# use the first candidate to resolve disagreements.
consensus_arbiter: ""
branding:
headers: true # X-Powered-By: Kronaxis Router on every response
header_name: "Kronaxis Router"
Expand Down Expand Up @@ -170,6 +178,7 @@ rules:
match:
priority_level: interactive
backends: [local-large, cloud-fast]
# max_ttft_ms: 800 # predictive SLA: drop backends whose rolling p95 exceeds this

# ── Principle 5: Vision routes to vision-capable backends ──────────
- name: vision-tasks
Expand Down Expand Up @@ -396,3 +405,13 @@ graphify:
# cosine: 1.0
# tsvector: 0.0
# recency: 0.0

# ──────────────────────────────────────────────────────────────────────
# SEMANTIC CACHE (fuzzy prompt caching)
# On an exact-cache miss, embed the prompt and return a cached answer if a
# stored prompt is cosine >= min_similarity. Reuses the graphify embedder +
# pgvector (needs graphify embedder + DATABASE_URL). Off by default.
# ──────────────────────────────────────────────────────────────────────
semantic_cache:
enabled: false
min_similarity: 0.96 # high by default — a near-duplicate returns a prior answer
132 changes: 132 additions & 0 deletions consensus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package main

import (
"encoding/json"
"fmt"
"strings"
"sync"
"sync/atomic"
)

// Adversarial consensus (ROADMAP #18). Opt-in via X-Kronaxis-Consensus: 1.
// The request is dispatched to several backends concurrently; if their answers
// agree (Jaccard similarity over the configured threshold) the agreed answer is
// returned, otherwise an arbiter model resolves the disagreement. Costs N×+1
// calls, so opt-in only and never on streaming.

var (
consensusAgreedTotal atomic.Uint64
consensusArbitratedTotal atomic.Uint64
)

const (
defaultConsensusN = 3
defaultConsensusAgree = 0.8
)

// runConsensus dispatches to up to N candidates concurrently and returns
// (responseBody, statusCode, mode) where mode is "agreed", "arbitrated", or ""
// (couldn't run — caller should fall back to normal dispatch). A nil body also
// signals fall-back.
func runConsensus(req *ChatRequest, body []byte, meta RouteRequest, candidates []RouteResult) ([]byte, int, string) {
n := defaultConsensusN
if n > len(candidates) {
n = len(candidates)
}
if n < 2 {
return nil, 0, "" // need at least two opinions
}
picks := candidates[:n]

type res struct {
body []byte
status int
content string
err error
}
results := make([]res, n)
var wg sync.WaitGroup
for i := range picks {
wg.Add(1)
go func(i int) {
defer wg.Done()
st, _, b, err := forwardToBackend(picks[i].Backend, picks[i].ModelName, body, req, meta)
results[i] = res{body: b, status: st, content: extractContent(b), err: err}
}(i)
}
wg.Wait()

// Keep the successful, non-empty answers.
var ok []res
for _, r := range results {
if r.err == nil && r.status < 400 && r.content != "" {
ok = append(ok, r)
}
}
if len(ok) == 0 {
return nil, 0, "" // all failed; let caller do normal dispatch
}
if len(ok) == 1 {
return ok[0].body, ok[0].status, "agreed" // only one opinion survived
}

// Agreement: every pair must be at least `agree` similar.
agree := defaultConsensusAgree
agreed := true
for i := 0; i < len(ok) && agreed; i++ {
for j := i + 1; j < len(ok); j++ {
if jaccardSimilarity(ok[i].content, ok[j].content) < agree {
agreed = false
break
}
}
}
if agreed {
consensusAgreedTotal.Add(1)
return ok[0].body, ok[0].status, "agreed"
}

// Divergence: ask an arbiter to resolve.
arb := consensusArbiter(picks)
if arb == nil {
// No arbiter available: return the first opinion rather than fail.
return ok[0].body, ok[0].status, "agreed"
}
question := lastUserMessage(req)
var b strings.Builder
fmt.Fprintf(&b, "%d models were asked the same question and disagreed. Here are their answers:\n\n", len(ok))
for i, r := range ok {
fmt.Fprintf(&b, "--- Answer %d ---\n%s\n\n", i+1, r.content)
}
b.WriteString("Resolve the disagreement and produce the single correct, final answer. Reply with ONLY that answer.")

arbReq := *req
arbReq.Stream = false
arbReq.Messages = []ChatMessage{
{Role: "user", Content: question + "\n\n" + b.String()},
}
arbBody, err := json.Marshal(arbReq)
if err != nil {
return ok[0].body, ok[0].status, "agreed"
}
st, _, arbResp, err := forwardToBackend(arb, arb.Config.ModelName, arbBody, &arbReq, meta)
if err != nil || st >= 400 || extractContent(arbResp) == "" {
return ok[0].body, ok[0].status, "agreed"
}
consensusArbitratedTotal.Add(1)
return arbResp, st, "arbitrated"
}

// consensusArbiter resolves the arbiter backend: the configured
// server.consensus_arbiter if available, else the first candidate's backend.
func consensusArbiter(picks []RouteResult) *Backend {
if cfg != nil && cfg.Server.ConsensusArbiter != "" {
if b := pool.Get(cfg.Server.ConsensusArbiter); b != nil && b.IsAvailable() {
return b
}
}
if len(picks) > 0 {
return picks[0].Backend
}
return nil
}
6 changes: 5 additions & 1 deletion docs/api-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ OpenAI-compatible chat completions proxy. This is the main endpoint.
| `X-Kronaxis-Session-Create` | `true` | Store this transcript and return a session id. |
| `X-Kronaxis-Session-ID` | string | Hydrate a stored session; send only the new turn. |
| `X-Kronaxis-Session-TTL` | duration | Override the session TTL on create. |
| `X-Kronaxis-Reflect` | `1` | Run a System-2 review pass on the answer before returning (non-streaming). |
| `X-Kronaxis-Consensus` | `1` | Dispatch to several backends; return the agreed answer or an arbiter's resolution. |

**Response:** Standard OpenAI ChatCompletion response.

Expand All @@ -36,7 +38,9 @@ OpenAI-compatible chat completions proxy. This is the main endpoint.
| `X-Kronaxis-Router-Version` | Router version |
| `X-Kronaxis-Backend` | Backend that served the request |
| `X-Kronaxis-Rule` | Rule that matched |
| `X-Kronaxis-Cache` | `HIT` if served from cache |
| `X-Kronaxis-Cache` | `HIT` (exact) or `SEMANTIC` (fuzzy near-duplicate) if served from cache |
| `X-Kronaxis-Reflected` | `true` if a System-2 reflection pass refined the answer |
| `X-Kronaxis-Consensus` | `agreed` or `arbitrated` when consensus mode ran |
| `X-Kronaxis-Graphify` | Mode actually used (`lossless` / `compress` / `augment`; only present when it ran) |
| `X-Kronaxis-Graphify-Chunks` | Number of chunks injected |
| `X-Kronaxis-Graphify-Tokens-Saved` | Approximate input tokens saved by compression |
Expand Down
24 changes: 24 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,17 @@ func runServer() {
logger.Printf("queue-aware routing enabled: scraping vLLM /metrics every %s", cfg.Server.QueueScrapeInterval.Duration)
}

// Spot-market arbitrage: cheapest-eligible routing + optional live price feed.
costAwareRouting = cfg.Server.CostAwareRouting
if costAwareRouting {
logger.Printf("cost-aware routing enabled (cheapest eligible backend wins)")
if cfg.Server.PriceFeedURL != "" {
pf := newPriceFeed(cfg.Server.PriceFeedURL, cfg.Server.PriceFeedInterval.Duration)
go pf.Run(ctx)
logger.Printf("price feed enabled: %s every %s", cfg.Server.PriceFeedURL, cfg.Server.PriceFeedInterval.Duration)
}
}

// Graphify pre-stage: optional embedder + middleware. If embedder is
// configured but unreachable, log and continue (router still works).
//
Expand Down Expand Up @@ -269,6 +280,19 @@ func runServer() {
}
}

// Semantic prompt cache (reuses the graphify embedder + pgvector).
if cfg.SemanticCache.Enabled && graphifyEmbedder != nil && db != nil {
semCache = newSemanticCache(graphifyEmbedder, cfg.SemanticCache.MinSimilarity)
if err := semCache.ensureSchema(ctx); err != nil {
logger.Printf("semantic cache disabled: %v", err)
semCache = nil
} else {
logger.Printf("semantic cache enabled: threshold=%.2f", semCache.threshold)
}
} else if cfg.SemanticCache.Enabled {
logger.Printf("semantic cache enabled in config but needs a graphify embedder + DATABASE_URL; disabled")
}

// Register routes
mux := http.NewServeMux()
mux.HandleFunc("/v1/chat/completions", handleChatCompletions)
Expand Down
12 changes: 12 additions & 0 deletions middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ func extractHeaders(r *http.Request) RouteRequest {
Tier: tier,
PersonaID: r.Header.Get("X-Kronaxis-PersonaID"),
ResponseSchema: r.Header.Get("X-Kronaxis-Response-Schema"),
Reflect: isTruthyHeader(r.Header.Get("X-Kronaxis-Reflect")),
Consensus: isTruthyHeader(r.Header.Get("X-Kronaxis-Consensus")),
}
}

// isTruthyHeader treats "1"/"true"/"yes" (case-insensitive) as enabled.
func isTruthyHeader(v string) bool {
switch strings.ToLower(strings.TrimSpace(v)) {
case "1", "true", "yes":
return true
default:
return false
}
}

Expand Down
Loading
Loading