Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set timeout for LLM (#153) #154

Merged
merged 7 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fire_seq_search_server/debug_server.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ rm -f ./fire_seq_search_server
cargo build --features llm
cp target/debug/fire_seq_search_server ./fire_seq_search_server

export RUST_LOG="warn,fire_seq_search_server=debug"
export RUST_LOG="warn,fire_seq_search_server=info"
#export RUST_LOG="debug"
export RUST_BACKTRACE=1
#RAYON_NUM_THREADS=1
Expand Down
2 changes: 1 addition & 1 deletion fire_seq_search_server/deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ allow = [
"BSD-2-Clause", "BSD-3-Clause",
"CC0-1.0",
"MPL-2.0",
"Unicode-3.0",
]
# The confidence threshold for detecting a license from license text.
# The higher the value, the more closely the license text must be to the
Expand All @@ -35,7 +36,6 @@ confidence-threshold = 0.8
# aren't accepted for every possible crate as with the normal allow list
exceptions = [
{ name = "fastdivide", allow = ["zlib-acknowledgement"] },
{ name = "unicode-ident", allow = ["Unicode-DFS-2016"] },
]

# This section is considered when running `cargo deny check bans`.
Expand Down
1 change: 1 addition & 0 deletions fire_seq_search_server/src/language_tools/tokenizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use log::{debug, info};
pub fn filter_out_stopwords<'a,'b>(term_tokens: &'a [String], nltk: &'b HashSet<String>) -> Vec<&'a str> {
let term_ref: Vec<&str> = term_tokens.iter().map(|s| &**s).collect();
let terms_selected: Vec<&str> = term_ref.into_iter()
.filter(|&s| ! (s.trim().is_empty() ) )
.filter(|&s| !nltk.contains(&(&s).to_lowercase() ) )
.collect();
terms_selected
Expand Down
1 change: 1 addition & 0 deletions fire_seq_search_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ pub fn generate_server_info_for_test() -> ServerInformation {
convert_underline_hierarchy: true,
host: "127.0.0.1:22024".to_string(),
llm_enabled: false,
llm_max_waiting_time: 60,
};
server_info
}
Expand Down
45 changes: 34 additions & 11 deletions fire_seq_search_server/src/local_llm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::task::yield_now;
use tokio::task;
use tokio::time;
use tokio;

use std::borrow::Cow;
use std::borrow::Cow::Borrowed;
Expand Down Expand Up @@ -88,9 +88,17 @@ pub struct HealthCheck {

const LLM_SERVER_PORT: &str = "8081"; // TODO Remove this magic number


#[derive(Debug)]
pub struct LlmJob {
pub title: String,
pub body : String,
pub time : std::time::Instant, /* 16 bytes */
}

struct JobProcessor {
done_job: HashMap<String, String>,
job_queue: VecDeque<DocData>,
job_queue: VecDeque<LlmJob>,
}

impl JobProcessor {
Expand All @@ -104,22 +112,29 @@ impl JobProcessor {
let title: &str = &doc.title;
info!("Job posted for {}", &title);
if !self.done_job.contains_key(title) {
self.job_queue.push_back(doc);
let job: LlmJob = LlmJob {
title: doc.title,
body: doc.body,
time: std::time::Instant::now(),
};
self.job_queue.push_back(job);
}
}
}

use crate::ServerInformation;

pub struct LlmEngine {
endpoint: String,
client: reqwest::Client,
job_cache: Arc<Mutex<JobProcessor>>,
//job_cache :Arc<Mutex<HashMap<String, Option<String> >>>,
server_info: Arc<ServerInformation>,
}



impl LlmEngine {
pub async fn llm_init() -> Self {
pub async fn llm_init(server_info: Arc<ServerInformation>) -> Self {
info!("llm called");

let lfile = locate_llamafile().await;
Expand All @@ -129,27 +144,25 @@ impl LlmEngine {
.args([ "-n", "19",
&lfile, "--nobrowser",
"--port", LLM_SERVER_PORT,
//">/tmp/llamafile.stdout", "2>/tmp/llamafile.stderr",
])
.stdout(Stdio::from(File::create("/tmp/llamafile.stdout.txt").unwrap()))
.stderr(Stdio::from(File::create("/tmp/llamafile.stderr.txt").unwrap()))
.spawn()
.expect("llm model failed to launch");

yield_now().await;
let wait_llm = time::Duration::from_millis(500);
let wait_llm = tokio::time::Duration::from_millis(500);
tokio::time::sleep(wait_llm).await;
task::yield_now().await;

let endpoint = format!("http://127.0.0.1:{}", LLM_SERVER_PORT).to_string();


loop {
let resp = reqwest::get(endpoint.to_owned() + "/health").await;
let resp = match resp {
Err(_e) => {
info!("llm not ready");
let wait_llm = time::Duration::from_millis(1000);
let wait_llm = tokio::time::Duration::from_millis(1000);
tokio::time::sleep(wait_llm).await;
task::yield_now().await;
continue;
Expand All @@ -171,7 +184,8 @@ impl LlmEngine {
Self {
endpoint,
client,
job_cache: map
job_cache: map,
server_info,
}
}

Expand Down Expand Up @@ -230,7 +244,7 @@ impl LlmEngine{
return;
}

let next_job: Option<DocData>;
let next_job: Option<LlmJob>;

let mut jcache = self.job_cache.lock().await;//.unwrap();
next_job = jcache.job_queue.pop_front();
Expand All @@ -249,6 +263,15 @@ impl LlmEngine{
}
drop(jcache);

let waiting_time = doc.time.elapsed().as_secs();
let allowed_wait = self.server_info.llm_max_waiting_time;
if waiting_time > allowed_wait {
info!("Waiting for {} for {} seconds, discard",
&title, waiting_time);
return;
}


info!("Start summarize job: {}", &title);
let summarize_result = self.summarize(&doc.body).await;
info!("Finished summarize job: {}", &title);
Expand Down
17 changes: 7 additions & 10 deletions fire_seq_search_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,17 @@ async fn main() {
.format_target(false)
.init();

info!("main thread running");
let matches = Cli::parse();
let server_info: ServerInformation = build_server_info(matches);

let mut llm_loader = None;
if cfg!(feature="llm") {
info!("LLM Enabled");
//tokio::task::JoinHandle<LlmEngine>
llm_loader = Some(task::spawn( async { LlmEngine::llm_init().await }));
let serv_info = Arc::new(server_info.clone());
llm_loader = Some(task::spawn( async { LlmEngine::llm_init( serv_info ).await }));
}

info!("main thread running");
let matches = Cli::parse();
let server_info: ServerInformation = build_server_info(matches);

let mut engine = QueryEngine::construct(server_info).await;

info!("query engine build finished");
Expand Down Expand Up @@ -100,14 +100,10 @@ async fn main() {
let listener = tokio::net::TcpListener::bind(&engine_arc.server_info.host)
.await.unwrap();
axum::serve(listener, app).await.unwrap();
// let llm = llm.await.unwrap();
//llm.summarize("hi my friend").await;
}





fn build_server_info(args: Cli) -> ServerInformation {
let notebook_name = match args.notebook_name {
Some(x) => x.to_string(),
Expand Down Expand Up @@ -136,6 +132,7 @@ fn build_server_info(args: Cli) -> ServerInformation {
convert_underline_hierarchy: true,
host,
llm_enabled: cfg!(feature="llm"),
llm_max_waiting_time: 180,
}
}

Expand Down
1 change: 1 addition & 0 deletions fire_seq_search_server/src/query_engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub struct ServerInformation {
pub host: String,

pub llm_enabled: bool,
pub llm_max_waiting_time: u64, /* in secs */
}

use crate::language_tools::tokenizer::FireSeqTokenizer;
Expand Down
Loading