diff --git a/config/config.go b/config/config.go index c5ac7f3..115d1ce 100644 --- a/config/config.go +++ b/config/config.go @@ -29,6 +29,7 @@ type Config struct { MessageTTL int DeadLetterExchange string DeadLetterRoutingKey string + Priority int } Exchange struct { Name string @@ -136,6 +137,16 @@ func (c Config) DeadLetterRoutingKey() string { return transformToStringValue(c.QueueSettings.DeadLetterRoutingKey) } +// HasPriority checks if priority is configured +func (c Config) HasPriority() bool { + return c.QueueSettings.Priority > 0 +} + +// Priority returns the priority +func (c Config) Priority() int32 { + return int32(c.QueueSettings.Priority) +} + func LoadAndParse(location string) (*Config, error) { if !filepath.IsAbs(location) { location, err := filepath.Abs(location) diff --git a/consumer/connection.go b/consumer/connection.go index 4890737..8b713a0 100644 --- a/consumer/connection.go +++ b/consumer/connection.go @@ -157,6 +157,10 @@ func (c *rabbitMqConnection) queueArgs() amqp.Table { } } + if c.cfg.HasPriority() { + args["x-max-priority"] = c.cfg.Priority() + } + if len(args) > 0 { return args } diff --git a/consumer/connection_test.go b/consumer/connection_test.go index 19fb701..6ff6af7 100644 --- a/consumer/connection_test.go +++ b/consumer/connection_test.go @@ -63,6 +63,16 @@ const ttlConfig = `[rabbitmq] error=a info=b` +const priorityConfig = `[rabbitmq] + queue=worker + + [queuesettings] + priority=42 + + [exchange] + name=worker + type=test` + var amqpTable amqp.Table var queueTests = []struct { @@ -95,6 +105,18 @@ var queueTests = []struct { }, nil, }, + // Define queue with Priority. + { + "queueWithPriority", + priorityConfig, + func(ch *TestChannel) { + ch.On("Qos", 3, 0, false).Return(nil).Once() + ch.On("QueueDeclare", "worker", true, false, false, false, amqp.Table{"x-max-priority": int32(42)}).Return(amqp.Queue{}, nil).Once() + ch.On("ExchangeDeclare", "worker", "test", false, false, false, false, amqp.Table{}).Return(nil).Once() + ch.On("QueueBind", "worker", "", "worker", false, amqpTable).Return(nil).Once() + }, + nil, + }, // Set QoS fails. { "setQosFail", diff --git a/example.conf b/example.conf index 5bfc164..1e6686d 100644 --- a/example.conf +++ b/example.conf @@ -99,6 +99,9 @@ deadLetterExchange = someexchange # The routing key used when sending a message to the dead letter exchange. deadLetterroutingkey = someroutingkey +# The priority range for this queue. +priority = 10 + [logs] # Path to the log file where informational output is written to # #