Skip to content
This repository was archived by the owner on Mar 17, 2025. It is now read-only.

Commit aa00d56

Browse files
committed
Add priority to queue settings
1 parent cd20a74 commit aa00d56

File tree

4 files changed

+56
-0
lines changed

4 files changed

+56
-0
lines changed

config/config.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type Config struct {
2929
MessageTTL int
3030
DeadLetterExchange string
3131
DeadLetterRoutingKey string
32+
Priority int
3233
}
3334
Exchange struct {
3435
Name string
@@ -136,6 +137,16 @@ func (c Config) DeadLetterRoutingKey() string {
136137
return transformToStringValue(c.QueueSettings.DeadLetterRoutingKey)
137138
}
138139

140+
// HasPriority checks if priority is configured
141+
func (c Config) HasPriority() bool {
142+
return c.QueueSettings.Priority > 0
143+
}
144+
145+
// Priority returns the priority
146+
func (c Config) Priority() int32 {
147+
return int32(c.QueueSettings.Priority)
148+
}
149+
139150
func LoadAndParse(location string) (*Config, error) {
140151
if !filepath.IsAbs(location) {
141152
location, err := filepath.Abs(location)

consumer/connection.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,10 @@ func (c *rabbitMqConnection) queueArgs() amqp.Table {
157157
}
158158
}
159159

160+
if c.cfg.HasPriority() {
161+
args["x-max-priority"] = c.cfg.Priority()
162+
}
163+
160164
if len(args) > 0 {
161165
return args
162166
}

consumer/connection_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,32 @@ const ttlConfig = `[rabbitmq]
6363
error=a
6464
info=b`
6565

66+
const priorityConfig = `[rabbitmq]
67+
host=localhost
68+
username=ricbra
69+
password=t3st
70+
vhost=staging
71+
port=123
72+
queue=worker
73+
74+
[prefetch]
75+
count=3
76+
global=On
77+
78+
[queuesettings]
79+
routingkey=foo
80+
priority=42
81+
82+
[exchange]
83+
name=worker
84+
autodelete=Off
85+
type=test
86+
durable=On
87+
88+
[logs]
89+
error=a
90+
info=b`
91+
6692
var amqpTable amqp.Table
6793

6894
var queueTests = []struct {
@@ -95,6 +121,18 @@ var queueTests = []struct {
95121
},
96122
nil,
97123
},
124+
// Define queue with Priority.
125+
{
126+
"queueWithPriority",
127+
priorityConfig,
128+
func(ch *TestChannel) {
129+
ch.On("Qos", 3, 0, true).Return(nil).Once()
130+
ch.On("QueueDeclare", "worker", true, false, false, false, amqp.Table{"x-max-priority": int32(42)}).Return(amqp.Queue{}, nil).Once()
131+
ch.On("ExchangeDeclare", "worker", "test", true, false, false, false, amqp.Table{}).Return(nil).Once()
132+
ch.On("QueueBind", "worker", "foo", "worker", false, amqpTable).Return(nil).Once()
133+
},
134+
nil,
135+
},
98136
// Set QoS fails.
99137
{
100138
"setQosFail",

example.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ deadLetterExchange = someexchange
9999
# The routing key used when sending a message to the dead letter exchange.
100100
deadLetterroutingkey = someroutingkey
101101

102+
# The priority range for this queue.
103+
priority = 10
104+
102105
[logs]
103106
# Path to the log file where informational output is written to
104107
# #

0 commit comments

Comments
 (0)