Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions src/jobs/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ pub struct Job<T> {
pub data: T,
#[serde(skip_serializing_if = "Option::is_none")]
pub request_id: Option<String>,
/// Unix epoch (seconds) when this job is intended to become available for
/// processing. Set by the producer when `scheduled_on` is provided; absent
/// for immediate jobs. Consumers use this to compute queue pickup latency
/// that excludes intentional scheduling delay.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub available_at: Option<String>,
Comment on lines +24 to +29
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't bake second-only precision into the new latency baseline.

available_at is documented and serialized as epoch seconds, and the new Redis path consumes it directly for queue_pickup_latency_seconds. Together with the existing second-resolution timestamp, that means Redis can only emit 0/1/2... second observations, so the new subsecond buckets become misleading for both immediate and scheduled jobs. If parity with SQS matters here, this baseline needs millisecond precision.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/jobs/job.rs` around lines 24 - 29, available_at is currently
documented/serialized as epoch seconds which forces second-only resolution for
queue_pickup_latency_seconds; change available_at to carry millisecond precision
and propagate that precision through the consumer path that computes
queue_pickup_latency_seconds and any comparisons against timestamp. Concretely:
update the Job struct field available_at to an integer millisecond epoch type
(e.g. Option<i64> or Option<i128>) and adjust serde (and the field doc) to
serialize/deserialize milliseconds, update the consumer code that reads
available_at and the existing timestamp handling so both use
millisecond-resolution arithmetic when computing queue_pickup_latency_seconds,
and ensure any Redis serialization/deserialization paths that consume
available_at accept the millisecond format.

}

impl<T> Job<T> {
Expand All @@ -32,12 +38,17 @@ impl<T> Job<T> {
job_type,
data,
request_id: None,
available_at: None,
}
}
pub fn with_request_id(mut self, id: Option<String>) -> Self {
self.request_id = id;
self
}
pub fn with_scheduled_on(mut self, scheduled_on: Option<i64>) -> Self {
self.available_at = scheduled_on.map(|ts| ts.to_string());
self
}
}

// Enum to represent different message types
Expand Down Expand Up @@ -379,6 +390,35 @@ mod tests {
assert_eq!(deserialized.data.relayer_id, "relayer-1");
}

#[test]
fn test_job_with_scheduled_on_sets_available_at() {
let tx_request = TransactionRequest::new("tx123", "relayer-1");
let job = Job::new(JobType::TransactionRequest, tx_request).with_scheduled_on(Some(12345));

assert_eq!(job.available_at.as_deref(), Some("12345"));
}

#[test]
fn test_job_serialization_preserves_available_at() {
let tx_request = TransactionRequest::new("tx123", "relayer-1");
let job = Job::new(JobType::TransactionRequest, tx_request).with_scheduled_on(Some(12345));

let serialized = serde_json::to_string(&job).unwrap();
let deserialized: Job<TransactionRequest> = serde_json::from_str(&serialized).unwrap();

assert_eq!(deserialized.available_at.as_deref(), Some("12345"));
}

#[test]
fn test_job_serialization_omits_available_at_when_not_scheduled() {
let tx_request = TransactionRequest::new("tx123", "relayer-1");
let job = Job::new(JobType::TransactionRequest, tx_request);

let serialized = serde_json::to_string(&job).unwrap();

assert!(!serialized.contains("available_at"));
}

#[test]
fn test_notification_send_serialization() {
let payload = WebhookPayload::Transaction(TransactionResponse::Evm(Box::new(
Expand Down
105 changes: 90 additions & 15 deletions src/jobs/job_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ impl JobProducerTrait for JobProducer {
transaction_process_job
);
let job = Job::new(JobType::TransactionRequest, transaction_process_job)
.with_request_id(get_request_id());
.with_request_id(get_request_id())
.with_scheduled_on(scheduled_on);
let request_id = job.request_id.clone();
let tx_id = job.data.transaction_id.clone();
let relayer_id = job.data.relayer_id.clone();
Expand Down Expand Up @@ -155,7 +156,8 @@ impl JobProducerTrait for JobProducer {
scheduled_on: Option<i64>,
) -> Result<(), JobProducerError> {
let job = Job::new(JobType::TransactionSend, transaction_submit_job)
.with_request_id(get_request_id());
.with_request_id(get_request_id())
.with_scheduled_on(scheduled_on);
let request_id = job.request_id.clone();
let tx_id = job.data.transaction_id.clone();
let relayer_id = job.data.relayer_id.clone();
Expand Down Expand Up @@ -191,7 +193,8 @@ impl JobProducerTrait for JobProducer {
JobType::TransactionStatusCheck,
transaction_status_check_job.clone(),
)
.with_request_id(get_request_id());
.with_request_id(get_request_id())
.with_scheduled_on(scheduled_on);
let request_id = job.request_id.clone();
let tx_id = job.data.transaction_id.clone();
let relayer_id = job.data.relayer_id.clone();
Expand Down Expand Up @@ -222,7 +225,8 @@ impl JobProducerTrait for JobProducer {
scheduled_on: Option<i64>,
) -> Result<(), JobProducerError> {
let job = Job::new(JobType::NotificationSend, notification_send_job)
.with_request_id(get_request_id());
.with_request_id(get_request_id())
.with_scheduled_on(scheduled_on);
let request_id = job.request_id.clone();
let notification_id = job.data.notification_id.clone();

Expand All @@ -249,8 +253,9 @@ impl JobProducerTrait for JobProducer {
swap_request_job: TokenSwapRequest,
scheduled_on: Option<i64>,
) -> Result<(), JobProducerError> {
let job =
Job::new(JobType::TokenSwapRequest, swap_request_job).with_request_id(get_request_id());
let job = Job::new(JobType::TokenSwapRequest, swap_request_job)
.with_request_id(get_request_id())
.with_scheduled_on(scheduled_on);
let request_id = job.request_id.clone();
let relayer_id = job.data.relayer_id.clone();
let backend = self.queue_backend();
Expand Down Expand Up @@ -280,7 +285,8 @@ impl JobProducerTrait for JobProducer {
JobType::RelayerHealthCheck,
relayer_health_check_job.clone(),
)
.with_request_id(get_request_id());
.with_request_id(get_request_id())
.with_scheduled_on(scheduled_on);
let request_id = job.request_id.clone();
let relayer_id = job.data.relayer_id.clone();
let backend = self.queue_backend();
Expand Down Expand Up @@ -317,6 +323,8 @@ mod tests {
struct TestRedisStorage<T> {
pub push_called: bool,
pub schedule_called: bool,
pub last_job: Option<T>,
pub last_scheduled_timestamp: Option<i64>,
_phantom: std::marker::PhantomData<T>,
}

Expand All @@ -325,17 +333,24 @@ mod tests {
Self {
push_called: false,
schedule_called: false,
last_job: None,
last_scheduled_timestamp: None,
_phantom: std::marker::PhantomData,
}
}
}

async fn push(&mut self, _job: T) -> Result<(), JobProducerError> {
impl<T: Clone> TestRedisStorage<T> {
async fn push(&mut self, job: T) -> Result<(), JobProducerError> {
self.push_called = true;
self.last_job = Some(job);
Ok(())
}

async fn schedule(&mut self, _job: T, _timestamp: i64) -> Result<(), JobProducerError> {
async fn schedule(&mut self, job: T, timestamp: i64) -> Result<(), JobProducerError> {
self.schedule_called = true;
self.last_job = Some(job);
self.last_scheduled_timestamp = Some(timestamp);
Ok(())
}
}
Expand Down Expand Up @@ -410,7 +425,8 @@ mod tests {
scheduled_on: Option<i64>,
) -> Result<(), JobProducerError> {
let mut queue = self.queue.lock().await;
let job = Job::new(JobType::TransactionRequest, transaction_process_job);
let job = Job::new(JobType::TransactionRequest, transaction_process_job)
.with_scheduled_on(scheduled_on);

match scheduled_on {
Some(scheduled_on) => {
Expand All @@ -433,7 +449,8 @@ mod tests {
scheduled_on: Option<i64>,
) -> Result<(), JobProducerError> {
let mut queue = self.queue.lock().await;
let job = Job::new(JobType::TransactionSend, transaction_submit_job);
let job = Job::new(JobType::TransactionSend, transaction_submit_job)
.with_scheduled_on(scheduled_on);

match scheduled_on {
Some(on) => {
Expand All @@ -456,7 +473,8 @@ mod tests {
let job = Job::new(
JobType::TransactionStatusCheck,
transaction_status_check_job.clone(),
);
)
.with_scheduled_on(scheduled_on);

// Route to the appropriate queue based on network type
use crate::models::NetworkType;
Expand Down Expand Up @@ -485,7 +503,8 @@ mod tests {
scheduled_on: Option<i64>,
) -> Result<(), JobProducerError> {
let mut queue = self.queue.lock().await;
let job = Job::new(JobType::NotificationSend, notification_send_job);
let job = Job::new(JobType::NotificationSend, notification_send_job)
.with_scheduled_on(scheduled_on);

match scheduled_on {
Some(on) => {
Expand All @@ -505,7 +524,8 @@ mod tests {
scheduled_on: Option<i64>,
) -> Result<(), JobProducerError> {
let mut queue = self.queue.lock().await;
let job = Job::new(JobType::TokenSwapRequest, swap_request_job);
let job = Job::new(JobType::TokenSwapRequest, swap_request_job)
.with_scheduled_on(scheduled_on);

match scheduled_on {
Some(on) => {
Expand All @@ -525,7 +545,8 @@ mod tests {
scheduled_on: Option<i64>,
) -> Result<(), JobProducerError> {
let mut queue = self.queue.lock().await;
let job = Job::new(JobType::RelayerHealthCheck, relayer_health_check_job);
let job = Job::new(JobType::RelayerHealthCheck, relayer_health_check_job)
.with_scheduled_on(scheduled_on);

match scheduled_on {
Some(scheduled_on) => {
Expand Down Expand Up @@ -841,6 +862,36 @@ mod tests {
assert!(!queue.transaction_status_queue_evm.push_called);
}

#[tokio::test]
async fn test_scheduled_status_check_persists_available_at() {
use crate::models::NetworkType;
let producer = TestJobProducer::new();

let status_job =
TransactionStatusCheck::new("tx-evm-scheduled", "relayer-1", NetworkType::Evm);
let scheduled_timestamp = calculate_scheduled_timestamp(30);
producer
.produce_check_transaction_status_job(status_job, Some(scheduled_timestamp))
.await
.unwrap();

let queue = producer.get_queue().await;
let stored_job = queue
.transaction_status_queue_evm
.last_job
.expect("scheduled status check should be stored");
let expected_available_at = scheduled_timestamp.to_string();

assert_eq!(
stored_job.available_at.as_deref(),
Some(expected_available_at.as_str())
);
assert_eq!(
queue.transaction_status_queue_evm.last_scheduled_timestamp,
Some(scheduled_timestamp)
);
}

#[tokio::test]
async fn test_submit_transaction_scheduled() {
let producer = TestJobProducer::new();
Expand All @@ -857,6 +908,30 @@ mod tests {
assert!(!queue.transaction_submission_queue.push_called);
}

#[tokio::test]
async fn test_scheduled_submit_job_persists_available_at() {
let producer = TestJobProducer::new();

let submit_job = TransactionSend::submit("tx-scheduled", "relayer-1");
let scheduled_timestamp = calculate_scheduled_timestamp(15);
producer
.produce_submit_transaction_job(submit_job, Some(scheduled_timestamp))
.await
.unwrap();

let queue = producer.get_queue().await;
let stored_job = queue
.transaction_submission_queue
.last_job
.expect("scheduled submission should be stored");
let expected_available_at = scheduled_timestamp.to_string();

assert_eq!(
stored_job.available_at.as_deref(),
Some(expected_available_at.as_str())
);
}

#[tokio::test]
async fn test_notification_job_scheduled() {
let producer = TestJobProducer::new();
Expand Down
58 changes: 58 additions & 0 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ pub fn observe_processing_time(relayer_id: &str, network_type: &str, stage: &str
.with_label_values(&[relayer_id, network_type, stage])
.observe(secs);
}

/// Observe queue pickup latency (time from send to consumer pickup).
pub fn observe_queue_pickup_latency(queue_type: &str, backend: &str, secs: f64) {
QUEUE_PICKUP_LATENCY
.with_label_values(&[queue_type, backend])
.observe(secs);
}
use sysinfo::{Disks, System};

lazy_static! {
Expand Down Expand Up @@ -206,6 +213,15 @@ lazy_static! {
histogram_vec
};

// Histogram for queue pickup latency (time from send to consumer pickup).
pub static ref QUEUE_PICKUP_LATENCY: HistogramVec = {
let histogram_opts = HistogramOpts::new("queue_pickup_latency_seconds", "Queue pickup latency in seconds (send to consumer pickup)")
.buckets(vec![0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0, 600.0]);
let histogram_vec = HistogramVec::new(histogram_opts, &["queue_type", "backend"]).unwrap();
REGISTRY.register(Box::new(histogram_vec.clone())).unwrap();
histogram_vec
};

// Histogram for RPC call latency.
pub static ref RPC_CALL_LATENCY: HistogramVec = {
let histogram_opts = HistogramOpts::new("rpc_call_latency_seconds", "RPC call latency in seconds")
Expand Down Expand Up @@ -515,6 +531,9 @@ mod actix_tests {
.with_label_values(&["test-relayer", "stellar"])
.inc();

// Queue pickup latency
observe_queue_pickup_latency("transaction-request", "sqs", 0.5);

let metrics = gather_metrics().expect("failed to gather metrics");
let output = String::from_utf8(metrics).expect("metrics output is not valid UTF-8");

Expand All @@ -541,6 +560,9 @@ mod actix_tests {
// TRY_AGAIN_LATER metrics
assert!(output.contains("transactions_try_again_later_success_total"));
assert!(output.contains("transactions_try_again_later_failed_total"));

// Queue pickup latency
assert!(output.contains("queue_pickup_latency_seconds"));
}

#[actix_rt::test]
Expand Down Expand Up @@ -786,4 +808,40 @@ mod processing_time_tests {
let unique: std::collections::HashSet<&str> = stages.iter().copied().collect();
assert_eq!(stages.len(), unique.len(), "stage constants must be unique");
}

#[test]
fn test_observe_queue_pickup_latency_records_to_histogram() {
let before = QUEUE_PICKUP_LATENCY
.with_label_values(&["notification", "sqs"])
.get_sample_count();

observe_queue_pickup_latency("notification", "sqs", 1.5);

let after = QUEUE_PICKUP_LATENCY
.with_label_values(&["notification", "sqs"])
.get_sample_count();

assert_eq!(after, before + 1, "sample count should increase by 1");
}

#[test]
fn test_observe_queue_pickup_latency_both_backends() {
for backend in &["sqs", "redis"] {
let before = QUEUE_PICKUP_LATENCY
.with_label_values(&["relayer-health-check", backend])
.get_sample_count();

observe_queue_pickup_latency("relayer-health-check", backend, 0.25);

let after = QUEUE_PICKUP_LATENCY
.with_label_values(&["relayer-health-check", backend])
.get_sample_count();

assert_eq!(
after,
before + 1,
"sample count should increase by 1 for backend {backend}"
);
}
}
}
Loading
Loading