diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..26d0cdb --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/rabbitmq-cli-consumer diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..74f31b4 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,10 @@ +language: go + +go: + - 1.4 + - 1.5 + - tip + +matrix: + allow_failures: + - go: tip diff --git a/README.md b/README.md index 7cc2b9a..c4c6b45 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,8 @@ RabbitMQ cli consumer --------------------- +[![Build Status](https://travis-ci.org/ricbra/rabbitmq-cli-consumer.svg)](https://travis-ci.org/ricbra/rabbitmq-cli-consumer) + If you are a fellow PHP developer just like me you're probably aware of the following fact: PHP really SUCKS in long running tasks. @@ -103,7 +105,60 @@ Run without arguments or with --help switch to show the helptext: --help, -h show help --version, -v print the version -## Configuration +## Fanout + +Todo. + +## Remote Procedure Call + +No special configuration is required for enabling RPC mode. You should be aware +that any output on STDOUT will be returned as reply to the requesting client. To +demonstrate how RPC works, we'll implement [the example](https://www.rabbitmq.com/tutorials/tutorial-six-php.html) on the RabbitMQ site +using rabbitmq-cli-consumer. + +We don't change the rpc_client.php, only the rpc_server.php: + +```php +-V to get rid of the output: $ rabbitmq-cli-consumer -e "/path/to/your/app argument --flag" -c /path/to/your/configuration.conf +## Configuration inheritance + +By default rabbitmq-cli-consumer looks for configuration files in the following +order: + +1. /etc/rabbitmq-cli-consumer/rabbitmq-cli-consumer.conf +2. ~/.rabbitmq-cli-consumer.conf +3. location passed in via -c option + +## Override options on the command line: + +Every option in the configuration file can be overwritten by passing them on the +cli. Use On and Off for true and false +respectively. Example: + + $ rabbitmq-cli-consumer -c /you/config.conf -e "/path/to/executble" --queue-key=custom-key + ### Prefetch count It's possible to configure the prefetch count and if you want set it as global. Add the following section to your -configuration to confol these values: +configuration to control these values: ```ini [prefetch] @@ -154,6 +228,43 @@ type=direct durable=On ``` +### Configuring the queue + +All queue options are configurable. Full example: + +```ini +[queue] +name=rpc_queue +durable=On +autodelete=Off +exclusive=Off +nowait=Off +``` + +If you want to define a TTL for your queue: + +```ini +[queue] +name=rpc_queue +durable=On +autodelete=Off +exclusive=Off +nowait=Off +ttl=1200 +``` + +When you want to configure the routing key: + +```ini +[queue] +name=rpc_queue +durable=On +autodelete=Off +exclusive=Off +nowait=Off +key=your-routing-key +``` + ## The executable Your executable receives the message as the last argument. So consider the following: @@ -274,8 +385,42 @@ exit(1); ``` +## Log rotation + +To close and reopen the logs send the USR1 signal: + + $ kill -s USR1 pid_of_process + # Developing Missing anything? Found a bug? I love to see your PR. +## Setup development environment + +It can be quite difficult to get an development environment up & running. I'll hope to +expand the instructions a bit in the future. + +### Go and gvm + +Todo. + +### RabbitMQ + +Start by installing docker. + +Then: + + $ docker run -d -P -e RABBITMQ_NODENAME=rabbitmq-cli-consumer --name rabbitmq-cli-consumer rabbitmq:3-management + +To see which ports are available: + + $ docker port rabbitmq-cli-consumer + +You can login with guest / guest. +If you want stop the container: + + $ docker stop rabbitmq-cli-consumer + +And to restart the container: + # docker start rabbitmq-cli-consumer diff --git a/command/command_executer.go b/command/command_executer.go index 3c3bfd8..de76a3e 100644 --- a/command/command_executer.go +++ b/command/command_executer.go @@ -1,15 +1,24 @@ package command import ( + "fmt" "log" - "os/exec" ) +type Executer interface { + Execute(cmd Command) (result []byte, err error) +} + type CommandExecuter struct { errLogger *log.Logger infLogger *log.Logger } +type Command interface { + CombinedOutput() (out []byte, err error) + Output() (out []byte, err error) +} + func New(errLogger, infLogger *log.Logger) *CommandExecuter { return &CommandExecuter{ errLogger: errLogger, @@ -17,18 +26,20 @@ func New(errLogger, infLogger *log.Logger) *CommandExecuter { } } -func (me CommandExecuter) Execute(cmd *exec.Cmd) bool { +func (me CommandExecuter) Execute(cmd Command) (result []byte, err error) { me.infLogger.Println("Processing message...") - out, err := cmd.CombinedOutput() + + out, err := cmd.Output() if err != nil { me.infLogger.Println("Failed. Check error log for details.") me.errLogger.Printf("Failed: %s\n", string(out[:])) me.errLogger.Printf("Error: %s\n", err) - return false + + return out, fmt.Errorf("Error occured during execution of command: %s", err) } me.infLogger.Println("Processed!") - return true + return out, nil } diff --git a/command/command_executer_test.go b/command/command_executer_test.go new file mode 100644 index 0000000..53d69af --- /dev/null +++ b/command/command_executer_test.go @@ -0,0 +1,41 @@ +package command + +import ( + "bytes" + "log" + "testing" + + "github.com/stretchr/testify/mock" +) + +func TestExecutesCommand(t *testing.T) { + var b bytes.Buffer + errLogger := log.New(&b, "", 0) + infLogger := log.New(&b, "", 0) + + cmd := new(TestCommand) + + cmd.On("Output").Return(make([]byte, 0), nil).Once() + + executer := New(errLogger, infLogger) + executer.Execute(cmd) + + cmd.AssertExpectations(t) + +} + +type TestCommand struct { + mock.Mock +} + +func (t *TestCommand) CombinedOutput() (out []byte, err error) { + argsT := t.Called() + + return argsT.Get(0).([]byte), argsT.Error(1) +} + +func (t *TestCommand) Output() (out []byte, err error) { + argsT := t.Called() + + return argsT.Get(0).([]byte), argsT.Error(1) +} diff --git a/command/command_factory.go b/command/command_factory.go index f2e1fc8..4503343 100644 --- a/command/command_factory.go +++ b/command/command_factory.go @@ -1,8 +1,6 @@ package command -import ( - "os/exec" -) +import "os/exec" type CommandFactory struct { Cmd string diff --git a/config/config.go b/config/config.go index 71bf8c6..d97f56f 100644 --- a/config/config.go +++ b/config/config.go @@ -1,51 +1,140 @@ package config import ( - "code.google.com/p/gcfg" - "path/filepath" + "fmt" + "log" + "strings" + + "github.com/codegangsta/cli" + + "gopkg.in/gcfg.v1" + + "gopkg.in/validator.v2" ) +// Config contains all config values type Config struct { RabbitMq struct { - Host string - Username string - Password string - Port string - Vhost string - Queue string + Host string `validate:"nonzero"` + Username string `validate:"nonzero"` + Password string `validate:"nonzero"` + Port string `validate:"nonzero"` + Vhost string `validate:"nonzero"` Compression bool } Prefetch struct { - Count int - Global bool + Count int `validate:"nonzero"` + Global bool + } + Queue struct { + Name string `validate:"nonzero"` + Durable bool + Autodelete bool + Exclusive bool + Nowait bool + Key string + TTL int } Exchange struct { - Name string - Autodelete bool - Type string - Durable bool + Name string `validate:"nonzero"` + Autodelete bool + Type string `validate:"nonzero"` + Durable bool } Logs struct { - Error string - Info string + Error string `validate:"nonzero"` + Info string `validate:"nonzero"` } } -func LoadAndParse(location string) (*Config, error) { - if !filepath.IsAbs(location) { - location, err := filepath.Abs(location) +// Validate validtes Config and prints errors. +func Validate(config Config, logger *log.Logger) bool { + if err := validator.Validate(config); err != nil { + for f, e := range err.(validator.ErrorMap) { + split := strings.Split(strings.ToLower(f), ".") + msg := e.Error() + switch msg { + case "zero value": + msg = "This option is required" + } - if err != nil { - return nil, err + logger.Printf("The option \"%s\" under section \"%s\" is invalid: %s\n", split[1], split[0], msg) } - - location = location + return false } + return true +} + +// Default returns Config with default defined +func Default() Config { + return CreateFromString( + `[prefetch] + count=3 + global=Off + + [exchange] + autodelete=Off + type=direct + durable=On + `) +} + +// CreateFromString creates Config from string +func CreateFromString(config string) Config { cfg := Config{} - if err := gcfg.ReadFileInto(&cfg, location); err != nil { - return nil, err - } + gcfg.ReadStringInto(&cfg, config) + + return cfg +} + +// CreateFromCliContext creates config from options passed to cli +func CreateFromCliContext(c *cli.Context) Config { + str := fmt.Sprintf( + `[rabbitmq] + host=%s + username=%s + password=%s + vhost=%s + port=%s + compression=%s + + [prefetch] + count=%s + global=%s + + [queue] + name=%s + durable=%s + autodelete=%s + exclusive=%s + nowait=%s + key=%s + + [exchange] + name=%s + autodelete=%s + type=%s + durable=%s`, + c.String("host"), + c.String("username"), + c.String("password"), + c.String("vhost"), + c.String("port"), + c.String("compression"), + c.String("prefetch-count"), + c.String("prefetch-global"), + c.String("queue-name"), + c.String("queue-durable"), + c.String("queue-autodelete"), + c.String("queue-exclusive"), + c.String("queue-nowait"), + c.String("queue-key"), + c.String("exchange-name"), + c.String("exchange-autodelete"), + c.String("exchange-type"), + c.String("exchange-durable"), + ) - return &cfg, nil + return CreateFromString(str) } diff --git a/config/config_merger.go b/config/config_merger.go new file mode 100644 index 0000000..423e81e --- /dev/null +++ b/config/config_merger.go @@ -0,0 +1,25 @@ +package config + +import ( + "fmt" + + "github.com/imdario/mergo" +) + +type Merger interface { + Merge() +} + +type ConfigMerger struct { +} + +func (m ConfigMerger) Merge(configs []Config) (Config, error) { + dest := Config{} + for _, config := range configs { + if err := mergo.MergeWithOverwrite(&dest, config); err != nil { + return dest, fmt.Errorf("Could not merge config: %s", err.Error()) + } + } + + return dest, nil +} diff --git a/config/config_merger_test.go b/config/config_merger_test.go new file mode 100644 index 0000000..95ae067 --- /dev/null +++ b/config/config_merger_test.go @@ -0,0 +1,34 @@ +package config + +import ( + "testing" + + "code.google.com/p/gcfg" + "github.com/stretchr/testify/assert" +) + +func TestMergesConfigs(t *testing.T) { + configs := []Config{ + createConfig(`[rabbitmq] + host=rabbitmq.provider.com + password=123pass + vhost=test`), + createConfig(`[rabbitmq] + host=localhost + queue=testqueue`), + } + + merger := ConfigMerger{} + config, _ := merger.Merge(configs) + + assert.Equal(t, config.RabbitMq.Host, "localhost") + assert.Equal(t, config.RabbitMq.Password, "123pass") + assert.Equal(t, config.RabbitMq.Vhost, "test") +} + +func createConfig(config string) Config { + cfg := Config{} + gcfg.ReadStringInto(&cfg, config) + + return cfg +} diff --git a/config/config_test.go b/config/config_test.go new file mode 100644 index 0000000..69ddea7 --- /dev/null +++ b/config/config_test.go @@ -0,0 +1,66 @@ +package config + +import ( + "bytes" + "log" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFailsOnRequiredFields(t *testing.T) { + config := createConfig(`[rabbitmq] + host=`) + + var b bytes.Buffer + logger := log.New(&b, "", 0) + + valid := Validate(config, logger) + out := b.String() + + assert.Equal(t, false, valid) + assert.Contains(t, out, "The option \"port\" under section \"rabbitmq\" is invalid: This option is required") + assert.Contains(t, out, "The option \"username\" under section \"rabbitmq\" is invalid: This option is required") + assert.Contains(t, out, "The option \"password\" under section \"rabbitmq\" is invalid: This option is required") + assert.Contains(t, out, "The option \"vhost\" under section \"rabbitmq\" is invalid: This option is required") + assert.Contains(t, out, "The option \"host\" under section \"rabbitmq\" is invalid: This option is required") + assert.Contains(t, out, "The option \"count\" under section \"prefetch\" is invalid: This option is required") + assert.Contains(t, out, "The option \"name\" under section \"exchange\" is invalid: This option is required") + assert.Contains(t, out, "The option \"type\" under section \"exchange\" is invalid: This option is required") + assert.Contains(t, out, "The option \"error\" under section \"logs\" is invalid: This option is required") + assert.Contains(t, out, "The option \"info\" under section \"logs\" is invalid: This option is required") + assert.Contains(t, out, "The option \"name\" under section \"queue\" is invalid: This option is required") +} + +func TestPassOnValidConfig(t *testing.T) { + config := createConfig( + `[rabbitmq] + host=localhost + username=test + password=t3st + vhost=test + port=123 + + [prefetch] + count=3 + global=On + + [queue] + name=test + + [exchange] + name=test + autodelete=Off + type=test + durable=On + + [logs] + error=a + info=b + `) + + var b bytes.Buffer + logger := log.New(&b, "", 0) + valid := Validate(config, logger) + assert.Equal(t, true, valid) +} diff --git a/config/locator.go b/config/locator.go new file mode 100644 index 0000000..fd28919 --- /dev/null +++ b/config/locator.go @@ -0,0 +1,44 @@ +package config + +import ( + "errors" + "fmt" + "os/user" + + "github.com/spf13/afero" +) + +type FileLocator struct { + Paths []string + Filesystem afero.Fs +} + +func (r FileLocator) Locate() (error, []string) { + exists := []string{} + for _, path := range r.Paths { + if _, err := r.Filesystem.Stat(path); err == nil { + exists = append(exists, path) + } + } + if len(exists) == 0 { + return errors.New("No configuration files found, exiting"), exists + } + + return nil, exists +} + +type Locator interface { + Locate() (error, []string) +} + +func NewLocator(paths []string, filesystem afero.Fs, user *user.User) Locator { + if user != nil { + paths = append([]string{fmt.Sprintf("%s/.rabbitmq-cli-consumer.conf", user.HomeDir)}, paths...) + } + paths = append([]string{"/etc/rabbitmq-cli-consumer/rabbitmq-cli-consumer.conf"}, paths...) + + return FileLocator{ + Paths: paths, + Filesystem: filesystem, + } +} diff --git a/config/locator_test.go b/config/locator_test.go new file mode 100644 index 0000000..143c152 --- /dev/null +++ b/config/locator_test.go @@ -0,0 +1,52 @@ +package config + +import ( + "os/user" + "testing" + + "github.com/spf13/afero" + "github.com/stretchr/testify/assert" +) + +func TestFindsDefauktFullpathConfig(t *testing.T) { + fs := &afero.MemMapFs{} + fs.Create("/etc/rabbitmq-cli-consumer/rabbitmq-cli-consumer.conf") + + u := NewLocator([]string{ + "/home/test/my-config.conf", + }, fs, nil) + + _, paths := u.Locate() + assert.Equal( + t, + []string{"/etc/rabbitmq-cli-consumer/rabbitmq-cli-consumer.conf"}, + paths, + ) +} + +func TestFindsConfigInHomedir(t *testing.T) { + fs := &afero.MemMapFs{} + fs.Create("/home/fakeuser/.rabbitmq-cli-consumer.conf") + user := createUser() + + u := NewLocator([]string{ + "/home/test/my-config.conf", + }, fs, user) + + _, paths := u.Locate() + assert.Equal( + t, + []string{"/home/fakeuser/.rabbitmq-cli-consumer.conf"}, + paths, + ) +} + +func createUser() *user.User { + return &user.User{ + Uid: "1", + Gid: "1", + Username: "fakeuser", + Name: "Foo Bar", + HomeDir: "/home/fakeuser", + } +} diff --git a/consumer/consumer.go b/consumer/consumer.go index f53e3d7..40f632a 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -4,26 +4,28 @@ import ( "bytes" "compress/zlib" "encoding/base64" - "errors" "fmt" + "log" + "net/url" + "github.com/ricbra/rabbitmq-cli-consumer/command" "github.com/ricbra/rabbitmq-cli-consumer/config" "github.com/streadway/amqp" - "log" - "net/url" ) +// Consumer represents a consumer type Consumer struct { - Channel *amqp.Channel + Channel Channel Connection *amqp.Connection Queue string Factory *command.CommandFactory ErrLogger *log.Logger InfLogger *log.Logger - Executer *command.CommandExecuter + Executer command.Executer Compression bool } +// Consume starts consuming messages from RabbitMQ func (c *Consumer) Consume() { c.InfLogger.Println("Registering consumer... ") msgs, err := c.Channel.Consume(c.Queue, "", false, false, false, false, nil) @@ -36,80 +38,129 @@ func (c *Consumer) Consume() { defer c.Channel.Close() forever := make(chan bool) - go func() { for d := range msgs { - input := d.Body - if c.Compression { - var b bytes.Buffer - w, err := zlib.NewWriterLevel(&b, zlib.BestCompression) - if err != nil { - c.ErrLogger.Println("Could not create zlib handler") - d.Nack(true, true) - } - c.InfLogger.Println("Compressed message") - w.Write(input) - w.Close() - - input = b.Bytes() - } - - cmd := c.Factory.Create(base64.StdEncoding.EncodeToString(input)) - if c.Executer.Execute(cmd) { - d.Ack(true) - } else { - d.Nack(true, true) + delivery := RabbitMqDelivery{ + delivery: d, + body: d.Body, } + c.ProcessMessage(delivery) } }() c.InfLogger.Println("Waiting for messages...") <-forever } -func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogger *log.Logger) (*Consumer, error) { - uri := fmt.Sprintf( - "amqp://%s:%s@%s:%s%s", - url.QueryEscape(cfg.RabbitMq.Username), - url.QueryEscape(cfg.RabbitMq.Password), - cfg.RabbitMq.Host, - cfg.RabbitMq.Port, - cfg.RabbitMq.Vhost, +// ProcessMessage content of one single message +func (c *Consumer) ProcessMessage(msg Delivery) { + input := msg.Body() + if c.Compression { + var b bytes.Buffer + w, err := zlib.NewWriterLevel(&b, zlib.BestCompression) + if err != nil { + c.ErrLogger.Println("Could not create zlib handler") + return + } + c.InfLogger.Println("Decompressed message") + w.Write(input) + w.Close() + + input = b.Bytes() + } + + cmd := c.Factory.Create(base64.StdEncoding.EncodeToString(input)) + out, err := c.Executer.Execute(cmd) + + if err != nil { + msg.Nack(true, true) + return + } + + if msg.IsRpcMessage() { + c.InfLogger.Println("Message is RPC message, trying to send reply...") + + if err := c.Reply(msg, out); err != nil { + c.InfLogger.Println("Sending RPC reply failed. Check error log.") + c.ErrLogger.Printf("Error occured during send RPC reply: %s", err) + msg.Nack(true, true) + + return + } + c.InfLogger.Println("RPC reply send.") + } + + // All went fine, ack message + msg.Ack(true) +} + +func (c *Consumer) Reply(msg Delivery, out []byte) error { + return c.Channel.Publish( + "", + msg.ReplyTo(), + false, + false, + amqp.Publishing{ + ContentType: "text/plain", + CorrelationId: msg.CorrelationId(), + Body: out, + }, ) +} + +// New returns a initialized consumer based on config +func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogger *log.Logger) (*Consumer, error) { + uri := ParseURI(cfg.RabbitMq.Username, cfg.RabbitMq.Password, cfg.RabbitMq.Host, cfg.RabbitMq.Port, cfg.RabbitMq.Vhost) infLogger.Println("Connecting RabbitMQ...") - conn, err := amqp.Dial(uri) + conn, err := Connect(uri) if nil != err { - return nil, errors.New(fmt.Sprintf("Failed connecting RabbitMQ: %s", err.Error())) + return nil, fmt.Errorf("Failed connecting RabbitMQ: %s", err.Error()) } infLogger.Println("Connected.") infLogger.Println("Opening channel...") ch, err := conn.Channel() if nil != err { - return nil, errors.New(fmt.Sprintf("Failed to open a channel: %s", err.Error())) + return nil, fmt.Errorf("Failed to open a channel: %s", err.Error()) } infLogger.Println("Done.") - infLogger.Println("Setting QoS... ") - // Attempt to preserve BC here - if cfg.Prefetch.Count == 0 { - cfg.Prefetch.Count = 3 + if err := Initialize(cfg, ch, infLogger, errLogger); err != nil { + return nil, err } + + return &Consumer{ + Channel: ch, + Connection: conn, + Queue: cfg.Queue.Name, + Factory: factory, + ErrLogger: errLogger, + InfLogger: infLogger, + Executer: command.New(errLogger, infLogger), + Compression: cfg.RabbitMq.Compression, + }, nil +} + +// Initialize channel according to config +func Initialize(cfg *config.Config, ch Channel, errLogger, infLogger *log.Logger) error { + infLogger.Println("Setting QoS... ") + if err := ch.Qos(cfg.Prefetch.Count, 0, cfg.Prefetch.Global); err != nil { - return nil, errors.New(fmt.Sprintf("Failed to set QoS: %s", err.Error())) + return fmt.Errorf("Failed to set QoS: %s", err.Error()) } + infLogger.Println("Succeeded setting QoS.") - infLogger.Printf("Declaring queue \"%s\"...", cfg.RabbitMq.Queue) - _, err = ch.QueueDeclare(cfg.RabbitMq.Queue, true, false, false, false, nil) + infLogger.Printf("Declaring queue \"%s\"...", cfg.Queue.Name) - if nil != err { - return nil, errors.New(fmt.Sprintf("Failed to declare queue: %s", err.Error())) + table := amqp.Table{} + if cfg.Queue.TTL > 0 { + table["x-message-ttl"] = cfg.Queue.TTL } + _, err := ch.QueueDeclare(cfg.Queue.Name, cfg.Queue.Durable, cfg.Queue.Autodelete, cfg.Queue.Exclusive, cfg.Queue.Nowait, table) - // Check for missing exchange settings to preserve BC - if "" == cfg.Exchange.Name && "" == cfg.Exchange.Type && !cfg.Exchange.Durable && !cfg.Exchange.Autodelete { - cfg.Exchange.Type = "direct" + if nil != err { + return fmt.Errorf("Failed to declare queue: %s", err.Error()) } // Empty Exchange name means default, no need to declare @@ -118,26 +169,88 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg err = ch.ExchangeDeclare(cfg.Exchange.Name, cfg.Exchange.Type, cfg.Exchange.Durable, cfg.Exchange.Autodelete, false, false, amqp.Table{}) if nil != err { - return nil, errors.New(fmt.Sprintf("Failed to declare exchange: %s", err.Error())) + return fmt.Errorf("Failed to declare exchange: %s", err.Error()) } // Bind queue - infLogger.Printf("Binding queue \"%s\" to exchange \"%s\"...", cfg.RabbitMq.Queue, cfg.Exchange.Name) - err = ch.QueueBind(cfg.RabbitMq.Queue, "", cfg.Exchange.Name, false, nil) + infLogger.Printf("Binding queue \"%s\" to exchange \"%s\"...", cfg.Queue.Name, cfg.Exchange.Name) + err = ch.QueueBind(cfg.Queue.Name, cfg.Queue.Key, cfg.Exchange.Name, false, amqp.Table{}) if nil != err { - return nil, errors.New(fmt.Sprintf("Failed to bind queue to exchange: %s", err.Error())) + return fmt.Errorf("Failed to bind queue to exchange: %s", err.Error()) } } - return &Consumer{ - Channel: ch, - Connection: conn, - Queue: cfg.RabbitMq.Queue, - Factory: factory, - ErrLogger: errLogger, - InfLogger: infLogger, - Executer: command.New(errLogger, infLogger), - Compression: cfg.RabbitMq.Compression, - }, nil + return nil +} + +// Connect opens a connection to the given uri +func Connect(uri string) (*amqp.Connection, error) { + return amqp.Dial(uri) +} + +// ParseURI parses the URI based on config +func ParseURI(username, password, host, port, vhost string) string { + if start := string(vhost[0]); start != "/" { + vhost = fmt.Sprintf("/%s", vhost) + } + + return fmt.Sprintf( + "amqp://%s:%s@%s:%s%s", + url.QueryEscape(username), + url.QueryEscape(password), + host, + port, + vhost, + ) +} + +// Channel is the interface +type Channel interface { + ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error + QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) + Qos(prefetchCount, prefetchSize int, global bool) error + QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error + Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) + Close() error + Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error +} + +// Delivery interface describes interface for messages +type Delivery interface { + Ack(multiple bool) error + Nack(multiple, requeue bool) error + Body() []byte + IsRpcMessage() bool + CorrelationId() string + ReplyTo() string +} + +type RabbitMqDelivery struct { + body []byte + delivery amqp.Delivery +} + +func (r RabbitMqDelivery) Ack(multiple bool) error { + return r.delivery.Ack(multiple) +} + +func (r RabbitMqDelivery) Nack(multiple, requeue bool) error { + return r.delivery.Nack(multiple, requeue) +} + +func (r RabbitMqDelivery) Body() []byte { + return r.body +} + +func (r RabbitMqDelivery) IsRpcMessage() bool { + return r.delivery.ReplyTo != "" && r.delivery.CorrelationId != "" +} + +func (r RabbitMqDelivery) CorrelationId() string { + return r.delivery.CorrelationId +} + +func (r RabbitMqDelivery) ReplyTo() string { + return r.delivery.ReplyTo } diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go new file mode 100644 index 0000000..52c85c1 --- /dev/null +++ b/consumer/consumer_test.go @@ -0,0 +1,415 @@ +package consumer + +import ( + "bytes" + "encoding/base64" + "errors" + "fmt" + "log" + "testing" + + "github.com/ricbra/rabbitmq-cli-consumer/command" + "github.com/ricbra/rabbitmq-cli-consumer/config" + "github.com/streadway/amqp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestParseAndEscapesParamsInURI(t *testing.T) { + uri := ParseURI("richard", "my@:secr%t", "localhost", "123", "/vhost") + + assert.Equal(t, "amqp://richard:my%40%3Asecr%25t@localhost:123/vhost", uri) +} + +func TestAddsSlashWhenMissingInVhost(t *testing.T) { + uri := ParseURI("richard", "secret", "localhost", "123", "vhost") + + assert.Equal(t, "amqp://richard:secret@localhost:123/vhost", uri) +} + +func TestSetQosFails(t *testing.T) { + config := createConfig() + ch := new(TestChannel) + + var b bytes.Buffer + errLogger := log.New(&b, "", 0) + infLogger := log.New(&b, "", 0) + + ch.On("Qos", 3, 0, true).Return(errors.New("Error occured")).Once() + + err := Initialize(&config, ch, errLogger, infLogger) + + ch.AssertExpectations(t) + ch.AssertNotCalled(t, "QueueDeclare", "worker", true, false, false, false, amqp.Table{}) + assert.NotNil(t, err) +} + +func TestSetQosSucceeds(t *testing.T) { + config := createConfig() + ch := new(TestChannel) + + var b bytes.Buffer + errLogger := log.New(&b, "", 0) + infLogger := log.New(&b, "", 0) + + ch.On("Qos", 3, 0, true).Return(nil).Once() + ch.On("QueueDeclare", "worker", true, false, false, false, amqp.Table{}).Return(amqp.Queue{}, errors.New("error")).Once() + + Initialize(&config, ch, errLogger, infLogger) + + ch.AssertExpectations(t) +} + +func TestDeclareQueueFails(t *testing.T) { + config := createConfig() + ch := new(TestChannel) + + var b bytes.Buffer + errLogger := log.New(&b, "", 0) + infLogger := log.New(&b, "", 0) + + ch.On("Qos", 3, 0, true).Return(nil).Once() + ch.On("QueueDeclare", "worker", true, false, false, false, amqp.Table{}).Return(amqp.Queue{}, errors.New("error")).Once() + + err := Initialize(&config, ch, errLogger, infLogger) + + ch.AssertExpectations(t) + ch.AssertNotCalled(t, "ExchangeDeclare", "worker", "test", true, false, false, false, amqp.Table{}) + assert.NotNil(t, err) +} + +func TestDeclareQueueSucceeds(t *testing.T) { + config := createConfig() + ch := new(TestChannel) + + var b bytes.Buffer + errLogger := log.New(&b, "", 0) + infLogger := log.New(&b, "", 0) + + ch.On("Qos", 3, 0, true).Return(nil).Once() + ch.On("QueueDeclare", "worker", true, false, false, false, amqp.Table{}).Return(amqp.Queue{}, nil).Once() + ch.On("ExchangeDeclare", "worker", "test", true, false, false, false, amqp.Table{}).Return(errors.New("error")).Once() + + Initialize(&config, ch, errLogger, infLogger) + + ch.AssertExpectations(t) +} + +func TestDeclareQueueWithTTL(t *testing.T) { + config := createConfigWithTTL() + ch := new(TestChannel) + + var b bytes.Buffer + errLogger := log.New(&b, "", 0) + infLogger := log.New(&b, "", 0) + + ch.On("Qos", 3, 0, true).Return(nil).Once() + ch.On("QueueDeclare", "worker", true, false, false, false, amqp.Table{"x-message-ttl": 1200}).Return(amqp.Queue{}, nil).Once() + ch.On("ExchangeDeclare", "worker", "test", true, false, false, false, amqp.Table{}).Return(errors.New("error")).Once() + + Initialize(&config, ch, errLogger, infLogger) + + ch.AssertExpectations(t) +} + +func TestBindQueueFails(t *testing.T) { + config := createConfig() + ch := new(TestChannel) + + var b bytes.Buffer + errLogger := log.New(&b, "", 0) + infLogger := log.New(&b, "", 0) + + ch.On("Qos", 3, 0, true).Return(nil).Once() + ch.On("QueueDeclare", "worker", true, false, false, false, amqp.Table{}).Return(amqp.Queue{}, nil).Once() + ch.On("ExchangeDeclare", "worker", "test", true, false, false, false, amqp.Table{}).Return(nil).Once() + ch.On("QueueBind", "worker", "foo", "worker", false, amqp.Table{}).Return(errors.New("error")).Once() + + err := Initialize(&config, ch, errLogger, infLogger) + + ch.AssertExpectations(t) + assert.NotNil(t, err) +} + +func TestBindQueueSucceeds(t *testing.T) { + config := createConfig() + ch := new(TestChannel) + + var b bytes.Buffer + errLogger := log.New(&b, "", 0) + infLogger := log.New(&b, "", 0) + + ch.On("Qos", 3, 0, true).Return(nil).Once() + ch.On("QueueDeclare", "worker", true, false, false, false, amqp.Table{}).Return(amqp.Queue{}, nil).Once() + ch.On("ExchangeDeclare", "worker", "test", true, false, false, false, amqp.Table{}).Return(nil).Once() + ch.On("QueueBind", "worker", "foo", "worker", false, amqp.Table{}).Return(nil).Once() + + err := Initialize(&config, ch, errLogger, infLogger) + + ch.AssertExpectations(t) + assert.Nil(t, err) +} + +func TestProcessingMessageWithSuccess(t *testing.T) { + msg := new(TestDelivery) + executer := new(TestExecuter) + factory := &command.CommandFactory{ + Cmd: "test", + Args: []string{"aa"}, + } + consumer := Consumer{ + Executer: executer, + Factory: factory, + Compression: false, + } + body := []byte("the_body") + args := base64.StdEncoding.EncodeToString(body) + cmd := factory.Create(args) + executer.On("Execute", cmd).Return([]byte(""), nil).Once() + msg.On("Body").Return(body).Once() + msg.On("Ack", true).Return(nil).Once() + msg.On("IsRpcMessage").Return(false).Once() + + consumer.ProcessMessage(msg) + + executer.AssertExpectations(t) + msg.AssertExpectations(t) +} + +func TestProcessingMessageWithFailure(t *testing.T) { + msg := new(TestDelivery) + executer := new(TestExecuter) + factory := &command.CommandFactory{ + Cmd: "test", + Args: []string{"aa"}, + } + consumer := Consumer{ + Executer: executer, + Factory: factory, + Compression: false, + } + body := []byte("the_body") + args := base64.StdEncoding.EncodeToString(body) + cmd := factory.Create(args) + executer.On("Execute", cmd).Return([]byte(""), fmt.Errorf("Test")).Once() + msg.On("Body").Return(body).Once() + msg.On("Nack", true, true).Return(nil).Once() + + consumer.ProcessMessage(msg) + + executer.AssertExpectations(t) + msg.AssertExpectations(t) +} + +func TestProcessingRpcMessageWithSuccess(t *testing.T) { + msg := new(TestDelivery) + ch := new(TestChannel) + var b bytes.Buffer + errLogger := log.New(&b, "", 0) + infLogger := log.New(&b, "", 0) + executer := new(TestExecuter) + factory := &command.CommandFactory{ + Cmd: "test", + Args: []string{"aa"}, + } + consumer := Consumer{ + Executer: executer, + Factory: factory, + Compression: false, + Channel: ch, + ErrLogger: errLogger, + InfLogger: infLogger, + } + body := []byte("the_body") + args := base64.StdEncoding.EncodeToString(body) + cmd := factory.Create(args) + out := []byte("msg") + executer.On("Execute", cmd).Return(out, nil).Once() + ch.On("Publish", "", "queue_name", false, false, amqp.Publishing{ + ContentType: "text/plain", + CorrelationId: "123456", + Body: out, + }).Return(nil).Once() + msg.On("Body").Return(body).Once() + msg.On("IsRpcMessage").Return(true).Once() + msg.On("ReplyTo").Return("queue_name").Once() + msg.On("CorrelationId").Return("123456").Once() + msg.On("Ack", true).Return(nil).Once() + + consumer.ProcessMessage(msg) + + executer.AssertExpectations(t) + msg.AssertExpectations(t) +} + +type TestCommand struct { + mock.Mock +} + +func (t *TestCommand) CombinedOutput() (out []byte, err error) { + argsT := t.Called() + + return argsT.Get(0).([]byte), argsT.Error(1) +} + +type TestExecuter struct { + mock.Mock +} + +func (t *TestExecuter) Execute(cmd command.Command) (result []byte, err error) { + argsT := t.Called(cmd) + + return argsT.Get(0).([]byte), argsT.Error(1) +} + +type TestDelivery struct { + mock.Mock + body []byte +} + +func (t *TestDelivery) Ack(multiple bool) error { + argstT := t.Called(multiple) + + return argstT.Error(0) +} + +func (t *TestDelivery) Nack(multiple, requeue bool) error { + argsT := t.Called(multiple, requeue) + + return argsT.Error(0) +} + +func (t *TestDelivery) Body() []byte { + argsT := t.Called() + + return argsT.Get(0).([]byte) +} + +func (t *TestDelivery) CorrelationId() string { + argsT := t.Called() + + return argsT.Get(0).(string) +} + +func (t *TestDelivery) IsRpcMessage() bool { + argsT := t.Called() + + return argsT.Get(0).(bool) +} + +func (t *TestDelivery) ReplyTo() string { + argsT := t.Called() + + return argsT.Get(0).(string) +} + +type TestChannel struct { + mock.Mock +} + +func (t *TestChannel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error { + argsT := t.Called(name, kind, durable, autoDelete, internal, noWait, args) + + return argsT.Error(0) +} + +func (t *TestChannel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) { + argsT := t.Called(name, durable, autoDelete, exclusive, noWait, args) + + return argsT.Get(0).(amqp.Queue), argsT.Error(1) +} + +func (t *TestChannel) Qos(prefetchCount, prefetchSize int, global bool) error { + argsT := t.Called(prefetchCount, prefetchSize, global) + + return argsT.Error(0) +} + +func (t *TestChannel) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error { + argsT := t.Called(name, key, exchange, noWait, args) + + return argsT.Error(0) +} + +func (t *TestChannel) Close() error { + argsT := t.Called() + + return argsT.Error(0) +} + +func (t *TestChannel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) { + argsT := t.Called(queue, consumer, autoAck, exclusive, noLocal, noWait, args) + + return argsT.Get(0).(<-chan amqp.Delivery), argsT.Error(0) +} + +func (t *TestChannel) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { + argsT := t.Called(exchange, key, mandatory, immediate, msg) + + return argsT.Error(0) +} + +func createConfig() config.Config { + return config.CreateFromString(`[rabbitmq] + host=localhost + username=ricbra + password=t3st + vhost=staging + port=123 + + [prefetch] + count=3 + global=On + + [queue] + name=worker + durable=On + autodelete=Off + exclusive=Off + nowait=Off + key=foo + + [exchange] + name=worker + autodelete=Off + type=test + durable=On + + [logs] + error=a + info=b + `) +} + +func createConfigWithTTL() config.Config { + return config.CreateFromString(`[rabbitmq] + host=localhost + username=ricbra + password=t3st + vhost=staging + port=123 + + [prefetch] + count=3 + global=On + + [queue] + name=worker + durable=On + autodelete=Off + exclusive=Off + nowait=Off + key=foo + ttl=1200 + + [exchange] + name=worker + autodelete=Off + type=test + durable=On + + [logs] + error=a + info=b + `) +} diff --git a/main.go b/main.go index c84a3c6..78f59f0 100644 --- a/main.go +++ b/main.go @@ -1,22 +1,32 @@ package main import ( + "fmt" + "io" + "log" + "os" + "os/signal" + "os/user" + "syscall" + + "gopkg.in/gcfg.v1" + "github.com/codegangsta/cli" "github.com/ricbra/rabbitmq-cli-consumer/command" "github.com/ricbra/rabbitmq-cli-consumer/config" "github.com/ricbra/rabbitmq-cli-consumer/consumer" - "io" - "log" - "os" + "github.com/spf13/afero" ) +var files []*os.File + func main() { app := cli.NewApp() app.Name = "rabbitmq-cli-consumer" app.Usage = "Consume RabbitMQ easily to any cli program" app.Author = "Richard van den Brand" app.Email = "richard@vandenbrand.org" - app.Version = "1.1.0" + app.Version = "2.0.0-dev" app.Flags = []cli.Flag{ cli.StringFlag{ Name: "executable, e", @@ -26,24 +36,118 @@ func main() { Name: "configuration, c", Usage: "Location of configuration file", }, - cli.BoolFlag{ - Name: "verbose, V", - Usage: "Enable verbose mode (logs to stdout and stderr)", + cli.StringFlag{ + Name: "quiet, Q", + Usage: "Enable quite mode (disables loggging to stdout and stderr)", + }, + cli.StringFlag{ + Name: "host, H", + Usage: "IP or hostname of RabbitMQ server", + }, + cli.StringFlag{ + Name: "username, u", + Usage: "RabbitMQ username", + }, + cli.StringFlag{ + Name: "password, p", + Usage: "RabbitMQ password", + }, + cli.StringFlag{ + Name: "vhost, V", + Usage: "RabbitMQ vhost", + }, + cli.StringFlag{ + Name: "port, P", + Usage: "RabbitMQ port", + }, + cli.StringFlag{ + Name: "compression, o", + Usage: "Enable compression of messages", + }, + cli.StringFlag{ + Name: "prefetch-count, C", + Usage: "Prefetch count", + }, + cli.StringFlag{ + Name: "prefetch-global, G", + Usage: "Set prefetch count as global", + }, + cli.StringFlag{ + Name: "queue-name,q", + Usage: "Queue name", + }, + cli.StringFlag{ + Name: "queue-durable,D", + Usage: "Mark queue as durable", + }, + cli.StringFlag{ + Name: "queue-autodelete,a", + Usage: "Autodelete queue", + }, + cli.StringFlag{ + Name: "queue-exlusive,E", + Usage: "Mark queue as exclusive", + }, + cli.StringFlag{ + Name: "queue-nowait, T", + Usage: "Do not wait for the server to confirm the binding", + }, + cli.StringFlag{ + Name: "queue-key, k", + Usage: "Routing key to bind the queue on", + }, + cli.StringFlag{ + Name: "exchange-name, X", + Usage: "Exchange name", + }, + cli.StringFlag{ + Name: "exchange-autodelete, t", + Usage: "Autodelete exchange", + }, + cli.StringFlag{ + Name: "exchange-type, y", + Usage: "Exchange type (direct, fanout, topic or headers)", + }, + cli.StringFlag{ + Name: "exchange-durable, j", + Usage: "Mark exchange as durable", }, } app.Action = func(c *cli.Context) { - if c.String("configuration") == "" && c.String("executable") == "" { + if c.String("executable") == "" { cli.ShowAppHelp(c) os.Exit(1) } - verbose := c.Bool("verbose") - - logger := log.New(os.Stderr, "", log.Ldate|log.Ltime) - cfg, err := config.LoadAndParse(c.String("configuration")) + verbose := !c.Bool("quiet") + logger := log.New(os.Stderr, "", 0) + // Config finding and parsing + // Perhaps refactor into something more elegant + user, _ := user.Current() + locator := config.NewLocator([]string{c.String("configuration")}, &afero.OsFs{}, user) + configs := []config.Config{} + err, locations := locator.Locate() if err != nil { - logger.Fatalf("Failed parsing configuration: %s\n", err) + logger.Fatalf("Failed locating configuration: %s\n", err) + } + + for _, path := range locations { + logger.Printf("Found config: %s", path) + cfg := config.Config{} + if err := gcfg.ReadFileInto(&cfg, path); err == nil { + configs = append(configs, cfg) + } else { + logger.Printf("Could not parse config: %s", err) + } + } + + // Read config settings passed in as option to the command + configs = append(configs, config.CreateFromCliContext(c)) + merger := config.ConfigMerger{} + cfg, _ := merger.Merge(configs) + if !config.Validate(cfg, logger) { + logger.Fatalf("Please fix configuration issues.") } errLogger, err := createLogger(cfg.Logs.Error, verbose, os.Stderr) @@ -58,11 +162,30 @@ func main() { factory := command.Factory(c.String("executable")) - client, err := consumer.New(cfg, factory, errLogger, infLogger) + client, err := consumer.New(&cfg, factory, errLogger, infLogger) if err != nil { errLogger.Fatalf("Failed creating consumer: %s", err) } + // Reopen logs on USR1 + sigs := make(chan os.Signal) + signal.Notify(sigs, syscall.SIGUSR1) + + go func() { + for _ = range sigs { + for _, file := range files { + filename := file.Name() + file.Close() + new, err := os.OpenFile(filename, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0660) + + if err != nil { + panic(fmt.Sprintf("Failed reopeing log file: %s", err)) + } + file = new + } + } + }() + client.Consume() } @@ -76,6 +199,8 @@ func createLogger(filename string, verbose bool, out io.Writer) (*log.Logger, er return nil, err } + files = append(files, file) + var writers = []io.Writer{ file, }