Skip to content

Commit

Permalink
Set timeout for LLM (#153) (#154)
Browse files Browse the repository at this point in the history
  • Loading branch information
Endle authored Dec 30, 2024
1 parent 2ea9ca0 commit 9863d78
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 23 deletions.
2 changes: 1 addition & 1 deletion fire_seq_search_server/debug_server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ rm -f ./fire_seq_search_server
cargo build --features llm
cp target/debug/fire_seq_search_server ./fire_seq_search_server

export RUST_LOG="warn,fire_seq_search_server=debug"
export RUST_LOG="warn,fire_seq_search_server=info"
#export RUST_LOG="debug"
export RUST_BACKTRACE=1
#RAYON_NUM_THREADS=1
Expand Down
2 changes: 1 addition & 1 deletion fire_seq_search_server/deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ allow = [
"BSD-2-Clause", "BSD-3-Clause",
"CC0-1.0",
"MPL-2.0",
"Unicode-3.0",
]
# The confidence threshold for detecting a license from license text.
# The higher the value, the more closely the license text must be to the
Expand All @@ -35,7 +36,6 @@ confidence-threshold = 0.8
# aren't accepted for every possible crate as with the normal allow list
exceptions = [
{ name = "fastdivide", allow = ["zlib-acknowledgement"] },
{ name = "unicode-ident", allow = ["Unicode-DFS-2016"] },
]

# This section is considered when running `cargo deny check bans`.
Expand Down
1 change: 1 addition & 0 deletions fire_seq_search_server/src/language_tools/tokenizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use log::{debug, info};
pub fn filter_out_stopwords<'a,'b>(term_tokens: &'a [String], nltk: &'b HashSet<String>) -> Vec<&'a str> {
let term_ref: Vec<&str> = term_tokens.iter().map(|s| &**s).collect();
let terms_selected: Vec<&str> = term_ref.into_iter()
.filter(|&s| ! (s.trim().is_empty() ) )
.filter(|&s| !nltk.contains(&(&s).to_lowercase() ) )
.collect();
terms_selected
Expand Down
1 change: 1 addition & 0 deletions fire_seq_search_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ pub fn generate_server_info_for_test() -> ServerInformation {
convert_underline_hierarchy: true,
host: "127.0.0.1:22024".to_string(),
llm_enabled: false,
llm_max_waiting_time: 60,
};
server_info
}
Expand Down
45 changes: 34 additions & 11 deletions fire_seq_search_server/src/local_llm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::task::yield_now;
use tokio::task;
use tokio::time;
use tokio;

use std::borrow::Cow;
use std::borrow::Cow::Borrowed;
Expand Down Expand Up @@ -88,9 +88,17 @@ pub struct HealthCheck {

const LLM_SERVER_PORT: &str = "8081"; // TODO Remove this magic number


#[derive(Debug)]
pub struct LlmJob {
pub title: String,
pub body : String,
pub time : std::time::Instant, /* 16 bytes */
}

struct JobProcessor {
done_job: HashMap<String, String>,
job_queue: VecDeque<DocData>,
job_queue: VecDeque<LlmJob>,
}

impl JobProcessor {
Expand All @@ -104,22 +112,29 @@ impl JobProcessor {
let title: &str = &doc.title;
info!("Job posted for {}", &title);
if !self.done_job.contains_key(title) {
self.job_queue.push_back(doc);
let job: LlmJob = LlmJob {
title: doc.title,
body: doc.body,
time: std::time::Instant::now(),
};
self.job_queue.push_back(job);
}
}
}

use crate::ServerInformation;

pub struct LlmEngine {
endpoint: String,
client: reqwest::Client,
job_cache: Arc<Mutex<JobProcessor>>,
//job_cache :Arc<Mutex<HashMap<String, Option<String> >>>,
server_info: Arc<ServerInformation>,
}



impl LlmEngine {
pub async fn llm_init() -> Self {
pub async fn llm_init(server_info: Arc<ServerInformation>) -> Self {
info!("llm called");

let lfile = locate_llamafile().await;
Expand All @@ -129,27 +144,25 @@ impl LlmEngine {
.args([ "-n", "19",
&lfile, "--nobrowser",
"--port", LLM_SERVER_PORT,
//">/tmp/llamafile.stdout", "2>/tmp/llamafile.stderr",
])
.stdout(Stdio::from(File::create("/tmp/llamafile.stdout.txt").unwrap()))
.stderr(Stdio::from(File::create("/tmp/llamafile.stderr.txt").unwrap()))
.spawn()
.expect("llm model failed to launch");

yield_now().await;
let wait_llm = time::Duration::from_millis(500);
let wait_llm = tokio::time::Duration::from_millis(500);
tokio::time::sleep(wait_llm).await;
task::yield_now().await;

let endpoint = format!("http://127.0.0.1:{}", LLM_SERVER_PORT).to_string();


loop {
let resp = reqwest::get(endpoint.to_owned() + "/health").await;
let resp = match resp {
Err(_e) => {
info!("llm not ready");
let wait_llm = time::Duration::from_millis(1000);
let wait_llm = tokio::time::Duration::from_millis(1000);
tokio::time::sleep(wait_llm).await;
task::yield_now().await;
continue;
Expand All @@ -171,7 +184,8 @@ impl LlmEngine {
Self {
endpoint,
client,
job_cache: map
job_cache: map,
server_info,
}
}

Expand Down Expand Up @@ -230,7 +244,7 @@ impl LlmEngine{
return;
}

let next_job: Option<DocData>;
let next_job: Option<LlmJob>;

let mut jcache = self.job_cache.lock().await;//.unwrap();
next_job = jcache.job_queue.pop_front();
Expand All @@ -249,6 +263,15 @@ impl LlmEngine{
}
drop(jcache);

let waiting_time = doc.time.elapsed().as_secs();
let allowed_wait = self.server_info.llm_max_waiting_time;
if waiting_time > allowed_wait {
info!("Waiting for {} for {} seconds, discard",
&title, waiting_time);
return;
}


info!("Start summarize job: {}", &title);
let summarize_result = self.summarize(&doc.body).await;
info!("Finished summarize job: {}", &title);
Expand Down
17 changes: 7 additions & 10 deletions fire_seq_search_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,17 @@ async fn main() {
.format_target(false)
.init();

info!("main thread running");
let matches = Cli::parse();
let server_info: ServerInformation = build_server_info(matches);

let mut llm_loader = None;
if cfg!(feature="llm") {
info!("LLM Enabled");
//tokio::task::JoinHandle<LlmEngine>
llm_loader = Some(task::spawn( async { LlmEngine::llm_init().await }));
let serv_info = Arc::new(server_info.clone());
llm_loader = Some(task::spawn( async { LlmEngine::llm_init( serv_info ).await }));
}

info!("main thread running");
let matches = Cli::parse();
let server_info: ServerInformation = build_server_info(matches);

let mut engine = QueryEngine::construct(server_info).await;

info!("query engine build finished");
Expand Down Expand Up @@ -100,14 +100,10 @@ async fn main() {
let listener = tokio::net::TcpListener::bind(&engine_arc.server_info.host)
.await.unwrap();
axum::serve(listener, app).await.unwrap();
// let llm = llm.await.unwrap();
//llm.summarize("hi my friend").await;
}





fn build_server_info(args: Cli) -> ServerInformation {
let notebook_name = match args.notebook_name {
Some(x) => x.to_string(),
Expand Down Expand Up @@ -136,6 +132,7 @@ fn build_server_info(args: Cli) -> ServerInformation {
convert_underline_hierarchy: true,
host,
llm_enabled: cfg!(feature="llm"),
llm_max_waiting_time: 180,
}
}

Expand Down
1 change: 1 addition & 0 deletions fire_seq_search_server/src/query_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct ServerInformation {
pub host: String,

pub llm_enabled: bool,
pub llm_max_waiting_time: u64, /* in secs */
}

use crate::language_tools::tokenizer::FireSeqTokenizer;
Expand Down

0 comments on commit 9863d78

Please sign in to comment.