Skip to content

Commit

Permalink
trying to add job queue
Browse files Browse the repository at this point in the history
  • Loading branch information
Endle committed Aug 22, 2024
1 parent 33e9997 commit 123033c
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 22 deletions.
8 changes: 5 additions & 3 deletions fire_seq_search_server/debug_server_mac.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ rm -f ./fire_seq_search_server
#nix-shell -p cargo -p rustc -p libiconv --run "cargo build"
cargo build
cp target/debug/fire_seq_search_server ./fire_seq_search_server
RUST_BACKTRACE=1 RUST_LOG=debug ./fire_seq_search_server \
--notebook_path /Users/zhenboli/logseq \
--exclude-zotero-items

export RUST_LOG="warn,fire_seq_search_server=info"
#export RUST_LOG="debug"
export RUST_BACKTRACE=1
./fire_seq_search_server --notebook_path ~/logseq --enable-journal-query
52 changes: 33 additions & 19 deletions fire_seq_search_server/src/local_llm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,37 @@ use log::{info, error};
use crate::query_engine::ServerInformation;
use reqwest;
use std::collections::HashMap;
use std::collections::VecDeque;



const LLM_SERVER_PORT: &str = "8081"; // TODO Remove this magic number
use std::sync::Arc;
use std::sync::Mutex;

struct JobProcessor {
done_job: HashMap<String, String>,
job_queue: VecDeque<String>,
}

impl JobProcessor {
pub fn new() -> Self {
JobProcessor {
done_job: HashMap::new(),
job_queue: VecDeque::new(),
}
}
pub fn add(&mut self, title:String) {
info!("Job posted for {}", &title);
self.job_queue.push_back(title);
}
}

pub struct LlmEngine {
endpoint: String,
client: reqwest::Client,
job_cache :Arc<Mutex<HashMap<String, Option<String> >>>,
job_cache: Arc<Mutex<JobProcessor>>,
//job_cache :Arc<Mutex<HashMap<String, Option<String> >>>,
}

use serde::{Serialize, Deserialize};
Expand All @@ -30,7 +50,7 @@ pub struct Message {
}

use tokio::task;
use crate::query_engine::DocData;
use crate::query_engine::DocData;
impl LlmEngine {
pub async fn llm_init() -> Self {
info!("llm called");
Expand All @@ -51,6 +71,8 @@ impl LlmEngine {
.expect("llm model failed to launch");

use tokio::time;
use tokio::task::yield_now;
yield_now().await;
let wait_llm = time::Duration::from_millis(500);
tokio::time::sleep(wait_llm).await;
task::yield_now().await;
Expand Down Expand Up @@ -81,7 +103,8 @@ impl LlmEngine {
let client = reqwest::Client::new();

info!("llm engine initialized");
let mut map = Arc::new(Mutex::new(HashMap::new()));
let mut map = Arc::new(Mutex::new(
JobProcessor::new()));
Self {
endpoint,
client,
Expand Down Expand Up @@ -125,17 +148,9 @@ impl LlmEngine{
}

pub async fn post_summarize_job(&self, doc: DocData) {
info!("Job posted for {}", &doc.title);
let mut jcache = self.job_cache.lock().unwrap(); //TODO error handler?
match jcache.get(&doc.title) {
Some(status) => {
},
None => {
info!("Create task for {}", &doc.title);
jcache.insert(doc.title.to_owned(), None);
//TODO
}
};
//TODO error handler?
let mut jcache = self.job_cache.lock().unwrap();
jcache.add(doc.title.to_owned());
}

pub async fn health(&self) -> Result<(), Box<dyn std::error::Error>> {
Expand All @@ -162,7 +177,6 @@ struct LlamaFileDef {

async fn locate_llamafile() -> Option<String> {
// TODO
//use sha256::try_digest;
let mut lf = LlamaFileDef {
filename: "mistral-7b-instruct-v0.2.Q4_0.llamafile".to_owned(),
filepath: None,
Expand All @@ -172,14 +186,14 @@ async fn locate_llamafile() -> Option<String> {

// TODO hack in dev
//let lf_path = "/var/home/lizhenbo/Downloads/mistral-7b-instruct-v0.2.Q4_0.llamafile";
let lf_base = "/Users/zhenboli/.llamafile/"
let lf_base = "/Users/zhenboli/.llamafile/";
let lf_path = lf_base.to_owned() + &lf.filename;
lf.filepath = Some( lf_path.to_owned() );
info!("lf {:?}", &lf);

let ppath = std::path::Path::new(lf_path);
let val = try_digest(ppath).unwrap();
//let val = "1903778f7defd921347b25327ebe5dd902f29417ba524144a8e4f7c32d83dee8";
let ppath = std::path::Path::new(&lf_path);
//let val = sha256::try_digest(ppath).unwrap();
let val = "1903778f7defd921347b25327ebe5dd902f29417ba524144a8e4f7c32d83dee8";
if val != lf.sha256 {
error!("Wrong sha256sum for the model. Quit");
return None;
Expand Down
2 changes: 2 additions & 0 deletions fire_seq_search_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ async fn main() {
let server_info: ServerInformation = build_server_info(matches);

let mut engine = QueryEngine::construct(server_info);

info!("query engine build finished");
if cfg!(feature="llm") {
let llm:LlmEngine = llm_loader.unwrap().await.unwrap();
engine.llm = Some(llm);
Expand Down

0 comments on commit 123033c

Please sign in to comment.