diff --git a/CHANGELOG.md b/CHANGELOG.md index adc282f18ba..a3cea7ed17f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to the versioning scheme outlined in the [README.md](RE - In the `/v3/transaction/{txid}` RPC endpoint, added `block_height` and `is_canonical` to the response. - Improved block validation in `stacks-inspect`. +- Allow non-blocking event dispatching. This is off by default, but can be enabled in the node configuration. ### Changed diff --git a/docs/event-dispatcher.md b/docs/event-dispatcher.md index 293ca5b003e..d93a798d04f 100644 --- a/docs/event-dispatcher.md +++ b/docs/event-dispatcher.md @@ -22,6 +22,20 @@ disable_retries = false # Optional: If true, failed deliveries w The `stacks-node` will then execute HTTP POST requests with JSON payloads to the configured `endpoint` for the subscribed events. +By default, when sending a payload the event dispatcher will block node operation until it has received a successful response from your observer. Your event observer should therefore be quick to respond, and offload any expensive computation to an asynchronous background task. Alternatively, you can configure the events to be delivered in a non-blocking fashion like this: + +```toml +[node] +... +event_dispatcher_blocking = false +# By default, up to 1,000 requests can be held in a queue before the event dispatcher will start blocking +# again. If you expect bigger bursts than that, you can further tweak this value. +# +# event_dispatcher_queue_size = 1_000 +``` + +Note that this is only meant to deal with bursts of events. If your event observer is continuously slower than the stream of incoming events, it will fall behind more and more, and the dispatcher will eventually start blocking again to catch up. + ## Important Notes * **`/new_microblocks` Endpoint Limitation:** Event delivery via the `/new_microblocks` endpoint (and by extension, events sourced from microblocks delivered to `/new_block`) is **only supported until epoch 2.5**. After this epoch, observers will no longer receive events on this path for new microblocks. diff --git a/stacks-node/src/event_dispatcher.rs b/stacks-node/src/event_dispatcher.rs index d043da13d2f..8f0afd15e6b 100644 --- a/stacks-node/src/event_dispatcher.rs +++ b/stacks-node/src/event_dispatcher.rs @@ -1,5 +1,5 @@ // Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation -// Copyright (C) 2020-2025 Stacks Open Internet Foundation +// Copyright (C) 2020-2026 Stacks Open Internet Foundation // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -16,22 +16,19 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; -use std::fmt; use std::path::PathBuf; #[cfg(test)] use std::sync::mpsc::channel; -#[cfg(test)] -use std::sync::LazyLock; use std::sync::{Arc, Mutex}; -use std::thread::sleep; -use std::time::Duration; +#[cfg(test)] +use std::sync::{LazyLock, Weak}; +use std::time::{Duration, SystemTime}; use clarity::vm::costs::ExecutionCost; use clarity::vm::events::{FTEventType, NFTEventType, STXEventType}; use clarity::vm::types::{AssetIdentifier, QualifiedContractIdentifier}; #[cfg(any(test, feature = "testing"))] use lazy_static::lazy_static; -use rand::Rng; use serde_json::json; use stacks::burnchains::{PoxConstants, Txid}; use stacks::chainstate::burn::ConsensusHash; @@ -54,19 +51,17 @@ use stacks::net::api::postblock_proposal::{ BlockValidateOk, BlockValidateReject, BlockValidateResponse, }; use stacks::net::atlas::{Attachment, AttachmentInstance}; -use stacks::net::http::HttpRequestContents; -use stacks::net::httpcore::{send_http_request, StacksHttpRequest}; use stacks::net::stackerdb::StackerDBEventDispatcher; #[cfg(any(test, feature = "testing"))] use stacks::util::tests::TestFlag; use stacks_common::bitvec::BitVec; use stacks_common::types::chainstate::{BlockHeaderHash, BurnchainHeaderHash, StacksBlockId}; -use stacks_common::types::net::PeerHost; use url::Url; mod db; mod payloads; mod stacker_db; +mod worker; use db::EventDispatcherDbConnection; use payloads::*; @@ -76,6 +71,9 @@ pub use payloads::{ }; pub use stacker_db::StackerDBChannel; +use crate::event_dispatcher::db::PendingPayload; +use crate::event_dispatcher::worker::{EventDispatcherResult, EventDispatcherWorker}; + #[cfg(test)] mod tests; @@ -85,48 +83,23 @@ lazy_static! { pub static ref TEST_SKIP_BLOCK_ANNOUNCEMENT: TestFlag = TestFlag::default(); } -#[derive(Debug)] +#[derive(Debug, thiserror::Error)] enum EventDispatcherError { - SerializationError(serde_json::Error), - HttpError(std::io::Error), - DbError(stacks::util_lib::db::Error), -} - -impl fmt::Display for EventDispatcherError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match *self { - EventDispatcherError::SerializationError(ref e) => fmt::Display::fmt(e, f), - EventDispatcherError::HttpError(ref e) => fmt::Display::fmt(e, f), - EventDispatcherError::DbError(ref e) => fmt::Display::fmt(e, f), - } - } -} - -impl core::error::Error for EventDispatcherError { - fn cause(&self) -> Option<&dyn core::error::Error> { - match *self { - EventDispatcherError::SerializationError(ref e) => Some(e), - EventDispatcherError::HttpError(ref e) => Some(e), - EventDispatcherError::DbError(ref e) => Some(e), - } - } + #[error("Serialization error: {0}")] + SerializationError(#[from] serde_json::Error), + #[error("HTTP error: {0}")] + HttpError(#[from] std::io::Error), + #[error("Database error: {0}")] + DbError(#[from] stacks::util_lib::db::Error), + #[error("Channel receive error: {0}")] + RecvError(#[from] std::sync::mpsc::RecvError), + #[error("Channel send error: {0}")] + SendError(String), // not capturing the underlying because it's a generic type } -impl From for EventDispatcherError { - fn from(value: serde_json::Error) -> Self { - EventDispatcherError::SerializationError(value) - } -} - -impl From for EventDispatcherError { - fn from(value: stacks::util_lib::db::Error) -> Self { - EventDispatcherError::DbError(value) - } -} - -impl From for EventDispatcherError { - fn from(value: std::io::Error) -> Self { - EventDispatcherError::HttpError(value) +impl From> for EventDispatcherError { + fn from(value: std::sync::mpsc::SendError) -> Self { + EventDispatcherError::SendError(format!("{value}")) } } @@ -167,6 +140,12 @@ impl EventObserver { } } +struct EventRequestData { + pub url: String, + pub payload_bytes: Arc<[u8]>, + pub timeout: Duration, +} + /// Events received from block-processing. /// Stacks events are structured as JSON, and are grouped by topic. An event observer can /// subscribe to one or more specific event streams, or the "any" stream to receive all of them. @@ -205,6 +184,10 @@ pub struct EventDispatcher { pub stackerdb_channel: Arc>, /// Path to the database where pending payloads are stored. db_path: PathBuf, + /// The worker thread that performs the actuall HTTP requests so that they don't block + /// the main operation of the node. It's wrapped in an `Arc` only to make some test helpers + /// work (see `ALL_WORKERS`); in release code it wouldn't be necessary. + worker: Arc, } /// This struct is used specifically for receiving proposal responses. @@ -229,7 +212,9 @@ impl ProposalCallbackReceiver for ProposalCallbackHandler { for observer in self.observers.iter() { self.dispatcher - .dispatch_to_observer(observer, &response, PATH_PROPOSAL_RESPONSE); + .dispatch_to_observer(observer, &response, PATH_PROPOSAL_RESPONSE) + .unwrap() + .wait_until_complete(); } } } @@ -401,12 +386,64 @@ impl BlockEventDispatcher for EventDispatcher { } } +/// During integration tests, the `test_observer` needs to ensure that all events +/// that were triggered have actually been delivered, before it can pass on the +/// captured data. To make that work, during test we store weak references to +/// all the workers and make it possible to wait for all of them to catch up +/// in a single function call (see `catch_up_all_event_dispatchers`). +#[cfg(test)] +static ALL_WORKERS: Mutex>> = Mutex::new(Vec::new()); + +#[cfg(test)] +pub fn catch_up_all_event_dispatchers() { + let mut results = Vec::new(); + let mut guard = ALL_WORKERS.lock().unwrap(); + + // remove all items that have been dropped; call .noop() the rest + guard.retain_mut(|w| { + let Some(worker) = w.upgrade() else { + return false; + }; + results.push(worker.noop().unwrap()); + return true; + }); + // unlock the mutex + drop(guard); + + // block until all workers have caught up + for result in results { + result.wait_until_complete(); + } +} + impl EventDispatcher { + /// The default behavior is to create a non-blocking dispatcher with a + /// queue size of 1,000. Note however that the default *node* configuration + /// is to always block (i.e. an effective queue size of 0). + /// + /// See the `event_dispatcher_blocking` and `event_dispatcher_queue_size` + /// config values. pub fn new(working_dir: PathBuf) -> EventDispatcher { + Self::new_with_custom_queue_size(working_dir, 1_000) + } + /// The queue size specifies how many events may be in-flight without + /// blocking the calling thread when sending additional events. A value + /// of 0 means they always block. + pub fn new_with_custom_queue_size(working_dir: PathBuf, queue_size: usize) -> EventDispatcher { let mut db_path = working_dir; db_path.push("event_observers.sqlite"); EventDispatcherDbConnection::new(&db_path).expect("Failed to initialize database"); + let worker = EventDispatcherWorker::new(db_path.clone(), queue_size) + .expect("Failed to start worker thread"); + + let worker = Arc::new(worker); + + #[cfg(test)] + { + ALL_WORKERS.lock().unwrap().push(Arc::downgrade(&worker)); + } + EventDispatcher { stackerdb_channel: Arc::new(Mutex::new(StackerDBChannel::new())), registered_observers: vec![], @@ -422,9 +459,19 @@ impl EventDispatcher { stackerdb_observers_lookup: HashSet::new(), block_proposal_observers_lookup: HashSet::new(), db_path, + worker, } } + /// Sends a noop task to the worker and waits until its completion is acknowledged. + /// This has the effect that all payloads that have been submitted before this point + /// are also done, which is a useful thing to wait for in some tests where you want + /// to assert on certain event deliveries. + #[cfg(test)] + pub fn catch_up(&self) { + self.worker.noop().unwrap().wait_until_complete(); + } + pub fn process_burn_block( &self, burn_block: &BurnchainHeaderHash, @@ -635,7 +682,7 @@ impl EventDispatcher { ); // Send payload - self.dispatch_to_observer( + self.dispatch_to_observer_or_log_error( &self.registered_observers[observer_id], &payload, PATH_BLOCK_PROCESSED, @@ -1038,8 +1085,9 @@ impl EventDispatcher { event_observer } - /// Process any pending payloads in the database. - /// This is called when the event dispatcher is first instantiated. + /// Process any pending payloads in the database. This is meant to be called at startup, in order to + /// handle anything that was enqueued but not sent before shutdown. This method blocks until all + /// requests are made (or, if the observer is no longer registered, removed from the DB). pub fn process_pending_payloads(&self) { let conn = EventDispatcherDbConnection::new(&self.db_path).expect("Failed to initialize database"); @@ -1059,10 +1107,20 @@ impl EventDispatcher { pending_payloads.len() ); - for (id, url, payload_bytes, _timeout_ms) in pending_payloads { - info!("Event dispatcher: processing pending payload: {url}"); - let full_url = Url::parse(url.as_str()) - .unwrap_or_else(|_| panic!("Event dispatcher: unable to parse {url} as a URL")); + for PendingPayload { + id, request_data, .. + } in pending_payloads + { + info!( + "Event dispatcher: processing pending payload: {}", + request_data.url + ); + let full_url = Url::parse(request_data.url.as_str()).unwrap_or_else(|_| { + panic!( + "Event dispatcher: unable to parse {} as a URL", + request_data.url + ) + }); // find the right observer let observer = self.registered_observers.iter().find(|observer| { let endpoint_url = Url::parse(format!("http://{}", &observer.endpoint).as_str()) @@ -1079,7 +1137,7 @@ impl EventDispatcher { // This observer is no longer registered, skip and delete info!( "Event dispatcher: observer {} no longer registered, skipping", - url + request_data.url ); if let Err(e) = conn.delete_payload(id) { error!( @@ -1090,22 +1148,24 @@ impl EventDispatcher { continue; }; - self.make_http_request_and_delete_from_db( - &payload_bytes, - full_url.as_str(), - observer.timeout, - observer.disable_retries, - id, - ); + // If the timeout configuration for this observer is different from what it was + // originally, the updated config wins. + self.worker + .initiate_send(id, observer.disable_retries, Some(observer.timeout)) + .expect("failed to dispatch pending event payload to worker thread") + .wait_until_complete(); } } + /// A successful result from this method only indicates that that payload was successfully + /// enqueued, not that the HTTP request was actually made. If you need to wait until that's + /// the case, call `wait_until_complete()` on the `EventDispatcherResult`. fn dispatch_to_observer( &self, event_observer: &EventObserver, payload: &serde_json::Value, path: &str, - ) { + ) -> Result { let full_url = Self::get_full_url(event_observer, path); let bytes = match Self::get_payload_bytes(payload) { Ok(bytes) => bytes, @@ -1113,95 +1173,37 @@ impl EventDispatcher { error!( "Event dispatcher: failed to serialize payload"; "path" => path, "error" => ?err ); - return; + return Err(err); } }; - let id = self.save_to_db(&full_url, bytes.as_ref(), event_observer.timeout); - - self.make_http_request_and_delete_from_db( - &bytes, - &full_url, - event_observer.timeout, - event_observer.disable_retries, - id, - ); - } + let data = EventRequestData { + payload_bytes: bytes, + url: full_url, + timeout: event_observer.timeout, + }; - fn make_http_request( - payload_bytes: &Arc<[u8]>, - full_url: &str, - timeout: Duration, - disable_retries: bool, - ) -> Result<(), EventDispatcherError> { - debug!( - "Event dispatcher: Sending payload"; "url" => %full_url, "bytes" => payload_bytes.len() - ); + let id = self.save_to_db(&data); - let url = Url::parse(full_url) - .unwrap_or_else(|_| panic!("Event dispatcher: unable to parse {full_url} as a URL")); - - let host = url.host_str().expect("Invalid URL: missing host"); - let port = url.port_or_known_default().unwrap_or(80); - let peerhost: PeerHost = format!("{host}:{port}") - .parse() - .unwrap_or(PeerHost::DNS(host.to_string(), port)); - - let mut backoff = Duration::from_millis(100); - let mut attempts: i32 = 0; - // Cap the backoff at 3x the timeout - let max_backoff = timeout.saturating_mul(3); - - loop { - let mut request = StacksHttpRequest::new_for_peer( - peerhost.clone(), - "POST".into(), - url.path().into(), - HttpRequestContents::new().payload_json_bytes(Arc::clone(payload_bytes)), - ) - .unwrap_or_else(|_| panic!("FATAL: failed to encode infallible data as HTTP request")); - request.add_header("Connection".into(), "close".into()); - match send_http_request(host, port, request, timeout) { - Ok(response) => { - if response.preamble().status_code == 200 { - debug!( - "Event dispatcher: Successful POST"; "url" => %url - ); - break; - } else { - error!( - "Event dispatcher: Failed POST"; "url" => %url, "response" => ?response.preamble() - ); - } - } - Err(err) => { - warn!( - "Event dispatcher: connection or request failed to {host}:{port} - {err:?}"; - "backoff" => ?backoff, - "attempts" => attempts - ); - if disable_retries { - warn!("Observer is configured in disable_retries mode: skipping retry of payload"); - return Err(err.into()); - } - #[cfg(test)] - if TEST_EVENT_OBSERVER_SKIP_RETRY.get() { - warn!("Fault injection: skipping retry of payload"); - return Err(err.into()); - } - } - } + self.worker + .initiate_send(id, event_observer.disable_retries, None) + } - sleep(backoff); - let jitter: u64 = rand::thread_rng().gen_range(0..100); - backoff = std::cmp::min( - backoff.saturating_mul(2) + Duration::from_millis(jitter), - max_backoff, - ); - attempts = attempts.saturating_add(1); + /// This fire-and-forget version of `dispatch_to_observer` logs any error from enqueueing the + /// request, and does not give you a way to wait for blocking until it's sent. If you need + /// more control, use `dispatch_to_observer()` directly and handle the result yourself. + /// + /// This method exists because we generally don't want the event dispatcher to interrupt the node's + /// processing. + fn dispatch_to_observer_or_log_error( + &self, + event_observer: &EventObserver, + payload: &serde_json::Value, + path: &str, + ) { + if let Err(err) = self.dispatch_to_observer(event_observer, payload, path) { + error!("Event dispatcher: Failed to enqueue payload for sending to observer: {err:?}"); } - - Ok(()) } fn get_payload_bytes(payload: &serde_json::Value) -> Result, EventDispatcherError> { @@ -1218,7 +1220,7 @@ impl EventDispatcher { format!("http://{url_str}") } - fn save_to_db(&self, url: &str, payload_bytes: &[u8], timeout: Duration) -> i64 { + fn save_to_db(&self, data: &EventRequestData) -> i64 { // Because the DB is initialized in the call to process_pending_payloads() during startup, // it is *probably* ok to skip initialization here. That said, at the time of writing this is the // only call to new_without_init(), and we might want to revisit the question whether it's @@ -1226,60 +1228,15 @@ impl EventDispatcher { let conn = EventDispatcherDbConnection::new_without_init(&self.db_path) .expect("Failed to open database for event observer"); - conn.insert_payload_with_retry(&url, payload_bytes.as_ref(), timeout); - conn.last_insert_rowid() - } - - fn make_http_request_and_delete_from_db( - &self, - payload_bytes: &Arc<[u8]>, - full_url: &str, - timeout: Duration, - disable_retries: bool, - id: i64, - ) { - let http_result = - Self::make_http_request(payload_bytes, full_url, timeout, disable_retries); - - if let Err(err) = http_result { - // log but continue - error!("EventDispatcher: dispatching failed"; "url" => &full_url, "error" => ?err); - } - - #[cfg(test)] - if TEST_EVENT_OBSERVER_SKIP_RETRY.get() { - warn!("Fault injection: skipping deletion of payload"); - return; - } - - // We're deleting regardless of result -- if retries are disabled, that means - // we're supposed to forget about it in case of failure. If they're not disabled, - // then we wouldn't be here in case of failue, because `make_http_request` retries - // until it's successful (with the exception of the above fault injection which - // simulates a shutdown). - let deletion_result = self.delete_from_db(id); - - if let Err(e) = deletion_result { - error!( - "Event observer: failed to delete pending payload from database"; - "error" => ?e - ); - } - } - - fn delete_from_db(&self, id: i64) -> Result<(), EventDispatcherError> { - let conn = EventDispatcherDbConnection::new_without_init(&self.db_path) - .expect("Failed to open database for event observer"); - conn.delete_payload(id)?; - Ok(()) + conn.insert_payload_with_retry(data, SystemTime::now()) } fn send_new_attachments(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.dispatch_to_observer(event_observer, payload, PATH_ATTACHMENT_PROCESSED); + self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_ATTACHMENT_PROCESSED); } fn send_new_mempool_txs(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.dispatch_to_observer(event_observer, payload, PATH_MEMPOOL_TX_SUBMIT); + self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_MEMPOOL_TX_SUBMIT); } /// Serializes new microblocks data into a JSON payload and sends it off to the correct path @@ -1312,7 +1269,7 @@ impl EventDispatcher { "burn_block_timestamp": burn_block_timestamp, }); - self.dispatch_to_observer(event_observer, &payload, PATH_MICROBLOCK_SUBMIT); + self.dispatch_to_observer_or_log_error(event_observer, &payload, PATH_MICROBLOCK_SUBMIT); } fn send_dropped_mempool_txs( @@ -1320,15 +1277,15 @@ impl EventDispatcher { event_observer: &EventObserver, payload: &serde_json::Value, ) { - self.dispatch_to_observer(event_observer, payload, PATH_MEMPOOL_TX_DROP); + self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_MEMPOOL_TX_DROP); } fn send_mined_block(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.dispatch_to_observer(event_observer, payload, PATH_MINED_BLOCK); + self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_MINED_BLOCK); } fn send_mined_microblock(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.dispatch_to_observer(event_observer, payload, PATH_MINED_MICROBLOCK); + self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_MINED_MICROBLOCK); } fn send_mined_nakamoto_block( @@ -1336,15 +1293,15 @@ impl EventDispatcher { event_observer: &EventObserver, payload: &serde_json::Value, ) { - self.dispatch_to_observer(event_observer, payload, PATH_MINED_NAKAMOTO_BLOCK); + self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_MINED_NAKAMOTO_BLOCK); } fn send_stackerdb_chunks(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.dispatch_to_observer(event_observer, payload, PATH_STACKERDB_CHUNKS); + self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_STACKERDB_CHUNKS); } fn send_new_burn_block(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.dispatch_to_observer(event_observer, payload, PATH_BURN_BLOCK_SUBMIT); + self.dispatch_to_observer_or_log_error(event_observer, payload, PATH_BURN_BLOCK_SUBMIT); } } diff --git a/stacks-node/src/event_dispatcher/db.rs b/stacks-node/src/event_dispatcher/db.rs index 122812592c8..14e4fc5676d 100644 --- a/stacks-node/src/event_dispatcher/db.rs +++ b/stacks-node/src/event_dispatcher/db.rs @@ -1,5 +1,5 @@ // Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation -// Copyright (C) 2020-2025 Stacks Open Internet Foundation +// Copyright (C) 2020-2026 Stacks Open Internet Foundation // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -14,15 +14,26 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use std::fmt::Debug; use std::path::PathBuf; use std::sync::Arc; use std::thread::sleep; -use std::time::Duration; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use rusqlite::{params, Connection}; -use stacks::util_lib::db::Error as db_error; +use rusqlite::{params, Connection, Row}; +use stacks::util_lib::db::{table_exists, Error as db_error}; + +use crate::event_dispatcher::EventRequestData; + +pub struct PendingPayload { + pub request_data: EventRequestData, + #[allow(dead_code)] // will be used in a follow-up commit + pub timestamp: SystemTime, + pub id: i64, +} /// Wraps a SQlite connection to the database in which pending event payloads are stored +#[derive(Debug)] pub struct EventDispatcherDbConnection { connection: Connection, } @@ -40,17 +51,15 @@ impl EventDispatcherDbConnection { id INTEGER PRIMARY KEY AUTOINCREMENT, url TEXT NOT NULL, payload BLOB NOT NULL, - timeout INTEGER NOT NULL + timeout INTEGER NOT NULL, + timestamp INTEGER NOT NULL )", [], )?; let mut connection = EventDispatcherDbConnection { connection }; - if let Some(col_type) = connection.get_payload_column_type()? { - if col_type.eq_ignore_ascii_case("TEXT") { - info!("Event observer: migrating pending_payloads.payload from TEXT to BLOB"); - connection.migrate_payload_column_to_blob()?; - } - } + + connection.run_necessary_migrations()?; + Ok(connection) } @@ -59,71 +68,58 @@ impl EventDispatcherDbConnection { EventDispatcherDbConnection { connection } } - /// Insert a payload into the database, retrying on failure. - pub fn insert_payload_with_retry(&self, url: &str, payload_bytes: &[u8], timeout: Duration) { - let mut attempts = 0i64; - let mut backoff = Duration::from_millis(100); // Initial backoff duration - let max_backoff = Duration::from_secs(5); // Cap the backoff duration - - loop { - match self.insert_payload(url, payload_bytes, timeout) { - Ok(_) => { - // Successful insert, break the loop - return; - } - Err(err) => { - // Log the error, then retry after a delay - warn!("Failed to insert payload into event observer database: {err:?}"; - "backoff" => ?backoff, - "attempts" => attempts - ); - - // Wait for the backoff duration - sleep(backoff); - - // Increase the backoff duration (with exponential backoff) - backoff = std::cmp::min(backoff.saturating_mul(2), max_backoff); - - attempts = attempts.saturating_add(1); - } - } - } + /// Insert a payload into the database, retrying on failure. Returns the id of of the inserted record. + pub fn insert_payload_with_retry(&self, data: &EventRequestData, timestamp: SystemTime) -> i64 { + with_retry( + || self.insert_payload(data, timestamp), + "Failed to insert payload into event observer database".to_string(), + ) + } + + pub fn get_payload_with_retry(&self, id: i64) -> PendingPayload { + with_retry( + || self.get_payload(id), + "Failed to retrieve payload {id} from event observer database".to_string(), + ) } pub fn insert_payload( &self, - url: &str, - payload_bytes: &[u8], - timeout: Duration, - ) -> Result<(), db_error> { - let timeout_ms: u64 = timeout.as_millis().try_into().expect("Timeout too large"); - self.connection.execute( - "INSERT INTO pending_payloads (url, payload, timeout) VALUES (?1, ?2, ?3)", - params![url, payload_bytes, timeout_ms], + data: &EventRequestData, + timestamp: SystemTime, + ) -> Result { + let timeout_ms: u64 = data + .timeout + .as_millis() + .try_into() + .expect("Timeout too large"); + + let timestamp_s = timestamp + .duration_since(UNIX_EPOCH) + .expect("system clock is multiple decades slow") + .as_secs(); + + let id: i64 = self.connection.query_row( + "INSERT INTO pending_payloads (url, payload, timeout, timestamp) VALUES (?1, ?2, ?3, ?4) RETURNING id", + params![data.url, data.payload_bytes, timeout_ms, timestamp_s], + |row| row.get(0), )?; - Ok(()) + Ok(id) } - // TODO: change this to get the id from the insertion directly, because that's more reliable - pub fn last_insert_rowid(&self) -> i64 { - self.connection.last_insert_rowid() + pub fn get_payload(&self, id: i64) -> Result { + self.connection.query_row_and_then( + &format!("SELECT {PAYLOAD_FIELDS} FROM pending_payloads WHERE id = ?1"), + [id], + row_to_pending_payload, + ) } - pub fn get_pending_payloads(&self) -> Result, u64)>, db_error> { - let mut stmt = self - .connection - .prepare("SELECT id, url, payload, timeout FROM pending_payloads ORDER BY id")?; - let payload_iter = stmt.query_and_then( - [], - |row| -> Result<(i64, String, Arc<[u8]>, u64), db_error> { - let id: i64 = row.get(0)?; - let url: String = row.get(1)?; - let payload_bytes: Vec = row.get(2)?; - let payload_bytes = Arc::<[u8]>::from(payload_bytes); - let timeout_ms: u64 = row.get(3)?; - Ok((id, url, payload_bytes, timeout_ms)) - }, - )?; + pub fn get_pending_payloads(&self) -> Result, db_error> { + let mut stmt = self.connection.prepare(&format!( + "SELECT {PAYLOAD_FIELDS} FROM pending_payloads ORDER BY id" + ))?; + let payload_iter = stmt.query_and_then([], row_to_pending_payload)?; payload_iter.collect() } @@ -133,25 +129,54 @@ impl EventDispatcherDbConnection { Ok(()) } - fn get_payload_column_type(&self) -> Result, db_error> { - let mut stmt = self - .connection - .prepare("PRAGMA table_info(pending_payloads)")?; - - let rows = stmt.query_map([], |row| { - let name: String = row.get(1)?; - let col_type: String = row.get(2)?; - Ok((name, col_type)) - })?; - - for row in rows { - let (name, col_type) = row?; - if name == "payload" { - return Ok(Some(col_type)); - } + /// The initial schema of the database when this code was first created + const DB_VERSION_INITIAL_SCHEMA: u32 = 0; + /// The `payload`` column type changed from TEXT to BLOB + const DB_VERSION_PAYLOAD_IS_BLOB: u32 = 1; + /// Column `timestamp` and table `db_config` added + const DB_VERSION_VERSIONING_AND_TIMESTAMP_COLUMN: u32 = 2; + + fn run_necessary_migrations(&mut self) -> Result<(), db_error> { + let current_schema = self.get_schema_version()?; + + if current_schema < Self::DB_VERSION_PAYLOAD_IS_BLOB { + info!("Event observer: migrating pending_payloads.payload from TEXT to BLOB"); + self.migrate_payload_column_to_blob()?; + } + + if current_schema < Self::DB_VERSION_VERSIONING_AND_TIMESTAMP_COLUMN { + info!("Event observer: adding timestamp to pending_payloads"); + self.add_versioning_and_timestamp_column()?; } - Ok(None) + Ok(()) + } + + fn get_schema_version(&self) -> Result { + let has_db_config = table_exists(&self.connection, "db_config")?; + + if has_db_config { + let version = + self.connection + .query_row("SELECT MAX(version) FROM db_config", [], |r| { + r.get::<_, u32>(0) + })?; + return Ok(version); + } + + let payload_type = self.connection.query_row( + "SELECT type FROM pragma_table_info('pending_payloads') WHERE name='payload'", + [], + |r| r.get::<_, String>(0), + )?; + + let payload_is_blob = payload_type.eq_ignore_ascii_case("BLOB"); + + if payload_is_blob { + Ok(Self::DB_VERSION_PAYLOAD_IS_BLOB) + } else { + Ok(Self::DB_VERSION_INITIAL_SCHEMA) + } } fn migrate_payload_column_to_blob(&mut self) -> Result<(), db_error> { @@ -178,10 +203,118 @@ impl EventDispatcherDbConnection { tx.commit()?; Ok(()) } + + fn add_versioning_and_timestamp_column(&mut self) -> Result<(), db_error> { + let tx = self.connection.transaction()?; + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("time travel to pre-1970 is not supported") + .as_secs(); + + tx.execute( + "ALTER TABLE pending_payloads RENAME TO pending_payloads_old", + [], + )?; + tx.execute( + "CREATE TABLE pending_payloads ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + url TEXT NOT NULL, + payload BLOB NOT NULL, + timeout INTEGER NOT NULL, + timestamp INTEGER NOT NULL + )", + [], + )?; + tx.execute( + "INSERT INTO pending_payloads (id, url, payload, timeout, timestamp) + SELECT id, url, CAST(payload AS BLOB), timeout, ?1 FROM pending_payloads_old", + [now], + )?; + tx.execute("DROP TABLE pending_payloads_old", [])?; + + tx.execute("CREATE TABLE db_config (version INTEGER)", [])?; + tx.execute( + "INSERT INTO db_config (version) VALUES (?1)", + params![Self::DB_VERSION_VERSIONING_AND_TIMESTAMP_COLUMN], + )?; + + tx.commit()?; + Ok(()) + } +} + +// If you change this, make sure to change `row_to_pending_payload` in sync. +const PAYLOAD_FIELDS: &str = "id, url, payload, timeout, timestamp"; + +/// This function should only be used with rows that were SELECTed using the +/// `PAYLOAD_FIELDS` constant. +fn row_to_pending_payload(row: &Row) -> Result { + let id: i64 = row.get(0)?; + let url: String = row.get(1)?; + let payload_bytes: Vec = row.get(2)?; + let payload_bytes = Arc::<[u8]>::from(payload_bytes); + let timeout_ms: u64 = row.get(3)?; + let timestamp_s: u64 = row.get(4)?; + let request_data = EventRequestData { + url, + payload_bytes, + timeout: Duration::from_millis(timeout_ms), + }; + + Ok(PendingPayload { + id, + request_data, + timestamp: UNIX_EPOCH + Duration::from_secs(timestamp_s), + }) +} + +/// Calls the given function, repeatedly if necessary, until it doesn't fail, and then +/// returns the result from the successful call. Initially backs off for 0.1s and increases +/// backoff exponentially up to a max of five seconds. If the function never returns a +/// success result, `with_retry` will block forever. +/// +/// # Example +/// +/// let response = with_retry(|| perform_db_op(42), "database operation 42 failed"); +fn with_retry(f: F, error_log_text: String) -> T +where + F: Fn() -> Result, + E: Debug, +{ + let mut attempts = 0i64; + let mut backoff = Duration::from_millis(100); // Initial backoff duration + let max_backoff = Duration::from_secs(5); // Cap the backoff duration + + loop { + match f() { + Ok(thing) => { + // Successful operation, break the loop + return thing; + } + Err(err) => { + // Log the error, then retry after a delay + warn!("{error_log_text}: {err:?}"; + "backoff" => ?backoff, + "attempts" => attempts + ); + + // Wait for the backoff duration + sleep(backoff); + + // Increase the backoff duration (with exponential backoff) + backoff = std::cmp::min(backoff.saturating_mul(2), max_backoff); + + attempts = attempts.saturating_add(1); + } + } + } } #[cfg(test)] mod test { + use std::cell::RefCell; + use std::time::Instant; + use serde_json::json; use tempfile::tempdir; @@ -212,7 +345,7 @@ mod test { } #[test] - fn test_migrate_payload_column_to_blob() { + fn test_migration() { let dir = tempdir().unwrap(); let db_path = dir.path().join("test_payload_migration.sqlite"); @@ -252,12 +385,35 @@ mod test { "Payload column was not migrated to BLOB" ); + let insertion_info_col_count: i64 = conn + .connection + .query_row( + "SELECT COUNT(*) FROM pragma_table_info('pending_payloads') WHERE name = 'timestamp'", + [], + |row| row.get(0), + ) + .unwrap(); + assert!( + insertion_info_col_count == 1, + "timestamp column was not added" + ); + + let version: u32 = conn + .connection + .query_row("SELECT MAX(version) FROM db_config", [], |r| r.get(0)) + .expect("db_config was not added"); + assert_eq!( + version, + EventDispatcherDbConnection::DB_VERSION_VERSIONING_AND_TIMESTAMP_COLUMN, + "Unexpected version number. Did you add a migration? Update this test." + ); + let pending_payloads = conn .get_pending_payloads() .expect("Failed to get pending payloads"); assert_eq!(pending_payloads.len(), 1, "Expected one pending payload"); assert_eq!( - pending_payloads[0].2.as_ref(), + pending_payloads[0].request_data.payload_bytes.as_ref(), payload_str.as_bytes(), "Payload contents did not survive migration" ); @@ -271,14 +427,20 @@ mod test { let conn = EventDispatcherDbConnection::new(&db_path).expect("Failed to initialize the database"); - let url = "http://example.com/api"; + let url = "http://example.com/api".to_string(); let payload = json!({"key": "value"}); let timeout = Duration::from_secs(5); + let timestamp_sentinel = UNIX_EPOCH + Duration::from_hours(24 * 20000); let payload_bytes = serde_json::to_vec(&payload).expect("Failed to serialize payload"); + let data = EventRequestData { + url, + payload_bytes: payload_bytes.into(), + timeout, + }; + // Insert payload - let insert_result = conn.insert_payload(url, payload_bytes.as_slice(), timeout); - assert!(insert_result.is_ok(), "Failed to insert payload"); + let id = conn.insert_payload_with_retry(&data, timestamp_sentinel); // Get pending payloads let pending_payloads = conn @@ -286,17 +448,23 @@ mod test { .expect("Failed to get pending payloads"); assert_eq!(pending_payloads.len(), 1, "Expected one pending payload"); - let (_id, retrieved_url, stored_bytes, timeout_ms) = &pending_payloads[0]; - assert_eq!(retrieved_url, url, "URL does not match"); + let PendingPayload { + id: retrieved_id, + timestamp: retrieved_timestamp, + request_data: retrieved_data, + } = &pending_payloads[0]; + + assert_eq!(*retrieved_id, id, "ID does not match"); + assert_eq!(retrieved_data.url, data.url, "URL does not match"); assert_eq!( - stored_bytes.as_ref(), - payload_bytes.as_slice(), + retrieved_data.payload_bytes.as_ref(), + data.payload_bytes.as_ref(), "Serialized payload does not match" ); + assert_eq!(retrieved_data.timeout, timeout, "Timeout does not match"); assert_eq!( - *timeout_ms, - timeout.as_millis() as u64, - "Timeout does not match" + *retrieved_timestamp, timestamp_sentinel, + "Time stamp does not match" ); } @@ -308,13 +476,19 @@ mod test { let conn = EventDispatcherDbConnection::new(&db_path).expect("Failed to initialize the database"); - let url = "http://example.com/api"; + let url = "http://example.com/api".to_string(); let payload = json!({"key": "value"}); let timeout = Duration::from_secs(5); let payload_bytes = serde_json::to_vec(&payload).expect("Failed to serialize payload"); + let data = EventRequestData { + url, + payload_bytes: payload_bytes.into(), + timeout, + }; + // Insert payload - conn.insert_payload(url, payload_bytes.as_slice(), timeout) + conn.insert_payload(&data, SystemTime::now()) .expect("Failed to insert payload"); // Get pending payloads @@ -323,7 +497,7 @@ mod test { .expect("Failed to get pending payloads"); assert_eq!(pending_payloads.len(), 1, "Expected one pending payload"); - let (id, _, _, _) = pending_payloads[0]; + let PendingPayload { id, .. } = pending_payloads[0]; // Delete payload let delete_result = conn.delete_payload(id); @@ -335,4 +509,36 @@ mod test { .expect("Failed to get pending payloads"); assert_eq!(pending_payloads.len(), 0, "Expected no pending payloads"); } + + #[test] + fn test_with_retry_returns_original_result() { + let f = || Result::::Ok(6_7); + let result = with_retry(f, "failed".to_string()); + assert_eq!(result, 67); + } + + #[test] + fn test_with_retry_retries_as_often_as_necessary() { + let call_count = RefCell::new(0); + let f = || { + *call_count.borrow_mut() += 1; + if *call_count.borrow() < 5 { + return Err("keep trying"); + } else { + return Ok("you did it"); + } + }; + let now = Instant::now(); + let result = with_retry(f, "failed".to_string()); + let elapsed_millis = now.elapsed().as_millis(); + assert_eq!(result, "you did it"); + let count = *call_count.borrow(); + assert_eq!( + count, 5, + "inner function was not called the expected number of times" + ); + // We retry 4 times, with delays of 100, 200, 400, and 800 ms, respectively, + // for a total of 1,500. + assert!(1_450 < elapsed_millis && elapsed_millis < 1_550); + } } diff --git a/stacks-node/src/event_dispatcher/tests.rs b/stacks-node/src/event_dispatcher/tests.rs index 7908ca364f8..42e51e93fb9 100644 --- a/stacks-node/src/event_dispatcher/tests.rs +++ b/stacks-node/src/event_dispatcher/tests.rs @@ -1,5 +1,5 @@ // Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation -// Copyright (C) 2020-2025 Stacks Open Internet Foundation +// Copyright (C) 2020-2026 Stacks Open Internet Foundation // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -15,8 +15,9 @@ // along with this program. If not, see . use std::net::TcpListener; +use std::sync::atomic::{AtomicU32, Ordering}; use std::thread; -use std::time::Instant; +use std::time::{Instant, SystemTime}; use clarity::boot_util::boot_code_id; use clarity::vm::costs::ExecutionCost; @@ -37,9 +38,12 @@ use stacks::chainstate::stacks::{ TransactionPayload, TransactionPostConditionMode, TransactionPublicKeyEncoding, TransactionSpendingCondition, TransactionVersion, }; +use stacks::net::http::HttpRequestContents; +use stacks::net::httpcore::{send_http_request, StacksHttpRequest}; use stacks::types::chainstate::{ BlockHeaderHash, StacksAddress, StacksPrivateKey, StacksPublicKey, }; +use stacks::types::net::PeerHost; use stacks::util::hash::{Hash160, Sha512Trunc256Sum}; use stacks::util::secp256k1::MessageSignature; use stacks_common::bitvec::BitVec; @@ -238,7 +242,7 @@ fn test_process_pending_payloads() { info!("endpoint: {}", endpoint); let timeout = Duration::from_secs(5); - let mut dispatcher = EventDispatcher::new(dir.path().to_path_buf()); + let mut dispatcher = EventDispatcher::new_with_custom_queue_size(dir.path().to_path_buf(), 0); dispatcher.register_observer(&EventObserverConfig { endpoint: endpoint.clone(), @@ -261,12 +265,18 @@ fn test_process_pending_payloads() { .with_status(200) .create(); - let url = &format!("{}/api", &server.url()); + let url = format!("{}/api", &server.url()); + + let data = EventRequestData { + url, + payload_bytes: payload_bytes.into(), + timeout, + }; TEST_EVENT_OBSERVER_SKIP_RETRY.set(false); // Insert payload - conn.insert_payload(url, payload_bytes.as_slice(), timeout) + conn.insert_payload(&data, SystemTime::now()) .expect("Failed to insert payload"); // Process pending payloads @@ -318,9 +328,15 @@ fn pending_payloads_are_skipped_if_url_does_not_match() { .create(); // Use a different URL than the observer's endpoint - let url = "http://different-domain.com/api"; + let url = "http://different-domain.com/api".to_string(); + + let data = EventRequestData { + url, + payload_bytes: payload_bytes.into(), + timeout, + }; - conn.insert_payload(url, payload_bytes.as_slice(), timeout) + conn.insert_payload(&data, SystemTime::now()) .expect("Failed to insert payload"); dispatcher.process_pending_payloads(); @@ -396,7 +412,10 @@ fn test_send_payload_with_db() { TEST_EVENT_OBSERVER_SKIP_RETRY.set(false); // Call send_payload - dispatcher.dispatch_to_observer(&observer, &payload, "/test"); + dispatcher + .dispatch_to_observer(&observer, &payload, "/test") + .unwrap() + .wait_until_complete(); // Verify that the payload was sent and database is empty _m.assert(); @@ -441,7 +460,7 @@ fn test_send_payload_success() { let working_dir = dir.path().to_path_buf(); let dispatcher = EventDispatcher::new(working_dir); - dispatcher.dispatch_to_observer(&observer, &payload, "/test"); + dispatcher.dispatch_to_observer_or_log_error(&observer, &payload, "/test"); // Wait for the server to process the request rx.recv_timeout(Duration::from_secs(5)) @@ -493,7 +512,7 @@ fn test_send_payload_retry() { let working_dir = dir.path().to_path_buf(); let dispatcher = EventDispatcher::new(working_dir); - dispatcher.dispatch_to_observer(&observer, &payload, "/test"); + dispatcher.dispatch_to_observer_or_log_error(&observer, &payload, "/test"); // Wait for the server to process the request rx.recv_timeout(Duration::from_secs(5)) @@ -550,7 +569,10 @@ fn test_send_payload_timeout() { let dispatcher = EventDispatcher::new(working_dir); // Call the function being tested - dispatcher.dispatch_to_observer(&observer, &payload, "/test"); + dispatcher + .dispatch_to_observer(&observer, &payload, "/test") + .unwrap() + .wait_until_complete(); // Record the time after the function returns let elapsed_time = start_time.elapsed(); @@ -658,7 +680,10 @@ fn test_send_payload_with_db_force_restart() { info!("Sending payload 1"); // Send the payload - dispatcher.dispatch_to_observer(&observer, &payload, "/test"); + dispatcher + .dispatch_to_observer(&observer, &payload, "/test") + .unwrap() + .wait_until_complete(); // Re-enable retrying TEST_EVENT_OBSERVER_SKIP_RETRY.set(false); @@ -668,7 +693,7 @@ fn test_send_payload_with_db_force_restart() { info!("Sending payload 2"); // Send another payload - dispatcher.dispatch_to_observer(&observer, &payload2, "/test"); + dispatcher.dispatch_to_observer_or_log_error(&observer, &payload2, "/test"); // Wait for the server to process the requests rx.recv_timeout(Duration::from_secs(5)) @@ -695,7 +720,10 @@ fn test_event_dispatcher_disable_retries() { let dispatcher = EventDispatcher::new(working_dir); // in non "disable_retries" mode this will run forever - dispatcher.dispatch_to_observer(&observer, &payload, "/test"); + dispatcher + .dispatch_to_observer(&observer, &payload, "/test") + .unwrap() + .wait_until_complete(); // Verify that the payload was sent _m.assert(); @@ -715,7 +743,10 @@ fn test_event_dispatcher_disable_retries_invalid_url() { let dispatcher = EventDispatcher::new(working_dir); // in non "disable_retries" mode this will run forever - dispatcher.dispatch_to_observer(&observer, &payload, "/test"); + dispatcher + .dispatch_to_observer(&observer, &payload, "/test") + .unwrap() + .wait_until_complete(); } #[test] @@ -927,6 +958,7 @@ fn test_block_proposal_validation_event() { let endpoint = server.url().strip_prefix("http://").unwrap().to_string(); let dir = tempdir().unwrap(); let mut dispatcher = EventDispatcher::new(dir.path().to_path_buf()); + dispatcher.register_observer(&EventObserverConfig { endpoint: endpoint.clone(), events_keys: vec![EventKeyType::BlockProposal], @@ -958,3 +990,253 @@ fn test_block_proposal_validation_event() { mock.assert(); } + +#[test] +fn test_http_delivery_non_blocking() { + let mut slow_server = mockito::Server::new(); + + let start_count = Arc::new(AtomicU32::new(0)); + let end_count = Arc::new(AtomicU32::new(0)); + + let start_count2 = start_count.clone(); + let end_count2 = end_count.clone(); + + let mock = slow_server + .mock("POST", "/mined_nakamoto_block") + .with_body_from_request(move |_| { + start_count2.fetch_add(1, Ordering::SeqCst); + thread::sleep(Duration::from_secs(2)); + end_count2.fetch_add(1, Ordering::SeqCst); + "".into() + }) + .create(); + + let endpoint = slow_server + .url() + .strip_prefix("http://") + .unwrap() + .to_string(); + + let dir = tempdir().unwrap(); + let mut dispatcher = EventDispatcher::new(dir.path().to_path_buf()); + + dispatcher.register_observer(&EventObserverConfig { + endpoint: endpoint.clone(), + events_keys: vec![EventKeyType::MinedBlocks], + timeout_ms: 3_000, + disable_retries: false, + }); + + let nakamoto_block = NakamotoBlock { + header: NakamotoBlockHeader::empty(), + txs: vec![], + }; + + let start = Instant::now(); + + dispatcher.process_mined_nakamoto_block_event( + 0, + &nakamoto_block, + 0, + &ExecutionCost::max_value(), + vec![], + ); + + assert!( + start.elapsed() < Duration::from_millis(100), + "dispatcher blocked while sending event" + ); + + thread::sleep(Duration::from_secs(1)); + + assert!(start_count.load(Ordering::SeqCst) == 1); + assert!(end_count.load(Ordering::SeqCst) == 0); + + thread::sleep(Duration::from_secs(2)); + + assert!(start_count.load(Ordering::SeqCst) == 1); + assert!(end_count.load(Ordering::SeqCst) == 1); + + mock.assert(); +} + +#[test] +fn test_http_delivery_blocks_once_queue_is_full() { + let mut slow_server = mockito::Server::new(); + + let start_count = Arc::new(AtomicU32::new(0)); + let end_count = Arc::new(AtomicU32::new(0)); + + let start_count2 = start_count.clone(); + let end_count2 = end_count.clone(); + + // this server takes 2 seconds until it finally responds + let mock = slow_server + .mock("POST", "/mined_nakamoto_block") + .expect(4) + .with_body_from_request(move |_| { + start_count2.fetch_add(1, Ordering::SeqCst); + thread::sleep(Duration::from_secs(2)); + end_count2.fetch_add(1, Ordering::SeqCst); + "".into() + }) + .create(); + + let endpoint = slow_server + .url() + .strip_prefix("http://") + .unwrap() + .to_string(); + + let dir = tempdir().unwrap(); + + // Create a dispatcher with a queue size of 3, so that three pending requests + // don't block, but the fourth one does. + let mut dispatcher = EventDispatcher::new_with_custom_queue_size(dir.path().to_path_buf(), 3); + + dispatcher.register_observer(&EventObserverConfig { + endpoint: endpoint.clone(), + events_keys: vec![EventKeyType::MinedBlocks], + timeout_ms: 3_000, + disable_retries: false, + }); + + let nakamoto_block = NakamotoBlock { + header: NakamotoBlockHeader::empty(), + txs: vec![], + }; + + let start = Instant::now(); + + // send the first three requests + for _ in 1..=3 { + dispatcher.process_mined_nakamoto_block_event( + 0, + &nakamoto_block, + 0, + &ExecutionCost::max_value(), + vec![], + ); + } + + let elapsed = start.elapsed(); + // this shouldn't block because they fit in the queue + assert!( + elapsed < Duration::from_millis(500), + "dispatcher blocked while sending first three events" + ); + + thread::sleep(Duration::from_millis(500) - elapsed); + + assert_eq!(start_count.load(Ordering::SeqCst), 1); + assert_eq!(end_count.load(Ordering::SeqCst), 0); + + let start = Instant::now(); + + // send the fourth request -- this should now block until the first request is complete + dispatcher.process_mined_nakamoto_block_event( + 0, + &nakamoto_block, + 0, + &ExecutionCost::max_value(), + vec![], + ); + + // we waited 500ms previously, so it should take on the order of 1.5s until + // the first request is complete + assert!( + start.elapsed() > Duration::from_millis(1000), + "dispatcher did not block when sending fourth event" + ); + + assert!( + start.elapsed() < Duration::from_millis(2000), + "dispatcher blocked unexpectedly long after sending fourth event" + ); + + thread::sleep(Duration::from_millis(100)); + + assert_eq!(start_count.load(Ordering::SeqCst), 2); + assert_eq!(end_count.load(Ordering::SeqCst), 1); + + thread::sleep(Duration::from_secs(2)); + + assert_eq!(start_count.load(Ordering::SeqCst), 3); + assert_eq!(end_count.load(Ordering::SeqCst), 2); + + thread::sleep(Duration::from_secs(2)); + + assert_eq!(start_count.load(Ordering::SeqCst), 4); + assert_eq!(end_count.load(Ordering::SeqCst), 3); + + thread::sleep(Duration::from_secs(2)); + + assert_eq!(start_count.load(Ordering::SeqCst), 4); + assert_eq!(end_count.load(Ordering::SeqCst), 4); + + mock.assert(); +} + +#[test] +fn test_http_delivery_always_blocks_if_queue_size_is_zero() { + let mut slow_server = mockito::Server::new(); + + let start_count = Arc::new(AtomicU32::new(0)); + let end_count = Arc::new(AtomicU32::new(0)); + + let start_count2 = start_count.clone(); + let end_count2 = end_count.clone(); + + let mock = slow_server + .mock("POST", "/mined_nakamoto_block") + .with_body_from_request(move |_| { + start_count2.fetch_add(1, Ordering::SeqCst); + thread::sleep(Duration::from_secs(2)); + end_count2.fetch_add(1, Ordering::SeqCst); + "".into() + }) + .create(); + + let endpoint = slow_server + .url() + .strip_prefix("http://") + .unwrap() + .to_string(); + + let dir = tempdir().unwrap(); + let mut dispatcher = EventDispatcher::new_with_custom_queue_size(dir.path().to_path_buf(), 0); + + dispatcher.register_observer(&EventObserverConfig { + endpoint: endpoint.clone(), + events_keys: vec![EventKeyType::MinedBlocks], + timeout_ms: 3_000, + disable_retries: false, + }); + + let nakamoto_block = NakamotoBlock { + header: NakamotoBlockHeader::empty(), + txs: vec![], + }; + + let start = Instant::now(); + + dispatcher.process_mined_nakamoto_block_event( + 0, + &nakamoto_block, + 0, + &ExecutionCost::max_value(), + vec![], + ); + + assert!( + start.elapsed() > Duration::from_millis(1900), + "dispatcher did not block while sending event" + ); + + thread::sleep(Duration::from_millis(100)); + + assert!(start_count.load(Ordering::SeqCst) == 1); + assert!(end_count.load(Ordering::SeqCst) == 1); + + mock.assert(); +} diff --git a/stacks-node/src/event_dispatcher/worker.rs b/stacks-node/src/event_dispatcher/worker.rs new file mode 100644 index 00000000000..985e91ecc6b --- /dev/null +++ b/stacks-node/src/event_dispatcher/worker.rs @@ -0,0 +1,391 @@ +// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation +// Copyright (C) 2020-2026 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::path::PathBuf; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender}; +use std::sync::Arc; +use std::thread::{self, sleep}; +use std::time::{Duration, SystemTime}; + +use rand::Rng; +use stacks::net::http::HttpRequestContents; +use stacks::net::httpcore::{send_http_request, StacksHttpRequest}; +use stacks::types::net::PeerHost; +use url::Url; + +use crate::event_dispatcher::db::EventDispatcherDbConnection; +#[cfg(test)] +use crate::event_dispatcher::TEST_EVENT_OBSERVER_SKIP_RETRY; +use crate::event_dispatcher::{EventDispatcherError, EventRequestData}; + +#[allow(dead_code)] // NoOp is only used in test configurations +enum WorkerTask { + Payload { + /// The id of the payload data in the event observer DB. It must exist. + id: i64, + /// If true, the HTTP request is only attempted once. + disable_retries: bool, + /// A value for the HTTP timeout is stored in the DB, but can optionally be overridden. + timeout_override: Option, + }, + NoOp, +} +struct WorkerMessage { + task: WorkerTask, + /// The worker thread will send a message on this channel once it's done with this request. + completion: Sender<()>, +} + +/// The return type of `initiate_send()`. If the caller of that method just wishes to move +/// on, they can happily drop this result. This is the behavior for most event deliveries. +/// +/// On the other hand, if they wish to block until the HTTP request was successfully sent +/// (or, in the case of `disable_retries`, at least attempted), they can call +/// `.wait_until_complete()`. This is what happens during `process_pending_payloads()` at +/// startup. Note that it's possible that other requests are in the queue, so the blocking +/// may take longer than only the handling of this very request. +pub struct EventDispatcherResult { + /// The worker thread will send a one-time message to this channel to notify of completion. + /// Afterwards, it will drop the sender and thus close the channel. If this is `None`, then + /// the operation is already complete, and `wait_until_complete` returns immediately. + receiver: Option>, +} + +impl EventDispatcherResult { + pub fn wait_until_complete(self) { + let Some(receiver) = self.receiver else { + return; + }; + // There is no codepath that would drop the sender without sending the acknowledgenent + // first. And this method consumes `self`, so it can only be called once. + // So if despite all that, `recv()` returns an error, that means the worker thread panicked. + receiver + .recv() + .expect("EventDispatcherWorker thread has terminated mid-operation"); + } +} + +/// This worker is responsible for making the actual HTTP requests that ultimately result +/// from dispatching events to observers. It makes those requests on a dedicated separate +/// thread so that e.g. a slow event observer doesn't block a node from continuing its work. +/// +/// Call `EventDispatcherWorker::new()` to create. +/// +/// Call `initiate_send()` with the id of the payload (in the event oberserver DB) to enqueue. +/// +/// Cloning the `EventDispatcherWorker` does *not* create a new thread -- both the original and +/// the clone will share a single queue and worker thread. +/// +/// Once the `EventDispatcherWorker` (including any clones) is dropped, the worker thread will +/// finish any enqueued work and then shut down. +#[derive(Clone)] +pub struct EventDispatcherWorker { + sender: SyncSender, + must_block: bool, +} + +static NEXT_THREAD_NUM: AtomicU64 = AtomicU64::new(1); + +impl EventDispatcherWorker { + /// The custom queue size indicates how many requests are allowed to be pending (i.e. not + /// complete) before a call to initiate_send starts blocking. A value of 0 thus means that + /// every request blocks. + pub fn new( + db_path: PathBuf, + queue_size: usize, + ) -> Result { + // If the channel bound is 0, then the send operation will + // block until the *message was received*, not until the + // payload was sent. Only the next send will then block (until + // the previous request was completed). + // + // In other words, the channel's bound has to be one less than + // the desired queue size. If the queue size is 0, then the bound + // is also set to zero, and *in addition* we block until the + // request has happend. + + let channel_bound = if queue_size > 0 { queue_size - 1 } else { 0 }; + let must_block = queue_size == 0; + + let (message_tx, message_rx) = sync_channel(channel_bound); + let (ready_tx, ready_rx) = channel(); + + let thread_num = NEXT_THREAD_NUM.fetch_add(1, Ordering::SeqCst); + + thread::Builder::new() + .name(format!("event-dispatcher-{thread_num}").to_string()) + .spawn(move || { + let conn = match EventDispatcherDbConnection::new(&db_path) { + Ok(conn) => conn, + Err(err) => { + error!("Event Dispatcher Worker: Unable to open DB, terminating worker thread: {err}"); + ready_tx.send(Err(err)).unwrap(); + return; + } + }; + + if let Err(err) = ready_tx.send(Ok(())) { + // If the sending fails (i.e. the receiver has been dropped), that means a logic bug + // has been introduced to the code -- at time of writing, the main function is waiting + // for this message a few lines down, outside the thread closure. + // We log this, but we still start the loop. + error!( + "Event Dispatcher Worker: Unable to send ready state. This is a bug. {err}" + ); + } + + // this will run forever until the messaging channel is closed + Self::main_thread_loop(conn, message_rx); + }) + .unwrap(); + + // note double question mark, deals with both the channel RecvError and whatever error + // might be sent across that channel + ready_rx.recv()??; + + Ok(EventDispatcherWorker { + sender: message_tx, + must_block, + }) + } + + /// Let the worker know that it should send the request that is stored in the DB under the given + /// ID, and delete that DB entry once it's done. + /// + /// A successful result only means that the request was successfully enqueued, not that it was + /// actually made. If you need to wait until the latter has happened, call `wait_until_complete()` + /// on the returned `EventDispatcherResult`. + /// + /// The worker has a limited queue size (1000 by default). If the queue is already full, the + /// call to `initiate_send()` will block until space has become available. + pub fn initiate_send( + &self, + id: i64, + disable_retries: bool, + timeout_override: Option, + ) -> Result { + let (sender, receiver) = channel(); + debug!("Event Dispatcher Worker: sending payload {id}"); + + self.sender.send(WorkerMessage { + task: WorkerTask::Payload { + id, + disable_retries, + timeout_override, + }, + completion: sender, + })?; + + let result = EventDispatcherResult { + receiver: Some(receiver), + }; + + if !self.must_block { + return Ok(result); + } + + result.wait_until_complete(); + + let result = EventDispatcherResult { receiver: None }; + Ok(result) + } + + #[cfg(test)] + pub fn noop(&self) -> Result { + let (sender, receiver) = channel(); + debug!("Event Dispatcher Worker: sending no-op"); + + self.sender.send(WorkerMessage { + task: WorkerTask::NoOp, + completion: sender, + })?; + + // we're ignoring `must_block` here -- the only point of `noop()` is to call + // `wait_until_complete()` immedately anyway, so we'll let the caller do that. + + Ok(EventDispatcherResult { + receiver: Some(receiver), + }) + } + + fn main_thread_loop(conn: EventDispatcherDbConnection, message_rx: Receiver) { + // main loop of the thread -- get message from channel, grab data from DB, send request, + // delete from DB, acknowledge + loop { + let Ok(WorkerMessage { task, completion }) = message_rx.recv() else { + info!("Event Dispatcher Worker: channel closed, terminating worker thread."); + return; + }; + + let WorkerTask::Payload { + id, + disable_retries, + timeout_override, + } = task + else { + // no-op -- just ack and move on + debug!("Event Dispatcher Worker: doing no-op"); + let _ = completion.send(()); + continue; + }; + + debug!("Event Dispatcher Worker: doing payload {id}"); + + // This will block forever if we were passed a non-existing ID. Don't do that. + let mut payload = conn.get_payload_with_retry(id); + + // Deliberately not handling the error case of `duration_since()` -- if the `timestamp` + // is *after* `now` (which should be extremely rare), the most likely reason is a *slight* + // adjustment to the the system clock (e.g. NTP sync) that happened between storing the + // entity and retrieving it, and that should be fine. + // If there was a *major* adjustment, all bets are off anyway. You shouldn't mess with your + // clock on a server running a node. + if let Ok(age) = SystemTime::now().duration_since(payload.timestamp) { + if age.as_secs() > 5 * 60 { + warn!( + "Event Dispatcher Worker: Event payload transmitting more than 5 minutes after event"; + "age_ms" => age.as_millis(), + "id"=> id + ); + } + } + + if let Some(timeout_override) = timeout_override { + payload.request_data.timeout = timeout_override; + } + + Self::make_http_request_and_delete_from_db( + &payload.request_data, + disable_retries, + id, + &conn, + ); + + // We're ignoring the result of this call -- if the requester has dropped the receiver + // in the meantime, that's fine. That is the usual case of fire-and-forget calls. + let _ = completion.send(()); + } + } + + fn make_http_request_and_delete_from_db( + data: &EventRequestData, + disable_retries: bool, + id: i64, + conn: &EventDispatcherDbConnection, + ) { + let http_result = Self::make_http_request(data, disable_retries); + + if let Err(err) = http_result { + // log but continue + error!("EventDispatcher: dispatching failed"; "url" => data.url.clone(), "error" => ?err); + } + + #[cfg(test)] + if TEST_EVENT_OBSERVER_SKIP_RETRY.get() { + warn!("Fault injection: skipping deletion of payload"); + return; + } + + // We're deleting regardless of result -- if retries are disabled, that means + // we're supposed to forget about it in case of failure. If they're not disabled, + // then we wouldn't be here in case of failue, because `make_http_request` retries + // until it's successful (with the exception of the above fault injection which + // simulates a shutdown). + let deletion_result = conn.delete_payload(id); + + if let Err(e) = deletion_result { + error!( + "Event observer: failed to delete pending payload from database"; + "error" => ?e + ); + } + } + + fn make_http_request( + data: &EventRequestData, + disable_retries: bool, + ) -> Result<(), EventDispatcherError> { + debug!( + "Event dispatcher: Sending payload"; "url" => &data.url, "bytes" => data.payload_bytes.len() + ); + + let url = Url::parse(&data.url) + .unwrap_or_else(|_| panic!("Event dispatcher: unable to parse {} as a URL", data.url)); + + let host = url.host_str().expect("Invalid URL: missing host"); + let port = url.port_or_known_default().unwrap_or(80); + let peerhost: PeerHost = format!("{host}:{port}") + .parse() + .unwrap_or(PeerHost::DNS(host.to_string(), port)); + + let mut backoff = Duration::from_millis(100); + let mut attempts: i32 = 0; + // Cap the backoff at 3x the timeout + let max_backoff = data.timeout.saturating_mul(3); + + loop { + let mut request = StacksHttpRequest::new_for_peer( + peerhost.clone(), + "POST".into(), + url.path().into(), + HttpRequestContents::new().payload_json_bytes(Arc::clone(&data.payload_bytes)), + ) + .unwrap_or_else(|_| panic!("FATAL: failed to encode infallible data as HTTP request")); + request.add_header("Connection".into(), "close".into()); + match send_http_request(host, port, request, data.timeout) { + Ok(response) => { + if response.preamble().status_code == 200 { + debug!( + "Event dispatcher: Successful POST"; "url" => %url + ); + break; + } else { + error!( + "Event dispatcher: Failed POST"; "url" => %url, "response" => ?response.preamble() + ); + } + } + Err(err) => { + warn!( + "Event dispatcher: connection or request failed to {host}:{port} - {err:?}"; + "backoff" => ?backoff, + "attempts" => attempts + ); + if disable_retries { + warn!("Observer is configured in disable_retries mode: skipping retry of payload"); + return Err(err.into()); + } + #[cfg(test)] + if TEST_EVENT_OBSERVER_SKIP_RETRY.get() { + warn!("Fault injection: skipping retry of payload"); + return Err(err.into()); + } + } + } + + sleep(backoff); + let jitter: u64 = rand::thread_rng().gen_range(0..100); + backoff = std::cmp::min( + backoff.saturating_mul(2) + Duration::from_millis(jitter), + max_backoff, + ); + attempts = attempts.saturating_add(1); + } + + Ok(()) + } +} diff --git a/stacks-node/src/main.rs b/stacks-node/src/main.rs index c1b7c9e99ad..f660fef12f5 100644 --- a/stacks-node/src/main.rs +++ b/stacks-node/src/main.rs @@ -1,3 +1,19 @@ +// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation +// Copyright (C) 2020-2026 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + #[macro_use] extern crate serde_derive; #[macro_use] @@ -249,6 +265,20 @@ fn cli_get_miner_spend( spend_amount } +/// If the previous session was terminated before all the pending events had been sent, +/// the DB will still contain them. Work through that before doing anything new. +/// Pending events for observers that are no longer registered will be discarded. +fn send_pending_event_payloads(conf: &Config) { + // This dispatcher gets a queue size of 0 to ensure that it blocks. Technically + // process_pending_payloads() always blocks; this is just and additional safeguard. + let mut event_dispatcher = + EventDispatcher::new_with_custom_queue_size(conf.get_working_dir(), 0); + for observer in &conf.events_observers { + event_dispatcher.register_observer(observer); + } + event_dispatcher.process_pending_payloads(); +} + fn main() { panic::set_hook(Box::new(|panic_info| { error!("Process abort due to thread panic: {panic_info}"); @@ -411,6 +441,8 @@ fn main() { debug!("burnchain configuration {:?}", &conf.burnchain); debug!("connection configuration {:?}", &conf.connection_options); + send_pending_event_payloads(&conf); + let num_round: u64 = 0; // Infinite number of rounds if conf.burnchain.mode == "helium" || conf.burnchain.mode == "mocknet" { diff --git a/stacks-node/src/node.rs b/stacks-node/src/node.rs index a150ec64fb1..7f560423410 100644 --- a/stacks-node/src/node.rs +++ b/stacks-node/src/node.rs @@ -1,3 +1,19 @@ +// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation +// Copyright (C) 2020-2026 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + use std::collections::{HashMap, HashSet}; use std::net::SocketAddr; use std::thread::JoinHandle; @@ -338,12 +354,14 @@ impl Node { ) .expect("FATAL: failed to initiate mempool"); - let mut event_dispatcher = EventDispatcher::new(config.get_working_dir()); + let mut event_dispatcher = EventDispatcher::new_with_custom_queue_size( + config.get_working_dir(), + config.node.effective_event_dispatcher_queue_size(), + ); for observer in &config.events_observers { event_dispatcher.register_observer(observer); } - event_dispatcher.process_pending_payloads(); let burnchain_config = config.get_burnchain(); diff --git a/stacks-node/src/run_loop/boot_nakamoto.rs b/stacks-node/src/run_loop/boot_nakamoto.rs index e2644f1f5e4..cfd80317507 100644 --- a/stacks-node/src/run_loop/boot_nakamoto.rs +++ b/stacks-node/src/run_loop/boot_nakamoto.rs @@ -1,5 +1,5 @@ // Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation -// Copyright (C) 2020-2023 Stacks Open Internet Foundation +// Copyright (C) 2020-2026 Stacks Open Internet Foundation // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -83,7 +83,7 @@ impl BootRunLoop { InnerLoops::Epoch2(neon), ) } else { - let naka = NakaRunLoop::new(config.clone(), None, None, None); + let naka = NakaRunLoop::new(config.clone(), None, None, None, None); ( naka.get_coordinator_channel().unwrap(), InnerLoops::Epoch3(naka), @@ -184,6 +184,7 @@ impl BootRunLoop { Some(termination_switch), Some(counters), monitoring_thread, + Some(neon_loop.get_event_dispatcher()), ); let new_coord_channels = naka .get_coordinator_channel() diff --git a/stacks-node/src/run_loop/mod.rs b/stacks-node/src/run_loop/mod.rs index 3fc5195c27c..5a903aec113 100644 --- a/stacks-node/src/run_loop/mod.rs +++ b/stacks-node/src/run_loop/mod.rs @@ -171,7 +171,7 @@ pub struct RegisteredKey { } pub fn announce_boot_receipts( - event_dispatcher: &mut EventDispatcher, + event_dispatcher: &EventDispatcher, chainstate: &StacksChainState, pox_constants: &PoxConstants, boot_receipts: &[StacksTransactionReceipt], diff --git a/stacks-node/src/run_loop/nakamoto.rs b/stacks-node/src/run_loop/nakamoto.rs index 6c9e3ea1663..3983e8300be 100644 --- a/stacks-node/src/run_loop/nakamoto.rs +++ b/stacks-node/src/run_loop/nakamoto.rs @@ -1,5 +1,5 @@ // Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation -// Copyright (C) 2020-2023 Stacks Open Internet Foundation +// Copyright (C) 2020-2026 Stacks Open Internet Foundation // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -77,11 +77,16 @@ pub struct RunLoop { impl RunLoop { /// Sets up a runloop and node, given a config. + /// + /// If no event_dispatcher is passed, a new one is created. Allowing one to be passed in + /// allows the nakamoto runloop to continue using the same event dispatcher as the + /// neon runloop at the epoch 2->3 transition. pub fn new( config: Config, should_keep_running: Option>, counters: Option, monitoring_thread: Option>>, + event_dispatcher: Option, ) -> Self { let channels = CoordinatorCommunication::instantiate(); let should_keep_running = @@ -91,11 +96,16 @@ impl RunLoop { config.burnchain.burn_fee_cap, ))); - let mut event_dispatcher = EventDispatcher::new(config.get_working_dir()); - for observer in config.events_observers.iter() { - event_dispatcher.register_observer(observer); - } - event_dispatcher.process_pending_payloads(); + let event_dispatcher = event_dispatcher.unwrap_or_else(|| { + let mut event_dispatcher = EventDispatcher::new_with_custom_queue_size( + config.get_working_dir(), + config.node.event_dispatcher_queue_size, + ); + for observer in config.events_observers.iter() { + event_dispatcher.register_observer(observer); + } + event_dispatcher + }); Self { config, diff --git a/stacks-node/src/run_loop/neon.rs b/stacks-node/src/run_loop/neon.rs index 0a33aedc4bf..2eaf993814c 100644 --- a/stacks-node/src/run_loop/neon.rs +++ b/stacks-node/src/run_loop/neon.rs @@ -1,3 +1,19 @@ +// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation +// Copyright (C) 2020-2026 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + #[cfg(test)] use std::sync::atomic::AtomicU64; use std::sync::atomic::{AtomicBool, Ordering}; @@ -314,11 +330,13 @@ impl RunLoop { config.burnchain.burn_fee_cap, ))); - let mut event_dispatcher = EventDispatcher::new(config.get_working_dir()); + let mut event_dispatcher = EventDispatcher::new_with_custom_queue_size( + config.get_working_dir(), + config.node.effective_event_dispatcher_queue_size(), + ); for observer in config.events_observers.iter() { event_dispatcher.register_observer(observer); } - event_dispatcher.process_pending_payloads(); Self { config, diff --git a/stacks-node/src/tests/neon_integrations.rs b/stacks-node/src/tests/neon_integrations.rs index fb6a860fb4f..58510f42a22 100644 --- a/stacks-node/src/tests/neon_integrations.rs +++ b/stacks-node/src/tests/neon_integrations.rs @@ -1,3 +1,19 @@ +// Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation +// Copyright (C) 2020-2026 Stacks Open Internet Foundation +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + use std::collections::{HashMap, HashSet}; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicU64, Ordering}; @@ -292,7 +308,9 @@ pub mod test_observer { use warp::Filter; use {tokio, warp}; - use crate::event_dispatcher::{MinedBlockEvent, MinedMicroblockEvent, MinedNakamotoBlockEvent}; + use crate::event_dispatcher::{ + self, MinedBlockEvent, MinedMicroblockEvent, MinedNakamotoBlockEvent, + }; use crate::Config; pub const EVENT_OBSERVER_PORT: u16 = 50303; @@ -520,51 +538,70 @@ pub mod test_observer { Ok(warp::http::StatusCode::OK) } + /// Waits until all events that have been queued for dispatch until now + /// have actually been delivered. This ensures the tests have all the data + /// they need to assert on. + fn catch_up() { + event_dispatcher::catch_up_all_event_dispatchers(); + } + pub fn get_stacker_sets() -> Vec<(StacksBlockId, u64, RewardSet)> { + catch_up(); STACKER_SETS.lock().unwrap().clone() } pub fn get_memtxs() -> Vec { + catch_up(); MEMTXS.lock().unwrap().clone() } pub fn get_memtx_drops() -> Vec<(String, String)> { + catch_up(); MEMTXS_DROPPED.lock().unwrap().clone() } pub fn get_blocks() -> Vec { + catch_up(); NEW_BLOCKS.lock().unwrap().clone() } pub fn get_microblocks() -> Vec { + catch_up(); NEW_MICROBLOCKS.lock().unwrap().clone() } pub fn get_burn_blocks() -> Vec { + catch_up(); BURN_BLOCKS.lock().unwrap().clone() } pub fn get_attachments() -> Vec { + catch_up(); ATTACHMENTS.lock().unwrap().clone() } pub fn get_mined_blocks() -> Vec { + catch_up(); MINED_BLOCKS.lock().unwrap().clone() } pub fn get_mined_microblocks() -> Vec { + catch_up(); MINED_MICROBLOCKS.lock().unwrap().clone() } pub fn get_mined_nakamoto_blocks() -> Vec { + catch_up(); MINED_NAKAMOTO_BLOCKS.lock().unwrap().clone() } pub fn get_stackerdb_chunks() -> Vec { + catch_up(); NEW_STACKERDB_CHUNKS.lock().unwrap().clone() } pub fn get_proposal_responses() -> Vec { + catch_up(); PROPOSAL_RESPONSES.lock().unwrap().clone() } @@ -655,6 +692,7 @@ pub mod test_observer { } pub fn clear() { + catch_up(); NEW_BLOCKS.lock().unwrap().clear(); MINED_BLOCKS.lock().unwrap().clear(); MINED_MICROBLOCKS.lock().unwrap().clear(); diff --git a/stackslib/src/config/mod.rs b/stackslib/src/config/mod.rs index 4305a1bcc89..395194f6221 100644 --- a/stackslib/src/config/mod.rs +++ b/stackslib/src/config/mod.rs @@ -1,5 +1,5 @@ // Copyright (C) 2013-2020 Blockstack PBC, a public benefit corporation -// Copyright (C) 2020-2024 Stacks Open Internet Foundation +// Copyright (C) 2020-2026 Stacks Open Internet Foundation // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -2145,6 +2145,30 @@ pub struct NodeConfig { /// @default: `300` (5 minutes) /// @units: seconds pub chain_liveness_poll_time_secs: u64, + /// By default, HTTP requests to event observers block the operation of the node + /// until a successful response is received from the observer. This creates a + /// predictable order of operations, but it also means that an event observer that + /// is slow to respond might stall the node. This can be prevented by changing this + /// setting to `false`, in which case those requests are enqueued to be delivered + /// on a background thread. Only if the queue is full will new requests cause + /// blocking again. The size of that queue can be controlled with the + /// `event_dispatcher_queue_size` setting. + /// + /// Pending requests are persisted across restarts of the node. On restart, the + /// node will deliver all remaining event payloads before resuming normal operations. + /// --- + /// @default: `true` + pub event_dispatcher_blocking: bool, + /// This setting does nothing if the `event_dispatcher_blocking` has its default + /// value of `true`. But if the event dispatcher is set to be non-blocking, this + /// queue size controls how many events can be in-flight (not yet delivered) before + /// the event dispatcher becomes blocking again until space becomes available. + /// + /// Setting this value to `0` is equivalent to setting `event_dispatcher_blocking` + /// to `true`, as no in-flight requests are allowed. + /// --- + /// @default: `1_000` + pub event_dispatcher_queue_size: usize, /// A list of specific StackerDB contracts (identified by their qualified contract /// identifiers, e.g., "SP000000000000000000002Q6VF78.pox-3") that this node /// should actively replicate. @@ -2429,6 +2453,8 @@ impl Default for NodeConfig { fault_injection_block_push_fail_probability: None, fault_injection_hide_blocks: false, chain_liveness_poll_time_secs: 300, + event_dispatcher_blocking: true, + event_dispatcher_queue_size: 1000, stacker_dbs: vec![], txindex: false, } @@ -2590,6 +2616,14 @@ impl NodeConfig { false, ) } + + pub fn effective_event_dispatcher_queue_size(&self) -> usize { + if self.event_dispatcher_blocking { + 0 + } else { + self.event_dispatcher_queue_size + } + } } #[derive(Clone, Debug, PartialEq)] @@ -3841,6 +3875,9 @@ pub struct NodeConfigFile { /// At most, how often should the chain-liveness thread /// wake up the chains-coordinator. Defaults to 300s (5 min). pub chain_liveness_poll_time_secs: Option, + pub event_dispatcher_blocking: Option, + /// Only relevant if `event_dispatcher_blocking` is false + pub event_dispatcher_queue_size: Option, /// Stacker DBs we replicate pub stacker_dbs: Option>, /// fault injection: fail to push blocks with this probability (0-100) @@ -3921,6 +3958,12 @@ impl NodeConfigFile { chain_liveness_poll_time_secs: self .chain_liveness_poll_time_secs .unwrap_or(default_node_config.chain_liveness_poll_time_secs), + event_dispatcher_blocking: self + .event_dispatcher_blocking + .unwrap_or(default_node_config.event_dispatcher_blocking), + event_dispatcher_queue_size: self + .event_dispatcher_queue_size + .unwrap_or(default_node_config.event_dispatcher_queue_size), stacker_dbs: self .stacker_dbs .unwrap_or_default()