Skip to content
This repository was archived by the owner on Mar 17, 2025. It is now read-only.

Add priority to queue settings #13

Merged
merged 1 commit into from
Jan 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Config struct {
MessageTTL int
DeadLetterExchange string
DeadLetterRoutingKey string
Priority int
}
Exchange struct {
Name string
Expand Down Expand Up @@ -136,6 +137,16 @@ func (c Config) DeadLetterRoutingKey() string {
return transformToStringValue(c.QueueSettings.DeadLetterRoutingKey)
}

// HasPriority checks if priority is configured
func (c Config) HasPriority() bool {
return c.QueueSettings.Priority > 0
}

// Priority returns the priority
func (c Config) Priority() int32 {
return int32(c.QueueSettings.Priority)
}

func LoadAndParse(location string) (*Config, error) {
if !filepath.IsAbs(location) {
location, err := filepath.Abs(location)
Expand Down
4 changes: 4 additions & 0 deletions consumer/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ func (c *rabbitMqConnection) queueArgs() amqp.Table {
}
}

if c.cfg.HasPriority() {
args["x-max-priority"] = c.cfg.Priority()
}

if len(args) > 0 {
return args
}
Expand Down
22 changes: 22 additions & 0 deletions consumer/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ const ttlConfig = `[rabbitmq]
error=a
info=b`

const priorityConfig = `[rabbitmq]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recent changes made most of the config values optional. Please remove all config values which are not required for this test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like the other PR tell me if the deleted config was not what you had in mind

queue=worker

[queuesettings]
priority=42

[exchange]
name=worker
type=test`

var amqpTable amqp.Table

var queueTests = []struct {
Expand Down Expand Up @@ -95,6 +105,18 @@ var queueTests = []struct {
},
nil,
},
// Define queue with Priority.
{
"queueWithPriority",
priorityConfig,
func(ch *TestChannel) {
ch.On("Qos", 3, 0, false).Return(nil).Once()
ch.On("QueueDeclare", "worker", true, false, false, false, amqp.Table{"x-max-priority": int32(42)}).Return(amqp.Queue{}, nil).Once()
ch.On("ExchangeDeclare", "worker", "test", false, false, false, false, amqp.Table{}).Return(nil).Once()
ch.On("QueueBind", "worker", "", "worker", false, amqpTable).Return(nil).Once()
},
nil,
},
// Set QoS fails.
{
"setQosFail",
Expand Down
3 changes: 3 additions & 0 deletions example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ deadLetterExchange = someexchange
# The routing key used when sending a message to the dead letter exchange.
deadLetterroutingkey = someroutingkey

# The priority range for this queue.
priority = 10

[logs]
# Path to the log file where informational output is written to
# #
Expand Down