diff --git a/stacks-node/src/event_dispatcher.rs b/stacks-node/src/event_dispatcher.rs index e7aaff025db..d043da13d2f 100644 --- a/stacks-node/src/event_dispatcher.rs +++ b/stacks-node/src/event_dispatcher.rs @@ -16,6 +16,7 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; +use std::fmt; use std::path::PathBuf; #[cfg(test)] use std::sync::mpsc::channel; @@ -84,6 +85,51 @@ lazy_static! { pub static ref TEST_SKIP_BLOCK_ANNOUNCEMENT: TestFlag = TestFlag::default(); } +#[derive(Debug)] +enum EventDispatcherError { + SerializationError(serde_json::Error), + HttpError(std::io::Error), + DbError(stacks::util_lib::db::Error), +} + +impl fmt::Display for EventDispatcherError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + EventDispatcherError::SerializationError(ref e) => fmt::Display::fmt(e, f), + EventDispatcherError::HttpError(ref e) => fmt::Display::fmt(e, f), + EventDispatcherError::DbError(ref e) => fmt::Display::fmt(e, f), + } + } +} + +impl core::error::Error for EventDispatcherError { + fn cause(&self) -> Option<&dyn core::error::Error> { + match *self { + EventDispatcherError::SerializationError(ref e) => Some(e), + EventDispatcherError::HttpError(ref e) => Some(e), + EventDispatcherError::DbError(ref e) => Some(e), + } + } +} + +impl From for EventDispatcherError { + fn from(value: serde_json::Error) -> Self { + EventDispatcherError::SerializationError(value) + } +} + +impl From for EventDispatcherError { + fn from(value: stacks::util_lib::db::Error) -> Self { + EventDispatcherError::DbError(value) + } +} + +impl From for EventDispatcherError { + fn from(value: std::io::Error) -> Self { + EventDispatcherError::HttpError(value) + } +} + #[derive(Debug, Clone)] struct EventObserver { /// URL to which events will be sent @@ -157,16 +203,15 @@ pub struct EventDispatcher { block_proposal_observers_lookup: HashSet, /// Channel for sending StackerDB events to the miner coordinator pub stackerdb_channel: Arc>, - /// Path to the database where pending payloads are stored. If `None`, then - /// the database is not used and events are not recoverable across restarts. - db_path: Option, + /// Path to the database where pending payloads are stored. + db_path: PathBuf, } /// This struct is used specifically for receiving proposal responses. /// It's constructed separately to play nicely with threading. struct ProposalCallbackHandler { observers: Vec, - db_path: Option, + dispatcher: EventDispatcher, } impl ProposalCallbackReceiver for ProposalCallbackHandler { @@ -183,13 +228,8 @@ impl ProposalCallbackReceiver for ProposalCallbackHandler { }; for observer in self.observers.iter() { - EventDispatcher::send_payload_given_db_path( - &self.db_path, - observer, - &response, - PATH_PROPOSAL_RESPONSE, - None, - ); + self.dispatcher + .dispatch_to_observer(observer, &response, PATH_PROPOSAL_RESPONSE); } } } @@ -280,7 +320,7 @@ impl MemPoolEventDispatcher for EventDispatcher { } let handler = ProposalCallbackHandler { observers: callback_receivers, - db_path: self.db_path.clone(), + dispatcher: self.clone(), }; Some(Box::new(handler)) } @@ -361,20 +401,12 @@ impl BlockEventDispatcher for EventDispatcher { } } -impl Default for EventDispatcher { - fn default() -> Self { - EventDispatcher::new(None) - } -} - impl EventDispatcher { - pub fn new(working_dir: Option) -> EventDispatcher { - let db_path = if let Some(mut db_path) = working_dir { - db_path.push("event_observers.sqlite"); - Some(db_path) - } else { - None - }; + pub fn new(working_dir: PathBuf) -> EventDispatcher { + let mut db_path = working_dir; + db_path.push("event_observers.sqlite"); + EventDispatcherDbConnection::new(&db_path).expect("Failed to initialize database"); + EventDispatcher { stackerdb_channel: Arc::new(Mutex::new(StackerDBChannel::new())), registered_observers: vec![], @@ -603,11 +635,10 @@ impl EventDispatcher { ); // Send payload - self.send_payload( + self.dispatch_to_observer( &self.registered_observers[observer_id], &payload, PATH_BLOCK_PROCESSED, - None, ); } } @@ -1010,11 +1041,8 @@ impl EventDispatcher { /// Process any pending payloads in the database. /// This is called when the event dispatcher is first instantiated. pub fn process_pending_payloads(&self) { - let Some(db_path) = &self.db_path else { - return; - }; let conn = - EventDispatcherDbConnection::new(db_path).expect("Failed to initialize database"); + EventDispatcherDbConnection::new(&self.db_path).expect("Failed to initialize database"); let pending_payloads = match conn.get_pending_payloads() { Ok(payloads) => payloads, Err(e) => { @@ -1062,35 +1090,50 @@ impl EventDispatcher { continue; }; - Self::send_payload_with_bytes( - &self.db_path, - observer, - payload_bytes, - full_url.path(), - Some(id), + self.make_http_request_and_delete_from_db( + &payload_bytes, + full_url.as_str(), + observer.timeout, + observer.disable_retries, + id, ); + } + } - #[cfg(test)] - if TEST_EVENT_OBSERVER_SKIP_RETRY.get() { - warn!("Fault injection: delete_payload"); - return; - } - - if let Err(e) = conn.delete_payload(id) { + fn dispatch_to_observer( + &self, + event_observer: &EventObserver, + payload: &serde_json::Value, + path: &str, + ) { + let full_url = Self::get_full_url(event_observer, path); + let bytes = match Self::get_payload_bytes(payload) { + Ok(bytes) => bytes, + Err(err) => { error!( - "Event observer: failed to delete pending payload from database"; - "error" => ?e + "Event dispatcher: failed to serialize payload"; "path" => path, "error" => ?err ); + return; } - } + }; + + let id = self.save_to_db(&full_url, bytes.as_ref(), event_observer.timeout); + + self.make_http_request_and_delete_from_db( + &bytes, + &full_url, + event_observer.timeout, + event_observer.disable_retries, + id, + ); } - fn send_payload_directly( + fn make_http_request( payload_bytes: &Arc<[u8]>, full_url: &str, timeout: Duration, disable_retries: bool, - ) -> bool { + ) -> Result<(), EventDispatcherError> { debug!( "Event dispatcher: Sending payload"; "url" => %full_url, "bytes" => payload_bytes.len() ); @@ -1137,20 +1180,18 @@ impl EventDispatcher { "backoff" => ?backoff, "attempts" => attempts ); + if disable_retries { + warn!("Observer is configured in disable_retries mode: skipping retry of payload"); + return Err(err.into()); + } + #[cfg(test)] + if TEST_EVENT_OBSERVER_SKIP_RETRY.get() { + warn!("Fault injection: skipping retry of payload"); + return Err(err.into()); + } } } - if disable_retries { - warn!("Observer is configured in disable_retries mode: skipping retry of payload"); - return false; - } - - #[cfg(test)] - if TEST_EVENT_OBSERVER_SKIP_RETRY.get() { - warn!("Fault injection: skipping retry of payload"); - return false; - } - sleep(backoff); let jitter: u64 = rand::thread_rng().gen_range(0..100); backoff = std::cmp::min( @@ -1159,105 +1200,86 @@ impl EventDispatcher { ); attempts = attempts.saturating_add(1); } - true - } - fn send_payload( - &self, - event_observer: &EventObserver, - payload: &serde_json::Value, - path: &str, - id: Option, - ) { - Self::send_payload_given_db_path(&self.db_path, event_observer, payload, path, id); + Ok(()) } - fn send_payload_given_db_path( - db_path: &Option, - event_observer: &EventObserver, - payload: &serde_json::Value, - path: &str, - id: Option, - ) { - let payload_bytes = match serde_json::to_vec(payload) { - Ok(bytes) => Arc::<[u8]>::from(bytes), - Err(err) => { - error!( - "Event dispatcher: failed to serialize payload"; "path" => path, "error" => ?err - ); - return; - } - }; - Self::send_payload_with_bytes(db_path, event_observer, payload_bytes, path, id); + fn get_payload_bytes(payload: &serde_json::Value) -> Result, EventDispatcherError> { + let payload_bytes = serde_json::to_vec(payload)?; + Ok(Arc::<[u8]>::from(payload_bytes)) } - fn send_payload_with_bytes( - db_path: &Option, - event_observer: &EventObserver, - payload_bytes: Arc<[u8]>, - path: &str, - id: Option, - ) { - // Construct the full URL + fn get_full_url(event_observer: &EventObserver, path: &str) -> String { let url_str = if path.starts_with('/') { format!("{}{path}", &event_observer.endpoint) } else { format!("{}/{path}", &event_observer.endpoint) }; - let full_url = format!("http://{url_str}"); - - // if the observer is in "disable_retries" mode quickly send the payload without checking for the db - if event_observer.disable_retries { - Self::send_payload_directly(&payload_bytes, &full_url, event_observer.timeout, true); - } else if let Some(db_path) = db_path { - // Because the DB is initialized in the call to process_pending_payloads() during startup, - // it is *probably* ok to skip initialization here. That said, at the time of writing this is the - // only call to new_without_init(), and we might want to revisit the question whether it's - // really worth it. - let conn = EventDispatcherDbConnection::new_without_init(db_path) - .expect("Failed to open database for event observer"); - - let id = match id { - Some(id) => id, - None => { - conn.insert_payload_with_retry( - &full_url, - payload_bytes.as_ref(), - event_observer.timeout, - ); - conn.last_insert_rowid() - } - }; + format!("http://{url_str}") + } - let success = Self::send_payload_directly( - &payload_bytes, - &full_url, - event_observer.timeout, - false, - ); - // This is only `false` when the TestFlag is set to skip retries - if !success { - return; - } + fn save_to_db(&self, url: &str, payload_bytes: &[u8], timeout: Duration) -> i64 { + // Because the DB is initialized in the call to process_pending_payloads() during startup, + // it is *probably* ok to skip initialization here. That said, at the time of writing this is the + // only call to new_without_init(), and we might want to revisit the question whether it's + // really worth it. + let conn = EventDispatcherDbConnection::new_without_init(&self.db_path) + .expect("Failed to open database for event observer"); - if let Err(e) = conn.delete_payload(id) { - error!( - "Event observer: failed to delete pending payload from database"; - "error" => ?e - ); - } - } else { - // No database, just send the payload - Self::send_payload_directly(&payload_bytes, &full_url, event_observer.timeout, false); + conn.insert_payload_with_retry(&url, payload_bytes.as_ref(), timeout); + conn.last_insert_rowid() + } + + fn make_http_request_and_delete_from_db( + &self, + payload_bytes: &Arc<[u8]>, + full_url: &str, + timeout: Duration, + disable_retries: bool, + id: i64, + ) { + let http_result = + Self::make_http_request(payload_bytes, full_url, timeout, disable_retries); + + if let Err(err) = http_result { + // log but continue + error!("EventDispatcher: dispatching failed"; "url" => &full_url, "error" => ?err); } + + #[cfg(test)] + if TEST_EVENT_OBSERVER_SKIP_RETRY.get() { + warn!("Fault injection: skipping deletion of payload"); + return; + } + + // We're deleting regardless of result -- if retries are disabled, that means + // we're supposed to forget about it in case of failure. If they're not disabled, + // then we wouldn't be here in case of failue, because `make_http_request` retries + // until it's successful (with the exception of the above fault injection which + // simulates a shutdown). + let deletion_result = self.delete_from_db(id); + + if let Err(e) = deletion_result { + error!( + "Event observer: failed to delete pending payload from database"; + "error" => ?e + ); + } + } + + fn delete_from_db(&self, id: i64) -> Result<(), EventDispatcherError> { + let conn = EventDispatcherDbConnection::new_without_init(&self.db_path) + .expect("Failed to open database for event observer"); + conn.delete_payload(id)?; + Ok(()) } fn send_new_attachments(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.send_payload(event_observer, payload, PATH_ATTACHMENT_PROCESSED, None); + self.dispatch_to_observer(event_observer, payload, PATH_ATTACHMENT_PROCESSED); } fn send_new_mempool_txs(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.send_payload(event_observer, payload, PATH_MEMPOOL_TX_SUBMIT, None); + self.dispatch_to_observer(event_observer, payload, PATH_MEMPOOL_TX_SUBMIT); } /// Serializes new microblocks data into a JSON payload and sends it off to the correct path @@ -1290,7 +1312,7 @@ impl EventDispatcher { "burn_block_timestamp": burn_block_timestamp, }); - self.send_payload(event_observer, &payload, PATH_MICROBLOCK_SUBMIT, None); + self.dispatch_to_observer(event_observer, &payload, PATH_MICROBLOCK_SUBMIT); } fn send_dropped_mempool_txs( @@ -1298,15 +1320,15 @@ impl EventDispatcher { event_observer: &EventObserver, payload: &serde_json::Value, ) { - self.send_payload(event_observer, payload, PATH_MEMPOOL_TX_DROP, None); + self.dispatch_to_observer(event_observer, payload, PATH_MEMPOOL_TX_DROP); } fn send_mined_block(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.send_payload(event_observer, payload, PATH_MINED_BLOCK, None); + self.dispatch_to_observer(event_observer, payload, PATH_MINED_BLOCK); } fn send_mined_microblock(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.send_payload(event_observer, payload, PATH_MINED_MICROBLOCK, None); + self.dispatch_to_observer(event_observer, payload, PATH_MINED_MICROBLOCK); } fn send_mined_nakamoto_block( @@ -1314,15 +1336,15 @@ impl EventDispatcher { event_observer: &EventObserver, payload: &serde_json::Value, ) { - self.send_payload(event_observer, payload, PATH_MINED_NAKAMOTO_BLOCK, None); + self.dispatch_to_observer(event_observer, payload, PATH_MINED_NAKAMOTO_BLOCK); } fn send_stackerdb_chunks(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.send_payload(event_observer, payload, PATH_STACKERDB_CHUNKS, None); + self.dispatch_to_observer(event_observer, payload, PATH_STACKERDB_CHUNKS); } fn send_new_burn_block(&self, event_observer: &EventObserver, payload: &serde_json::Value) { - self.send_payload(event_observer, payload, PATH_BURN_BLOCK_SUBMIT, None); + self.dispatch_to_observer(event_observer, payload, PATH_BURN_BLOCK_SUBMIT); } } diff --git a/stacks-node/src/event_dispatcher/tests.rs b/stacks-node/src/event_dispatcher/tests.rs index da24ec8a73e..7908ca364f8 100644 --- a/stacks-node/src/event_dispatcher/tests.rs +++ b/stacks-node/src/event_dispatcher/tests.rs @@ -238,7 +238,7 @@ fn test_process_pending_payloads() { info!("endpoint: {}", endpoint); let timeout = Duration::from_secs(5); - let mut dispatcher = EventDispatcher::new(Some(dir.path().to_path_buf())); + let mut dispatcher = EventDispatcher::new(dir.path().to_path_buf()); dispatcher.register_observer(&EventObserverConfig { endpoint: endpoint.clone(), @@ -290,7 +290,7 @@ fn pending_payloads_are_skipped_if_url_does_not_match() { let mut server = mockito::Server::new(); let endpoint = server.host_with_port(); let timeout = Duration::from_secs(5); - let mut dispatcher = EventDispatcher::new(Some(dir.path().to_path_buf())); + let mut dispatcher = EventDispatcher::new(dir.path().to_path_buf()); dispatcher.register_observer(&EventObserverConfig { endpoint: endpoint.clone(), @@ -343,18 +343,13 @@ fn pending_payloads_are_skipped_if_url_does_not_match() { fn test_new_event_dispatcher_with_db() { let dir = tempdir().unwrap(); let working_dir = dir.path().to_path_buf(); - - let dispatcher = EventDispatcher::new(Some(working_dir.clone())); - let expected_db_path = working_dir.join("event_observers.sqlite"); - assert_eq!(dispatcher.db_path, Some(expected_db_path.clone())); - assert!( - !expected_db_path.exists(), - "Database file was created too soon" - ); + assert!(!expected_db_path.exists(), "Database file already exists"); - EventDispatcherDbConnection::new(&expected_db_path).expect("Failed to initialize the database"); + let dispatcher = EventDispatcher::new(working_dir.clone()); + + assert_eq!(dispatcher.db_path, expected_db_path.clone()); // Verify that the database was initialized assert!(expected_db_path.exists(), "Database file was not created"); @@ -373,13 +368,6 @@ fn test_new_event_observer() { assert_eq!(observer.disable_retries, false); } -#[test] -fn test_new_event_dispatcher_without_db() { - let dispatcher = EventDispatcher::new(None); - - assert!(dispatcher.db_path.is_none(), "Expected db_path to be None"); -} - #[test] #[serial] fn test_send_payload_with_db() { @@ -389,9 +377,7 @@ fn test_send_payload_with_db() { let working_dir = dir.path().to_path_buf(); let payload = json!({"key": "value"}); - let dispatcher = EventDispatcher::new(Some(working_dir.clone())); - let db_path = dispatcher.clone().db_path.clone().unwrap(); - EventDispatcherDbConnection::new(&db_path).expect("Failed to initialize the database"); + let dispatcher = EventDispatcher::new(working_dir.clone()); // Create a mock server let mut server = mockito::Server::new(); @@ -410,13 +396,13 @@ fn test_send_payload_with_db() { TEST_EVENT_OBSERVER_SKIP_RETRY.set(false); // Call send_payload - dispatcher.send_payload(&observer, &payload, "/test", None); + dispatcher.dispatch_to_observer(&observer, &payload, "/test"); // Verify that the payload was sent and database is empty _m.assert(); // Verify that the database is empty - let db_path = dispatcher.db_path.unwrap(); + let db_path = dispatcher.db_path; let db_path_str = db_path.to_str().unwrap(); let conn = Connection::open(db_path_str).expect("Failed to open database"); let pending_payloads = EventDispatcherDbConnection::new_from_exisiting_connection(conn) @@ -425,35 +411,6 @@ fn test_send_payload_with_db() { assert_eq!(pending_payloads.len(), 0, "Expected no pending payloads"); } -#[test] -fn test_send_payload_without_db() { - use mockito::Matcher; - - let timeout = Duration::from_secs(5); - let payload = json!({"key": "value"}); - - // Create a mock server - let mut server = mockito::Server::new(); - let _m = server - .mock("POST", "/test") - .match_header("content-type", Matcher::Regex("application/json.*".into())) - .match_body(Matcher::Json(payload.clone())) - .with_status(200) - .create(); - - let endpoint = server.url().strip_prefix("http://").unwrap().to_string(); - - let observer = EventObserver::new(endpoint, timeout, false); - - let dispatcher = EventDispatcher::new(None); - - // Call send_payload - dispatcher.send_payload(&observer, &payload, "/test", None); - - // Verify that the payload was sent - _m.assert(); -} - #[test] fn test_send_payload_success() { let port = get_random_port(); @@ -480,9 +437,11 @@ fn test_send_payload_success() { let payload = json!({"key": "value"}); - let dispatcher = EventDispatcher::new(None); + let dir = tempdir().unwrap(); + let working_dir = dir.path().to_path_buf(); + let dispatcher = EventDispatcher::new(working_dir); - dispatcher.send_payload(&observer, &payload, "/test", None); + dispatcher.dispatch_to_observer(&observer, &payload, "/test"); // Wait for the server to process the request rx.recv_timeout(Duration::from_secs(5)) @@ -498,7 +457,7 @@ fn test_send_payload_retry() { // Start a mock server in a separate thread let server = Server::http(format!("127.0.0.1:{port}")).unwrap(); - thread::spawn(move || { + let thread = thread::spawn(move || { let mut attempt = 0; while let Ok(request) = server.recv() { attempt += 1; @@ -530,13 +489,17 @@ fn test_send_payload_retry() { let payload = json!({"key": "value"}); - let dispatcher = EventDispatcher::new(None); + let dir = tempdir().unwrap(); + let working_dir = dir.path().to_path_buf(); + let dispatcher = EventDispatcher::new(working_dir); - dispatcher.send_payload(&observer, &payload, "/test", None); + dispatcher.dispatch_to_observer(&observer, &payload, "/test"); // Wait for the server to process the request rx.recv_timeout(Duration::from_secs(5)) .expect("Server did not receive request in time"); + + thread.join().unwrap(); } #[test] @@ -550,7 +513,7 @@ fn test_send_payload_timeout() { // Start a mock server in a separate thread let server = Server::http(format!("127.0.0.1:{port}")).unwrap(); - thread::spawn(move || { + let thread = thread::spawn(move || { let mut attempt = 0; // This exists to only keep request from being dropped #[allow(clippy::collection_is_never_read)] @@ -582,10 +545,12 @@ fn test_send_payload_timeout() { // Record the time before sending the payload let start_time = Instant::now(); - let dispatcher = EventDispatcher::new(None); + let dir = tempdir().unwrap(); + let working_dir = dir.path().to_path_buf(); + let dispatcher = EventDispatcher::new(working_dir); // Call the function being tested - dispatcher.send_payload(&observer, &payload, "/test", None); + dispatcher.dispatch_to_observer(&observer, &payload, "/test"); // Record the time after the function returns let elapsed_time = start_time.elapsed(); @@ -604,6 +569,8 @@ fn test_send_payload_timeout() { // Wait for the server to process the request rx.recv_timeout(Duration::from_secs(5)) .expect("Server did not receive request in time"); + + thread.join().unwrap(); } #[test] @@ -620,7 +587,7 @@ fn test_send_payload_with_db_force_restart() { info!("Starting mock server on port {port}"); // Start a mock server in a separate thread let server = Server::http(format!("127.0.0.1:{port}")).unwrap(); - thread::spawn(move || { + let thread = thread::spawn(move || { let mut attempt = 0; // This exists to only keep request from being dropped #[allow(clippy::collection_is_never_read)] @@ -670,7 +637,7 @@ fn test_send_payload_with_db_force_restart() { } }); - let mut dispatcher = EventDispatcher::new(Some(working_dir.clone())); + let mut dispatcher = EventDispatcher::new(working_dir.clone()); let observer = dispatcher.register_observer_private(&EventObserverConfig { endpoint: format!("127.0.0.1:{port}"), @@ -679,7 +646,7 @@ fn test_send_payload_with_db_force_restart() { disable_retries: false, }); - EventDispatcherDbConnection::new(&dispatcher.clone().db_path.unwrap()).unwrap(); + EventDispatcherDbConnection::new(&dispatcher.clone().db_path).unwrap(); let payload = json!({"key": "value"}); let payload2 = json!({"key": "value2"}); @@ -691,7 +658,7 @@ fn test_send_payload_with_db_force_restart() { info!("Sending payload 1"); // Send the payload - dispatcher.send_payload(&observer, &payload, "/test", None); + dispatcher.dispatch_to_observer(&observer, &payload, "/test"); // Re-enable retrying TEST_EVENT_OBSERVER_SKIP_RETRY.set(false); @@ -701,11 +668,13 @@ fn test_send_payload_with_db_force_restart() { info!("Sending payload 2"); // Send another payload - dispatcher.send_payload(&observer, &payload2, "/test", None); + dispatcher.dispatch_to_observer(&observer, &payload2, "/test"); // Wait for the server to process the requests rx.recv_timeout(Duration::from_secs(5)) .expect("Server did not receive request in time"); + + thread.join().unwrap(); } #[test] @@ -721,10 +690,12 @@ fn test_event_dispatcher_disable_retries() { let observer = EventObserver::new(endpoint, timeout, true); - let dispatcher = EventDispatcher::new(None); + let dir = tempdir().unwrap(); + let working_dir = dir.path().to_path_buf(); + let dispatcher = EventDispatcher::new(working_dir); // in non "disable_retries" mode this will run forever - dispatcher.send_payload(&observer, &payload, "/test", None); + dispatcher.dispatch_to_observer(&observer, &payload, "/test"); // Verify that the payload was sent _m.assert(); @@ -739,10 +710,12 @@ fn test_event_dispatcher_disable_retries_invalid_url() { let observer = EventObserver::new(endpoint, timeout, true); - let dispatcher = EventDispatcher::new(None); + let dir = tempdir().unwrap(); + let working_dir = dir.path().to_path_buf(); + let dispatcher = EventDispatcher::new(working_dir); // in non "disable_retries" mode this will run forever - dispatcher.send_payload(&observer, &payload, "/test", None); + dispatcher.dispatch_to_observer(&observer, &payload, "/test"); } #[test] @@ -752,7 +725,7 @@ fn block_event_with_disable_retries_observer() { let dir = tempdir().unwrap(); let working_dir = dir.path().to_path_buf(); - let mut event_dispatcher = EventDispatcher::new(Some(working_dir.clone())); + let mut event_dispatcher = EventDispatcher::new(working_dir.clone()); let config = EventObserverConfig { endpoint: String::from("255.255.255.255"), events_keys: vec![EventKeyType::MinedBlocks], @@ -952,8 +925,8 @@ fn test_block_proposal_validation_event() { let mock = server.mock("POST", "/proposal_response").create(); let endpoint = server.url().strip_prefix("http://").unwrap().to_string(); - - let mut dispatcher = EventDispatcher::new(None); + let dir = tempdir().unwrap(); + let mut dispatcher = EventDispatcher::new(dir.path().to_path_buf()); dispatcher.register_observer(&EventObserverConfig { endpoint: endpoint.clone(), events_keys: vec![EventKeyType::BlockProposal], diff --git a/stacks-node/src/nakamoto_node/miner.rs b/stacks-node/src/nakamoto_node/miner.rs index e601ceb2558..40e77490a59 100644 --- a/stacks-node/src/nakamoto_node/miner.rs +++ b/stacks-node/src/nakamoto_node/miner.rs @@ -58,6 +58,8 @@ use stacks_common::types::{PrivateKey, StacksEpochId}; #[cfg(test)] use stacks_common::util::tests::TestFlag; use stacks_common::util::vrf::VRFProof; +#[cfg(test)] +use tempfile::tempdir; use super::miner_db::MinerDB; use super::relayer::{MinerStopHandle, RelayerThread}; @@ -2055,6 +2057,8 @@ fn should_read_count_extend_units() { let (relay_sender, _rcv_2) = std::sync::mpsc::sync_channel(1); let (_coord_rcv, coord_comms) = stacks::chainstate::coordinator::comm::CoordinatorCommunication::instantiate(); + let working_dir = tempdir().unwrap(); + let mut miner = BlockMinerThread { config: Config::default(), globals: Globals::new( @@ -2087,7 +2091,7 @@ fn should_read_count_extend_units() { burn_election_block: BlockSnapshot::empty(), burn_block: BlockSnapshot::empty(), parent_tenure_id: StacksBlockId([0; 32]), - event_dispatcher: EventDispatcher::new(None), + event_dispatcher: EventDispatcher::new(working_dir.path().to_path_buf()), reason: MinerReason::Extended { burn_view_consensus_hash: ConsensusHash([0; 20]), }, diff --git a/stacks-node/src/node.rs b/stacks-node/src/node.rs index 1711dc297a7..a150ec64fb1 100644 --- a/stacks-node/src/node.rs +++ b/stacks-node/src/node.rs @@ -338,7 +338,7 @@ impl Node { ) .expect("FATAL: failed to initiate mempool"); - let mut event_dispatcher = EventDispatcher::new(Some(config.get_working_dir())); + let mut event_dispatcher = EventDispatcher::new(config.get_working_dir()); for observer in &config.events_observers { event_dispatcher.register_observer(observer); diff --git a/stacks-node/src/run_loop/nakamoto.rs b/stacks-node/src/run_loop/nakamoto.rs index 3f07ecc92d7..6c9e3ea1663 100644 --- a/stacks-node/src/run_loop/nakamoto.rs +++ b/stacks-node/src/run_loop/nakamoto.rs @@ -91,7 +91,7 @@ impl RunLoop { config.burnchain.burn_fee_cap, ))); - let mut event_dispatcher = EventDispatcher::new(Some(config.get_working_dir())); + let mut event_dispatcher = EventDispatcher::new(config.get_working_dir()); for observer in config.events_observers.iter() { event_dispatcher.register_observer(observer); } diff --git a/stacks-node/src/run_loop/neon.rs b/stacks-node/src/run_loop/neon.rs index 6ac6f4d9248..0a33aedc4bf 100644 --- a/stacks-node/src/run_loop/neon.rs +++ b/stacks-node/src/run_loop/neon.rs @@ -314,7 +314,7 @@ impl RunLoop { config.burnchain.burn_fee_cap, ))); - let mut event_dispatcher = EventDispatcher::new(Some(config.get_working_dir())); + let mut event_dispatcher = EventDispatcher::new(config.get_working_dir()); for observer in config.events_observers.iter() { event_dispatcher.register_observer(observer); }