Skip to content
This repository was archived by the owner on Mar 17, 2025. It is now read-only.

Commit a9b48f5

Browse files
johnduroChristian Häusler
authored and
Christian Häusler
committed
Add priority to queue settings (#13)
1 parent f1023a3 commit a9b48f5

File tree

4 files changed

+40
-0
lines changed

4 files changed

+40
-0
lines changed

config/config.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type Config struct {
2929
MessageTTL int
3030
DeadLetterExchange string
3131
DeadLetterRoutingKey string
32+
Priority int
3233
}
3334
Exchange struct {
3435
Name string
@@ -136,6 +137,16 @@ func (c Config) DeadLetterRoutingKey() string {
136137
return transformToStringValue(c.QueueSettings.DeadLetterRoutingKey)
137138
}
138139

140+
// HasPriority checks if priority is configured
141+
func (c Config) HasPriority() bool {
142+
return c.QueueSettings.Priority > 0
143+
}
144+
145+
// Priority returns the priority
146+
func (c Config) Priority() int32 {
147+
return int32(c.QueueSettings.Priority)
148+
}
149+
139150
func LoadAndParse(location string) (*Config, error) {
140151
if !filepath.IsAbs(location) {
141152
location, err := filepath.Abs(location)

consumer/connection.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,10 @@ func (c *rabbitMqConnection) queueArgs() amqp.Table {
157157
}
158158
}
159159

160+
if c.cfg.HasPriority() {
161+
args["x-max-priority"] = c.cfg.Priority()
162+
}
163+
160164
if len(args) > 0 {
161165
return args
162166
}

consumer/connection_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,16 @@ const ttlConfig = `[rabbitmq]
6363
error=a
6464
info=b`
6565

66+
const priorityConfig = `[rabbitmq]
67+
queue=worker
68+
69+
[queuesettings]
70+
priority=42
71+
72+
[exchange]
73+
name=worker
74+
type=test`
75+
6676
var amqpTable amqp.Table
6777

6878
var queueTests = []struct {
@@ -95,6 +105,18 @@ var queueTests = []struct {
95105
},
96106
nil,
97107
},
108+
// Define queue with Priority.
109+
{
110+
"queueWithPriority",
111+
priorityConfig,
112+
func(ch *TestChannel) {
113+
ch.On("Qos", 3, 0, false).Return(nil).Once()
114+
ch.On("QueueDeclare", "worker", true, false, false, false, amqp.Table{"x-max-priority": int32(42)}).Return(amqp.Queue{}, nil).Once()
115+
ch.On("ExchangeDeclare", "worker", "test", false, false, false, false, amqp.Table{}).Return(nil).Once()
116+
ch.On("QueueBind", "worker", "", "worker", false, amqpTable).Return(nil).Once()
117+
},
118+
nil,
119+
},
98120
// Set QoS fails.
99121
{
100122
"setQosFail",

example.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ deadLetterExchange = someexchange
9999
# The routing key used when sending a message to the dead letter exchange.
100100
deadLetterroutingkey = someroutingkey
101101

102+
# The priority range for this queue.
103+
priority = 10
104+
102105
[logs]
103106
# Path to the log file where informational output is written to
104107
# #

0 commit comments

Comments
 (0)