From 9863d78ef668428e2fa3b754bcee5de1ebba1133 Mon Sep 17 00:00:00 2001 From: Zhenbo Li <3221521+Endle@users.noreply.github.com> Date: Sun, 29 Dec 2024 20:31:25 -0500 Subject: [PATCH] Set timeout for LLM (#153) (#154) --- fire_seq_search_server/debug_server.sh | 2 +- fire_seq_search_server/deny.toml | 2 +- .../src/language_tools/tokenizer.rs | 1 + fire_seq_search_server/src/lib.rs | 1 + fire_seq_search_server/src/local_llm/mod.rs | 45 ++++++++++++++----- fire_seq_search_server/src/main.rs | 17 +++---- .../src/query_engine/mod.rs | 1 + 7 files changed, 46 insertions(+), 23 deletions(-) diff --git a/fire_seq_search_server/debug_server.sh b/fire_seq_search_server/debug_server.sh index fbc3c0d..55cae89 100644 --- a/fire_seq_search_server/debug_server.sh +++ b/fire_seq_search_server/debug_server.sh @@ -4,7 +4,7 @@ rm -f ./fire_seq_search_server cargo build --features llm cp target/debug/fire_seq_search_server ./fire_seq_search_server -export RUST_LOG="warn,fire_seq_search_server=debug" +export RUST_LOG="warn,fire_seq_search_server=info" #export RUST_LOG="debug" export RUST_BACKTRACE=1 #RAYON_NUM_THREADS=1 diff --git a/fire_seq_search_server/deny.toml b/fire_seq_search_server/deny.toml index 7e4fa45..b98577a 100644 --- a/fire_seq_search_server/deny.toml +++ b/fire_seq_search_server/deny.toml @@ -25,6 +25,7 @@ allow = [ "BSD-2-Clause", "BSD-3-Clause", "CC0-1.0", "MPL-2.0", + "Unicode-3.0", ] # The confidence threshold for detecting a license from license text. # The higher the value, the more closely the license text must be to the @@ -35,7 +36,6 @@ confidence-threshold = 0.8 # aren't accepted for every possible crate as with the normal allow list exceptions = [ { name = "fastdivide", allow = ["zlib-acknowledgement"] }, - { name = "unicode-ident", allow = ["Unicode-DFS-2016"] }, ] # This section is considered when running `cargo deny check bans`. diff --git a/fire_seq_search_server/src/language_tools/tokenizer.rs b/fire_seq_search_server/src/language_tools/tokenizer.rs index 7cac6d9..abff1fe 100644 --- a/fire_seq_search_server/src/language_tools/tokenizer.rs +++ b/fire_seq_search_server/src/language_tools/tokenizer.rs @@ -13,6 +13,7 @@ use log::{debug, info}; pub fn filter_out_stopwords<'a,'b>(term_tokens: &'a [String], nltk: &'b HashSet) -> Vec<&'a str> { let term_ref: Vec<&str> = term_tokens.iter().map(|s| &**s).collect(); let terms_selected: Vec<&str> = term_ref.into_iter() + .filter(|&s| ! (s.trim().is_empty() ) ) .filter(|&s| !nltk.contains(&(&s).to_lowercase() ) ) .collect(); terms_selected diff --git a/fire_seq_search_server/src/lib.rs b/fire_seq_search_server/src/lib.rs index bc397b3..2b93e02 100644 --- a/fire_seq_search_server/src/lib.rs +++ b/fire_seq_search_server/src/lib.rs @@ -174,6 +174,7 @@ pub fn generate_server_info_for_test() -> ServerInformation { convert_underline_hierarchy: true, host: "127.0.0.1:22024".to_string(), llm_enabled: false, + llm_max_waiting_time: 60, }; server_info } diff --git a/fire_seq_search_server/src/local_llm/mod.rs b/fire_seq_search_server/src/local_llm/mod.rs index f845232..ba3c3f6 100644 --- a/fire_seq_search_server/src/local_llm/mod.rs +++ b/fire_seq_search_server/src/local_llm/mod.rs @@ -10,7 +10,7 @@ use std::sync::Arc; use tokio::sync::Mutex; use tokio::task::yield_now; use tokio::task; -use tokio::time; +use tokio; use std::borrow::Cow; use std::borrow::Cow::Borrowed; @@ -88,9 +88,17 @@ pub struct HealthCheck { const LLM_SERVER_PORT: &str = "8081"; // TODO Remove this magic number + +#[derive(Debug)] +pub struct LlmJob { + pub title: String, + pub body : String, + pub time : std::time::Instant, /* 16 bytes */ +} + struct JobProcessor { done_job: HashMap, - job_queue: VecDeque, + job_queue: VecDeque, } impl JobProcessor { @@ -104,22 +112,29 @@ impl JobProcessor { let title: &str = &doc.title; info!("Job posted for {}", &title); if !self.done_job.contains_key(title) { - self.job_queue.push_back(doc); + let job: LlmJob = LlmJob { + title: doc.title, + body: doc.body, + time: std::time::Instant::now(), + }; + self.job_queue.push_back(job); } } } +use crate::ServerInformation; + pub struct LlmEngine { endpoint: String, client: reqwest::Client, job_cache: Arc>, - //job_cache :Arc >>>, + server_info: Arc, } impl LlmEngine { - pub async fn llm_init() -> Self { + pub async fn llm_init(server_info: Arc) -> Self { info!("llm called"); let lfile = locate_llamafile().await; @@ -129,7 +144,6 @@ impl LlmEngine { .args([ "-n", "19", &lfile, "--nobrowser", "--port", LLM_SERVER_PORT, - //">/tmp/llamafile.stdout", "2>/tmp/llamafile.stderr", ]) .stdout(Stdio::from(File::create("/tmp/llamafile.stdout.txt").unwrap())) .stderr(Stdio::from(File::create("/tmp/llamafile.stderr.txt").unwrap())) @@ -137,19 +151,18 @@ impl LlmEngine { .expect("llm model failed to launch"); yield_now().await; - let wait_llm = time::Duration::from_millis(500); + let wait_llm = tokio::time::Duration::from_millis(500); tokio::time::sleep(wait_llm).await; task::yield_now().await; let endpoint = format!("http://127.0.0.1:{}", LLM_SERVER_PORT).to_string(); - loop { let resp = reqwest::get(endpoint.to_owned() + "/health").await; let resp = match resp { Err(_e) => { info!("llm not ready"); - let wait_llm = time::Duration::from_millis(1000); + let wait_llm = tokio::time::Duration::from_millis(1000); tokio::time::sleep(wait_llm).await; task::yield_now().await; continue; @@ -171,7 +184,8 @@ impl LlmEngine { Self { endpoint, client, - job_cache: map + job_cache: map, + server_info, } } @@ -230,7 +244,7 @@ impl LlmEngine{ return; } - let next_job: Option; + let next_job: Option; let mut jcache = self.job_cache.lock().await;//.unwrap(); next_job = jcache.job_queue.pop_front(); @@ -249,6 +263,15 @@ impl LlmEngine{ } drop(jcache); + let waiting_time = doc.time.elapsed().as_secs(); + let allowed_wait = self.server_info.llm_max_waiting_time; + if waiting_time > allowed_wait { + info!("Waiting for {} for {} seconds, discard", + &title, waiting_time); + return; + } + + info!("Start summarize job: {}", &title); let summarize_result = self.summarize(&doc.body).await; info!("Finished summarize job: {}", &title); diff --git a/fire_seq_search_server/src/main.rs b/fire_seq_search_server/src/main.rs index 37f7951..1b47020 100644 --- a/fire_seq_search_server/src/main.rs +++ b/fire_seq_search_server/src/main.rs @@ -58,17 +58,17 @@ async fn main() { .format_target(false) .init(); + info!("main thread running"); + let matches = Cli::parse(); + let server_info: ServerInformation = build_server_info(matches); + let mut llm_loader = None; if cfg!(feature="llm") { info!("LLM Enabled"); - //tokio::task::JoinHandle - llm_loader = Some(task::spawn( async { LlmEngine::llm_init().await })); + let serv_info = Arc::new(server_info.clone()); + llm_loader = Some(task::spawn( async { LlmEngine::llm_init( serv_info ).await })); } - info!("main thread running"); - let matches = Cli::parse(); - let server_info: ServerInformation = build_server_info(matches); - let mut engine = QueryEngine::construct(server_info).await; info!("query engine build finished"); @@ -100,14 +100,10 @@ async fn main() { let listener = tokio::net::TcpListener::bind(&engine_arc.server_info.host) .await.unwrap(); axum::serve(listener, app).await.unwrap(); - // let llm = llm.await.unwrap(); - //llm.summarize("hi my friend").await; } - - fn build_server_info(args: Cli) -> ServerInformation { let notebook_name = match args.notebook_name { Some(x) => x.to_string(), @@ -136,6 +132,7 @@ fn build_server_info(args: Cli) -> ServerInformation { convert_underline_hierarchy: true, host, llm_enabled: cfg!(feature="llm"), + llm_max_waiting_time: 180, } } diff --git a/fire_seq_search_server/src/query_engine/mod.rs b/fire_seq_search_server/src/query_engine/mod.rs index 8451e05..d6dfc80 100644 --- a/fire_seq_search_server/src/query_engine/mod.rs +++ b/fire_seq_search_server/src/query_engine/mod.rs @@ -33,6 +33,7 @@ pub struct ServerInformation { pub host: String, pub llm_enabled: bool, + pub llm_max_waiting_time: u64, /* in secs */ } use crate::language_tools::tokenizer::FireSeqTokenizer;