Skip to content
25 changes: 24 additions & 1 deletion src/core/event_bus/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,28 @@ pub enum DomainEvent {
},

// ── Memory ──────────────────────────────────────────────────────────
/// The configured embedding provider is unreachable or the requested model
/// is not installed, so the memory pipeline fell back to an alternative.
///
/// Published by `memory_store::factories` (once per process via the
/// `OLLAMA_HEALTH_REPORTED` latch) so the UI can surface a user-visible
/// warning with an actionable fix hint. The `message` field is a
/// pre-formatted human-readable string safe to show in a notification.
EmbeddingModelUnhealthy {
/// Short provider slug, e.g. `"ollama"`.
provider: String,
/// The model that was intended but could not be reached / found,
/// e.g. `"bge-m3"`.
model: String,
/// The provider that will serve embeddings for this session instead,
/// e.g. `"cloud"`.
fallback_provider: String,
/// Human-readable explanation with an actionable fix,
/// e.g. `"Local embedding model unreachable — falling back to cloud
/// embeddings. Run \`ollama pull bge-m3\` to fix."`.
message: String,
},

/// A memory entry was stored.
MemoryStored {
key: String,
Expand Down Expand Up @@ -583,7 +605,8 @@ impl DomainEvent {
| Self::SubagentCompleted { .. }
| Self::SubagentFailed { .. } => "agent",

Self::MemoryStored { .. }
Self::EmbeddingModelUnhealthy { .. }
| Self::MemoryStored { .. }
| Self::MemoryRecalled { .. }
| Self::MemorySyncRequested { .. }
| Self::MemorySyncStageChanged { .. }
Expand Down
140 changes: 140 additions & 0 deletions src/openhuman/doctor/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ pub struct DoctorReport {

// ── Public entry point ───────────────────────────────────────────

/// Build the full doctor report.
///
/// `ops::doctor_report` runs this in `tokio::task::spawn_blocking` because the
/// checks are synchronous and may touch the file system, sqlite, or local HTTP
/// endpoints. Keep this function blocking-only; add async probes in the caller
/// or behind their own runtime boundary instead of introducing `.await` here.
pub fn run(config: &Config) -> Result<DoctorReport> {
let mut items: Vec<DiagnosticItem> = Vec::new();

Expand All @@ -73,6 +79,7 @@ pub fn run(config: &Config) -> Result<DoctorReport> {
check_daemon_state(config, &mut items);
check_environment(&mut items);
check_memory_tree_db(config, &mut items);
check_embedding_model_health(config, &mut items);

let errors = items
.iter()
Expand Down Expand Up @@ -836,6 +843,131 @@ fn check_memory_tree_db(config: &Config, items: &mut Vec<DiagnosticItem>) {
}
}

// ── Embedding model health ───────────────────────────────────────

/// Probe the configured embedding provider and model.
///
/// - If the intended provider is not `"ollama"` (e.g. cloud): `Ok` — no
/// local daemon is involved and nothing to diagnose here.
/// - If Ollama is configured but the daemon at `<base_url>/api/tags` is
/// unreachable: `Error` with the pull command as the fix hint.
/// - If the daemon is reachable but the configured embedding model is not
/// listed in `/api/tags`: `Error` with `ollama pull <model>` guidance.
/// - If both daemon and model are healthy: `Ok`.
///
/// This check is synchronous (uses a small blocking HTTP call) so it fits
/// the existing `run()` contract. The timeout is capped at 3 s to avoid
/// stalling `openhuman doctor` on a very slow Ollama daemon.
fn check_embedding_model_health(config: &Config, items: &mut Vec<DiagnosticItem>) {
let cat = "embedding_model";

// Resolve the effective (intended, non-probed) embedding settings.
let local_embedding_model = config.workload_local_model("embeddings");
let (provider, model, _dims) =
crate::openhuman::memory_store::factories::effective_embedding_settings(
&config.memory,
local_embedding_model.as_deref(),
);

log::debug!("[doctor] check_embedding_model_health: provider={provider} model={model}");

if provider != "ollama" {
// Cloud or custom provider — no local daemon to probe.
items.push(DiagnosticItem::ok(
cat,
format!("embedding provider: {provider} (model: {model}) — no local daemon required"),
));
return;
}

// Ollama path: probe reachability then model availability.
let base_url = crate::openhuman::inference::local::ollama_base_url();
let tags_url = format!("{}/api/tags", base_url.trim_end_matches('/'));

log::debug!("[doctor] probing ollama at {tags_url} for embedding model {model}");

let client = match reqwest::blocking::Client::builder()
.timeout(std::time::Duration::from_secs(3))
.build()
{
Ok(c) => c,
Err(e) => {
items.push(DiagnosticItem::warn(
cat,
format!("could not build HTTP client for Ollama probe: {e}"),
));
return;
}
};

let resp = match client.get(&tags_url).send() {
Ok(r) => r,
Err(e) => {
items.push(DiagnosticItem::error(
cat,
format!(
"Ollama daemon unreachable at {base_url} — embedding model `{model}` cannot be used. \
Start Ollama, then run: ollama pull {model} (error: {e})"
),
));
return;
}
};

if !resp.status().is_success() {
items.push(DiagnosticItem::error(
cat,
format!(
"Ollama /api/tags returned {} at {base_url} — cannot verify embedding model `{model}`. \
Start Ollama and run: ollama pull {model}",
resp.status()
),
));
return;
}

// Parse the tags response and look for the configured model.
let body = match resp.text() {
Ok(t) => t,
Err(e) => {
items.push(DiagnosticItem::warn(
cat,
format!("Ollama /api/tags response could not be read: {e}"),
));
return;
}
};

let model_found = serde_json::from_str::<serde_json::Value>(&body)
.ok()
.and_then(|v| v.get("models").cloned())
.and_then(|m| m.as_array().cloned())
.unwrap_or_default()
.iter()
.any(|entry| {
entry
.get("name")
.and_then(serde_json::Value::as_str)
.map(|name| model_matches(name, &model))
.unwrap_or(false)
});

if model_found {
items.push(DiagnosticItem::ok(
cat,
format!("embedding model `{model}` is installed and reachable at {base_url}"),
));
} else {
items.push(DiagnosticItem::error(
cat,
format!(
"embedding model `{model}` is NOT installed on Ollama at {base_url}. \
Run: ollama pull {model}"
),
));
}
}

// ── Helpers ──────────────────────────────────────────────────────

fn parse_rfc3339(input: &str) -> Option<DateTime<Utc>> {
Expand All @@ -844,6 +976,14 @@ fn parse_rfc3339(input: &str) -> Option<DateTime<Utc>> {
.map(|dt| dt.with_timezone(&Utc))
}

fn model_matches(installed: &str, configured: &str) -> bool {
installed == configured || model_base(installed) == model_base(configured)
}

fn model_base(model: &str) -> &str {
model.split(':').next().unwrap()
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

fn truncate_for_display(text: &str, max_len: usize) -> String {
if text.chars().count() <= max_len {
return text.to_string();
Expand Down
14 changes: 14 additions & 0 deletions src/openhuman/doctor/core_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,20 @@ fn embedding_provider_validation_rejects_malformed_url() {
assert!(err.contains("invalid custom provider URL"), "{err}");
}

#[test]
fn model_matches_accepts_exact_and_tagged_variants() {
assert!(model_matches("bge-m3", "bge-m3"));
assert!(model_matches("bge-m3:latest", "bge-m3"));
assert!(model_matches("bge-m3", "bge-m3:latest"));
assert!(model_matches("bge-m3:v1.0", "bge-m3"));
}

#[test]
fn model_matches_rejects_different_base_models() {
assert!(!model_matches("nomic-embed-text:latest", "bge-m3"));
assert!(!model_matches("bge-m3:latest", "nomic-embed-text"));
}

// ── check_memory_tree_db tests (#2206) ───────────────────────────────────────

/// When the workspace exists but the DB file has never been created,
Expand Down
9 changes: 8 additions & 1 deletion src/openhuman/doctor/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,14 @@ use crate::openhuman::doctor::{self, DoctorReport, ModelProbeReport};
use crate::rpc::RpcOutcome;

pub async fn doctor_report(config: &Config) -> Result<RpcOutcome<DoctorReport>, String> {
let report = doctor::run(config).map_err(|e| e.to_string())?;
// `doctor::run` calls `check_embedding_model_health` which uses
// `reqwest::blocking::Client` — that panics inside a tokio runtime.
// Move the entire sync `run()` onto a blocking thread.
let config_clone = config.clone();
let report = tokio::task::spawn_blocking(move || doctor::run(&config_clone))
.await
.map_err(|e| format!("doctor task join error: {e}"))?
.map_err(|e| e.to_string())?;
Ok(RpcOutcome::single_log(report, "doctor report generated"))
}

Expand Down
56 changes: 43 additions & 13 deletions src/openhuman/memory_store/factories.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,32 +29,58 @@ use crate::openhuman::memory_store::unified::UnifiedMemory;
static OLLAMA_HEALTH_REPORTED: AtomicBool = AtomicBool::new(false);

/// Reports the Ollama-unreachable fallback to Sentry at most once per
/// process. Returns `true` on the firing call, `false` afterwards — callers
/// use the return value only for logging context.
fn report_ollama_health_gate_once(base_url: &str) -> bool {
/// process and publishes an [`EmbeddingModelUnhealthy`] domain event.
///
/// Returns `true` on the firing call, `false` afterwards — callers use the
/// return value only for logging context.
///
/// [`EmbeddingModelUnhealthy`]: crate::core::event_bus::events::DomainEvent::EmbeddingModelUnhealthy
fn report_ollama_health_gate_once(base_url: &str, model: &str) -> bool {
if OLLAMA_HEALTH_REPORTED
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
log::debug!(
"[memory::factory] ollama health-gate fallback already reported this process; suppressing duplicate at {base_url}"
"[memory::factory] ollama health-gate fallback already reported this process; suppressing duplicate at {base_url} model={model}"
);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
return false;
}
// Tags are indexed and grouped on; keep them low-cardinality and free of
// credentials. Full URL stays in the message body for diagnostics.
let host_tag = redact_ollama_host(base_url);
let message = format!(
let sentry_message = format!(
"ollama embeddings opted-in but daemon unreachable at {base_url}; falling back to cloud embeddings for this session"
);
// Call report_error_message directly to avoid a redundant format!("{:#}") round-trip
// that report_error would perform on an already-formatted &str.
crate::core::observability::report_error_message(
&message,
&sentry_message,
"memory",
"ollama_health_gate",
&[("ollama_host", host_tag), ("fallback", "cloud")],
);

// Publish a user-visible domain event so the UI can surface a notification
// with an actionable fix hint. The event bus is best-effort (no runtime
// present in unit-test contexts without `init_global`), so we fire-and-
// forget and ignore any lagged-receiver errors.
let user_message = format!(
"Local embedding model unreachable — falling back to cloud embeddings. \
Run `ollama pull {model}` to fix."
);
log::debug!(
"[memory::factory] publishing EmbeddingModelUnhealthy event: provider=ollama model={model} fallback=cloud"
);
let event = crate::core::event_bus::DomainEvent::EmbeddingModelUnhealthy {
provider: "ollama".to_string(),
model: model.to_string(),
fallback_provider: "cloud".to_string(),
message: user_message,
};
// publish_global is infallible (drops the event when no receivers are
// registered, which is fine for the health-gate use case).
crate::core::event_bus::publish_global(event);

true
}

Expand Down Expand Up @@ -244,9 +270,10 @@ pub async fn effective_embedding_settings_probed(
// doesn't recreate the per-embed flood we're fixing. Then fall back to
// cloud so the user has a working app.
log::warn!(
"[memory::factory] ollama unreachable at {base_url}; falling back to cloud embedder for this session"
"[memory::factory] ollama unreachable at {base_url} (model={}); falling back to cloud embedder for this session",
intended.1
);
report_ollama_health_gate_once(&base_url);
report_ollama_health_gate_once(&base_url, &intended.1);
cloud_embedding_fallback()
}

Expand Down Expand Up @@ -364,9 +391,10 @@ fn create_memory_full(
intended
} else {
log::warn!(
"[memory::factory] ollama unreachable at {base_url}; falling back to cloud embedder for this session"
"[memory::factory] ollama unreachable at {base_url} (model={}); falling back to cloud embedder for this session",
intended.1
);
report_ollama_health_gate_once(&base_url);
report_ollama_health_gate_once(&base_url, &intended.1);
gate_triggered = true;
cloud_embedding_fallback()
}
Expand Down Expand Up @@ -707,6 +735,8 @@ mod tests {
/// subsequent calls in the same process must be suppressed. We can't
/// observe the Sentry side effect directly here, but the boolean return
/// value is the gate's contract — covers the once-per-process guarantee.
/// Event publication is fire-and-forget via the global event bus and is
/// verified manually/log-side rather than by this unit test.
///
/// Acquires the local-AI domain mutex to serialize with `probed_settings_*`
/// tests that also touch the latch; without that, parallel test execution
Expand All @@ -719,15 +749,15 @@ mod tests {
reset_health_gate_for_test();

assert!(
report_ollama_health_gate_once("http://127.0.0.1:1"),
report_ollama_health_gate_once("http://127.0.0.1:1", "bge-m3"),
"first call must fire the report"
);
assert!(
!report_ollama_health_gate_once("http://127.0.0.1:1"),
!report_ollama_health_gate_once("http://127.0.0.1:1", "bge-m3"),
"second call must be suppressed"
);
assert!(
!report_ollama_health_gate_once("http://example.invalid:11434"),
!report_ollama_health_gate_once("http://example.invalid:11434", "nomic-embed-text"),
"different URL also suppressed — gate is process-scoped, not per-URL"
);
}
Expand Down
13 changes: 7 additions & 6 deletions src/openhuman/memory_tree/score/embed/ollama.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,13 @@ impl Embedder for OllamaEmbedder {
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
anyhow::bail!(format_embedding_status_error(
status,
&body,
&self.endpoint,
&self.model
));
let msg = format_embedding_status_error(status, &body, &self.endpoint, &self.model);
// Log at WARN so missing-model failures surface in traces without
// requiring debug-level logging to be enabled. Missing-model 404s
// include the `ollama pull` remediation hint from
// `format_embedding_status_error`.
log::warn!("[embeddings] {msg}");
anyhow::bail!(msg);
}

let payload: EmbedResponse = resp
Expand Down