diff --git a/coprocessor/fhevm-engine/.sqlx/query-0be7f94ac1356de126688b56b95593e80509b7834f14f39e8aed9a4f15fad410.json b/coprocessor/fhevm-engine/.sqlx/query-0be7f94ac1356de126688b56b95593e80509b7834f14f39e8aed9a4f15fad410.json new file mode 100644 index 000000000..7050a3732 --- /dev/null +++ b/coprocessor/fhevm-engine/.sqlx/query-0be7f94ac1356de126688b56b95593e80509b7834f14f39e8aed9a4f15fad410.json @@ -0,0 +1,28 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT status, worker_id FROM dependence_chain WHERE dependence_chain_id = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "status", + "type_info": "Text" + }, + { + "ordinal": 1, + "name": "worker_id", + "type_info": "Uuid" + } + ], + "parameters": { + "Left": [ + "Bytea" + ] + }, + "nullable": [ + false, + true + ] + }, + "hash": "0be7f94ac1356de126688b56b95593e80509b7834f14f39e8aed9a4f15fad410" +} diff --git a/coprocessor/fhevm-engine/.sqlx/query-133f13945ef86d97a139636c8d3feab7b76acafe242bf4c4ca89f85aec539164.json b/coprocessor/fhevm-engine/.sqlx/query-133f13945ef86d97a139636c8d3feab7b76acafe242bf4c4ca89f85aec539164.json new file mode 100644 index 000000000..39ecadec6 --- /dev/null +++ b/coprocessor/fhevm-engine/.sqlx/query-133f13945ef86d97a139636c8d3feab7b76acafe242bf4c4ca89f85aec539164.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE dependence_chain \n SET \n worker_id = NULL,\n lock_acquired_at = NULL,\n lock_expires_at = NULL,\n status = CASE \n WHEN status = 'processing' THEN 'processed'\n ELSE status\n END\n WHERE worker_id = $1 AND dependence_chain_id = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Bytea" + ] + }, + "nullable": [] + }, + "hash": "133f13945ef86d97a139636c8d3feab7b76acafe242bf4c4ca89f85aec539164" +} diff --git a/coprocessor/fhevm-engine/.sqlx/query-356ad05cf8677b0e561e56e0b7d5298b39471d8431093f3297da926b3f97273e.json b/coprocessor/fhevm-engine/.sqlx/query-356ad05cf8677b0e561e56e0b7d5298b39471d8431093f3297da926b3f97273e.json new file mode 100644 index 000000000..69b08b581 --- /dev/null +++ b/coprocessor/fhevm-engine/.sqlx/query-356ad05cf8677b0e561e56e0b7d5298b39471d8431093f3297da926b3f97273e.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "TRUNCATE TABLE dependence_chain", + "describe": { + "columns": [], + "parameters": { + "Left": [] + }, + "nullable": [] + }, + "hash": "356ad05cf8677b0e561e56e0b7d5298b39471d8431093f3297da926b3f97273e" +} diff --git a/coprocessor/fhevm-engine/.sqlx/query-5ed3357cb17bbeb4c9c195203319d4c52c23e042141c6e4574edcf6416aaa282.json b/coprocessor/fhevm-engine/.sqlx/query-5ed3357cb17bbeb4c9c195203319d4c52c23e042141c6e4574edcf6416aaa282.json new file mode 100644 index 000000000..8e085c6a5 --- /dev/null +++ b/coprocessor/fhevm-engine/.sqlx/query-5ed3357cb17bbeb4c9c195203319d4c52c23e042141c6e4574edcf6416aaa282.json @@ -0,0 +1,24 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE dependence_chain AS dc\n SET\n lock_expires_at = NOW() + make_interval(secs => $3)\n WHERE dependence_chain_id = $1 AND worker_id = $2\n RETURNING dc.lock_expires_at::timestamptz AS \"lock_expires_at: chrono::DateTime\";\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "lock_expires_at: chrono::DateTime", + "type_info": "Timestamptz" + } + ], + "parameters": { + "Left": [ + "Bytea", + "Uuid", + "Float8" + ] + }, + "nullable": [ + true + ] + }, + "hash": "5ed3357cb17bbeb4c9c195203319d4c52c23e042141c6e4574edcf6416aaa282" +} diff --git a/coprocessor/fhevm-engine/.sqlx/query-8f7a80b924a8cc486b806a8c89d92bc46ae3f8342223e75b46a6f370cc701c13.json b/coprocessor/fhevm-engine/.sqlx/query-8f7a80b924a8cc486b806a8c89d92bc46ae3f8342223e75b46a6f370cc701c13.json new file mode 100644 index 000000000..1e34231fa --- /dev/null +++ b/coprocessor/fhevm-engine/.sqlx/query-8f7a80b924a8cc486b806a8c89d92bc46ae3f8342223e75b46a6f370cc701c13.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE dependence_chain\n SET status = 'updated', last_updated_at = NOW()\n WHERE dependence_chain_id = $1", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Bytea" + ] + }, + "nullable": [] + }, + "hash": "8f7a80b924a8cc486b806a8c89d92bc46ae3f8342223e75b46a6f370cc701c13" +} diff --git a/coprocessor/fhevm-engine/.sqlx/query-92ed51d010ff1133aed35ae7594933e19b8cc81314d65d5c89044059121e02b1.json b/coprocessor/fhevm-engine/.sqlx/query-92ed51d010ff1133aed35ae7594933e19b8cc81314d65d5c89044059121e02b1.json new file mode 100644 index 000000000..0f1e100e4 --- /dev/null +++ b/coprocessor/fhevm-engine/.sqlx/query-92ed51d010ff1133aed35ae7594933e19b8cc81314d65d5c89044059121e02b1.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO dependence_chain (dependence_chain_id, status, last_updated_at)\n VALUES ($1, 'updated', NOW())\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Bytea" + ] + }, + "nullable": [] + }, + "hash": "92ed51d010ff1133aed35ae7594933e19b8cc81314d65d5c89044059121e02b1" +} diff --git a/coprocessor/fhevm-engine/.sqlx/query-63f097acd50a5ec09f959668fce297d0c2c1c9851741c2a5bbb3238af2fe5b75.json b/coprocessor/fhevm-engine/.sqlx/query-ae7f1215a401825938f784ca18311fa74acfa6ae7c7bc13dc6e9079528c165ec.json similarity index 57% rename from coprocessor/fhevm-engine/.sqlx/query-63f097acd50a5ec09f959668fce297d0c2c1c9851741c2a5bbb3238af2fe5b75.json rename to coprocessor/fhevm-engine/.sqlx/query-ae7f1215a401825938f784ca18311fa74acfa6ae7c7bc13dc6e9079528c165ec.json index e1e917d42..b63d574e7 100644 --- a/coprocessor/fhevm-engine/.sqlx/query-63f097acd50a5ec09f959668fce297d0c2c1c9851741c2a5bbb3238af2fe5b75.json +++ b/coprocessor/fhevm-engine/.sqlx/query-ae7f1215a401825938f784ca18311fa74acfa6ae7c7bc13dc6e9079528c165ec.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\nWITH selected_computations AS (\n (\n SELECT DISTINCT\n c_creation_order.transaction_id\n FROM (\n SELECT transaction_id\n FROM computations \n WHERE is_completed = FALSE\n AND is_error = FALSE\n AND is_allowed = TRUE\n ORDER BY created_at\n LIMIT $1\n ) as c_creation_order\n UNION ALL\n SELECT DISTINCT\n c_schedule_order.transaction_id\n FROM (\n SELECT transaction_id\n FROM computations \n WHERE is_completed = FALSE\n AND is_error = FALSE\n AND is_allowed = TRUE\n ORDER BY schedule_order\n LIMIT $1\n ) as c_schedule_order\n )\n)\n-- Acquire all computations from this transaction set\nSELECT\n c.tenant_id, \n c.output_handle, \n c.dependencies, \n c.fhe_operation, \n c.is_scalar,\n c.is_allowed, \n c.dependence_chain_id,\n c.transaction_id\nFROM computations c\nJOIN selected_computations sc\n ON c.transaction_id = sc.transaction_id\nFOR UPDATE SKIP LOCKED ", + "query": "\nWITH selected_computations AS (\n (\n SELECT DISTINCT\n c_creation_order.transaction_id\n FROM (\n SELECT transaction_id\n FROM computations \n WHERE is_completed = FALSE\n AND is_error = FALSE\n AND is_allowed = TRUE\n AND ($1::bytea IS NULL OR dependence_chain_id = $1)\n ORDER BY created_at\n LIMIT $2\n ) as c_creation_order\n UNION ALL\n SELECT DISTINCT\n c_schedule_order.transaction_id\n FROM (\n SELECT transaction_id\n FROM computations \n WHERE is_completed = FALSE\n AND is_error = FALSE\n AND is_allowed = TRUE\n AND ($1::bytea IS NULL OR dependence_chain_id = $1)\n ORDER BY schedule_order\n LIMIT $2\n ) as c_schedule_order\n )\n)\n-- Acquire all computations from this transaction set\nSELECT\n c.tenant_id, \n c.output_handle, \n c.dependencies, \n c.fhe_operation, \n c.is_scalar,\n c.is_allowed, \n c.dependence_chain_id,\n c.transaction_id\nFROM computations c\nJOIN selected_computations sc\n ON c.transaction_id = sc.transaction_id\nFOR UPDATE SKIP LOCKED ", "describe": { "columns": [ { @@ -46,6 +46,7 @@ ], "parameters": { "Left": [ + "Bytea", "Int8" ] }, @@ -60,5 +61,5 @@ false ] }, - "hash": "63f097acd50a5ec09f959668fce297d0c2c1c9851741c2a5bbb3238af2fe5b75" + "hash": "ae7f1215a401825938f784ca18311fa74acfa6ae7c7bc13dc6e9079528c165ec" } diff --git a/coprocessor/fhevm-engine/.sqlx/query-b460c45cb303e25260533c32b25bf5d3606d7d8fa338f9f1b5480fb915853686.json b/coprocessor/fhevm-engine/.sqlx/query-b460c45cb303e25260533c32b25bf5d3606d7d8fa338f9f1b5480fb915853686.json new file mode 100644 index 000000000..a22728f59 --- /dev/null +++ b/coprocessor/fhevm-engine/.sqlx/query-b460c45cb303e25260533c32b25bf5d3606d7d8fa338f9f1b5480fb915853686.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": " \n UPDATE dependence_chain\n SET \n worker_id = NULL,\n lock_acquired_at = NULL,\n lock_expires_at = NULL,\n status = CASE \n WHEN status = 'processing' THEN 'processed'\n ELSE status\n END\n WHERE worker_id = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "b460c45cb303e25260533c32b25bf5d3606d7d8fa338f9f1b5480fb915853686" +} diff --git a/coprocessor/fhevm-engine/.sqlx/query-b70ea209992428946075c428fb31645d2a857bfddd4f1f6c628d6965cf6ef2fe.json b/coprocessor/fhevm-engine/.sqlx/query-b70ea209992428946075c428fb31645d2a857bfddd4f1f6c628d6965cf6ef2fe.json new file mode 100644 index 000000000..276ce13b7 --- /dev/null +++ b/coprocessor/fhevm-engine/.sqlx/query-b70ea209992428946075c428fb31645d2a857bfddd4f1f6c628d6965cf6ef2fe.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT COUNT(*) as count FROM dependence_chain\n WHERE (status = 'updated' AND worker_id IS NULL) OR (lock_expires_at < NOW())", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + null + ] + }, + "hash": "b70ea209992428946075c428fb31645d2a857bfddd4f1f6c628d6965cf6ef2fe" +} diff --git a/coprocessor/fhevm-engine/.sqlx/query-db960d1e67219284c082dbb56187c75efe1b9389d9e8a703b6f3399586369bac.json b/coprocessor/fhevm-engine/.sqlx/query-db960d1e67219284c082dbb56187c75efe1b9389d9e8a703b6f3399586369bac.json new file mode 100644 index 000000000..e9263ce0b --- /dev/null +++ b/coprocessor/fhevm-engine/.sqlx/query-db960d1e67219284c082dbb56187c75efe1b9389d9e8a703b6f3399586369bac.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE dependence_chain\n SET\n error_message = CASE\n WHEN status = 'processing' THEN $3\n ELSE error_message\n END\n WHERE worker_id = $1 AND dependence_chain_id = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Bytea", + "Text" + ] + }, + "nullable": [] + }, + "hash": "db960d1e67219284c082dbb56187c75efe1b9389d9e8a703b6f3399586369bac" +} diff --git a/coprocessor/fhevm-engine/.sqlx/query-e83798919f9929634fec3ed1d1ed3ec8891fbb77ce3b9831054589567a0e084a.json b/coprocessor/fhevm-engine/.sqlx/query-e83798919f9929634fec3ed1d1ed3ec8891fbb77ce3b9831054589567a0e084a.json new file mode 100644 index 000000000..60d591974 --- /dev/null +++ b/coprocessor/fhevm-engine/.sqlx/query-e83798919f9929634fec3ed1d1ed3ec8891fbb77ce3b9831054589567a0e084a.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO dependence_chain(\n dependence_chain_id,\n status,\n last_updated_at\n )\n VALUES (unnest($1::bytea[]), 'updated', statement_timestamp())\n ON CONFLICT (dependence_chain_id) DO UPDATE\n SET status = EXCLUDED.status,\n last_updated_at = EXCLUDED.last_updated_at\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "ByteaArray" + ] + }, + "nullable": [] + }, + "hash": "e83798919f9929634fec3ed1d1ed3ec8891fbb77ce3b9831054589567a0e084a" +} diff --git a/coprocessor/fhevm-engine/Cargo.lock b/coprocessor/fhevm-engine/Cargo.lock index 1ed7ff2af..b24699295 100644 --- a/coprocessor/fhevm-engine/Cargo.lock +++ b/coprocessor/fhevm-engine/Cargo.lock @@ -6975,6 +6975,7 @@ dependencies = [ "base64 0.22.1", "bigdecimal", "bytes", + "chrono", "crc", "crossbeam-queue", "either", @@ -7053,6 +7054,7 @@ dependencies = [ "bitflags 2.9.4", "byteorder", "bytes", + "chrono", "crc", "digest 0.10.7", "dotenvy", @@ -7097,6 +7099,7 @@ dependencies = [ "bigdecimal", "bitflags 2.9.4", "byteorder", + "chrono", "crc", "dotenvy", "etcetera 0.8.0", @@ -7134,6 +7137,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2d12fe70b2c1b4401038055f90f151b78208de1f9f89a7dbfd41587a10c3eea" dependencies = [ "atoi", + "chrono", "flume", "futures-channel", "futures-core", @@ -7575,6 +7579,7 @@ dependencies = [ "alloy", "bigdecimal", "bincode", + "chrono", "clap", "criterion", "fhevm-engine-common", @@ -7595,6 +7600,7 @@ dependencies = [ "scheduler", "serde", "serde_json", + "serial_test", "sha3", "sqlx", "strum 0.26.3", @@ -7611,6 +7617,7 @@ dependencies = [ "tonic-web", "tracing", "tracing-subscriber", + "uuid", ] [[package]] @@ -8330,6 +8337,7 @@ version = "1.18.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2f87b8aa10b915a06587d0dec516c282ff295b475d94abf425d62b57710070a2" dependencies = [ + "getrandom 0.3.3", "js-sys", "wasm-bindgen", ] diff --git a/coprocessor/fhevm-engine/Cargo.toml b/coprocessor/fhevm-engine/Cargo.toml index 65b8a4805..a16b04154 100644 --- a/coprocessor/fhevm-engine/Cargo.toml +++ b/coprocessor/fhevm-engine/Cargo.toml @@ -72,6 +72,7 @@ sqlx = { version = "0.8.6", default-features = false, features = [ "time", "postgres", "uuid", + "chrono" ] } testcontainers = "0.24.0" thiserror = "2.0.12" @@ -93,6 +94,7 @@ tracing-subscriber = { version = "0.3.20", features = ["fmt", "json"] } humantime = "2.2.0" bytesize = "2.0.1" http = "1.3.1" +chrono = { version = "0.4.41", features = ["serde"] } [profile.dev.package.tfhe] overflow-checks = false diff --git a/coprocessor/fhevm-engine/db-migration/migrations/20251205154454_create_dependence_chain_table.sql b/coprocessor/fhevm-engine/db-migration/migrations/20251205154454_create_dependence_chain_table.sql new file mode 100644 index 000000000..8dd8c1f4a --- /dev/null +++ b/coprocessor/fhevm-engine/db-migration/migrations/20251205154454_create_dependence_chain_table.sql @@ -0,0 +1,27 @@ +CREATE TABLE dependence_chain ( + dependence_chain_id bytea PRIMARY KEY, + + -- Scheduling / Coordination + status TEXT NOT NULL CHECK (status IN ( + 'updated', 'processing', 'processed' + )), + error_message TEXT, -- optional error message if processing failed + + -- Worker Ownership (updated by tfhe-workers) + worker_id UUID, + lock_acquired_at TIMESTAMPTZ, + lock_expires_at TIMESTAMPTZ, + + -- Execution (updated by host-listener(s)) + last_updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX idx_pending_dependence_chain + ON dependence_chain USING BTREE (last_updated_at) + WHERE status = 'updated' AND worker_id IS NULL; + +CREATE INDEX idx_dependence_chain_worker_id + ON dependence_chain (worker_id); + +CREATE INDEX idx_dependence_chain_worker_id_and_dependence_chain_id + ON dependence_chain (dependence_chain_id, worker_id); diff --git a/coprocessor/fhevm-engine/host-listener/src/database/dependence_chains.rs b/coprocessor/fhevm-engine/host-listener/src/database/dependence_chains.rs new file mode 100644 index 000000000..dcb6e1e8e --- /dev/null +++ b/coprocessor/fhevm-engine/host-listener/src/database/dependence_chains.rs @@ -0,0 +1,443 @@ +use std::collections::hash_map::Entry; +use std::collections::{HashMap, HashSet, VecDeque}; + +use tracing::{error, warn}; + +use crate::database::tfhe_event_propagate::{ + tfhe_inputs_handle, tfhe_result_handle, +}; +use crate::database::tfhe_event_propagate::{ + ChainCache, ChainDependency, Chains, Handle, LogTfhe, TransactionHash, +}; + +type HandleToTransaction = HashMap; +type Transactions = VecDeque; +type TransactionToChain = HashMap; + +fn scan_transactions(logs: &[LogTfhe]) -> (HandleToTransaction, Transactions) { + let nb_logs = logs.len(); + let mut prev_tx = None; + let mut seen_txs = HashSet::with_capacity(nb_logs); + let mut txs = VecDeque::with_capacity(nb_logs); + let mut handle_to_tx = HashMap::with_capacity(nb_logs); + for (age, log) in logs.iter().enumerate() { + let tx = log.transaction_hash.unwrap_or_default(); + if prev_tx != Some(tx) && !seen_txs.contains(&tx) { + prev_tx = Some(tx); + seen_txs.insert(tx); + txs.push_front(tx); + } + if let Some(handle) = tfhe_result_handle(&log.event) { + // eprintln!("Mapping handle {:?} to tx {:?}", handle, tx); + handle_to_tx.insert(handle, (age, tx)); + } + } + (handle_to_tx, txs) +} + +async fn scan_transactions_dependencies( + logs: &[LogTfhe], + handle_to_tx: HandleToTransaction, + nb_txs: usize, + past_chains: &ChainCache, +) -> ( + HashMap, + TransactionToChain, +) { + // tx to its direct dependences or chain + let mut tx_direct_tx_dependency: HashMap< + TransactionHash, + (usize, TransactionHash), + > = HashMap::with_capacity(nb_txs); + let mut tx_chain_dependency = HashMap::with_capacity(nb_txs); + for log in logs { + // eprintln!("Scanning dependencies for log {:?}", log.event.data); + let tx = log.transaction_hash.unwrap_or_default(); + let log_inputs = tfhe_inputs_handle(&log.event); + for input in log_inputs { + // eprintln!("\tScanning input {:?}", input); + let input_tx = handle_to_tx.get(&input); + if let Some((_, dep_tx)) = input_tx { + if &tx == dep_tx { + // eprintln!("Ignore local"); + continue; + } + } + let same_block_dep = tx_direct_tx_dependency.entry(tx); + // eprintln!( + // "\tSame block dep for tx {:?}: {:?}", + // tx, same_block_dep + // ); + let already_one = matches!(same_block_dep, Entry::Occupied(_)); + if let Some((age, dep_tx)) = input_tx { + // block local, we keep the most recent tx only, assuming correct order + let should_insert = match same_block_dep { + Entry::Vacant(_) => true, + Entry::Occupied(ref e) => *age > e.get().0, + }; + if should_insert { + eprintln!( + "Tx {:?} depends on tx {:?} (age {})", + tx, dep_tx, age + ); + if &tx != dep_tx { + same_block_dep.insert_entry((*age, *dep_tx)); + } + } + continue; + } + if already_one { + // this tx already has a local dependency, skip checking past ones + continue; + } + let pre_block_dep = tx_chain_dependency + .entry(log.transaction_hash.unwrap_or_default()); + if let Entry::Occupied(_) = pre_block_dep { + // this tx already has a chain dependency, skip checking past ones + continue; + } + if let Some(&dep_tx) = past_chains.write().await.get(&input) { + // from a previous block + pre_block_dep.insert_entry(dep_tx); + } + // no dependency or only unknown ones + } + } + (tx_direct_tx_dependency, tx_chain_dependency) +} + +fn assign_tx_a_chain_dependency( + mut txs: Transactions, + tx_direct_tx_dependency: &HashMap< + TransactionHash, + (usize, TransactionHash), + >, + tx_chain_dependency: &HashMap, +) -> (TransactionToChain, Chains) { + let nb_txs = txs.len(); + let mut txn_to_chain_dep = HashMap::with_capacity(nb_txs); + let mut chains = HashSet::with_capacity(nb_txs); + // tx to its chain dependency + while let Some(tx) = txs.pop_back() { + let chain_dep = txn_to_chain_dep.entry(tx); + if let Entry::Occupied(_) = chain_dep { + // already done + continue; + } + // in block dependency we propagate the chain + let chain = if let Some(direct_deps) = tx_direct_tx_dependency.get(&tx) + { + let (_age, tx_dep) = direct_deps; + assert!(*tx_dep != tx); + let Some(chain) = txn_to_chain_dep.get(tx_dep) else { + warn!(?tx, "Out of order transactions"); + // only happens if logs are out of order + txs.push_back(tx); + // let's do its dependency first + txs.push_back(*tx_dep); + continue; + }; + *chain + } else if let Some(chain) = tx_chain_dependency.get(&tx) { + *chain + } else { + // no dependency or unknown ones + // createa new chain + tx + }; + // eprintln!("Assign tx {:?} to chain {:?}", tx, chain); + txn_to_chain_dep.insert(tx, chain); + chains.insert(chain); + } + (txn_to_chain_dep, chains) +} + +pub async fn assign_logs_a_chain_dependency( + logs: &mut [LogTfhe], + txn_to_chain_dep: &TransactionToChain, + past_chains: &ChainCache, +) { + let mut past_chains_write = past_chains.write().await; + for log in logs { + let tx = log.transaction_hash.unwrap_or_default(); + let chain_dep = txn_to_chain_dep.get(&tx); + if chain_dep.is_none() { + error!(?tx, "No chain dependency found for transaction"); + } + let chain_dep = *chain_dep.unwrap_or(&tx); + log.dependence_chain = chain_dep; + if log.is_allowed == false { + // cannot be reused in future blocks + continue; + } + // update past chains cache for next block + if let Some(handle) = tfhe_result_handle(&log.event) { + past_chains_write.put(handle, chain_dep); + } + } +} + +pub async fn dependence_chains( + logs: &mut [LogTfhe], + past_chains: &ChainCache, +) -> Chains { + // handle to transaction + let (handle_to_tx, txs) = scan_transactions(logs); + + let nb_txs = txs.len(); + let (tx_direct_tx_dependency, tx_chain_dependency) = + scan_transactions_dependencies(logs, handle_to_tx, nb_txs, past_chains) + .await; + + let (txn_to_chain_dep, chains) = assign_tx_a_chain_dependency( + txs, + &tx_direct_tx_dependency, + &tx_chain_dependency, + ); + + assign_logs_a_chain_dependency(logs, &txn_to_chain_dep, past_chains).await; + chains +} + +#[cfg(test)] +mod tests { + use alloy::primitives::FixedBytes; + use alloy_primitives::Address; + + use crate::contracts::TfheContract as C; + use crate::contracts::TfheContract::TfheContractEvents as E; + use crate::database::dependence_chains::dependence_chains; + use crate::database::tfhe_event_propagate::{ChainCache, LogTfhe}; + use crate::database::tfhe_event_propagate::{ + ClearConst, Handle, TransactionHash, + }; + + fn caller() -> Address { + Address::from_slice(&[0x11u8; 20]) + } + + fn tfhe_event(data: E) -> alloy::primitives::Log { + let address = "0x0000000000000000000000000000000000000000" + .parse() + .unwrap(); + alloy::primitives::Log:: { address, data } + } + + fn push_event( + e: E, + logs: &mut Vec, + is_allowed: bool, + tx: TransactionHash, + ) { + logs.push(LogTfhe { + event: tfhe_event(e), + is_allowed, + block_number: 0, + block_timestamp: sqlx::types::time::PrimitiveDateTime::MIN, + transaction_hash: Some(tx), + dependence_chain: TransactionHash::ZERO, + }) + } + + fn new_handle() -> Handle { + static HANDLE_COUNTER: std::sync::atomic::AtomicU64 = + std::sync::atomic::AtomicU64::new(1); + let id = + HANDLE_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + Handle::with_last_byte(id as u8) + } + + fn input_handle(logs: &mut Vec, tx: TransactionHash) -> Handle { + let result = new_handle(); + push_event( + E::TrivialEncrypt(C::TrivialEncrypt { + caller: caller(), + pt: ClearConst::from_be_slice(&[0]), + toType: 0, + result, + }), + logs, + false, + tx, + ); + result + } + + fn op1( + handle: Handle, + logs: &mut Vec, + tx: TransactionHash, + ) -> Handle { + let result = new_handle(); + push_event( + E::FheAdd(C::FheAdd { + lhs: handle, + rhs: handle, + scalarByte: FixedBytes::from_slice(&[0]), + result, + caller: caller(), + }), + logs, + true, + tx, + ); + result + } + + fn op2( + handle1: Handle, + handle2: Handle, + logs: &mut Vec, + tx: TransactionHash, + ) -> Handle { + let result = new_handle(); + push_event( + E::FheAdd(C::FheAdd { + lhs: handle1, + rhs: handle2, + scalarByte: FixedBytes::from_slice(&[0]), + result, + caller: caller(), + }), + logs, + true, + tx, + ); + result + } + + #[tokio::test] + async fn test_dependence_chains_1_local_chain() { + let cache = ChainCache::new(lru::LruCache::new( + std::num::NonZeroUsize::new(100).unwrap(), + )); + let mut logs = vec![]; + let tx1 = TransactionHash::with_last_byte(0); + let v0 = input_handle(&mut logs, tx1); + let _v1 = op1(v0, &mut logs, tx1); + let chains = dependence_chains(&mut logs, &cache).await; + assert_eq!(chains.len(), 1); + assert!(logs.iter().all(|log| log.dependence_chain == tx1)); + assert_eq!(cache.read().await.len(), 1); + } + + #[tokio::test] + async fn test_dependence_chains_2_local_chain() { + let cache = ChainCache::new(lru::LruCache::new( + std::num::NonZeroUsize::new(100).unwrap(), + )); + let mut logs = vec![]; + let tx1 = TransactionHash::with_last_byte(0); + let tx2 = TransactionHash::with_last_byte(1); + + let va_1 = input_handle(&mut logs, tx1); + let _vb_1 = op1(va_1, &mut logs, tx1); + let va_2 = input_handle(&mut logs, tx2); + let _vb_2 = op1(va_2, &mut logs, tx2); + let chains = dependence_chains(&mut logs, &cache).await; + assert_eq!(chains.len(), 2); + assert!(logs[0..2].iter().all(|log| log.dependence_chain == tx1)); + assert!(logs[2..4].iter().all(|log| log.dependence_chain == tx2)); + assert_eq!(cache.read().await.len(), 2); + } + + #[tokio::test] + async fn test_dependence_chains_2_local_chain_mixed() { + let cache = ChainCache::new(lru::LruCache::new( + std::num::NonZeroUsize::new(100).unwrap(), + )); + let mut logs = vec![]; + let tx1 = TransactionHash::with_last_byte(0); + let tx2 = TransactionHash::with_last_byte(1); + let tx3 = TransactionHash::with_last_byte(2); + let va_1 = input_handle(&mut logs, tx1); + let vb_1 = op1(va_1, &mut logs, tx1); + let va_2 = input_handle(&mut logs, tx2); + let vb_2 = op1(va_2, &mut logs, tx2); + let _vc_1 = op2(vb_1, vb_2, &mut logs, tx3); + let chains = dependence_chains(&mut logs, &cache).await; + assert!(logs[0..2].iter().all(|log| log.dependence_chain == tx1)); + assert!(logs[2..4].iter().all(|log| log.dependence_chain == tx2)); + assert!(logs[5..].iter().all(|log| log.dependence_chain == tx2)); + assert_eq!(chains.len(), 2); + assert_eq!(cache.read().await.len(), 3); + } + + #[tokio::test] + async fn test_dependence_chains_2_local_chain_mixed_bis() { + // check that the last tx dependency is kept + let cache = ChainCache::new(lru::LruCache::new( + std::num::NonZeroUsize::new(100).unwrap(), + )); + let mut logs = vec![]; + let tx1 = TransactionHash::with_last_byte(0); + let tx2 = TransactionHash::with_last_byte(1); + let tx3 = TransactionHash::with_last_byte(2); + let va_1 = input_handle(&mut logs, tx1); + let va_2 = input_handle(&mut logs, tx2); + let vb_2 = op1(va_2, &mut logs, tx2); + let vb_1 = op1(va_1, &mut logs, tx1); + let _vc_1 = op2(vb_1, vb_2, &mut logs, tx3); + let chains = dependence_chains(&mut logs, &cache).await; + assert_eq!(chains.len(), 2); + assert_eq!(logs[0].dependence_chain, tx1); + assert_eq!(logs[1].dependence_chain, tx2); + assert_eq!(logs[2].dependence_chain, tx2); + assert_eq!(logs[3].dependence_chain, tx1); + assert_eq!(logs[4].dependence_chain, tx1); + assert_eq!(cache.read().await.len(), 3); + } + + #[tokio::test] + async fn test_dependence_chains_1_known_past_handle() { + let cache = ChainCache::new(lru::LruCache::new( + std::num::NonZeroUsize::new(100).unwrap(), + )); + let mut logs = vec![]; + let past_handle = new_handle(); + let past_tx = TransactionHash::with_last_byte(0); + cache.write().await.put(past_handle, past_tx); + let tx1 = TransactionHash::with_last_byte(1); + let _va_1 = op1(past_handle, &mut logs, tx1); + let chains = dependence_chains(&mut logs, &cache).await; + assert_eq!(chains.len(), 1); + assert!(chains.iter().all(|chain| chain == &past_tx)); + assert!(logs.iter().all(|log| log.dependence_chain == past_tx)); + assert_eq!(cache.read().await.len(), 2); + } + + #[tokio::test] + async fn test_dependence_chains_1_unknown_past_handle() { + let cache = ChainCache::new(lru::LruCache::new( + std::num::NonZeroUsize::new(100).unwrap(), + )); + let mut logs = vec![]; + let past_handle = new_handle(); + let tx1 = TransactionHash::with_last_byte(1); + let _va_1 = op1(past_handle, &mut logs, tx1); + let chains = dependence_chains(&mut logs, &cache).await; + assert_eq!(chains.len(), 1); + assert!(chains.iter().all(|chain| chain == &tx1)); + assert!(logs.iter().all(|log| log.dependence_chain == tx1)); + assert_eq!(cache.read().await.len(), 1); + } + + #[tokio::test] + async fn test_dependence_chains_1_local_and_known_past_handle() { + let cache = ChainCache::new(lru::LruCache::new( + std::num::NonZeroUsize::new(100).unwrap(), + )); + let past_handle = new_handle(); + let past_tx = TransactionHash::with_last_byte(0); + cache.write().await.put(past_handle, past_tx); + let tx1 = TransactionHash::with_last_byte(1); + let mut logs = vec![]; + let va_1 = input_handle(&mut logs, tx1); + let _vb_1 = op2(past_handle, va_1, &mut logs, tx1); + let chains = dependence_chains(&mut logs, &cache).await; + assert_eq!(chains.len(), 1); + assert!(chains.iter().all(|chain| chain == &past_tx)); + assert!(logs.iter().all(|log| log.dependence_chain == past_tx)); + assert_eq!(cache.read().await.len(), 2); + } +} diff --git a/coprocessor/fhevm-engine/host-listener/src/database/ingest.rs b/coprocessor/fhevm-engine/host-listener/src/database/ingest.rs index a6c47b0d5..0e3f46227 100644 --- a/coprocessor/fhevm-engine/host-listener/src/database/ingest.rs +++ b/coprocessor/fhevm-engine/host-listener/src/database/ingest.rs @@ -9,6 +9,7 @@ use tracing::{error, info}; use crate::cmd::block_history::BlockSummary; use crate::contracts::{AclContract, TfheContract}; +use crate::database::dependence_chains::dependence_chains; use crate::database::tfhe_event_propagate::{ acl_result_handles, tfhe_result_handle, Database, LogTfhe, }; @@ -100,9 +101,11 @@ pub async fn ingest_block_logs( let log = LogTfhe { event, transaction_hash: log.transaction_hash, - is_allowed: false, // updated in the next loop block_number, block_timestamp, + // updated in the next loop and dependence_chains + is_allowed: false, + dependence_chain: Default::default(), }; tfhe_event_log.push(log); continue; @@ -119,18 +122,19 @@ pub async fn ingest_block_logs( ); } } - - for tfhe_log in tfhe_event_log { - let is_allowed = + for tfhe_log in tfhe_event_log.iter_mut() { + tfhe_log.is_allowed = if let Some(result_handle) = tfhe_result_handle(&tfhe_log.event) { is_allowed.contains(&result_handle.to_vec()) } else { false }; - let tfhe_log = LogTfhe { - is_allowed, - ..tfhe_log - }; + } + + let chains = + dependence_chains(&mut tfhe_event_log, &db.dependence_chain).await; + + for tfhe_log in tfhe_event_log { let inserted = db.insert_tfhe_event(&mut tx, &tfhe_log).await?; if block_logs.catchup && inserted { info!(tfhe_log = ?tfhe_log, "TFHE event missed before"); @@ -147,5 +151,6 @@ pub async fn ingest_block_logs( } db.mark_block_as_valid(&mut tx, &block_logs.summary).await?; + db.update_dependence_chain(&mut tx, chains).await?; tx.commit().await } diff --git a/coprocessor/fhevm-engine/host-listener/src/database/mod.rs b/coprocessor/fhevm-engine/host-listener/src/database/mod.rs index 59887c36b..d9e5f70e4 100644 --- a/coprocessor/fhevm-engine/host-listener/src/database/mod.rs +++ b/coprocessor/fhevm-engine/host-listener/src/database/mod.rs @@ -1,2 +1,3 @@ +pub mod dependence_chains; pub mod ingest; pub mod tfhe_event_propagate; diff --git a/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs b/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs index 60f048f22..c8f107002 100644 --- a/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs +++ b/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs @@ -14,6 +14,7 @@ use sqlx::postgres::PgPoolOptions; use sqlx::types::Uuid; use sqlx::Error as SqlxError; use sqlx::{PgPool, Postgres}; +use std::collections::HashSet; use std::ops::DerefMut; use std::sync::Arc; use std::time::Duration; @@ -36,6 +37,9 @@ pub type ChainId = u64; pub type ToType = u8; pub type ScalarByte = FixedBytes<1>; pub type ClearConst = Uint<256, 4>; +pub type ChainDependency = TransactionHash; +pub type ChainCache = RwLock>; +pub type Chains = HashSet; const MINIMUM_BUCKET_CACHE_SIZE: u16 = 16; @@ -77,7 +81,7 @@ pub struct Database { pub pool: Arc>>, pub tenant_id: TenantId, pub chain_id: ChainId, - bucket_cache: tokio::sync::RwLock>, + pub dependence_chain: ChainCache, pub tick: HeartBeat, } @@ -88,6 +92,7 @@ pub struct LogTfhe { pub is_allowed: bool, pub block_number: u64, pub block_timestamp: sqlx::types::time::PrimitiveDateTime, + pub dependence_chain: TransactionHash, } pub type Transaction<'l> = sqlx::Transaction<'l, Postgres>; @@ -96,14 +101,14 @@ impl Database { pub async fn new( url: &DatabaseURL, coprocessor_api_key: &CoprocessorApiKey, - bucket_cache_size: u16, + dependence_cache_size: u16, ) -> Result { let pool = Self::new_pool(url).await; let (tenant_id, chain_id) = Self::find_tenant_id(&pool, coprocessor_api_key).await?; let bucket_cache = tokio::sync::RwLock::new(lru::LruCache::new( std::num::NonZeroU16::new( - bucket_cache_size.max(MINIMUM_BUCKET_CACHE_SIZE), + dependence_cache_size.max(MINIMUM_BUCKET_CACHE_SIZE), ) .unwrap() .into(), @@ -113,7 +118,7 @@ impl Database { tenant_id, chain_id, pool: Arc::new(RwLock::new(pool)), - bucket_cache, + dependence_chain: bucket_cache, tick: HeartBeat::default(), }) } @@ -208,13 +213,6 @@ impl Database { scalar_byte: &FixedBytes<1>, log: &LogTfhe, ) -> Result { - let bucket = self - .sort_computation_into_bucket( - result, - dependencies_handles, - &log.transaction_hash, - ) - .await; let dependencies_handles = dependencies_handles .iter() .map(|d| d.to_vec()) @@ -228,7 +226,6 @@ impl Database { fhe_operation, scalar_byte, log, - &bucket, ) .await } @@ -244,13 +241,6 @@ impl Database { scalar_byte: &FixedBytes<1>, log: &LogTfhe, ) -> Result { - let bucket = self - .sort_computation_into_bucket( - result, - dependencies, - &log.transaction_hash, - ) - .await; let dependencies = dependencies.iter().map(|d| d.to_vec()).collect::>(); self.insert_computation_inner( @@ -261,7 +251,6 @@ impl Database { fhe_operation, scalar_byte, log, - &bucket, ) .await } @@ -276,7 +265,6 @@ impl Database { fhe_operation: FheOperation, scalar_byte: &FixedBytes<1>, log: &LogTfhe, - bucket: &Handle, ) -> Result { let is_scalar = !scalar_byte.is_zero(); let output_handle = result.to_vec(); @@ -302,7 +290,7 @@ impl Database { &dependencies, fhe_operation as i16, is_scalar, - bucket.to_vec(), + log.dependence_chain.to_vec(), log.transaction_hash.map(|txh| txh.to_vec()), log.is_allowed, log.block_timestamp, @@ -313,54 +301,6 @@ impl Database { .map(|result| result.rows_affected() > 0) } - async fn sort_computation_into_bucket( - &self, - output: &Handle, - dependencies: &[&Handle], - transaction_hash: &Option, - ) -> Handle { - // If the transaction ID is a hit in the cache, update its - // last use and add the output handle in the bucket - if let Some(txh) = transaction_hash { - // We need a write access here as get updates the LRUcache - let mut bucket_cache_write = self.bucket_cache.write().await; - if let Some(ce) = bucket_cache_write.get(txh).cloned() { - bucket_cache_write.put(*output, ce); - return ce; - } - } - // If any input dependence is a match, return its bucket. This - // computation is in a connected component with other ops in - // this bucket - let bucket_cache_read = self.bucket_cache.read().await; - for d in dependencies { - // We peek here as the reuse is less likely than the use - // of the new handle which we add - because handles - // operate under single assinment - if let Some(ce) = bucket_cache_read.peek(*d).cloned() { - drop(bucket_cache_read); - let mut bucket_cache_write = self.bucket_cache.write().await; - bucket_cache_write.put(*output, ce); - // As the transaction hash was not in the cache, add - // it to this bucket as well - if let Some(txh) = transaction_hash { - bucket_cache_write.put(*txh, ce); - } - return ce; - } - } - drop(bucket_cache_read); - // If this computation is not linked to any others, assign it - // to a new empty bucket and add output handle and transaction - // hash where relevant - let mut bucket_cache_write = self.bucket_cache.write().await; - bucket_cache_write.put(*output, *output); - if let Some(txh) = transaction_hash { - bucket_cache_write.put(*txh, *output); - } - *output - } - #[rustfmt::skip] pub async fn insert_tfhe_event( &self, @@ -809,6 +749,33 @@ impl Database { query.execute(&self.pool().await).await?; Ok(()) } + + pub async fn update_dependence_chain( + &self, + tx: &mut Transaction<'_>, + chains: Chains, + ) -> Result<(), SqlxError> { + if chains.is_empty() { + return Ok(()); + } + let chains = chains.iter().map(|d| d.to_vec()).collect::>(); + let query = sqlx::query!( + r#" + INSERT INTO dependence_chain( + dependence_chain_id, + status, + last_updated_at + ) + VALUES (unnest($1::bytea[]), 'updated', statement_timestamp()) + ON CONFLICT (dependence_chain_id) DO UPDATE + SET status = EXCLUDED.status, + last_updated_at = EXCLUDED.last_updated_at + "#, + &chains, + ); + query.execute(tx.deref_mut()).await?; + Ok(()) + } } fn event_to_op_int(op: &TfheContractEvents) -> FheOperation { @@ -938,3 +905,47 @@ pub fn acl_result_handles(event: &Log) -> Vec { | AclContractEvents::UnblockedAccount(_) => vec![], } } + +pub fn tfhe_inputs_handle(op: &TfheContractEvents) -> Vec { + use TfheContract as C; + use TfheContractEvents as E; + match op { + E::Cast(C::Cast { ct, .. }) + | E::FheNeg(C::FheNeg { ct, .. }) + | E::FheNot(C::FheNot { ct, .. }) => vec![*ct], + + E::FheAdd(C::FheAdd { lhs, rhs, .. }) + | E::FheBitAnd(C::FheBitAnd { lhs, rhs, .. }) + | E::FheBitOr(C::FheBitOr { lhs, rhs, .. }) + | E::FheBitXor(C::FheBitXor { lhs, rhs, .. }) + | E::FheDiv(C::FheDiv { lhs, rhs, .. }) + | E::FheMax(C::FheMax { lhs, rhs, .. }) + | E::FheMin(C::FheMin { lhs, rhs, .. }) + | E::FheMul(C::FheMul { lhs, rhs, .. }) + | E::FheRem(C::FheRem { lhs, rhs, .. }) + | E::FheRotl(C::FheRotl { lhs, rhs, .. }) + | E::FheRotr(C::FheRotr { lhs, rhs, .. }) + | E::FheShl(C::FheShl { lhs, rhs, .. }) + | E::FheShr(C::FheShr { lhs, rhs, .. }) + | E::FheSub(C::FheSub { lhs, rhs, .. }) + | E::FheEq(C::FheEq { lhs, rhs, .. }) + | E::FheGe(C::FheGe { lhs, rhs, .. }) + | E::FheGt(C::FheGt { lhs, rhs, .. }) + | E::FheLe(C::FheLe { lhs, rhs, .. }) + | E::FheLt(C::FheLt { lhs, rhs, .. }) + | E::FheNe(C::FheNe { lhs, rhs, .. }) => vec![*lhs, *rhs], + + E::FheIfThenElse(C::FheIfThenElse { + control, + ifTrue, + ifFalse, + .. + }) => { + vec![*control, *ifTrue, *ifFalse] + } + + E::FheRand(_) | E::FheRandBounded(_) | E::TrivialEncrypt(_) => vec![], + + E::Initialized(_) | E::Upgraded(_) | E::VerifyInput(_) => vec![], + } +} diff --git a/coprocessor/fhevm-engine/stress-test-generator/src/utils.rs b/coprocessor/fhevm-engine/stress-test-generator/src/utils.rs index 704076b45..3287048f6 100644 --- a/coprocessor/fhevm-engine/stress-test-generator/src/utils.rs +++ b/coprocessor/fhevm-engine/stress-test-generator/src/utils.rs @@ -233,6 +233,7 @@ pub async fn generate_trivial_encrypt( is_allowed, block_number: 1, block_timestamp: PrimitiveDateTime::MAX, + dependence_chain: transaction_hash.clone(), }; let mut tx = listener_event_to_db.new_transaction().await?; listener_event_to_db @@ -415,6 +416,7 @@ pub async fn insert_tfhe_event( is_allowed, block_number: 1, block_timestamp: PrimitiveDateTime::MAX, + dependence_chain: transaction_hash.clone(), }; listener_event_to_db .insert_tfhe_event(&mut tx, &log) diff --git a/coprocessor/fhevm-engine/tfhe-worker/Cargo.toml b/coprocessor/fhevm-engine/tfhe-worker/Cargo.toml index 296789895..a4ba8bcb4 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/Cargo.toml +++ b/coprocessor/fhevm-engine/tfhe-worker/Cargo.toml @@ -34,6 +34,7 @@ opentelemetry = { workspace = true } opentelemetry-otlp = { workspace = true } opentelemetry_sdk = { workspace = true } opentelemetry-semantic-conventions = { workspace = true } +chrono = { workspace = true } # crates.io dependencies actix-web = "4.9.0" @@ -43,6 +44,7 @@ regex = "1.10.6" tonic-health = "0.12.3" tonic-types = "0.12.3" tonic-web = "0.12.3" +uuid = { version = "1", features = ["v4"] } # local dependencies fhevm-engine-common = { path = "../fhevm-engine-common" } @@ -61,6 +63,7 @@ host-listener = { path = "../host-listener" } testcontainers = { workspace = true } test-harness = { path = "../test-harness" } serde = { workspace = true } +serial_test = { workspace = true } [build-dependencies] tonic-build = { workspace = true } diff --git a/coprocessor/fhevm-engine/tfhe-worker/benches/utils.rs b/coprocessor/fhevm-engine/tfhe-worker/benches/utils.rs index 191ce2077..9ca3a68ca 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/benches/utils.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/benches/utils.rs @@ -108,6 +108,7 @@ async fn start_coprocessor(rx: Receiver, app_port: u16, db_url: &str) { health_check_port: 8080, metric_rerand_batch_latency: MetricsConfig::default(), metric_fhe_batch_latency: MetricsConfig::default(), + worker_id: None, }; std::thread::spawn(move || { diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/daemon_cli.rs b/coprocessor/fhevm-engine/tfhe-worker/src/daemon_cli.rs index 430d5b5e5..4b43dd62c 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/src/daemon_cli.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/src/daemon_cli.rs @@ -2,6 +2,7 @@ use clap::Parser; use fhevm_engine_common::telemetry::MetricsConfig; use fhevm_engine_common::utils::DatabaseURL; use tracing::Level; +use uuid::Uuid; #[derive(Parser, Debug, Clone)] #[command(version, about, long_about = None)] @@ -83,6 +84,12 @@ pub struct Args { #[arg(long, default_value = "tfhe-worker")] pub service_name: String, + /// Worker/replica ID for this worker instance + /// If not provided, a random UUID will be generated + /// Used to identify the worker in the dependence_chain table + #[arg(long, value_parser = clap::value_parser!(Uuid))] + pub worker_id: Option, + /// Log level for the application #[arg( long, diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/dependence_chain.rs b/coprocessor/fhevm-engine/tfhe-worker/src/dependence_chain.rs new file mode 100644 index 000000000..490a34d52 --- /dev/null +++ b/coprocessor/fhevm-engine/tfhe-worker/src/dependence_chain.rs @@ -0,0 +1,304 @@ +use chrono::{DateTime, Utc}; +use sqlx::Postgres; +use std::fmt; +use tracing::{debug, error, info, warn}; +use uuid::Uuid; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum LockingReason { + UpdatedUnowned, // Normal lock acquisition + ExpiredLock, // Work-stealing + ExtendedLock, // Lock extension + Missing, // No lock acquired +} + +impl From<&str> for LockingReason { + fn from(s: &str) -> Self { + match s { + "updated_unowned" => LockingReason::UpdatedUnowned, + "expired_lock" => LockingReason::ExpiredLock, + "extended_lock" => LockingReason::ExtendedLock, + _ => LockingReason::Missing, + } + } +} + +/// Manages a non-blocking, distributed locking mechanism +/// that coordinates dependence-chain processing across multiple workers +#[derive(Clone)] +pub struct LockMngr { + pool: sqlx::Pool, + worker_id: Uuid, + lock: Option, + expiration_duration_secs: i64, +} + +/// Dependence chain lock data +#[derive(sqlx::FromRow, Clone)] +pub struct DatabaseChainLock { + pub dependence_chain_id: Vec, + pub worker_id: Option, + pub lock_acquired_at: Option>, + pub lock_expires_at: Option>, + pub last_updated_at: DateTime, + pub match_reason: String, +} + +#[derive(Debug, sqlx::FromRow)] +struct LockExpiresAt { + lock_expires_at: Option>, +} + +impl fmt::Debug for DatabaseChainLock { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("DatabaseChainLock") + .field("dcid", &hex::encode(&self.dependence_chain_id)) + .field("worker_id", &self.worker_id) + .field("lock_acquired_at", &self.lock_acquired_at) + .field("lock_expires_at", &self.lock_expires_at) + .field("last_updated_at", &self.last_updated_at) + .field("match_reason", &self.match_reason) + .finish() + } +} + +impl LockMngr { + pub fn new(worker_id: Uuid, pool: sqlx::Pool) -> Self { + Self { + worker_id, + pool, + lock: None, + expiration_duration_secs: 30, + } + } + + pub fn new_with_expiry( + worker_id: Uuid, + pool: sqlx::Pool, + expiration_duration_secs: i64, + ) -> Self { + let mut mgr = Self::new(worker_id, pool); + mgr.expiration_duration_secs = expiration_duration_secs; + mgr + } + + /// Acquire the next available dependence-chain entry for processing + /// sorted by last_updated_at (FIFO). + /// Returns the dependence_chain_id if a lock was acquired + pub async fn acquire_next_lock( + &mut self, + ) -> Result<(Option>, LockingReason), sqlx::Error> { + let row = sqlx::query_as::<_, DatabaseChainLock>( + r#" + WITH candidate AS ( + SELECT dependence_chain_id, + CASE + WHEN status = 'updated' AND worker_id IS NULL + THEN 'updated_unowned' + WHEN lock_expires_at < NOW() + THEN 'expired_lock' + END AS match_reason + FROM dependence_chain + WHERE + ( + status = 'updated' -- Marked as updated by host-listener + AND + worker_id IS NULL -- Ensure no other workers own it + ) + OR ( + lock_expires_at < NOW() -- Work-stealing of expired locks + ) + ORDER BY last_updated_at ASC -- FIFO + FOR UPDATE SKIP LOCKED -- Ensure no other worker is currently trying to lock it + LIMIT 1 + ) + UPDATE dependence_chain AS dc + SET + worker_id = $1, + status = 'processing', + lock_acquired_at = NOW(), + lock_expires_at = NOW() + make_interval(secs => $2) + FROM candidate + WHERE dc.dependence_chain_id = candidate.dependence_chain_id + RETURNING dc.*, candidate.match_reason; + "#, + ) + .bind(self.worker_id) + .bind(self.expiration_duration_secs) + .fetch_optional(&self.pool) + .await?; + + let row = if let Some(row) = row { + row + } else { + return Ok((None, LockingReason::Missing)); + }; + + self.lock.replace(row.clone()); + info!(?row, "Acquired lock"); + + Ok(( + Some(row.dependence_chain_id), + LockingReason::from(row.match_reason.as_str()), + )) + } + + /// Release all locks held by this worker + /// + /// If host-listener has marked the dependence chain as 'updated' in the meantime, + /// we don't overwrite its status + pub async fn release_all_owned_locks(&mut self) -> Result { + // Since UPDATE always aquire a row-level lock internally, + // this acts as atomic_exchange + let rows = sqlx::query!( + r#" + UPDATE dependence_chain + SET + worker_id = NULL, + lock_acquired_at = NULL, + lock_expires_at = NULL, + status = CASE + WHEN status = 'processing' THEN 'processed' + ELSE status + END + WHERE worker_id = $1 + "#, + self.worker_id + ) + .execute(&self.pool) + .await?; + + self.lock.take(); + + info!(worker_id = %self.worker_id, + count = rows.rows_affected(), "Released all locks"); + + Ok(rows.rows_affected()) + } + + /// Release the lock held by this worker on the current dependence chain + /// If host-listener has marked the dependence chain as 'updated' in the meantime, + /// we don't overwrite its status + pub async fn release_current_lock(&mut self) -> Result { + let dep_chain_id = match &self.lock { + Some(lock) => lock.dependence_chain_id.clone(), + None => { + debug!("No lock to release"); + return Ok(0); + } + }; + + let rows = sqlx::query!( + r#" + UPDATE dependence_chain + SET + worker_id = NULL, + lock_acquired_at = NULL, + lock_expires_at = NULL, + status = CASE + WHEN status = 'processing' THEN 'processed' + ELSE status + END + WHERE worker_id = $1 AND dependence_chain_id = $2 + "#, + self.worker_id, + dep_chain_id, + ) + .execute(&self.pool) + .await?; + + self.lock.take(); + + info!(dcid = %hex::encode(&dep_chain_id), "Released lock"); + + Ok(rows.rows_affected()) + } + + /// Set error on the current dependence chain + /// If host-listener has marked the dependence chain as 'updated' in the meantime, + /// we don't overwrite its error + /// + /// The error is only informational and does not affect the processing status + pub async fn set_processing_error(&self, err: Option) -> Result { + let dep_chain_id: Vec = match &self.lock { + Some(lock) => lock.dependence_chain_id.clone(), + None => { + warn!("No lock to set error on"); + return Ok(0); + } + }; + + let rows = sqlx::query!( + r#" + UPDATE dependence_chain + SET + error_message = CASE + WHEN status = 'processing' THEN $3 + ELSE error_message + END + WHERE worker_id = $1 AND dependence_chain_id = $2 + "#, + self.worker_id, + dep_chain_id, + err + ) + .execute(&self.pool) + .await?; + + info!(dcid = %hex::encode(&dep_chain_id), error = ?err, "Set error on lock"); + Ok(rows.rows_affected()) + } + + /// Extend the lock expiration time on the current dependence chain + pub async fn extend_current_lock( + &mut self, + ) -> Result, LockingReason)>, sqlx::Error> { + let dependence_chain_id = match &self.lock { + Some(lock) => lock.dependence_chain_id.clone(), + None => { + debug!("No lock to extend"); + return Ok(None); + } + }; + + let row = sqlx::query_as!( + LockExpiresAt, + r#" + UPDATE dependence_chain AS dc + SET + lock_expires_at = NOW() + make_interval(secs => $3) + WHERE dependence_chain_id = $1 AND worker_id = $2 + RETURNING dc.lock_expires_at::timestamptz AS "lock_expires_at: chrono::DateTime"; + "#, + dependence_chain_id, + self.worker_id, + self.expiration_duration_secs as f64 + ) + .fetch_optional(&self.pool) + .await?; + + let lock_expires_at = match row { + Some(r) => r, + None => { + error!(dcid = %hex::encode(&dependence_chain_id), "No lock extended"); + return Ok(None); + } + }; + + // Update the in-memory lock + if let Some(lock) = self.lock.as_mut() { + lock.lock_expires_at = lock_expires_at.lock_expires_at; + info!(dcid = %hex::encode(&dependence_chain_id), expires_at = ?lock.lock_expires_at, "Extended lock"); + } + + Ok(Some((dependence_chain_id, LockingReason::ExtendedLock))) + } + + pub fn get_current_lock(&self) -> Option { + self.lock.clone() + } + + pub fn worker_id(&self) -> Uuid { + self.worker_id + } +} diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/lib.rs b/coprocessor/fhevm-engine/tfhe-worker/src/lib.rs index 398403bb5..729c9aa6b 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/src/lib.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/src/lib.rs @@ -8,6 +8,7 @@ use tokio::task::JoinSet; pub mod daemon_cli; mod db_queries; +pub mod dependence_chain; pub mod health_check; pub mod server; diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/tests/dependence_chain.rs b/coprocessor/fhevm-engine/tfhe-worker/src/tests/dependence_chain.rs new file mode 100644 index 000000000..618397003 --- /dev/null +++ b/coprocessor/fhevm-engine/tfhe-worker/src/tests/dependence_chain.rs @@ -0,0 +1,219 @@ +use crate::dependence_chain::{LockMngr, LockingReason}; +use serial_test::serial; +use sqlx::postgres::PgPoolOptions; +use test_harness::instance::ImportMode; +use tokio::time::{sleep, Duration}; +use tracing::info; +use uuid::Uuid; + +const NUM_SAMPLE_CHAINS: usize = 10; + +#[tokio::test] +#[serial(db)] +async fn test_acquire_next_lock() { + let pool = setup().await; + let dependence_chain_ids = insert_dependence_chains(&pool, NUM_SAMPLE_CHAINS) + .await + .expect("inserted chains"); + + let mut workers = vec![]; + + for dependence_chain_id in dependence_chain_ids.iter() { + info!(target: "deps_chain", ?dependence_chain_id, "Testing acquire_next_lock"); + let mut mgr = LockMngr::new_with_expiry(Uuid::new_v4(), pool.clone(), 3600); + + let (acquired, locking) = mgr.acquire_next_lock().await.unwrap(); + assert_eq!(acquired, Some(dependence_chain_id.clone())); + assert_eq!(locking, LockingReason::UpdatedUnowned); + + let row = sqlx::query!( + "SELECT status, worker_id FROM dependence_chain WHERE dependence_chain_id = $1", + dependence_chain_id + ) + .fetch_one(&pool) + .await + .unwrap(); + + assert_eq!(row.status, "processing".to_string()); + assert_eq!(row.worker_id, Some(mgr.worker_id())); + + workers.push(mgr); + } + + // Ensure no more locks available + assert_locks_available(&pool, 0).await; + + for worker in workers.iter_mut() { + assert_reacquire_lock(&pool, worker).await; + assert!(worker.get_current_lock().is_none()); + } +} + +#[tokio::test] +#[serial(db)] +async fn test_work_stealing() { + let pool = setup().await; + + let dependence_chain_ids = insert_dependence_chains(&pool, NUM_SAMPLE_CHAINS) + .await + .expect("inserted chains"); + + let mut workers = vec![]; + let expiration_duration_secs = 1; + + for dependence_chain_id in dependence_chain_ids.iter() { + info!(?dependence_chain_id, "Testing acquire_next_lock"); + + let worker = Uuid::new_v4(); + let mut mgr = LockMngr::new_with_expiry(worker, pool.clone(), expiration_duration_secs); + let acquired = mgr.acquire_next_lock().await.unwrap().0; + assert_eq!(acquired, Some(dependence_chain_id.clone())); + + // Verify DB state + let row = sqlx::query!( + "SELECT status, worker_id FROM dependence_chain WHERE dependence_chain_id = $1", + dependence_chain_id + ) + .fetch_one(&pool) + .await + .unwrap(); + + workers.push(mgr); + + assert_eq!(row.status, "processing".to_string()); + assert_eq!(row.worker_id, Some(worker)); + } + + // Make sure the locks have expired + tokio::time::sleep(std::time::Duration::from_secs( + 3 + expiration_duration_secs as u64, + )) + .await; + + // Assert that we can re-acquire all locks due to work-stealing + for _ in 0..NUM_SAMPLE_CHAINS { + let mut mgr = workers.pop().unwrap(); + let (acquired, locking_reason) = mgr.acquire_next_lock().await.unwrap(); + assert!(acquired.is_some()); + assert_eq!(locking_reason, LockingReason::ExpiredLock); + } + + assert_locks_available(&pool, 0).await; +} + +/// Asserts that after releasing a lock, it can be re-acquired by another worker +async fn assert_reacquire_lock(pool: &sqlx::PgPool, dependence_mgr: &mut LockMngr) { + let lock = dependence_mgr.get_current_lock().unwrap(); + let dependence_chain_id = lock.dependence_chain_id; + + let row = sqlx::query!( + "SELECT status, worker_id FROM dependence_chain WHERE dependence_chain_id = $1", + dependence_chain_id + ) + .fetch_one(pool) + .await + .unwrap(); + + assert_eq!(row.status, "processing".to_string()); + + // Update status for this dependence_chain_id + // to simulate host-listener marking it as updated again + sqlx::query!( + "UPDATE dependence_chain + SET status = 'updated', last_updated_at = NOW() + WHERE dependence_chain_id = $1", + dependence_chain_id + ) + .execute(pool) + .await + .unwrap(); + + // Assert that before releasing the lock, it cannot be re-acquired + assert_eq!( + LockMngr::new(Uuid::new_v4(), pool.clone()) + .acquire_next_lock() + .await + .unwrap() + .0, + None + ); + dependence_mgr.release_all_owned_locks().await.unwrap(); + + // Assert that after releasing or expiring, it can be re-acquired by another worker + assert_eq!( + LockMngr::new(Uuid::new_v4(), pool.clone()) + .acquire_next_lock() + .await + .unwrap() + .0, + Some(dependence_chain_id.clone()) + ); +} + +async fn assert_locks_available(pool: &sqlx::PgPool, expected_locks_count: usize) { + // Check DB state + let rows = sqlx::query!( + "SELECT COUNT(*) as count FROM dependence_chain + WHERE (status = 'updated' AND worker_id IS NULL) OR (lock_expires_at < NOW())", + ) + .fetch_one(pool) + .await + .unwrap(); + assert_eq!(rows.count, Some(expected_locks_count as i64)); + + if expected_locks_count == 0 { + // Check acquire_next_lock returns None + let worker = Uuid::new_v4(); + let mut mgr = LockMngr::new(worker, pool.clone()); + let acquired = mgr.acquire_next_lock().await.unwrap().0; + assert_eq!(acquired, None); + } +} + +async fn insert_dependence_chains( + pool: &sqlx::PgPool, + num_chains: usize, +) -> sqlx::Result>> { + let mut out = Vec::with_capacity(num_chains); + + for i in 0..num_chains { + let dependence_chain_id = i.to_le_bytes().to_vec(); + sqlx::query!( + r#" + INSERT INTO dependence_chain (dependence_chain_id, status, last_updated_at) + VALUES ($1, 'updated', NOW()) + "#, + dependence_chain_id, + ) + .execute(pool) + .await?; + + out.push(dependence_chain_id); + + sleep(Duration::from_millis(100)).await; + } + + assert_locks_available(pool, num_chains).await; + + Ok(out) +} + +async fn setup() -> sqlx::PgPool { + let _ = tracing_subscriber::fmt().json().with_level(true).try_init(); + let test_instance = test_harness::instance::setup_test_db(ImportMode::None) + .await + .expect("valid db instance"); + let pool = PgPoolOptions::new() + .max_connections(2) + .connect(test_instance.db_url.as_str()) + .await + .unwrap(); + + // Insert sample dependence-chain rows + sqlx::query!("TRUNCATE TABLE dependence_chain") + .execute(&pool) + .await + .unwrap(); + + pool +} diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/tests/mod.rs b/coprocessor/fhevm-engine/tfhe-worker/src/tests/mod.rs index 2cbe05bce..8e30e48f8 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/src/tests/mod.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/src/tests/mod.rs @@ -13,6 +13,7 @@ use utils::{ decrypt_ciphertexts, default_api_key, random_handle, wait_until_all_allowed_handles_computed, }; +mod dependence_chain; mod errors; mod health_check; mod inputs; diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/tests/operators_from_events.rs b/coprocessor/fhevm-engine/tfhe-worker/src/tests/operators_from_events.rs index 887f0b66c..c5e4b31ef 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/src/tests/operators_from_events.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/src/tests/operators_from_events.rs @@ -44,6 +44,7 @@ async fn insert_tfhe_event( is_allowed, block_number: log.block_number.unwrap_or(0), block_timestamp: PrimitiveDateTime::MAX, + dependence_chain: log.transaction_hash.unwrap_or_default(), }; db.insert_tfhe_event(tx, &event).await } diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/tests/utils.rs b/coprocessor/fhevm-engine/tfhe-worker/src/tests/utils.rs index b74fae881..867778a83 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/src/tests/utils.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/src/tests/utils.rs @@ -117,6 +117,7 @@ async fn start_coprocessor(rx: Receiver, app_port: u16, db_url: &str) { health_check_port: 8081, metric_rerand_batch_latency: MetricsConfig::default(), metric_fhe_batch_latency: MetricsConfig::default(), + worker_id: None, }; std::thread::spawn(move || { diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs b/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs index 2cd63b34c..5ad616e40 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs @@ -1,3 +1,4 @@ +use crate::dependence_chain::{self}; use crate::types::CoprocessorError; use crate::{db_queries::populate_cache_with_tenant_keys, types::TfheTenantKeys}; use fhevm_engine_common::tfhe_ops::check_fhe_operand_types; @@ -11,6 +12,7 @@ use prometheus::{register_int_counter, IntCounter}; use scheduler::dfg::types::{DFGTxInput, SchedulerError}; use scheduler::dfg::{build_component_nodes, ComponentNode, DFComponentGraph, DFGOp}; use scheduler::dfg::{scheduler::Scheduler, types::DFGTaskInput}; +use sqlx::types::Uuid; use sqlx::Postgres; use sqlx::{postgres::PgListener, query, Acquire}; use std::{ @@ -59,9 +61,11 @@ pub async fn run_tfhe_worker( args: crate::daemon_cli::Args, health_check: crate::health_check::HealthCheck, ) -> Result<(), Box> { + let worker_id = args.worker_id.unwrap_or(Uuid::new_v4()); + info!(target: "tfhe_worker", worker_id = %worker_id, "Starting tfhe-worker service"); loop { // here we log the errors and make sure we retry - if let Err(cycle_error) = tfhe_worker_cycle(&args, health_check.clone()).await { + if let Err(cycle_error) = tfhe_worker_cycle(&args, worker_id, health_check.clone()).await { WORKER_ERRORS_COUNTER.inc(); error!(target: "tfhe_worker", { error = cycle_error }, "Error in background worker, retrying shortly"); } @@ -71,6 +75,7 @@ pub async fn run_tfhe_worker( async fn tfhe_worker_cycle( args: &crate::daemon_cli::Args, + worker_id: Uuid, health_check: crate::health_check::HealthCheck, ) -> Result<(), Box> { let tracer = opentelemetry::global::tracer("tfhe_worker"); @@ -87,6 +92,11 @@ async fn tfhe_worker_cycle( let mut listener = PgListener::connect_with(&pool).await?; listener.listen("work_available").await?; + let mut deps_chain_mngr = dependence_chain::LockMngr::new(worker_id, pool.clone()); + + // Release all owned locks on startup to avoid stale locks + deps_chain_mngr.release_all_owned_locks().await?; + #[cfg(feature = "bench")] populate_cache_with_tenant_keys(vec![1i32], &pool, &tenant_key_cache).await?; let mut immedially_poll_more_work = false; @@ -104,7 +114,7 @@ async fn tfhe_worker_cycle( }, }; } - immedially_poll_more_work = false; + #[cfg(feature = "bench")] let now = std::time::SystemTime::now(); let loop_span = tracer.start("worker_iteration"); @@ -117,9 +127,23 @@ async fn tfhe_worker_cycle( s.end(); // Query for transactions to execute, and if relevant the associated keys - let (mut transactions, mut unneeded_handles) = - query_for_work(args, &health_check, &mut trx, &tracer, &loop_ctx).await?; + let (mut transactions, mut unneeded_handles) = query_for_work( + args, + &health_check, + &mut trx, + &mut deps_chain_mngr, + &tracer, + &loop_ctx, + ) + .await?; if transactions.is_empty() { + deps_chain_mngr.release_current_lock().await?; + + // Lock another dependence chain if available and + // continue processing without waiting for notification + let (lock, _) = deps_chain_mngr.acquire_next_lock().await?; + immedially_poll_more_work = lock.is_some(); + continue; } else { // We've fetched work, so we'll poll again without waiting @@ -137,6 +161,8 @@ async fn tfhe_worker_cycle( // Execute transactions segregated by tenant for (tenant_id, ref mut tenant_txs) in transactions.iter_mut() { + deps_chain_mngr.extend_current_lock().await?; + let mut tx_graph = build_transaction_graph_and_execute( tenant_id, tenant_txs, @@ -152,6 +178,7 @@ async fn tfhe_worker_cycle( &mut tx_graph, &mut unneeded_handles, &mut trx, + &mut deps_chain_mngr, &tracer, &loop_ctx, ) @@ -290,6 +317,7 @@ async fn query_for_work<'a>( args: &crate::daemon_cli::Args, health_check: &crate::health_check::HealthCheck, trx: &mut sqlx::Transaction<'a, Postgres>, + deps_chain_mngr: &mut dependence_chain::LockMngr, tracer: &opentelemetry::global::BoxedTracer, loop_ctx: &opentelemetry::Context, ) -> Result< @@ -298,6 +326,24 @@ async fn query_for_work<'a>( > { // This query locks our work items so other worker doesn't select them. let mut s = tracer.start_with_context("query_work_items", loop_ctx); + + // Lock dependence chain + let (dependence_chain_id, locking_reason) = match deps_chain_mngr.extend_current_lock().await? { + // If there is a current lock, we extend it and use its dependence_chain_id + Some((id, reason)) => (Some(id), reason), + None => deps_chain_mngr.acquire_next_lock().await?, + }; + + if dependence_chain_id.is_none() { + health_check.update_activity(); + return Ok((vec![], vec![])); + } + + s.set_attribute(KeyValue::new( + "dependence_chain_id", + format!("{:?}", dependence_chain_id.as_ref().map(hex::encode)), + )); + let the_work = query!( " WITH selected_computations AS ( @@ -310,8 +356,9 @@ WITH selected_computations AS ( WHERE is_completed = FALSE AND is_error = FALSE AND is_allowed = TRUE + AND ($1::bytea IS NULL OR dependence_chain_id = $1) ORDER BY created_at - LIMIT $1 + LIMIT $2 ) as c_creation_order UNION ALL SELECT DISTINCT @@ -322,8 +369,9 @@ WITH selected_computations AS ( WHERE is_completed = FALSE AND is_error = FALSE AND is_allowed = TRUE + AND ($1::bytea IS NULL OR dependence_chain_id = $1) ORDER BY schedule_order - LIMIT $1 + LIMIT $2 ) as c_schedule_order ) ) @@ -341,6 +389,7 @@ FROM computations c JOIN selected_computations sc ON c.transaction_id = sc.transaction_id FOR UPDATE SKIP LOCKED ", + dependence_chain_id, args.work_items_batch_size as i32, ) .fetch_all(trx.as_mut()) @@ -353,15 +402,20 @@ FOR UPDATE SKIP LOCKED ", s.end(); health_check.update_db_access(); if the_work.is_empty() { + if let Some(dependence_chain_id) = &dependence_chain_id { + warn!(target: "tfhe_worker", dcid = %hex::encode(dependence_chain_id), locking = ?locking_reason, "No work items found to process"); + } health_check.update_activity(); return Ok((vec![], vec![])); } WORK_ITEMS_FOUND_COUNTER.inc_by(the_work.len() as u64); - info!(target: "tfhe_worker", { count = the_work.len() }, "Processing work items"); + info!(target: "tfhe_worker", { count = the_work.len(), dcid = ?dependence_chain_id.as_ref().map(hex::encode), + locking = ?locking_reason }, "Processing work items"); // Make sure we process each tenant independently to avoid // setting different keys from different tenants in the worker // threads - let mut s_prep = tracer.start_with_context("prepare_dataflow_graphs", loop_ctx); + let mut s_prep: opentelemetry::global::BoxedSpan = + tracer.start_with_context("prepare_dataflow_graphs", loop_ctx); s_prep.set_attribute(KeyValue::new("work_items", the_work.len() as i64)); // Partition work by tenant let work_by_tenant = the_work.into_iter().into_group_map_by(|k| k.tenant_id); @@ -392,6 +446,7 @@ FOR UPDATE SKIP LOCKED ", tenant_id, &e, trx, + deps_chain_mngr, tracer, loop_ctx, ) @@ -506,6 +561,7 @@ async fn upload_transaction_graph_results<'a>( tx_graph: &mut DFComponentGraph, unneeded_handles: &mut Vec<(Handle, Handle)>, trx: &mut sqlx::Transaction<'a, Postgres>, + deps_mngr: &mut dependence_chain::LockMngr, tracer: &opentelemetry::global::BoxedTracer, loop_ctx: &opentelemetry::Context, ) -> Result<(), Box> { @@ -579,6 +635,7 @@ async fn upload_transaction_graph_results<'a>( tenant_id, &*cerr, trx, + deps_mngr, tracer, loop_ctx, ) @@ -657,12 +714,14 @@ async fn upload_transaction_graph_results<'a>( Ok(()) } +#[allow(clippy::too_many_arguments)] async fn set_computation_error<'a>( output_handle: &[u8], transaction_id: &[u8], tenant_id: &i32, cerr: &(dyn std::error::Error + Send + Sync), trx: &mut sqlx::Transaction<'a, Postgres>, + deps_mngr: &mut dependence_chain::LockMngr, tracer: &opentelemetry::global::BoxedTracer, loop_ctx: &opentelemetry::Context, ) -> Result<(), Box> { @@ -694,6 +753,8 @@ async fn set_computation_error<'a>( ) .execute(trx.as_mut()) .await?; + + deps_mngr.set_processing_error(Some(err_string)).await?; s.end(); Ok(()) }