Conversation
WalkthroughIntroduces job scheduling metadata and queue pickup latency metrics. Jobs gain an Changes
Sequence Diagram(s)sequenceDiagram
participant Job Producer
participant Job Storage
participant Worker
participant Metrics
Job Producer->>Job Producer: Create Job
Job Producer->>Job Producer: Call with_scheduled_on(timestamp)
Job Producer->>Job Storage: Store job with available_at set
Note over Job Storage: Job remains queued until available_at passes
Worker->>Job Storage: Poll for available jobs
Job Storage->>Worker: Return job with available_at
Note over Worker: Calculate latency<br/>(now - available_at)
Worker->>Metrics: observe_queue_pickup_latency(queue_type, backend, latency)
Metrics->>Metrics: Record to QUEUE_PICKUP_LATENCY histogram
Worker->>Worker: Process job
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 OpenGrep (1.16.5)src/queues/redis/worker.rs┌──────────────┐ �[32m✔�[39m �[1mOpengrep OSS�[0m �[1m Loading rules from local config...�[0m src/jobs/job_producer.rs┌──────────────┐ �[32m✔�[39m �[1mOpengrep OSS�[0m �[1m Loading rules from local config...�[0m src/metrics/mod.rs┌──────────────┐ �[32m✔�[39m �[1mOpengrep OSS�[0m �[1m Loading rules from local config...�[0m
Comment |
There was a problem hiding this comment.
Pull request overview
Adds a new Prometheus histogram to measure queue pickup latency (time from message/job creation/scheduled-availability to consumer pickup) across both queue backends (SQS + Redis/Apalis), with logic intended to exclude intentional scheduling delay.
Changes:
- Add
queue_pickup_latency_secondshistogram andobserve_queue_pickup_latency()helper. - Instrument SQS worker to compute pickup latency using
target_scheduled_on(preferred) or SQSSentTimestamp(fallback), only on first delivery. - Add
available_attoJob<T>and populate it from producers so Redis workers can compute pickup latency excluding scheduling delay; instrument Redis worker adapters.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
src/queues/sqs/worker.rs |
Requests SentTimestamp and observes pickup latency on first SQS delivery (scheduled baseline preferred). |
src/queues/redis/worker.rs |
Adds Redis/Apalis pickup-latency observation helper and wires it into each queue handler adapter; splits status handlers by network. |
src/metrics/mod.rs |
Defines/registers queue_pickup_latency_seconds histogram and exposes an observation helper + tests. |
src/jobs/job.rs |
Adds available_at field and builder method to persist intended availability time for scheduled jobs. |
src/jobs/job_producer.rs |
Populates available_at via .with_scheduled_on(...) and extends tests to verify it’s persisted for scheduled jobs. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /// for immediate jobs. Only call on first attempt to avoid retry inflation. | ||
| fn observe_redis_pickup_latency( | ||
| available_at: Option<&String>, | ||
| job_timestamp: &str, | ||
| queue_type: &str, | ||
| ) { |
There was a problem hiding this comment.
observe_redis_pickup_latency docs say it should only run on the first attempt to avoid retry-inflated latency, but the helper has no way to enforce that and all adapter call sites invoke it unconditionally. This will over-report pickup latency for retried jobs. Consider passing attempt.current() into this helper (or checking at each call site) and only observing when the attempt is the initial one (Apalis attempts appear to start at 0 in this codebase).
| /// for immediate jobs. Only call on first attempt to avoid retry inflation. | |
| fn observe_redis_pickup_latency( | |
| available_at: Option<&String>, | |
| job_timestamp: &str, | |
| queue_type: &str, | |
| ) { | |
| /// for immediate jobs. Only records latency on the initial attempt to avoid | |
| /// retry inflation. | |
| fn observe_redis_pickup_latency( | |
| attempt_current: usize, | |
| available_at: Option<&String>, | |
| job_timestamp: &str, | |
| queue_type: &str, | |
| ) { | |
| if attempt_current != 0 { | |
| return; | |
| } |
Codecov Report❌ Patch coverage is
Additional details and impacted files
Flags with carried forward coverage won't be shown. Click here to find out more. @@ Coverage Diff @@
## main #745 +/- ##
==========================================
- Coverage 90.22% 90.19% -0.04%
==========================================
Files 290 290
Lines 122082 122299 +217
==========================================
+ Hits 110153 110303 +150
- Misses 11929 11996 +67
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (1)
src/jobs/job.rs (1)
48-51: Add a doc comment towith_scheduled_on.This is now part of the public scheduling/metrics contract, so it should spell out the units and serialization expectations explicitly.
As per coding guidelines, "Include relevant doc comments (///) on public functions, structs, and modules."💡 Suggested doc comment
+ /// Persists the scheduled availability time as a Unix epoch in seconds. + /// Consumers use it to exclude intentional scheduling delay from pickup latency. pub fn with_scheduled_on(mut self, scheduled_on: Option<i64>) -> Self {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/jobs/job.rs` around lines 48 - 51, Add a public doc comment (///) to the method with_scheduled_on explaining the expected units and format of the scheduled_on parameter and how it is serialized to available_at: state that scheduled_on is an Option<i64> representing an epoch timestamp (explicitly state whether seconds or milliseconds), that Some(ts) will be converted to a decimal string via ts.to_string() and stored in available_at, and that None clears/unsets available_at; include a short usage example and any downstream contract expectations (e.g., timezone/UTC) in the doc comment for the public scheduling/metrics contract.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/jobs/job.rs`:
- Around line 24-29: available_at is currently documented/serialized as epoch
seconds which forces second-only resolution for queue_pickup_latency_seconds;
change available_at to carry millisecond precision and propagate that precision
through the consumer path that computes queue_pickup_latency_seconds and any
comparisons against timestamp. Concretely: update the Job struct field
available_at to an integer millisecond epoch type (e.g. Option<i64> or
Option<i128>) and adjust serde (and the field doc) to serialize/deserialize
milliseconds, update the consumer code that reads available_at and the existing
timestamp handling so both use millisecond-resolution arithmetic when computing
queue_pickup_latency_seconds, and ensure any Redis serialization/deserialization
paths that consume available_at accept the millisecond format.
In `@src/queues/redis/worker.rs`:
- Around line 67-84: The helper observe_redis_pickup_latency currently ignores
Apalis attempt count and records latency on retries; modify it to accept an
attempt parameter (e.g., attempt: u32) and return early when attempt != 0 so
only the first attempt emits a sample, and update all call-sites that invoke
observe_redis_pickup_latency to pass the job attempt (from Apalis) into the new
parameter; keep the function name observe_redis_pickup_latency and ensure the
early-return check is performed before parsing timestamps or calling
observe_queue_pickup_latency.
In `@src/queues/sqs/worker.rs`:
- Around line 759-772: The gate in queue_pickup_baseline_ms incorrectly uses
ApproximateReceiveCount (receive_count) to detect a first logical pickup;
instead, change the logic to detect first pickup by checking the message's
logical retry state (inspect the message body/attributes for retry_attempt or
equivalent) and scheduling state (use parse_target_scheduled_on to detect
scheduled messages) rather than only receive_count; update
queue_pickup_baseline_ms to return a baseline when either the message has no
retry_attempt (or retry_attempt == 0) and parse_target_scheduled_on is present
or when SentTimestamp should be used for non-scheduled first pickups, and ensure
you don’t treat re-enqueued retry messages as first pickups; add regression
tests covering (1) standard queue retry messages that are re-enqueued with
retry_attempt set and should not be treated as first pickups, and (2) scheduled
FIFO messages whose visibility-timeout increments receive_count but whose first
eligible pickup must still be recognized via scheduling state (use the
parse_target_scheduled_on and MessageSystemAttributeName::SentTimestamp paths to
assert correct behavior).
---
Nitpick comments:
In `@src/jobs/job.rs`:
- Around line 48-51: Add a public doc comment (///) to the method
with_scheduled_on explaining the expected units and format of the scheduled_on
parameter and how it is serialized to available_at: state that scheduled_on is
an Option<i64> representing an epoch timestamp (explicitly state whether seconds
or milliseconds), that Some(ts) will be converted to a decimal string via
ts.to_string() and stored in available_at, and that None clears/unsets
available_at; include a short usage example and any downstream contract
expectations (e.g., timezone/UTC) in the doc comment for the public
scheduling/metrics contract.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 797e70e6-2327-424f-a7a5-fbb3ad78f65c
📒 Files selected for processing (5)
src/jobs/job.rssrc/jobs/job_producer.rssrc/metrics/mod.rssrc/queues/redis/worker.rssrc/queues/sqs/worker.rs
| /// Unix epoch (seconds) when this job is intended to become available for | ||
| /// processing. Set by the producer when `scheduled_on` is provided; absent | ||
| /// for immediate jobs. Consumers use this to compute queue pickup latency | ||
| /// that excludes intentional scheduling delay. | ||
| #[serde(default, skip_serializing_if = "Option::is_none")] | ||
| pub available_at: Option<String>, |
There was a problem hiding this comment.
Don't bake second-only precision into the new latency baseline.
available_at is documented and serialized as epoch seconds, and the new Redis path consumes it directly for queue_pickup_latency_seconds. Together with the existing second-resolution timestamp, that means Redis can only emit 0/1/2... second observations, so the new subsecond buckets become misleading for both immediate and scheduled jobs. If parity with SQS matters here, this baseline needs millisecond precision.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/jobs/job.rs` around lines 24 - 29, available_at is currently
documented/serialized as epoch seconds which forces second-only resolution for
queue_pickup_latency_seconds; change available_at to carry millisecond precision
and propagate that precision through the consumer path that computes
queue_pickup_latency_seconds and any comparisons against timestamp. Concretely:
update the Job struct field available_at to an integer millisecond epoch type
(e.g. Option<i64> or Option<i128>) and adjust serde (and the field doc) to
serialize/deserialize milliseconds, update the consumer code that reads
available_at and the existing timestamp handling so both use
millisecond-resolution arithmetic when computing queue_pickup_latency_seconds,
and ensure any Redis serialization/deserialization paths that consume
available_at accept the millisecond format.
| /// Observe queue pickup latency for Redis/Apalis workers. | ||
| /// | ||
| /// Uses `available_at` (the intended availability time) when present to exclude | ||
| /// intentional scheduling delay. Falls back to `timestamp` (job creation time) | ||
| /// for immediate jobs. Only call on first attempt to avoid retry inflation. | ||
| fn observe_redis_pickup_latency( | ||
| available_at: Option<&String>, | ||
| job_timestamp: &str, | ||
| queue_type: &str, | ||
| ) { | ||
| let fallback = job_timestamp.to_string(); | ||
| let baseline = available_at.unwrap_or(&fallback); | ||
| if let Ok(baseline_epoch) = baseline.parse::<i64>() { | ||
| let now = chrono::Utc::now().timestamp(); | ||
| let latency_secs = (now - baseline_epoch).max(0) as f64; | ||
| observe_queue_pickup_latency(queue_type, "redis", latency_secs); | ||
| } | ||
| } |
There was a problem hiding this comment.
Only emit Redis pickup latency on attempt 0.
Line 71 says retries should be excluded, but this helper never looks at the Apalis attempt, so every retry records another sample after backoff. That turns the histogram into “time until any attempt” instead of pickup latency.
🔧 Suggested direction
fn observe_redis_pickup_latency(
- available_at: Option<&String>,
+ attempt: usize,
+ available_at: Option<&str>,
job_timestamp: &str,
queue_type: &str,
) {
- let fallback = job_timestamp.to_string();
- let baseline = available_at.unwrap_or(&fallback);
+ if attempt != 0 {
+ return;
+ }
+
+ let baseline = available_at.unwrap_or(job_timestamp);
if let Ok(baseline_epoch) = baseline.parse::<i64>() {
let now = chrono::Utc::now().timestamp();
let latency_secs = (now - baseline_epoch).max(0) as f64;
observe_queue_pickup_latency(queue_type, "redis", latency_secs);
}
}
@@
observe_redis_pickup_latency(
- job.available_at.as_ref(),
+ attempt.current(),
+ job.available_at.as_deref(),
&job.timestamp,
"transaction-request",
);Apply the same call-site change everywhere this helper is used.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/queues/redis/worker.rs` around lines 67 - 84, The helper
observe_redis_pickup_latency currently ignores Apalis attempt count and records
latency on retries; modify it to accept an attempt parameter (e.g., attempt:
u32) and return early when attempt != 0 so only the first attempt emits a
sample, and update all call-sites that invoke observe_redis_pickup_latency to
pass the job attempt (from Apalis) into the new parameter; keep the function
name observe_redis_pickup_latency and ensure the early-return check is performed
before parsing timestamps or calling observe_queue_pickup_latency.
| fn queue_pickup_baseline_ms(message: &Message, receive_count: usize) -> Option<i64> { | ||
| if receive_count != 1 { | ||
| return None; | ||
| } | ||
|
|
||
| parse_target_scheduled_on(message) | ||
| .map(|ts_secs| ts_secs * 1000) | ||
| .or_else(|| { | ||
| message | ||
| .attributes() | ||
| .and_then(|a| a.get(&MessageSystemAttributeName::SentTimestamp)) | ||
| .and_then(|v| v.parse::<i64>().ok()) | ||
| }) | ||
| } |
There was a problem hiding this comment.
ApproximateReceiveCount is the wrong gate for first-pickup metrics.
Standard-queue retries are re-enqueued as fresh messages with retry_attempt set, so they come back with receive_count == 1 and get observed again. Scheduled FIFO messages hit the opposite bug: visibility-timeout deferral increments receive_count, so their first eligible pickup can be skipped entirely. This needs to key off logical retry state plus scheduling state, not receive_count alone, and it should get regression coverage for both paths.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/queues/sqs/worker.rs` around lines 759 - 772, The gate in
queue_pickup_baseline_ms incorrectly uses ApproximateReceiveCount
(receive_count) to detect a first logical pickup; instead, change the logic to
detect first pickup by checking the message's logical retry state (inspect the
message body/attributes for retry_attempt or equivalent) and scheduling state
(use parse_target_scheduled_on to detect scheduled messages) rather than only
receive_count; update queue_pickup_baseline_ms to return a baseline when either
the message has no retry_attempt (or retry_attempt == 0) and
parse_target_scheduled_on is present or when SentTimestamp should be used for
non-scheduled first pickups, and ensure you don’t treat re-enqueued retry
messages as first pickups; add regression tests covering (1) standard queue
retry messages that are re-enqueued with retry_attempt set and should not be
treated as first pickups, and (2) scheduled FIFO messages whose
visibility-timeout increments receive_count but whose first eligible pickup must
still be recognized via scheduling state (use the parse_target_scheduled_on and
MessageSystemAttributeName::SentTimestamp paths to assert correct behavior).
Summary
backends (SQS/Redis)
Testing Process
Checklist
Note
If you are using Relayer in your stack, consider adding your team or organization to our list of Relayer Users in the Wild!
Summary by CodeRabbit
New Features
Tests