diff --git a/src/jobs/job.rs b/src/jobs/job.rs index 8cbe2cdfe..396c1511a 100644 --- a/src/jobs/job.rs +++ b/src/jobs/job.rs @@ -21,6 +21,12 @@ pub struct Job { pub data: T, #[serde(skip_serializing_if = "Option::is_none")] pub request_id: Option, + /// Unix epoch (seconds) when this job is intended to become available for + /// processing. Set by the producer when `scheduled_on` is provided; absent + /// for immediate jobs. Consumers use this to compute queue pickup latency + /// that excludes intentional scheduling delay. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub available_at: Option, } impl Job { @@ -32,12 +38,17 @@ impl Job { job_type, data, request_id: None, + available_at: None, } } pub fn with_request_id(mut self, id: Option) -> Self { self.request_id = id; self } + pub fn with_scheduled_on(mut self, scheduled_on: Option) -> Self { + self.available_at = scheduled_on.map(|ts| ts.to_string()); + self + } } // Enum to represent different message types @@ -379,6 +390,35 @@ mod tests { assert_eq!(deserialized.data.relayer_id, "relayer-1"); } + #[test] + fn test_job_with_scheduled_on_sets_available_at() { + let tx_request = TransactionRequest::new("tx123", "relayer-1"); + let job = Job::new(JobType::TransactionRequest, tx_request).with_scheduled_on(Some(12345)); + + assert_eq!(job.available_at.as_deref(), Some("12345")); + } + + #[test] + fn test_job_serialization_preserves_available_at() { + let tx_request = TransactionRequest::new("tx123", "relayer-1"); + let job = Job::new(JobType::TransactionRequest, tx_request).with_scheduled_on(Some(12345)); + + let serialized = serde_json::to_string(&job).unwrap(); + let deserialized: Job = serde_json::from_str(&serialized).unwrap(); + + assert_eq!(deserialized.available_at.as_deref(), Some("12345")); + } + + #[test] + fn test_job_serialization_omits_available_at_when_not_scheduled() { + let tx_request = TransactionRequest::new("tx123", "relayer-1"); + let job = Job::new(JobType::TransactionRequest, tx_request); + + let serialized = serde_json::to_string(&job).unwrap(); + + assert!(!serialized.contains("available_at")); + } + #[test] fn test_notification_send_serialization() { let payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new( diff --git a/src/jobs/job_producer.rs b/src/jobs/job_producer.rs index a2415309b..aad6cb029 100644 --- a/src/jobs/job_producer.rs +++ b/src/jobs/job_producer.rs @@ -124,7 +124,8 @@ impl JobProducerTrait for JobProducer { transaction_process_job ); let job = Job::new(JobType::TransactionRequest, transaction_process_job) - .with_request_id(get_request_id()); + .with_request_id(get_request_id()) + .with_scheduled_on(scheduled_on); let request_id = job.request_id.clone(); let tx_id = job.data.transaction_id.clone(); let relayer_id = job.data.relayer_id.clone(); @@ -155,7 +156,8 @@ impl JobProducerTrait for JobProducer { scheduled_on: Option, ) -> Result<(), JobProducerError> { let job = Job::new(JobType::TransactionSend, transaction_submit_job) - .with_request_id(get_request_id()); + .with_request_id(get_request_id()) + .with_scheduled_on(scheduled_on); let request_id = job.request_id.clone(); let tx_id = job.data.transaction_id.clone(); let relayer_id = job.data.relayer_id.clone(); @@ -191,7 +193,8 @@ impl JobProducerTrait for JobProducer { JobType::TransactionStatusCheck, transaction_status_check_job.clone(), ) - .with_request_id(get_request_id()); + .with_request_id(get_request_id()) + .with_scheduled_on(scheduled_on); let request_id = job.request_id.clone(); let tx_id = job.data.transaction_id.clone(); let relayer_id = job.data.relayer_id.clone(); @@ -222,7 +225,8 @@ impl JobProducerTrait for JobProducer { scheduled_on: Option, ) -> Result<(), JobProducerError> { let job = Job::new(JobType::NotificationSend, notification_send_job) - .with_request_id(get_request_id()); + .with_request_id(get_request_id()) + .with_scheduled_on(scheduled_on); let request_id = job.request_id.clone(); let notification_id = job.data.notification_id.clone(); @@ -249,8 +253,9 @@ impl JobProducerTrait for JobProducer { swap_request_job: TokenSwapRequest, scheduled_on: Option, ) -> Result<(), JobProducerError> { - let job = - Job::new(JobType::TokenSwapRequest, swap_request_job).with_request_id(get_request_id()); + let job = Job::new(JobType::TokenSwapRequest, swap_request_job) + .with_request_id(get_request_id()) + .with_scheduled_on(scheduled_on); let request_id = job.request_id.clone(); let relayer_id = job.data.relayer_id.clone(); let backend = self.queue_backend(); @@ -280,7 +285,8 @@ impl JobProducerTrait for JobProducer { JobType::RelayerHealthCheck, relayer_health_check_job.clone(), ) - .with_request_id(get_request_id()); + .with_request_id(get_request_id()) + .with_scheduled_on(scheduled_on); let request_id = job.request_id.clone(); let relayer_id = job.data.relayer_id.clone(); let backend = self.queue_backend(); @@ -317,6 +323,8 @@ mod tests { struct TestRedisStorage { pub push_called: bool, pub schedule_called: bool, + pub last_job: Option, + pub last_scheduled_timestamp: Option, _phantom: std::marker::PhantomData, } @@ -325,17 +333,24 @@ mod tests { Self { push_called: false, schedule_called: false, + last_job: None, + last_scheduled_timestamp: None, _phantom: std::marker::PhantomData, } } + } - async fn push(&mut self, _job: T) -> Result<(), JobProducerError> { + impl TestRedisStorage { + async fn push(&mut self, job: T) -> Result<(), JobProducerError> { self.push_called = true; + self.last_job = Some(job); Ok(()) } - async fn schedule(&mut self, _job: T, _timestamp: i64) -> Result<(), JobProducerError> { + async fn schedule(&mut self, job: T, timestamp: i64) -> Result<(), JobProducerError> { self.schedule_called = true; + self.last_job = Some(job); + self.last_scheduled_timestamp = Some(timestamp); Ok(()) } } @@ -410,7 +425,8 @@ mod tests { scheduled_on: Option, ) -> Result<(), JobProducerError> { let mut queue = self.queue.lock().await; - let job = Job::new(JobType::TransactionRequest, transaction_process_job); + let job = Job::new(JobType::TransactionRequest, transaction_process_job) + .with_scheduled_on(scheduled_on); match scheduled_on { Some(scheduled_on) => { @@ -433,7 +449,8 @@ mod tests { scheduled_on: Option, ) -> Result<(), JobProducerError> { let mut queue = self.queue.lock().await; - let job = Job::new(JobType::TransactionSend, transaction_submit_job); + let job = Job::new(JobType::TransactionSend, transaction_submit_job) + .with_scheduled_on(scheduled_on); match scheduled_on { Some(on) => { @@ -456,7 +473,8 @@ mod tests { let job = Job::new( JobType::TransactionStatusCheck, transaction_status_check_job.clone(), - ); + ) + .with_scheduled_on(scheduled_on); // Route to the appropriate queue based on network type use crate::models::NetworkType; @@ -485,7 +503,8 @@ mod tests { scheduled_on: Option, ) -> Result<(), JobProducerError> { let mut queue = self.queue.lock().await; - let job = Job::new(JobType::NotificationSend, notification_send_job); + let job = Job::new(JobType::NotificationSend, notification_send_job) + .with_scheduled_on(scheduled_on); match scheduled_on { Some(on) => { @@ -505,7 +524,8 @@ mod tests { scheduled_on: Option, ) -> Result<(), JobProducerError> { let mut queue = self.queue.lock().await; - let job = Job::new(JobType::TokenSwapRequest, swap_request_job); + let job = Job::new(JobType::TokenSwapRequest, swap_request_job) + .with_scheduled_on(scheduled_on); match scheduled_on { Some(on) => { @@ -525,7 +545,8 @@ mod tests { scheduled_on: Option, ) -> Result<(), JobProducerError> { let mut queue = self.queue.lock().await; - let job = Job::new(JobType::RelayerHealthCheck, relayer_health_check_job); + let job = Job::new(JobType::RelayerHealthCheck, relayer_health_check_job) + .with_scheduled_on(scheduled_on); match scheduled_on { Some(scheduled_on) => { @@ -841,6 +862,36 @@ mod tests { assert!(!queue.transaction_status_queue_evm.push_called); } + #[tokio::test] + async fn test_scheduled_status_check_persists_available_at() { + use crate::models::NetworkType; + let producer = TestJobProducer::new(); + + let status_job = + TransactionStatusCheck::new("tx-evm-scheduled", "relayer-1", NetworkType::Evm); + let scheduled_timestamp = calculate_scheduled_timestamp(30); + producer + .produce_check_transaction_status_job(status_job, Some(scheduled_timestamp)) + .await + .unwrap(); + + let queue = producer.get_queue().await; + let stored_job = queue + .transaction_status_queue_evm + .last_job + .expect("scheduled status check should be stored"); + let expected_available_at = scheduled_timestamp.to_string(); + + assert_eq!( + stored_job.available_at.as_deref(), + Some(expected_available_at.as_str()) + ); + assert_eq!( + queue.transaction_status_queue_evm.last_scheduled_timestamp, + Some(scheduled_timestamp) + ); + } + #[tokio::test] async fn test_submit_transaction_scheduled() { let producer = TestJobProducer::new(); @@ -857,6 +908,30 @@ mod tests { assert!(!queue.transaction_submission_queue.push_called); } + #[tokio::test] + async fn test_scheduled_submit_job_persists_available_at() { + let producer = TestJobProducer::new(); + + let submit_job = TransactionSend::submit("tx-scheduled", "relayer-1"); + let scheduled_timestamp = calculate_scheduled_timestamp(15); + producer + .produce_submit_transaction_job(submit_job, Some(scheduled_timestamp)) + .await + .unwrap(); + + let queue = producer.get_queue().await; + let stored_job = queue + .transaction_submission_queue + .last_job + .expect("scheduled submission should be stored"); + let expected_available_at = scheduled_timestamp.to_string(); + + assert_eq!( + stored_job.available_at.as_deref(), + Some(expected_available_at.as_str()) + ); + } + #[tokio::test] async fn test_notification_job_scheduled() { let producer = TestJobProducer::new(); diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index 6cea71c4f..f5d10a826 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -21,6 +21,13 @@ pub fn observe_processing_time(relayer_id: &str, network_type: &str, stage: &str .with_label_values(&[relayer_id, network_type, stage]) .observe(secs); } + +/// Observe queue pickup latency (time from send to consumer pickup). +pub fn observe_queue_pickup_latency(queue_type: &str, backend: &str, secs: f64) { + QUEUE_PICKUP_LATENCY + .with_label_values(&[queue_type, backend]) + .observe(secs); +} use sysinfo::{Disks, System}; lazy_static! { @@ -206,6 +213,15 @@ lazy_static! { histogram_vec }; + // Histogram for queue pickup latency (time from send to consumer pickup). + pub static ref QUEUE_PICKUP_LATENCY: HistogramVec = { + let histogram_opts = HistogramOpts::new("queue_pickup_latency_seconds", "Queue pickup latency in seconds (send to consumer pickup)") + .buckets(vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0]); + let histogram_vec = HistogramVec::new(histogram_opts, &["queue_type", "backend"]).unwrap(); + REGISTRY.register(Box::new(histogram_vec.clone())).unwrap(); + histogram_vec + }; + // Histogram for RPC call latency. pub static ref RPC_CALL_LATENCY: HistogramVec = { let histogram_opts = HistogramOpts::new("rpc_call_latency_seconds", "RPC call latency in seconds") @@ -515,6 +531,9 @@ mod actix_tests { .with_label_values(&["test-relayer", "stellar"]) .inc(); + // Queue pickup latency + observe_queue_pickup_latency("transaction-request", "sqs", 0.5); + let metrics = gather_metrics().expect("failed to gather metrics"); let output = String::from_utf8(metrics).expect("metrics output is not valid UTF-8"); @@ -541,6 +560,9 @@ mod actix_tests { // TRY_AGAIN_LATER metrics assert!(output.contains("transactions_try_again_later_success_total")); assert!(output.contains("transactions_try_again_later_failed_total")); + + // Queue pickup latency + assert!(output.contains("queue_pickup_latency_seconds")); } #[actix_rt::test] @@ -786,4 +808,40 @@ mod processing_time_tests { let unique: std::collections::HashSet<&str> = stages.iter().copied().collect(); assert_eq!(stages.len(), unique.len(), "stage constants must be unique"); } + + #[test] + fn test_observe_queue_pickup_latency_records_to_histogram() { + let before = QUEUE_PICKUP_LATENCY + .with_label_values(&["notification", "sqs"]) + .get_sample_count(); + + observe_queue_pickup_latency("notification", "sqs", 1.5); + + let after = QUEUE_PICKUP_LATENCY + .with_label_values(&["notification", "sqs"]) + .get_sample_count(); + + assert_eq!(after, before + 1, "sample count should increase by 1"); + } + + #[test] + fn test_observe_queue_pickup_latency_both_backends() { + for backend in &["sqs", "redis"] { + let before = QUEUE_PICKUP_LATENCY + .with_label_values(&["relayer-health-check", backend]) + .get_sample_count(); + + observe_queue_pickup_latency("relayer-health-check", backend, 0.25); + + let after = QUEUE_PICKUP_LATENCY + .with_label_values(&["relayer-health-check", backend]) + .get_sample_count(); + + assert_eq!( + after, + before + 1, + "sample count should increase by 1 for backend {backend}" + ); + } + } } diff --git a/src/queues/redis/worker.rs b/src/queues/redis/worker.rs index 8a216a628..97703b5ab 100644 --- a/src/queues/redis/worker.rs +++ b/src/queues/redis/worker.rs @@ -45,6 +45,8 @@ use std::{str::FromStr, time::Duration}; use tokio::signal::unix::SignalKind; use tracing::{debug, error, info}; +use crate::metrics::observe_queue_pickup_latency; + use super::{filter_relayers_for_swap, QueueType, WorkerContext}; use crate::queues::retry_config::{ RetryBackoffConfig, NOTIFICATION_BACKOFF, RELAYER_HEALTH_BACKOFF, STATUS_EVM_BACKOFF, @@ -62,12 +64,36 @@ use crate::queues::retry_config::{ // keeping all handler business logic backend-neutral. // --------------------------------------------------------------------------- +/// Observe queue pickup latency for Redis/Apalis workers. +/// +/// Uses `available_at` (the intended availability time) when present to exclude +/// intentional scheduling delay. Falls back to `timestamp` (job creation time) +/// for immediate jobs. Only call on first attempt to avoid retry inflation. +fn observe_redis_pickup_latency( + available_at: Option<&String>, + job_timestamp: &str, + queue_type: &str, +) { + let fallback = job_timestamp.to_string(); + let baseline = available_at.unwrap_or(&fallback); + if let Ok(baseline_epoch) = baseline.parse::() { + let now = chrono::Utc::now().timestamp(); + let latency_secs = (now - baseline_epoch).max(0) as f64; + observe_queue_pickup_latency(queue_type, "redis", latency_secs); + } +} + async fn apalis_transaction_request_handler( job: Job, state: Data>, attempt: Attempt, task_id: TaskId, ) -> Result<(), apalis::prelude::Error> { + observe_redis_pickup_latency( + job.available_at.as_ref(), + &job.timestamp, + "transaction-request", + ); let ctx = WorkerContext::new(attempt.current(), task_id.to_string()); transaction_request_handler(job, (*state).clone(), ctx) .await @@ -80,6 +106,11 @@ async fn apalis_transaction_submission_handler( attempt: Attempt, task_id: TaskId, ) -> Result<(), apalis::prelude::Error> { + observe_redis_pickup_latency( + job.available_at.as_ref(), + &job.timestamp, + "transaction-submission", + ); let ctx = WorkerContext::new(attempt.current(), task_id.to_string()); transaction_submission_handler(job, (*state).clone(), ctx) .await @@ -92,6 +123,41 @@ async fn apalis_transaction_status_handler( attempt: Attempt, task_id: TaskId, ) -> Result<(), apalis::prelude::Error> { + observe_redis_pickup_latency(job.available_at.as_ref(), &job.timestamp, "status-check"); + let ctx = WorkerContext::new(attempt.current(), task_id.to_string()); + transaction_status_handler(job, (*state).clone(), ctx) + .await + .map_err(Into::into) +} + +async fn apalis_transaction_status_evm_handler( + job: Job, + state: Data>, + attempt: Attempt, + task_id: TaskId, +) -> Result<(), apalis::prelude::Error> { + observe_redis_pickup_latency( + job.available_at.as_ref(), + &job.timestamp, + "status-check-evm", + ); + let ctx = WorkerContext::new(attempt.current(), task_id.to_string()); + transaction_status_handler(job, (*state).clone(), ctx) + .await + .map_err(Into::into) +} + +async fn apalis_transaction_status_stellar_handler( + job: Job, + state: Data>, + attempt: Attempt, + task_id: TaskId, +) -> Result<(), apalis::prelude::Error> { + observe_redis_pickup_latency( + job.available_at.as_ref(), + &job.timestamp, + "status-check-stellar", + ); let ctx = WorkerContext::new(attempt.current(), task_id.to_string()); transaction_status_handler(job, (*state).clone(), ctx) .await @@ -104,6 +170,7 @@ async fn apalis_notification_handler( attempt: Attempt, task_id: TaskId, ) -> Result<(), apalis::prelude::Error> { + observe_redis_pickup_latency(job.available_at.as_ref(), &job.timestamp, "notification"); let ctx = WorkerContext::new(attempt.current(), task_id.to_string()); notification_handler(job, (*state).clone(), ctx) .await @@ -116,6 +183,11 @@ async fn apalis_token_swap_request_handler( attempt: Attempt, task_id: TaskId, ) -> Result<(), apalis::prelude::Error> { + observe_redis_pickup_latency( + job.available_at.as_ref(), + &job.timestamp, + "token-swap-request", + ); let ctx = WorkerContext::new(attempt.current(), task_id.to_string()); token_swap_request_handler(job, (*state).clone(), ctx) .await @@ -128,6 +200,11 @@ async fn apalis_relayer_health_check_handler( attempt: Attempt, task_id: TaskId, ) -> Result<(), apalis::prelude::Error> { + observe_redis_pickup_latency( + job.available_at.as_ref(), + &job.timestamp, + "relayer-health-check", + ); let ctx = WorkerContext::new(attempt.current(), task_id.to_string()); relayer_health_check_handler(job, (*state).clone(), ctx) .await @@ -306,7 +383,7 @@ where )) .data(app_state.clone()) .backend(queue.transaction_status_queue_evm.clone()) - .build_fn(apalis_transaction_status_handler); + .build_fn(apalis_transaction_status_evm_handler); // Stellar status checker - fast retries for fast finality // Stellar has sub-second finality, needs more frequent status checks @@ -326,7 +403,7 @@ where )) .data(app_state.clone()) .backend(queue.transaction_status_queue_stellar.clone()) - .build_fn(apalis_transaction_status_handler); + .build_fn(apalis_transaction_status_stellar_handler); let notification_queue_worker = WorkerBuilder::new(NOTIFICATION_SENDER) .layer(ErrorHandlingLayer::new()) diff --git a/src/queues/sqs/worker.rs b/src/queues/sqs/worker.rs index 8b92644e2..144cfb4e2 100644 --- a/src/queues/sqs/worker.rs +++ b/src/queues/sqs/worker.rs @@ -19,6 +19,7 @@ use tokio::sync::watch; use tokio::task::{JoinHandle, JoinSet}; use tracing::{debug, error, info, warn}; +use crate::metrics::observe_queue_pickup_latency; use crate::queues::{backoff_config_for_queue, retry_delay_secs}; use crate::{ config::ServerConfig, @@ -231,6 +232,7 @@ async fn run_poll_loop( .visibility_timeout(visibility_timeout as i32) .message_system_attribute_names(MessageSystemAttributeName::ApproximateReceiveCount) .message_system_attribute_names(MessageSystemAttributeName::MessageGroupId) + .message_system_attribute_names(MessageSystemAttributeName::SentTimestamp) .message_attribute_names("target_scheduled_on") .message_attribute_names("retry_attempt") .send() => result, @@ -485,6 +487,16 @@ async fn process_message( // attempt when attribute is missing. let logical_retry_attempt = parse_retry_attempt(&message).unwrap_or(attempt_number); + // Observe queue pickup latency on first delivery only. + // For scheduled messages, measure from `target_scheduled_on` (the intended + // availability time) to exclude intentional scheduling delay. + // For immediate messages, fall back to SQS `SentTimestamp` (millis). + if let Some(baseline) = queue_pickup_baseline_ms(&message, receive_count) { + let now_ms = chrono::Utc::now().timestamp_millis(); + let latency_secs = (now_ms - baseline).max(0) as f64 / 1000.0; + observe_queue_pickup_latency(queue_type.queue_name(), "sqs", latency_secs); + } + // Use SQS MessageId as the worker task_id for log correlation. let sqs_message_id = message.message_id().unwrap_or("unknown").to_string(); @@ -744,6 +756,21 @@ fn parse_retry_attempt(message: &Message) -> Option { .and_then(|value| value.parse::().ok()) } +fn queue_pickup_baseline_ms(message: &Message, receive_count: usize) -> Option { + if receive_count != 1 { + return None; + } + + parse_target_scheduled_on(message) + .map(|ts_secs| ts_secs * 1000) + .or_else(|| { + message + .attributes() + .and_then(|a| a.get(&MessageSystemAttributeName::SentTimestamp)) + .and_then(|v| v.parse::().ok()) + }) +} + fn is_fifo_queue_url(queue_url: &str) -> bool { queue_url.ends_with(".fifo") } @@ -1419,6 +1446,58 @@ mod tests { assert_eq!(parse_retry_attempt(&message), Some(999999)); } + #[test] + fn test_queue_pickup_baseline_ms_uses_scheduled_time_on_first_delivery() { + let message = Message::builder() + .message_attributes( + "target_scheduled_on", + MessageAttributeValue::builder() + .data_type("Number") + .string_value("123") + .build() + .unwrap(), + ) + .set_attributes(Some(std::collections::HashMap::from([( + MessageSystemAttributeName::SentTimestamp, + "999999".to_string(), + )]))) + .build(); + + assert_eq!(queue_pickup_baseline_ms(&message, 1), Some(123_000)); + } + + #[test] + fn test_queue_pickup_baseline_ms_falls_back_to_sent_timestamp() { + let message = Message::builder() + .set_attributes(Some(std::collections::HashMap::from([( + MessageSystemAttributeName::SentTimestamp, + "123456".to_string(), + )]))) + .build(); + + assert_eq!(queue_pickup_baseline_ms(&message, 1), Some(123456)); + } + + #[test] + fn test_queue_pickup_baseline_ms_skips_retries() { + let message = Message::builder() + .message_attributes( + "target_scheduled_on", + MessageAttributeValue::builder() + .data_type("Number") + .string_value("123") + .build() + .unwrap(), + ) + .set_attributes(Some(std::collections::HashMap::from([( + MessageSystemAttributeName::SentTimestamp, + "123456".to_string(), + )]))) + .build(); + + assert_eq!(queue_pickup_baseline_ms(&message, 2), None); + } + // ── is_fifo_queue_url: comprehensive cases ──────────────────────── #[test]