Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
bf0d31a
feat: Add `Queue` (SQS)
sebst Dec 29, 2025
3d6eee3
Merge branch 'main' into feature/sqs
sebst Dec 30, 2025
d1e2bf4
subscribe()
sebst Dec 30, 2025
12b7034
Merge branch 'feature/sqs' of https://github.com/stelviodev/stelvio i…
sebst Dec 30, 2025
4c6f9e9
fmt
sebst Dec 30, 2025
8446d78
subscribe in create_resources
sebst Dec 30, 2025
0b36dba
Merge branch 'main' into feature/sqs
sebst Jan 2, 2026
51892a0
wip
sebst Jan 3, 2026
ad3afe9
...
sebst Jan 4, 2026
db0abe5
WIP
sebst Jan 5, 2026
fe547b8
wip2
sebst Jan 5, 2026
ec5b34c
docs
sebst Jan 5, 2026
2d64b39
docs improvements
sebst Jan 5, 2026
b700c1c
Merge branch 'main' into feature/sqs
sebst Jan 9, 2026
3a747b2
make subscription protected
sebst Jan 5, 2026
7c4aba6
..
sebst Jan 7, 2026
fd78de2
new linking mechanism
sebst Jan 9, 2026
3dc79ff
dlq
sebst Jan 9, 2026
6bcfc13
docs
sebst Jan 9, 2026
028041d
docs
sebst Jan 9, 2026
cea0ba8
i
sebst Jan 9, 2026
6e2573a
docs
sebst Jan 9, 2026
d8d7c4a
Merge branch 'main' into feature/sqs
sebst Jan 9, 2026
0988072
pr1
sebst Jan 10, 2026
62f618a
pr2
sebst Jan 10, 2026
2461ff7
pr3
sebst Jan 10, 2026
27c98da
pr4
sebst Jan 10, 2026
d3e6ffa
tests
sebst Jan 10, 2026
c72e0fe
docs
sebst Jan 10, 2026
1dd689f
...
sebst Jan 10, 2026
3888e36
prefix added
sebst Jan 10, 2026
8827cd6
Merge branch 'main' into feature/sqs
sebst Jan 11, 2026
52cb119
improve docs
sebst Jan 12, 2026
1039a76
dlq
sebst Jan 12, 2026
0893bf6
Revert "dlq"
sebst Jan 12, 2026
bfb2d1f
dlq config stuff
sebst Jan 12, 2026
1073452
naming stuff
sebst Jan 12, 2026
e6e9ad9
add test
sebst Jan 13, 2026
f7a4038
Merge branch 'main' into feature/sqs
sebst Jan 13, 2026
66e5945
typing
sebst Jan 13, 2026
96e2f1d
Revert "typing"
sebst Jan 13, 2026
c4d8027
typing
sebst Jan 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,18 @@

## 0.7.0a10 (2025-MM-DD)

### Queues

Stelvio now supports a `Queue` component to work with SQS Queues.

→ [Queues Guide](guides/queues.md)

### Email sending

Stelvio now offers an `Email` component to send emails using Amazon SES.

→ [Email Guide](guides/email.md)

### Function-to-Function Linking

Functions can now link to other functions, enabling Lambda-to-Lambda invocation. When you link a function to another, Stelvio automatically grants `lambda:InvokeFunction` permission and provides `function_arn` and `function_name` via the generated `Resources` object.
Expand Down
352 changes: 352 additions & 0 deletions docs/guides/queues.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,352 @@
# Working with Queues in Stelvio

Stelvio supports creating and managing [Amazon SQS (Simple Queue Service)](https://aws.amazon.com/sqs/) queues using the `Queue` component. This allows you to build decoupled, event-driven architectures with reliable message delivery.

## Creating a Queue

Create a queue by instantiating the `Queue` component in your `stlv_app.py`:

```python
from stelvio.aws.queue import Queue
from stelvio.aws.function import Function

@app.run
def run() -> None:
# Create a standard queue
orders_queue = Queue("orders")

# Link it to a function
order_processor = Function(
"process-orders",
handler="functions/orders.handler",
links=[orders_queue],
)
```

## Queue Configuration

Configure your queue with custom settings:

```python
from stelvio.aws.queue import Queue, QueueConfig

# Using keyword arguments
orders_queue = Queue(
"orders",
delay=5, # Delay delivery by 5 seconds
visibility_timeout=60, # Message hidden for 60 seconds after read
)

# Or using QueueConfig
orders_queue = Queue(
"orders",
config=QueueConfig(
delay=5,
visibility_timeout=60,
)
)
```

### Configuration Options

| Option | Default | Description |
|----------------------|----------|-----------------------------------------------------------|
| `fifo` | `False` | Enable FIFO (First-In-First-Out) queue ordering |
| `delay` | `0` | Default delay (in seconds) before messages become visible |
| `visibility_timeout` | `30` | Time (in seconds) a message is hidden after being read |
| `retention` | `345600` | Message retention period in seconds (default: 4 days) |
| `dlq` | `None` | Dead-letter queue configuration |

## FIFO Queues

FIFO queues guarantee exactly-once processing and preserve message order:

```python
orders_queue = Queue("orders", fifo=True)
```

When you create a FIFO queue, Stelvio automatically:

- Adds the `.fifo` suffix to the queue name ([required by AWS](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-fifo-queue-message-identifiers.html))
- Enables content-based deduplication

!!! info "FIFO Queue Naming"
AWS requires FIFO queue names to end with `.fifo`. Stelvio handles this automatically when you set `fifo=True`.

!!! warning "FIFO Throughput"
FIFO queues have lower [throughput](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/quotas-messages.html) than standard queues (300 messages/second without batching, 3,000 with high-throughput mode). Use standard queues when message order isn't critical.

## Dead-Letter Queues

Configure a dead-letter queue (DLQ) to capture messages that fail processing:

```python
from stelvio.aws.queue import Queue, DlqConfig

# First, create the DLQ
orders_dlq = Queue("orders-dlq")

# Reference DLQ by Queue component
orders_queue = Queue("orders", dlq=DlqConfig(queue=orders_dlq))

# Custom retry count (default is 3)
orders_queue = Queue(
"orders",
dlq=DlqConfig(queue=orders_dlq, retry=5)
)

# Using dictionary syntax
orders_queue = Queue(
"orders",
dlq={"queue": orders_dlq, "retry": 5}
)
```

!!! tip "DLQ Best Practices"
- Always configure a DLQ for production queues to capture failed messages
- Set up alerts on your DLQ to detect processing failures
- Choose retry counts based on your use case (typically 3-5 retries)

## Queue Subscriptions

Subscribe Lambda functions to process messages from your queue:

```python
orders_queue = Queue("orders")

# Simple subscription
orders_queue.subscribe("processor", "functions/orders.process")

# Multiple subscriptions with different names
orders_queue.subscribe("analytics", "functions/analytics.track_order")
```

Each subscription creates a separate Lambda function, so you can subscribe the same handler multiple times with different configurations:

```python
orders_queue = Queue("orders")

# Same handler, different batch sizes for different throughput needs
orders_queue.subscribe("fast-processor", "functions/orders.process", batch_size=1)
orders_queue.subscribe("batch-processor", "functions/orders.process", batch_size=100)
```

### Lambda Configuration

Customize the Lambda function for your subscription:

```python
# With direct options
orders_queue.subscribe(
"processor",
"functions/orders.process",
memory=512,
timeout=60,
)

# With FunctionConfig
from stelvio.aws.function import FunctionConfig

orders_queue.subscribe(
"processor",
FunctionConfig(
handler="functions/orders.process",
memory=512,
timeout=60,
)
)

# With dictionary
orders_queue.subscribe(
"processor",
{"handler": "functions/orders.process", "memory": 256}
)
```

### Batch Size

Control how many messages Lambda receives per invocation:

```python
orders_queue.subscribe(
"batch-processor",
"functions/orders.process",
batch_size=5, # Process 5 messages at a time (default: 10)
)
```

!!! tip "Choosing Batch Size"
- Smaller batches (1-5): Lower latency, faster processing of individual messages
- Larger batches (10+): Higher throughput, more efficient for high-volume queues
- Consider your Lambda timeout when choosing batch size

### Subscription Permissions

Stelvio automatically configures the necessary IAM permissions for queue subscriptions:

- **EventSourceMapping**: Connects the SQS queue to your Lambda function
- **SQS IAM permissions**: Grants read access (`sqs:ReceiveMessage`, `sqs:DeleteMessage`, `sqs:GetQueueAttributes`)

## Sending Messages

Use the [linking mechanism](linking.md) to send messages to your queue from Lambda functions:

```python
import boto3
import json
from stlv_resources import Resources

def handler(event, context):
sqs = boto3.client('sqs')

# Access the linked queue URL
queue_url = Resources.orders.queue_url

# Send a message
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps({
"order_id": "12345",
"customer": "[email protected]",
"items": [{"sku": "WIDGET-001", "qty": 2}]
})
)

return {"statusCode": 200, "body": "Message sent!"}
```

### Sending to FIFO Queues

FIFO queues require additional parameters when sending messages:

```python
import boto3
import json
from stlv_resources import Resources

def handler(event, context):
sqs = boto3.client('sqs')

queue_url = Resources.orders.queue_url

response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps({"order_id": "12345"}),
# Required for FIFO queues - messages with same group ID are processed in order
MessageGroupId="order-processing",
# Optional if content-based deduplication is enabled (Stelvio enables this by default)
# MessageDeduplicationId="unique-id-12345",
)

return {"statusCode": 200, "body": "Message sent!"}
```

!!! info "FIFO Message Parameters"
- **MessageGroupId** (required): Messages with the same group ID are processed in order. Use different group IDs for messages that can be processed in parallel.
- **MessageDeduplicationId** (optional): When content-based deduplication is enabled (default in Stelvio), SQS uses a hash of the message body. Provide this explicitly if you need custom deduplication logic.


### Link Properties

When you link a queue to a Lambda function, these properties are available:

| Property | Description |
|--------------|------------------------------------|
| `queue_url` | The queue URL for sending messages |
| `queue_arn` | The queue ARN |
| `queue_name` | The queue name |

### Link Permissions

Linked Lambda functions receive these SQS permissions for sending messages:

- `sqs:SendMessage` - Send messages to the queue
- `sqs:GetQueueAttributes` - Read queue metadata

!!! note "Receiving Messages"
For processing messages from a queue, use `queue.subscribe()` instead of linking.
Subscriptions automatically configure the necessary permissions (`sqs:ReceiveMessage`,
`sqs:DeleteMessage`, `sqs:GetQueueAttributes`) for the Lambda event source mapping.

## Processing Messages

Your Lambda function receives SQS events with batched messages:

```python
import json

def process(event, context):
"""Process SQS messages."""
for record in event.get('Records', []):
# Parse the message body
body = json.loads(record['body'])

order_id = body.get('order_id')
customer = body.get('customer')

print(f"Processing order {order_id} for {customer}")

# Process the order...

# Return success - SQS will delete processed messages
return {"statusCode": 200}
```

!!! important "Error Handling"
- If your Lambda raises an exception, SQS will retry the message after the visibility timeout
- Successfully processed messages are automatically deleted
- Failed messages eventually move to the DLQ (if configured)

## Linking vs Subscriptions

| Use Case | Approach |
|-----------------------------|-------------------------------------------------------------|
| **Process queue messages** | Use `queue.subscribe()` - creates Lambda triggered by queue |
| **Send messages to queue** | Use `links=[queue]` - grants permissions to send messages |
| **Pipeline (read → write)** | Subscribe to one queue, link to another for forwarding |


A common pattern is to combine subscriptions and linking to create multi-stage processing pipelines.
With this pattern, one Lambda function subscribes to a queue to process incoming messages, then forwards transformed data to a second queue.
Another Lambda function subscribes to that second queue to handle the next stage of processing.

Here is an example:

```python
# Example: Multi-stage pipeline with two Lambda functions
orders_queue = Queue("orders")
fulfillment_queue = Queue("fulfillment")

# Lambda #1: Process incoming orders and forward to fulfillment
orders_queue.subscribe(
"process-orders",
"functions/orders.process",
links=[fulfillment_queue], # Grant permission to send to fulfillment queue
)

# Lambda #2: Process fulfillment tasks
fulfillment_queue.subscribe(
"fulfill-orders",
"functions/fulfillment.fulfill",
)
```

This creates a two-stage processing pipeline:

1. **Order Processing Lambda** (`functions/orders.process`):
- Triggered by messages in `orders_queue`
- Validates and processes incoming orders
- Sends fulfillment tasks to `fulfillment_queue`

2. **Fulfillment Lambda** (`functions/fulfillment.fulfill`):
- Triggered by messages in `fulfillment_queue`
- Handles order fulfillment (shipping, inventory, etc.)


## Next Steps

Now that you understand SQS queues, you might want to explore:

- [Working with Lambda Functions](lambda.md) - Learn more about Lambda configuration
- [Working with DynamoDB](dynamo-db.md) - Store processed message data
- [Linking](linking.md) - Understand how Stelvio automates IAM permissions
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ Stelvio is released under the Apache 2.0 License. See the LICENSE file for detai
- [Using CLI](guides/using-cli.md) - The `stlv` CLI
- [API Gateway](guides/api-gateway.md) - Build REST APIs
- [Lambda Functions](guides/lambda.md) - Serverless functions with Python
- [Queues](guides/queues.md) - Serverless queues with SQS
- [Dynamo DB](guides/dynamo-db.md) - Serverless NoSQL Database
- [S3 Buckets](guides/s3.md) - AWS S3 (Object Storage)
- [Cron](guides/cron.md) - Scheduled tasks with EventBridge
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ nav:
- Environments: guides/environments.md
- API Gateway: guides/api-gateway.md
- Lambda Functions: guides/lambda.md
- Queues: guides/queues.md
- Dynamo DB: guides/dynamo-db.md
- S3 Buckets: guides/s3.md
- AWS SES (Email): guides/email.md
Expand Down
Loading