From 2aeb815987ce6e302cb418127b12e39c46298030 Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Fri, 22 May 2015 21:19:22 +0200 Subject: [PATCH 01/36] Added instructions on how to start RabbitMQ in docker container --- README.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/README.md b/README.md index 7cc2b9a..bfa82e5 100644 --- a/README.md +++ b/README.md @@ -278,4 +278,21 @@ exit(1); Missing anything? Found a bug? I love to see your PR. +## Setup development environment +It can be quite difficult to get an development environment up & running. I'll hope to +expand the instructions a bit in the future. + +### RabbitMQ + +Start by installing docker. + +Then: + + $ docker run -d -P -e RABBITMQ_NODENAME=rabbitmq-cli-consumer --name rabbitmq-cli-consumer rabbitmq:3-management + +To see which ports are available: + + $ docker port rabbitmq-cli-consumer + +You can login with guest / guest. From b3f3b4ae9c147d103782902458ab34fbf46d25f2 Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Sat, 23 May 2015 22:27:09 +0200 Subject: [PATCH 02/36] Added some more instructions about docker --- README.md | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index bfa82e5..de0787f 100644 --- a/README.md +++ b/README.md @@ -283,16 +283,29 @@ Missing anything? Found a bug? I love to see your PR. It can be quite difficult to get an development environment up & running. I'll hope to expand the instructions a bit in the future. +### Go and gvm + +Todo. + ### RabbitMQ Start by installing docker. Then: - $ docker run -d -P -e RABBITMQ_NODENAME=rabbitmq-cli-consumer --name rabbitmq-cli-consumer rabbitmq:3-management + $ docker run -d -P -e RABBITMQ_NODENAME=rabbitmq-cli-consumer --name rabbitmq-cli-consumer rabbitmq:3-management To see which ports are available: $ docker port rabbitmq-cli-consumer -You can login with guest / guest. +You can login with guest / guest. +If you want stop the container: + + $ docker stop rabbitmq-cli-consumer + +And to restart the container: + + # docker start rabbitmq-cli-consumer + + From 44aacd8fe4c6cb31ef14de40a872a479b1d8fd15 Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Sun, 16 Aug 2015 22:19:26 +0200 Subject: [PATCH 03/36] Changed version to 2.0.0-dev --- main.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/main.go b/main.go index c84a3c6..cb1737a 100644 --- a/main.go +++ b/main.go @@ -1,13 +1,14 @@ package main import ( + "io" + "log" + "os" + "github.com/codegangsta/cli" "github.com/ricbra/rabbitmq-cli-consumer/command" "github.com/ricbra/rabbitmq-cli-consumer/config" "github.com/ricbra/rabbitmq-cli-consumer/consumer" - "io" - "log" - "os" ) func main() { @@ -16,7 +17,7 @@ func main() { app.Usage = "Consume RabbitMQ easily to any cli program" app.Author = "Richard van den Brand" app.Email = "richard@vandenbrand.org" - app.Version = "1.1.0" + app.Version = "2.0.0-dev" app.Flags = []cli.Flag{ cli.StringFlag{ Name: "executable, e", From 3dcc503abc5e7aebe656e719b4a412a58d738879 Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Sun, 16 Aug 2015 22:20:20 +0200 Subject: [PATCH 04/36] Fixed identation in README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index de0787f..c3be5be 100644 --- a/README.md +++ b/README.md @@ -297,7 +297,7 @@ Then: To see which ports are available: - $ docker port rabbitmq-cli-consumer + $ docker port rabbitmq-cli-consumer You can login with guest / guest. If you want stop the container: From 0c49850ef105dc8882592bf5b0a3e20c42416660 Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Mon, 17 Aug 2015 21:52:14 +0200 Subject: [PATCH 05/36] Added FileLocator and tests --- config/locator.go | 39 +++++++++++++++++++++++++++++++ config/locator_test.go | 52 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+) create mode 100644 config/locator.go create mode 100644 config/locator_test.go diff --git a/config/locator.go b/config/locator.go new file mode 100644 index 0000000..676fb32 --- /dev/null +++ b/config/locator.go @@ -0,0 +1,39 @@ +package config + +import ( + "fmt" + "os/user" + + "github.com/spf13/afero" +) + +type FileLocator struct { + Paths []string + Filesystem afero.Fs +} + +func (r FileLocator) Locate() []string { + exists := []string{} + for _, path := range r.Paths { + if _, err := r.Filesystem.Stat(path); err == nil { + exists = append(exists, path) + } + } + + return exists +} + +type Locator interface { + Locate() []string +} + +func NewLocator(paths []string, filesystem afero.Fs, user *user.User) Locator { + if user != nil { + paths = append(paths, fmt.Sprintf("%s/.rabbitmq-cli-consumer.conf", user.HomeDir)) + } + + return FileLocator{ + Paths: paths, + Filesystem: filesystem, + } +} diff --git a/config/locator_test.go b/config/locator_test.go new file mode 100644 index 0000000..8f905e5 --- /dev/null +++ b/config/locator_test.go @@ -0,0 +1,52 @@ +package config + +import ( + "os/user" + "testing" + + "github.com/spf13/afero" + "github.com/stretchr/testify/assert" +) + +func TestFindsFullPathFiles(t *testing.T) { + fs := &afero.MemMapFs{} + fs.Create("/etc/rabbitmq-cli-consmer/rabbitmq-cli-consumer.conf") + + u := NewLocator([]string{ + "/etc/rabbitmq-cli-consmer/rabbitmq-cli-consumer.conf", + "/home/test/my-config.conf", + }, fs, nil) + + assert.Equal( + t, + []string{"/etc/rabbitmq-cli-consmer/rabbitmq-cli-consumer.conf"}, + u.Locate(), + ) +} + +func TestFindsConfigInHomedir(t *testing.T) { + fs := &afero.MemMapFs{} + fs.Create("/home/fakeuser/.rabbitmq-cli-consumer.conf") + user := createUser() + + u := NewLocator([]string{ + "/etc/rabbitmq-cli-consmer/rabbitmq-cli-consumer.conf", + "/home/test/my-config.conf", + }, fs, user) + + assert.Equal( + t, + []string{"/home/fakeuser/.rabbitmq-cli-consumer.conf"}, + u.Locate(), + ) +} + +func createUser() *user.User { + return &user.User{ + Uid: "1", + Gid: "1", + Username: "fakeuser", + Name: "Foo Bar", + HomeDir: "/home/fakeuser", + } +} From 09cd3ab95e5f9940fca8dfdc0f99381c9010d2cf Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Tue, 18 Aug 2015 08:28:04 +0200 Subject: [PATCH 06/36] First take on config merger --- config/config_merger.go | 25 +++++++++++++++++++++++++ config/config_merger_test.go | 28 ++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+) create mode 100644 config/config_merger.go create mode 100644 config/config_merger_test.go diff --git a/config/config_merger.go b/config/config_merger.go new file mode 100644 index 0000000..7e3d583 --- /dev/null +++ b/config/config_merger.go @@ -0,0 +1,25 @@ +package config + +import ( + "fmt" + + "github.com/imdario/mergo" +) + +type Merger interface { + Merge() +} + +type ConfigMerger struct { +} + +func (m ConfigMerger) Merge(configs []Config) { + dest := Config{} + for _, config := range configs { + if err := mergo.Merge(&dest, config); err != nil { + fmt.Println(err) + } + fmt.Println(dest) + + } +} diff --git a/config/config_merger_test.go b/config/config_merger_test.go new file mode 100644 index 0000000..bd2494d --- /dev/null +++ b/config/config_merger_test.go @@ -0,0 +1,28 @@ +package config + +import ( + "fmt" + "testing" + + "code.google.com/p/gcfg" +) + +func TestMergesConfigs(t *testing.T) { + configs := []Config{ + createConfig("localhost", "queue1"), + createConfig("test.host.com", "queue2"), + } + + merger := ConfigMerger{} + merger.Merge(configs) +} + +func createConfig(host, queue string) Config { + cfg := Config{} + cfgStr := fmt.Sprintf(`[rabbitmq] +host=%s +queue=%s`, host, queue) + gcfg.ReadStringInto(&cfg, cfgStr) + + return cfg +} From ad00379ec7c375d315407c935aadcbdba3861fd6 Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Mon, 24 Aug 2015 21:54:54 +0200 Subject: [PATCH 07/36] Added config merger and test --- config/config_merger.go | 10 +++++----- config/config_merger_test.go | 24 +++++++++++++++--------- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/config/config_merger.go b/config/config_merger.go index 7e3d583..423e81e 100644 --- a/config/config_merger.go +++ b/config/config_merger.go @@ -13,13 +13,13 @@ type Merger interface { type ConfigMerger struct { } -func (m ConfigMerger) Merge(configs []Config) { +func (m ConfigMerger) Merge(configs []Config) (Config, error) { dest := Config{} for _, config := range configs { - if err := mergo.Merge(&dest, config); err != nil { - fmt.Println(err) + if err := mergo.MergeWithOverwrite(&dest, config); err != nil { + return dest, fmt.Errorf("Could not merge config: %s", err.Error()) } - fmt.Println(dest) - } + + return dest, nil } diff --git a/config/config_merger_test.go b/config/config_merger_test.go index bd2494d..95ae067 100644 --- a/config/config_merger_test.go +++ b/config/config_merger_test.go @@ -1,28 +1,34 @@ package config import ( - "fmt" "testing" "code.google.com/p/gcfg" + "github.com/stretchr/testify/assert" ) func TestMergesConfigs(t *testing.T) { configs := []Config{ - createConfig("localhost", "queue1"), - createConfig("test.host.com", "queue2"), + createConfig(`[rabbitmq] + host=rabbitmq.provider.com + password=123pass + vhost=test`), + createConfig(`[rabbitmq] + host=localhost + queue=testqueue`), } merger := ConfigMerger{} - merger.Merge(configs) + config, _ := merger.Merge(configs) + + assert.Equal(t, config.RabbitMq.Host, "localhost") + assert.Equal(t, config.RabbitMq.Password, "123pass") + assert.Equal(t, config.RabbitMq.Vhost, "test") } -func createConfig(host, queue string) Config { +func createConfig(config string) Config { cfg := Config{} - cfgStr := fmt.Sprintf(`[rabbitmq] -host=%s -queue=%s`, host, queue) - gcfg.ReadStringInto(&cfg, cfgStr) + gcfg.ReadStringInto(&cfg, config) return cfg } From 868e5d61520957a256063da5b1064a09e60212a2 Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Mon, 31 Aug 2015 22:14:20 +0200 Subject: [PATCH 08/36] Rewrote config --- config/config.go | 59 +++++++++++++++++++------------------- config/config_test.go | 64 ++++++++++++++++++++++++++++++++++++++++++ config/locator.go | 13 ++++++--- config/locator_test.go | 14 ++++----- 4 files changed, 110 insertions(+), 40 deletions(-) create mode 100644 config/config_test.go diff --git a/config/config.go b/config/config.go index 71bf8c6..1b67d09 100644 --- a/config/config.go +++ b/config/config.go @@ -1,51 +1,52 @@ package config import ( - "code.google.com/p/gcfg" - "path/filepath" + "log" + "strings" + + "gopkg.in/validator.v2" ) type Config struct { RabbitMq struct { - Host string - Username string - Password string - Port string - Vhost string - Queue string + Host string `validate:"nonzero"` + Username string `validate:"nonzero"` + Password string `validate:"nonzero"` + Port string `validate:"nonzero"` + Vhost string `validate:"nonzero"` + Queue string `validate:"nonzero"` Compression bool } Prefetch struct { - Count int - Global bool + Count int `validate:"nonzero"` + Global bool } Exchange struct { - Name string - Autodelete bool - Type string - Durable bool + Name string `validate:"nonzero"` + Autodelete bool + Type string `validate:"nonzero"` + Durable bool } Logs struct { - Error string - Info string + Error string `validate:"nonzero"` + Info string `validate:"nonzero"` } } -func LoadAndParse(location string) (*Config, error) { - if !filepath.IsAbs(location) { - location, err := filepath.Abs(location) +func Validate(config Config, logger *log.Logger) bool { + if err := validator.Validate(config); err != nil { + for f, e := range err.(validator.ErrorMap) { + split := strings.Split(strings.ToLower(f), ".") + msg := e.Error() + switch msg { + case "zero value": + msg = "This option is required" + } - if err != nil { - return nil, err + logger.Printf("The option \"%s\" under section \"%s\" is invalid: %s\n", split[1], split[0], msg) } - - location = location - } - - cfg := Config{} - if err := gcfg.ReadFileInto(&cfg, location); err != nil { - return nil, err + return false } - return &cfg, nil + return true } diff --git a/config/config_test.go b/config/config_test.go new file mode 100644 index 0000000..8569c95 --- /dev/null +++ b/config/config_test.go @@ -0,0 +1,64 @@ +package config + +import ( + "bytes" + "log" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestFailsOnRequiredFields(t *testing.T) { + config := createConfig(`[rabbitmq] + host=`) + + var b bytes.Buffer + logger := log.New(&b, "", 0) + + valid := Validate(config, logger) + out := b.String() + + assert.Equal(t, false, valid) + assert.Contains(t, out, "The option \"queue\" under section \"rabbitmq\" is invalid: This option is required") + assert.Contains(t, out, "The option \"port\" under section \"rabbitmq\" is invalid: This option is required") + assert.Contains(t, out, "The option \"username\" under section \"rabbitmq\" is invalid: This option is required") + assert.Contains(t, out, "The option \"password\" under section \"rabbitmq\" is invalid: This option is required") + assert.Contains(t, out, "The option \"vhost\" under section \"rabbitmq\" is invalid: This option is required") + assert.Contains(t, out, "The option \"host\" under section \"rabbitmq\" is invalid: This option is required") + assert.Contains(t, out, "The option \"count\" under section \"prefetch\" is invalid: This option is required") + assert.Contains(t, out, "The option \"name\" under section \"exchange\" is invalid: This option is required") + assert.Contains(t, out, "The option \"type\" under section \"exchange\" is invalid: This option is required") + assert.Contains(t, out, "The option \"error\" under section \"logs\" is invalid: This option is required") + assert.Contains(t, out, "The option \"info\" under section \"logs\" is invalid: This option is required") +} + +func TestPassOnValidConfig(t *testing.T) { + config := createConfig( + `[rabbitmq] + host=localhost + username=test + password=t3st + vhost=test + queue=test + port=123 + + [prefetch] + count=3 + global=On + + [exchange] + name=test + autodelete=Off + type=test + durable=On + + [logs] + error=a + info=b + `) + + var b bytes.Buffer + logger := log.New(&b, "", 0) + valid := Validate(config, logger) + assert.Equal(t, true, valid) +} diff --git a/config/locator.go b/config/locator.go index 676fb32..fd28919 100644 --- a/config/locator.go +++ b/config/locator.go @@ -1,6 +1,7 @@ package config import ( + "errors" "fmt" "os/user" @@ -12,25 +13,29 @@ type FileLocator struct { Filesystem afero.Fs } -func (r FileLocator) Locate() []string { +func (r FileLocator) Locate() (error, []string) { exists := []string{} for _, path := range r.Paths { if _, err := r.Filesystem.Stat(path); err == nil { exists = append(exists, path) } } + if len(exists) == 0 { + return errors.New("No configuration files found, exiting"), exists + } - return exists + return nil, exists } type Locator interface { - Locate() []string + Locate() (error, []string) } func NewLocator(paths []string, filesystem afero.Fs, user *user.User) Locator { if user != nil { - paths = append(paths, fmt.Sprintf("%s/.rabbitmq-cli-consumer.conf", user.HomeDir)) + paths = append([]string{fmt.Sprintf("%s/.rabbitmq-cli-consumer.conf", user.HomeDir)}, paths...) } + paths = append([]string{"/etc/rabbitmq-cli-consumer/rabbitmq-cli-consumer.conf"}, paths...) return FileLocator{ Paths: paths, diff --git a/config/locator_test.go b/config/locator_test.go index 8f905e5..143c152 100644 --- a/config/locator_test.go +++ b/config/locator_test.go @@ -8,19 +8,19 @@ import ( "github.com/stretchr/testify/assert" ) -func TestFindsFullPathFiles(t *testing.T) { +func TestFindsDefauktFullpathConfig(t *testing.T) { fs := &afero.MemMapFs{} - fs.Create("/etc/rabbitmq-cli-consmer/rabbitmq-cli-consumer.conf") + fs.Create("/etc/rabbitmq-cli-consumer/rabbitmq-cli-consumer.conf") u := NewLocator([]string{ - "/etc/rabbitmq-cli-consmer/rabbitmq-cli-consumer.conf", "/home/test/my-config.conf", }, fs, nil) + _, paths := u.Locate() assert.Equal( t, - []string{"/etc/rabbitmq-cli-consmer/rabbitmq-cli-consumer.conf"}, - u.Locate(), + []string{"/etc/rabbitmq-cli-consumer/rabbitmq-cli-consumer.conf"}, + paths, ) } @@ -30,14 +30,14 @@ func TestFindsConfigInHomedir(t *testing.T) { user := createUser() u := NewLocator([]string{ - "/etc/rabbitmq-cli-consmer/rabbitmq-cli-consumer.conf", "/home/test/my-config.conf", }, fs, user) + _, paths := u.Locate() assert.Equal( t, []string{"/home/fakeuser/.rabbitmq-cli-consumer.conf"}, - u.Locate(), + paths, ) } From 97c5b7513a96c712c0ad97595dbfbd05bfd3d0f5 Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Wed, 2 Sep 2015 22:01:54 +0200 Subject: [PATCH 09/36] Added default config --- config/config.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/config/config.go b/config/config.go index 1b67d09..49f58cb 100644 --- a/config/config.go +++ b/config/config.go @@ -4,9 +4,12 @@ import ( "log" "strings" + "code.google.com/p/gcfg" + "gopkg.in/validator.v2" ) +// Config contains all config values type Config struct { RabbitMq struct { Host string `validate:"nonzero"` @@ -33,6 +36,7 @@ type Config struct { } } +// Validate validtes Config and prints errors. func Validate(config Config, logger *log.Logger) bool { if err := validator.Validate(config); err != nil { for f, e := range err.(validator.ErrorMap) { @@ -50,3 +54,25 @@ func Validate(config Config, logger *log.Logger) bool { return true } + +// Default returns Config with default defined +func Default() Config { + return CreateFromString( + `[prefetch] + count=3 + global=Off + + [exchange] + autodelete=Off + type=direct + durable=On + `) +} + +// CreateFromString creates Config from string +func CreateFromString(config string) Config { + cfg := Config{} + gcfg.ReadStringInto(&cfg, config) + + return cfg +} From ed14f0a1887f6e5f3ab0968c9f3469cc84d9330d Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Wed, 2 Sep 2015 22:05:01 +0200 Subject: [PATCH 10/36] Added Travis CI config --- .travis.yml | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 .travis.yml diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..74f31b4 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,10 @@ +language: go + +go: + - 1.4 + - 1.5 + - tip + +matrix: + allow_failures: + - go: tip From 38c742e84320947e3b68a802dae8cf3fc727c30f Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Wed, 2 Sep 2015 22:07:46 +0200 Subject: [PATCH 11/36] Added Travis CI button --- README.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index c3be5be..f3daf5f 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,8 @@ RabbitMQ cli consumer --------------------- +[![Build Status](https://travis-ci.org/ricbra/rabbitmq-cli-consumer.svg)](https://travis-ci.org/ricbra/rabbitmq-cli-consumer) + If you are a fellow PHP developer just like me you're probably aware of the following fact: PHP really SUCKS in long running tasks. @@ -294,18 +296,16 @@ Start by installing docker. Then: $ docker run -d -P -e RABBITMQ_NODENAME=rabbitmq-cli-consumer --name rabbitmq-cli-consumer rabbitmq:3-management - + To see which ports are available: - + $ docker port rabbitmq-cli-consumer - + You can login with guest / guest. If you want stop the container: $ docker stop rabbitmq-cli-consumer - + And to restart the container: - - # docker start rabbitmq-cli-consumer - + # docker start rabbitmq-cli-consumer From 66ce75fc485773a374a79c9ff85b4c2f17852a61 Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Wed, 2 Sep 2015 22:11:45 +0200 Subject: [PATCH 12/36] Fixed app after config refactoring --- main.go | 40 +++++++++++++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 7 deletions(-) diff --git a/main.go b/main.go index cb1737a..eb04699 100644 --- a/main.go +++ b/main.go @@ -4,11 +4,15 @@ import ( "io" "log" "os" + "os/user" + + "code.google.com/p/gcfg" "github.com/codegangsta/cli" "github.com/ricbra/rabbitmq-cli-consumer/command" "github.com/ricbra/rabbitmq-cli-consumer/config" "github.com/ricbra/rabbitmq-cli-consumer/consumer" + "github.com/spf13/afero" ) func main() { @@ -33,20 +37,42 @@ func main() { }, } app.Action = func(c *cli.Context) { - if c.String("configuration") == "" && c.String("executable") == "" { + if c.String("executable") == "" { cli.ShowAppHelp(c) os.Exit(1) } verbose := c.Bool("verbose") - - logger := log.New(os.Stderr, "", log.Ldate|log.Ltime) - cfg, err := config.LoadAndParse(c.String("configuration")) - + logger := log.New(os.Stderr, "", 0) + + // Config finding and parsing + // Perhaps refactor into something more elegant + user, _ := user.Current() + locator := config.NewLocator([]string{c.String("configuration")}, &afero.OsFs{}, user) + configs := []config.Config{} + err, locations := locator.Locate() if err != nil { - logger.Fatalf("Failed parsing configuration: %s\n", err) + logger.Fatalf("Failed locating configuration: %s\n", err) } + for _, path := range locations { + logger.Printf("Found config: %s", path) + cfg := config.Config{} + if err := gcfg.ReadFileInto(&cfg, path); err == nil { + configs = append(configs, cfg) + } else { + logger.Printf("Could not parse config: %s", err) + } + } + merger := config.ConfigMerger{} + cfg, _ := merger.Merge(configs) + if !config.Validate(cfg, logger) { + logger.Fatalf("Please fix configuration issues.") + } + + // fmt.Println(config) + // os.Exit(0) + errLogger, err := createLogger(cfg.Logs.Error, verbose, os.Stderr) if err != nil { logger.Fatalf("Failed creating error log: %s", err) @@ -59,7 +85,7 @@ func main() { factory := command.Factory(c.String("executable")) - client, err := consumer.New(cfg, factory, errLogger, infLogger) + client, err := consumer.New(&cfg, factory, errLogger, infLogger) if err != nil { errLogger.Fatalf("Failed creating consumer: %s", err) } From 770a1e4fe51d4a0a1960f6be35c9b1dd605f3ae5 Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Wed, 2 Sep 2015 23:37:11 +0200 Subject: [PATCH 13/36] Changed verbose to quiet --- main.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/main.go b/main.go index eb04699..b0d78d5 100644 --- a/main.go +++ b/main.go @@ -32,8 +32,8 @@ func main() { Usage: "Location of configuration file", }, cli.BoolFlag{ - Name: "verbose, V", - Usage: "Enable verbose mode (logs to stdout and stderr)", + Name: "quiet, Q", + Usage: "Enable quite mode (disables loggging to stdout and stderr)", }, } app.Action = func(c *cli.Context) { @@ -42,7 +42,7 @@ func main() { os.Exit(1) } - verbose := c.Bool("verbose") + verbose := !c.Bool("quiet") logger := log.New(os.Stderr, "", 0) // Config finding and parsing From efb33c22e809a878fa07554e987118bb8b220939 Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Thu, 3 Sep 2015 21:26:40 +0200 Subject: [PATCH 14/36] Started rewrite of consumer --- consumer/consumer.go | 102 ++++++++++++++++++++++++-------------- consumer/consumer_test.go | 23 +++++++++ 2 files changed, 87 insertions(+), 38 deletions(-) create mode 100644 consumer/consumer_test.go diff --git a/consumer/consumer.go b/consumer/consumer.go index f53e3d7..34112bc 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -4,15 +4,16 @@ import ( "bytes" "compress/zlib" "encoding/base64" - "errors" "fmt" + "log" + "net/url" + "github.com/ricbra/rabbitmq-cli-consumer/command" "github.com/ricbra/rabbitmq-cli-consumer/config" "github.com/streadway/amqp" - "log" - "net/url" ) +// Consumer represents a consumer type Consumer struct { Channel *amqp.Channel Connection *amqp.Connection @@ -24,6 +25,7 @@ type Consumer struct { Compression bool } +// Consume starts consuming messages from RabbitMQ func (c *Consumer) Consume() { c.InfLogger.Println("Registering consumer... ") msgs, err := c.Channel.Consume(c.Queue, "", false, false, false, false, nil) @@ -66,50 +68,54 @@ func (c *Consumer) Consume() { <-forever } +// New returns a initialized consumer based on config func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogger *log.Logger) (*Consumer, error) { - uri := fmt.Sprintf( - "amqp://%s:%s@%s:%s%s", - url.QueryEscape(cfg.RabbitMq.Username), - url.QueryEscape(cfg.RabbitMq.Password), - cfg.RabbitMq.Host, - cfg.RabbitMq.Port, - cfg.RabbitMq.Vhost, - ) + uri := ParseURI(cfg.RabbitMq.Username, cfg.RabbitMq.Password, cfg.RabbitMq.Host, cfg.RabbitMq.Port, cfg.RabbitMq.Vhost) infLogger.Println("Connecting RabbitMQ...") - conn, err := amqp.Dial(uri) + conn, err := Connect(uri) if nil != err { - return nil, errors.New(fmt.Sprintf("Failed connecting RabbitMQ: %s", err.Error())) + return nil, fmt.Errorf("Failed connecting RabbitMQ: %s", err.Error()) } infLogger.Println("Connected.") infLogger.Println("Opening channel...") ch, err := conn.Channel() if nil != err { - return nil, errors.New(fmt.Sprintf("Failed to open a channel: %s", err.Error())) + return nil, fmt.Errorf("Failed to open a channel: %s", err.Error()) } infLogger.Println("Done.") - infLogger.Println("Setting QoS... ") - // Attempt to preserve BC here - if cfg.Prefetch.Count == 0 { - cfg.Prefetch.Count = 3 + if err := Initialize(cfg, ch, infLogger, errLogger); err != nil { + return nil, err } + + return &Consumer{ + Channel: ch, + Connection: conn, + Queue: cfg.RabbitMq.Queue, + Factory: factory, + ErrLogger: errLogger, + InfLogger: infLogger, + Executer: command.New(errLogger, infLogger), + Compression: cfg.RabbitMq.Compression, + }, nil +} + +// Initialize channel according to config +func Initialize(cfg *config.Config, ch Channel, errLogger, infLogger *log.Logger) error { + infLogger.Println("Setting QoS... ") + if err := ch.Qos(cfg.Prefetch.Count, 0, cfg.Prefetch.Global); err != nil { - return nil, errors.New(fmt.Sprintf("Failed to set QoS: %s", err.Error())) + return fmt.Errorf("Failed to set QoS: %s", err.Error()) } infLogger.Println("Succeeded setting QoS.") infLogger.Printf("Declaring queue \"%s\"...", cfg.RabbitMq.Queue) - _, err = ch.QueueDeclare(cfg.RabbitMq.Queue, true, false, false, false, nil) + _, err := ch.QueueDeclare(cfg.RabbitMq.Queue, true, false, false, false, nil) if nil != err { - return nil, errors.New(fmt.Sprintf("Failed to declare queue: %s", err.Error())) - } - - // Check for missing exchange settings to preserve BC - if "" == cfg.Exchange.Name && "" == cfg.Exchange.Type && !cfg.Exchange.Durable && !cfg.Exchange.Autodelete { - cfg.Exchange.Type = "direct" + return fmt.Errorf("Failed to declare queue: %s", err.Error()) } // Empty Exchange name means default, no need to declare @@ -118,7 +124,7 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg err = ch.ExchangeDeclare(cfg.Exchange.Name, cfg.Exchange.Type, cfg.Exchange.Durable, cfg.Exchange.Autodelete, false, false, amqp.Table{}) if nil != err { - return nil, errors.New(fmt.Sprintf("Failed to declare exchange: %s", err.Error())) + return fmt.Errorf("Failed to declare exchange: %s", err.Error()) } // Bind queue @@ -126,18 +132,38 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg err = ch.QueueBind(cfg.RabbitMq.Queue, "", cfg.Exchange.Name, false, nil) if nil != err { - return nil, errors.New(fmt.Sprintf("Failed to bind queue to exchange: %s", err.Error())) + return fmt.Errorf("Failed to bind queue to exchange: %s", err.Error()) } } - return &Consumer{ - Channel: ch, - Connection: conn, - Queue: cfg.RabbitMq.Queue, - Factory: factory, - ErrLogger: errLogger, - InfLogger: infLogger, - Executer: command.New(errLogger, infLogger), - Compression: cfg.RabbitMq.Compression, - }, nil + return nil +} + +// Connect opens a connection to the given uri +func Connect(uri string) (*amqp.Connection, error) { + return amqp.Dial(uri) +} + +// ParseURI parses the URI based on config +func ParseURI(username, password, host, port, vhost string) string { + if start := string(vhost[0]); start != "/" { + vhost = fmt.Sprintf("/%s", vhost) + } + + return fmt.Sprintf( + "amqp://%s:%s@%s:%s%s", + url.QueryEscape(username), + url.QueryEscape(password), + host, + port, + vhost, + ) +} + +// Channel is the interface +type Channel interface { + ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error + QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) + Qos(prefetchCount, prefetchSize int, global bool) error + QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error } diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go new file mode 100644 index 0000000..ec89559 --- /dev/null +++ b/consumer/consumer_test.go @@ -0,0 +1,23 @@ +package consumer + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestOpensChannel(t *testing.T) { + t.Log("test") +} + +func TestParseAndEscapesParamsInURI(t *testing.T) { + uri := ParseURI("richard", "my@:secr%t", "localhost", "123", "/vhost") + + assert.Equal(t, "amqp://richard:my%40%3Asecr%25t@localhost:123/vhost", uri) +} + +func TestAddsSlashWhenMissingInVhost(t *testing.T) { + uri := ParseURI("richard", "secret", "localhost", "123", "vhost") + + assert.Equal(t, "amqp://richard:secret@localhost:123/vhost", uri) +} From fdbc2de84c1e9f3a9adbfa404063b26b128a14c9 Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Wed, 23 Sep 2015 21:21:01 +0200 Subject: [PATCH 15/36] Added QoS tests --- consumer/consumer.go | 5 ++- consumer/consumer_test.go | 95 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 1 deletion(-) diff --git a/consumer/consumer.go b/consumer/consumer.go index 34112bc..9abcad5 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -109,10 +109,13 @@ func Initialize(cfg *config.Config, ch Channel, errLogger, infLogger *log.Logger if err := ch.Qos(cfg.Prefetch.Count, 0, cfg.Prefetch.Global); err != nil { return fmt.Errorf("Failed to set QoS: %s", err.Error()) } + infLogger.Println("Succeeded setting QoS.") infLogger.Printf("Declaring queue \"%s\"...", cfg.RabbitMq.Queue) - _, err := ch.QueueDeclare(cfg.RabbitMq.Queue, true, false, false, false, nil) + + table := amqp.Table{} + _, err := ch.QueueDeclare(cfg.RabbitMq.Queue, true, false, false, false, table) if nil != err { return fmt.Errorf("Failed to declare queue: %s", err.Error()) diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go index ec89559..066b122 100644 --- a/consumer/consumer_test.go +++ b/consumer/consumer_test.go @@ -1,9 +1,15 @@ package consumer import ( + "bytes" + "errors" + "log" "testing" + "github.com/ricbra/rabbitmq-cli-consumer/config" + "github.com/streadway/amqp" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) func TestOpensChannel(t *testing.T) { @@ -21,3 +27,92 @@ func TestAddsSlashWhenMissingInVhost(t *testing.T) { assert.Equal(t, "amqp://richard:secret@localhost:123/vhost", uri) } + +func TestSetQosFails(t *testing.T) { + config := createConfig() + ch := new(TestChannel) + + var b bytes.Buffer + errLogger := log.New(&b, "", 0) + infLogger := log.New(&b, "", 0) + + ch.On("Qos", 3, 0, true).Return(errors.New("Error occured")).Once() + + err := Initialize(&config, ch, errLogger, infLogger) + + ch.AssertExpectations(t) + ch.AssertNotCalled(t, "ExchangeDeclare", "worker", "test", true, false, false, false, amqp.Table{}) + assert.NotNil(t, err) +} + +func TestSetQosSucceeds(t *testing.T) { + config := createConfig() + ch := new(TestChannel) + + var b bytes.Buffer + errLogger := log.New(&b, "", 0) + infLogger := log.New(&b, "", 0) + + ch.On("Qos", 3, 0, true).Return(nil).Once() + ch.On("QueueDeclare", "worker", true, false, false, false, amqp.Table{}).Return(amqp.Queue{}, errors.New("error")).Once() + + Initialize(&config, ch, errLogger, infLogger) + + ch.AssertExpectations(t) +} + +type TestChannel struct { + mock.Mock +} + +func (t *TestChannel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error { + argsT := t.Called(name, kind, durable, autoDelete, internal, noWait, args) + + return argsT.Error(0) +} + +func (t *TestChannel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) { + // fmt.Println("TTEESSTT") + // fmt.Printf("%v", args) + // os.Exit(1) + argsT := t.Called(name, durable, autoDelete, exclusive, noWait, args) + + return argsT.Get(0).(amqp.Queue), argsT.Error(1) +} + +func (t *TestChannel) Qos(prefetchCount, prefetchSize int, global bool) error { + argsT := t.Called(prefetchCount, prefetchSize, global) + + return argsT.Error(0) +} + +func (t *TestChannel) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error { + argsT := t.Called(name, key, exchange, noWait, args) + + return argsT.Error(0) +} + +func createConfig() config.Config { + return config.CreateFromString(`[rabbitmq] + host=localhost + username=ricbra + password=t3st + vhost=staging + queue=worker + port=123 + + [prefetch] + count=3 + global=On + + [exchange] + name=worker + autodelete=Off + type=test + durable=On + + [logs] + error=a + info=b + `) +} From c97af6ede91b696dd8d6cd81f38076ad126bc35d Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Wed, 23 Sep 2015 21:28:10 +0200 Subject: [PATCH 16/36] Added TestDeclareQueueFails --- consumer/consumer_test.go | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go index 066b122..ba0b5bc 100644 --- a/consumer/consumer_test.go +++ b/consumer/consumer_test.go @@ -41,7 +41,7 @@ func TestSetQosFails(t *testing.T) { err := Initialize(&config, ch, errLogger, infLogger) ch.AssertExpectations(t) - ch.AssertNotCalled(t, "ExchangeDeclare", "worker", "test", true, false, false, false, amqp.Table{}) + ch.AssertNotCalled(t, "QueueDeclare", "worker", true, false, false, false, amqp.Table{}) assert.NotNil(t, err) } @@ -61,6 +61,24 @@ func TestSetQosSucceeds(t *testing.T) { ch.AssertExpectations(t) } +func TestDeclareQueueFails(t *testing.T) { + config := createConfig() + ch := new(TestChannel) + + var b bytes.Buffer + errLogger := log.New(&b, "", 0) + infLogger := log.New(&b, "", 0) + + ch.On("Qos", 3, 0, true).Return(nil).Once() + ch.On("QueueDeclare", "worker", true, false, false, false, amqp.Table{}).Return(amqp.Queue{}, errors.New("error")).Once() + + err := Initialize(&config, ch, errLogger, infLogger) + + ch.AssertExpectations(t) + ch.AssertNotCalled(t, "ExchangeDeclare", "worker", "test", true, false, false, false, amqp.Table{}) + assert.NotNil(t, err) +} + type TestChannel struct { mock.Mock } From 05e44142a848a84058572acec62d3b3ea9b03781 Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Wed, 23 Sep 2015 21:30:45 +0200 Subject: [PATCH 17/36] Added TestDeclareQueueSucceeds --- consumer/consumer_test.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go index ba0b5bc..407bfcc 100644 --- a/consumer/consumer_test.go +++ b/consumer/consumer_test.go @@ -79,6 +79,23 @@ func TestDeclareQueueFails(t *testing.T) { assert.NotNil(t, err) } +func TestDeclareQueueSucceeds(t *testing.T) { + config := createConfig() + ch := new(TestChannel) + + var b bytes.Buffer + errLogger := log.New(&b, "", 0) + infLogger := log.New(&b, "", 0) + + ch.On("Qos", 3, 0, true).Return(nil).Once() + ch.On("QueueDeclare", "worker", true, false, false, false, amqp.Table{}).Return(amqp.Queue{}, nil).Once() + ch.On("ExchangeDeclare", "worker", "test", true, false, false, false, amqp.Table{}).Return(errors.New("error")).Once() + + Initialize(&config, ch, errLogger, infLogger) + + ch.AssertExpectations(t) +} + type TestChannel struct { mock.Mock } From bb7c46414969ab4ffa246b62c9db6c7a21aae3d7 Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Wed, 23 Sep 2015 21:32:12 +0200 Subject: [PATCH 18/36] Removed debug info --- consumer/consumer_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go index 407bfcc..5c1b2ff 100644 --- a/consumer/consumer_test.go +++ b/consumer/consumer_test.go @@ -107,9 +107,6 @@ func (t *TestChannel) ExchangeDeclare(name, kind string, durable, autoDelete, in } func (t *TestChannel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) { - // fmt.Println("TTEESSTT") - // fmt.Printf("%v", args) - // os.Exit(1) argsT := t.Called(name, durable, autoDelete, exclusive, noWait, args) return argsT.Get(0).(amqp.Queue), argsT.Error(1) From 13f6b1b5b5e7ef9a214a3ac2d84e707182f30d20 Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Wed, 23 Sep 2015 21:35:19 +0200 Subject: [PATCH 19/36] Added TestBindQueueFails --- consumer/consumer.go | 2 +- consumer/consumer_test.go | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/consumer/consumer.go b/consumer/consumer.go index 9abcad5..2438c45 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -132,7 +132,7 @@ func Initialize(cfg *config.Config, ch Channel, errLogger, infLogger *log.Logger // Bind queue infLogger.Printf("Binding queue \"%s\" to exchange \"%s\"...", cfg.RabbitMq.Queue, cfg.Exchange.Name) - err = ch.QueueBind(cfg.RabbitMq.Queue, "", cfg.Exchange.Name, false, nil) + err = ch.QueueBind(cfg.RabbitMq.Queue, "", cfg.Exchange.Name, false, amqp.Table{}) if nil != err { return fmt.Errorf("Failed to bind queue to exchange: %s", err.Error()) diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go index 5c1b2ff..b6e0703 100644 --- a/consumer/consumer_test.go +++ b/consumer/consumer_test.go @@ -96,6 +96,25 @@ func TestDeclareQueueSucceeds(t *testing.T) { ch.AssertExpectations(t) } +func TestBindQueueFails(t *testing.T) { + config := createConfig() + ch := new(TestChannel) + + var b bytes.Buffer + errLogger := log.New(&b, "", 0) + infLogger := log.New(&b, "", 0) + + ch.On("Qos", 3, 0, true).Return(nil).Once() + ch.On("QueueDeclare", "worker", true, false, false, false, amqp.Table{}).Return(amqp.Queue{}, nil).Once() + ch.On("ExchangeDeclare", "worker", "test", true, false, false, false, amqp.Table{}).Return(nil).Once() + ch.On("QueueBind", "worker", "", "worker", false, amqp.Table{}).Return(errors.New("error")).Once() + + err := Initialize(&config, ch, errLogger, infLogger) + + ch.AssertExpectations(t) + assert.NotNil(t, err) +} + type TestChannel struct { mock.Mock } From 11c4f0b1ebc06d40d7cb97e07522a3c1aa66a9be Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Wed, 23 Sep 2015 21:36:58 +0200 Subject: [PATCH 20/36] Added TestBindQueueSucceeds --- consumer/consumer_test.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go index b6e0703..7ae4465 100644 --- a/consumer/consumer_test.go +++ b/consumer/consumer_test.go @@ -115,6 +115,25 @@ func TestBindQueueFails(t *testing.T) { assert.NotNil(t, err) } +func TestBindQueueSucceeds(t *testing.T) { + config := createConfig() + ch := new(TestChannel) + + var b bytes.Buffer + errLogger := log.New(&b, "", 0) + infLogger := log.New(&b, "", 0) + + ch.On("Qos", 3, 0, true).Return(nil).Once() + ch.On("QueueDeclare", "worker", true, false, false, false, amqp.Table{}).Return(amqp.Queue{}, nil).Once() + ch.On("ExchangeDeclare", "worker", "test", true, false, false, false, amqp.Table{}).Return(nil).Once() + ch.On("QueueBind", "worker", "", "worker", false, amqp.Table{}).Return(nil).Once() + + err := Initialize(&config, ch, errLogger, infLogger) + + ch.AssertExpectations(t) + assert.Nil(t, err) +} + type TestChannel struct { mock.Mock } From 637d73a2d17ddb0a1c7939647d79b286497860f8 Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Wed, 23 Sep 2015 21:50:39 +0200 Subject: [PATCH 21/36] Cleanup of tests --- consumer/consumer_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go index 7ae4465..159d434 100644 --- a/consumer/consumer_test.go +++ b/consumer/consumer_test.go @@ -12,10 +12,6 @@ import ( "github.com/stretchr/testify/mock" ) -func TestOpensChannel(t *testing.T) { - t.Log("test") -} - func TestParseAndEscapesParamsInURI(t *testing.T) { uri := ParseURI("richard", "my@:secr%t", "localhost", "123", "/vhost") From 30c8d3a614a199ff6c80aa3e61c658e1c6d307f3 Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Tue, 13 Oct 2015 22:04:27 +0200 Subject: [PATCH 22/36] Added tests for CommandExecuter --- command/command_executer.go | 12 ++++++----- command/command_executer_test.go | 35 ++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 5 deletions(-) create mode 100644 command/command_executer_test.go diff --git a/command/command_executer.go b/command/command_executer.go index 3c3bfd8..0bb473d 100644 --- a/command/command_executer.go +++ b/command/command_executer.go @@ -1,15 +1,16 @@ package command -import ( - "log" - "os/exec" -) +import "log" type CommandExecuter struct { errLogger *log.Logger infLogger *log.Logger } +type Command interface { + CombinedOutput() (out []byte, err error) +} + func New(errLogger, infLogger *log.Logger) *CommandExecuter { return &CommandExecuter{ errLogger: errLogger, @@ -17,8 +18,9 @@ func New(errLogger, infLogger *log.Logger) *CommandExecuter { } } -func (me CommandExecuter) Execute(cmd *exec.Cmd) bool { +func (me CommandExecuter) Execute(cmd Command) bool { me.infLogger.Println("Processing message...") + out, err := cmd.CombinedOutput() if err != nil { diff --git a/command/command_executer_test.go b/command/command_executer_test.go new file mode 100644 index 0000000..41f80b1 --- /dev/null +++ b/command/command_executer_test.go @@ -0,0 +1,35 @@ +package command + +import ( + "bytes" + "log" + "testing" + + "github.com/stretchr/testify/mock" +) + +func TestExecutesCommand(t *testing.T) { + var b bytes.Buffer + errLogger := log.New(&b, "", 0) + infLogger := log.New(&b, "", 0) + + cmd := new(TestCommand) + + cmd.On("CombinedOutput").Return(make([]byte, 0), nil).Once() + + executer := New(errLogger, infLogger) + executer.Execute(cmd) + + cmd.AssertExpectations(t) + +} + +type TestCommand struct { + mock.Mock +} + +func (t *TestCommand) CombinedOutput() (out []byte, err error) { + argsT := t.Called() + + return argsT.Get(0).([]byte), argsT.Error(1) +} From 55115659f5b018425b87f2536d977fd815954d2e Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Sun, 18 Oct 2015 20:10:18 +0200 Subject: [PATCH 23/36] Added test for processing of messages --- command/command_executer.go | 4 ++ command/command_factory.go | 4 +- consumer/consumer.go | 78 +++++++++++++++++++++--------- consumer/consumer_test.go | 95 +++++++++++++++++++++++++++++++++++++ 4 files changed, 156 insertions(+), 25 deletions(-) diff --git a/command/command_executer.go b/command/command_executer.go index 0bb473d..43f6d46 100644 --- a/command/command_executer.go +++ b/command/command_executer.go @@ -2,6 +2,10 @@ package command import "log" +type Executer interface { + Execute(cmd Command) bool +} + type CommandExecuter struct { errLogger *log.Logger infLogger *log.Logger diff --git a/command/command_factory.go b/command/command_factory.go index f2e1fc8..4503343 100644 --- a/command/command_factory.go +++ b/command/command_factory.go @@ -1,8 +1,6 @@ package command -import ( - "os/exec" -) +import "os/exec" type CommandFactory struct { Cmd string diff --git a/consumer/consumer.go b/consumer/consumer.go index 2438c45..5917c99 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -21,7 +21,7 @@ type Consumer struct { Factory *command.CommandFactory ErrLogger *log.Logger InfLogger *log.Logger - Executer *command.CommandExecuter + Executer command.Executer Compression bool } @@ -38,36 +38,46 @@ func (c *Consumer) Consume() { defer c.Channel.Close() forever := make(chan bool) - go func() { for d := range msgs { - input := d.Body - if c.Compression { - var b bytes.Buffer - w, err := zlib.NewWriterLevel(&b, zlib.BestCompression) - if err != nil { - c.ErrLogger.Println("Could not create zlib handler") - d.Nack(true, true) - } - c.InfLogger.Println("Compressed message") - w.Write(input) - w.Close() - - input = b.Bytes() - } - - cmd := c.Factory.Create(base64.StdEncoding.EncodeToString(input)) - if c.Executer.Execute(cmd) { - d.Ack(true) - } else { - d.Nack(true, true) + delivery := RabbitMqDelivery{ + delivery: d, + body: d.Body, } + c.ProcessMessage(delivery) } }() c.InfLogger.Println("Waiting for messages...") <-forever } +// ProcessMessage content of one single message +func (c *Consumer) ProcessMessage(msg Delivery) { + input := msg.Body() + if c.Compression { + var b bytes.Buffer + w, err := zlib.NewWriterLevel(&b, zlib.BestCompression) + if err != nil { + c.ErrLogger.Println("Could not create zlib handler") + return + } + c.InfLogger.Println("Decompressed message") + w.Write(input) + w.Close() + + input = b.Bytes() + } + // fmt.Printf("%v", input) + + cmd := c.Factory.Create(base64.StdEncoding.EncodeToString(input)) + + if c.Executer.Execute(cmd) { + msg.Ack(true) + } else { + msg.Nack(true, true) + } +} + // New returns a initialized consumer based on config func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogger *log.Logger) (*Consumer, error) { uri := ParseURI(cfg.RabbitMq.Username, cfg.RabbitMq.Password, cfg.RabbitMq.Host, cfg.RabbitMq.Port, cfg.RabbitMq.Vhost) @@ -170,3 +180,27 @@ type Channel interface { Qos(prefetchCount, prefetchSize int, global bool) error QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error } + +// Delivery interface describes interface for messages +type Delivery interface { + Ack(multiple bool) error + Nack(multiple, requeue bool) error + Body() []byte +} + +type RabbitMqDelivery struct { + body []byte + delivery amqp.Delivery +} + +func (r RabbitMqDelivery) Ack(multiple bool) error { + return r.delivery.Ack(multiple) +} + +func (r RabbitMqDelivery) Nack(multiple, requeue bool) error { + return r.delivery.Nack(multiple, requeue) +} + +func (r RabbitMqDelivery) Body() []byte { + return r.body +} diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go index 159d434..2ca129b 100644 --- a/consumer/consumer_test.go +++ b/consumer/consumer_test.go @@ -2,10 +2,12 @@ package consumer import ( "bytes" + "encoding/base64" "errors" "log" "testing" + "github.com/ricbra/rabbitmq-cli-consumer/command" "github.com/ricbra/rabbitmq-cli-consumer/config" "github.com/streadway/amqp" "github.com/stretchr/testify/assert" @@ -130,6 +132,99 @@ func TestBindQueueSucceeds(t *testing.T) { assert.Nil(t, err) } +func TestProcessingMessageWithSuccess(t *testing.T) { + msg := new(TestDelivery) + executer := new(TestExecuter) + factory := &command.CommandFactory{ + Cmd: "test", + Args: []string{"aa"}, + } + consumer := Consumer{ + Executer: executer, + Factory: factory, + Compression: false, + } + body := []byte("the_body") + args := base64.StdEncoding.EncodeToString(body) + cmd := factory.Create(args) + executer.On("Execute", cmd).Return(true).Once() + msg.On("Body").Return(body).Once() + msg.On("Ack", true).Return(nil).Once() + + consumer.ProcessMessage(msg) + + executer.AssertExpectations(t) + msg.AssertExpectations(t) +} + +func TestProcessingMessageWithFailure(t *testing.T) { + msg := new(TestDelivery) + executer := new(TestExecuter) + factory := &command.CommandFactory{ + Cmd: "test", + Args: []string{"aa"}, + } + consumer := Consumer{ + Executer: executer, + Factory: factory, + Compression: false, + } + body := []byte("the_body") + args := base64.StdEncoding.EncodeToString(body) + cmd := factory.Create(args) + executer.On("Execute", cmd).Return(false).Once() + msg.On("Body").Return(body).Once() + msg.On("Nack", true, true).Return(nil).Once() + + consumer.ProcessMessage(msg) + + executer.AssertExpectations(t) + msg.AssertExpectations(t) +} + +type TestCommand struct { + mock.Mock +} + +func (t *TestCommand) CombinedOutput() (out []byte, err error) { + argsT := t.Called() + + return argsT.Get(0).([]byte), argsT.Error(1) +} + +type TestExecuter struct { + mock.Mock +} + +func (t *TestExecuter) Execute(cmd command.Command) bool { + argsT := t.Called(cmd) + + return argsT.Get(0).(bool) +} + +type TestDelivery struct { + mock.Mock + body []byte +} + +func (t *TestDelivery) Ack(multiple bool) error { + argstT := t.Called(multiple) + + return argstT.Error(0) +} + +func (t *TestDelivery) Nack(multiple, requeue bool) error { + argsT := t.Called(multiple, requeue) + + return argsT.Error(0) +} + +func (t *TestDelivery) Body() []byte { + argsT := t.Called() + + return argsT.Get(0).([]byte) +} + type TestChannel struct { mock.Mock } From 7ff6b59869c8e8f03d7a2f8b1e385d3ebc570ebc Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Mon, 26 Oct 2015 21:36:19 +0100 Subject: [PATCH 24/36] Refactored queue to its own config section --- config/config.go | 6 +++++- config/config_test.go | 6 ++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/config/config.go b/config/config.go index 49f58cb..fe7bf2f 100644 --- a/config/config.go +++ b/config/config.go @@ -17,13 +17,17 @@ type Config struct { Password string `validate:"nonzero"` Port string `validate:"nonzero"` Vhost string `validate:"nonzero"` - Queue string `validate:"nonzero"` Compression bool } Prefetch struct { Count int `validate:"nonzero"` Global bool } + Queue struct { + Name string `validate:"nonzero"` + Durable bool + Autodelete bool + } Exchange struct { Name string `validate:"nonzero"` Autodelete bool diff --git a/config/config_test.go b/config/config_test.go index 8569c95..69ddea7 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -19,7 +19,6 @@ func TestFailsOnRequiredFields(t *testing.T) { out := b.String() assert.Equal(t, false, valid) - assert.Contains(t, out, "The option \"queue\" under section \"rabbitmq\" is invalid: This option is required") assert.Contains(t, out, "The option \"port\" under section \"rabbitmq\" is invalid: This option is required") assert.Contains(t, out, "The option \"username\" under section \"rabbitmq\" is invalid: This option is required") assert.Contains(t, out, "The option \"password\" under section \"rabbitmq\" is invalid: This option is required") @@ -30,6 +29,7 @@ func TestFailsOnRequiredFields(t *testing.T) { assert.Contains(t, out, "The option \"type\" under section \"exchange\" is invalid: This option is required") assert.Contains(t, out, "The option \"error\" under section \"logs\" is invalid: This option is required") assert.Contains(t, out, "The option \"info\" under section \"logs\" is invalid: This option is required") + assert.Contains(t, out, "The option \"name\" under section \"queue\" is invalid: This option is required") } func TestPassOnValidConfig(t *testing.T) { @@ -39,13 +39,15 @@ func TestPassOnValidConfig(t *testing.T) { username=test password=t3st vhost=test - queue=test port=123 [prefetch] count=3 global=On + [queue] + name=test + [exchange] name=test autodelete=Off From 7a5876ecc3370ad6c094384d78d07f50ee002077 Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Mon, 26 Oct 2015 21:45:13 +0100 Subject: [PATCH 25/36] Made all queue params configurable while we're at it --- config/config.go | 2 ++ consumer/consumer.go | 10 +++++----- consumer/consumer_test.go | 8 +++++++- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/config/config.go b/config/config.go index fe7bf2f..d0554ad 100644 --- a/config/config.go +++ b/config/config.go @@ -27,6 +27,8 @@ type Config struct { Name string `validate:"nonzero"` Durable bool Autodelete bool + Exclusive bool + Nowait bool } Exchange struct { Name string `validate:"nonzero"` diff --git a/consumer/consumer.go b/consumer/consumer.go index 5917c99..94fac3d 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -103,7 +103,7 @@ func New(cfg *config.Config, factory *command.CommandFactory, errLogger, infLogg return &Consumer{ Channel: ch, Connection: conn, - Queue: cfg.RabbitMq.Queue, + Queue: cfg.Queue.Name, Factory: factory, ErrLogger: errLogger, InfLogger: infLogger, @@ -122,10 +122,10 @@ func Initialize(cfg *config.Config, ch Channel, errLogger, infLogger *log.Logger infLogger.Println("Succeeded setting QoS.") - infLogger.Printf("Declaring queue \"%s\"...", cfg.RabbitMq.Queue) + infLogger.Printf("Declaring queue \"%s\"...", cfg.Queue.Name) table := amqp.Table{} - _, err := ch.QueueDeclare(cfg.RabbitMq.Queue, true, false, false, false, table) + _, err := ch.QueueDeclare(cfg.Queue.Name, cfg.Queue.Durable, cfg.Queue.Autodelete, cfg.Queue.Exclusive, cfg.Queue.Nowait, table) if nil != err { return fmt.Errorf("Failed to declare queue: %s", err.Error()) @@ -141,8 +141,8 @@ func Initialize(cfg *config.Config, ch Channel, errLogger, infLogger *log.Logger } // Bind queue - infLogger.Printf("Binding queue \"%s\" to exchange \"%s\"...", cfg.RabbitMq.Queue, cfg.Exchange.Name) - err = ch.QueueBind(cfg.RabbitMq.Queue, "", cfg.Exchange.Name, false, amqp.Table{}) + infLogger.Printf("Binding queue \"%s\" to exchange \"%s\"...", cfg.Queue.Name, cfg.Exchange.Name) + err = ch.QueueBind(cfg.Queue.Name, "", cfg.Exchange.Name, false, amqp.Table{}) if nil != err { return fmt.Errorf("Failed to bind queue to exchange: %s", err.Error()) diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go index 2ca129b..ce48777 100644 --- a/consumer/consumer_test.go +++ b/consumer/consumer_test.go @@ -259,13 +259,19 @@ func createConfig() config.Config { username=ricbra password=t3st vhost=staging - queue=worker port=123 [prefetch] count=3 global=On + [queue] + name=worker + durable=On + autodelete=Off + exclusive=Off + nowait=Off + [exchange] name=worker autodelete=Off From 7fa1fa9c8786e45d640bc9164902b17c1a2ec97b Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Mon, 23 Nov 2015 21:02:05 +0100 Subject: [PATCH 26/36] Added RPC support --- command/command_executer.go | 17 ++++--- command/command_executer_test.go | 8 ++- consumer/consumer.go | 57 +++++++++++++++++++-- consumer/consumer_test.go | 87 ++++++++++++++++++++++++++++++-- 4 files changed, 153 insertions(+), 16 deletions(-) diff --git a/command/command_executer.go b/command/command_executer.go index 43f6d46..de76a3e 100644 --- a/command/command_executer.go +++ b/command/command_executer.go @@ -1,9 +1,12 @@ package command -import "log" +import ( + "fmt" + "log" +) type Executer interface { - Execute(cmd Command) bool + Execute(cmd Command) (result []byte, err error) } type CommandExecuter struct { @@ -13,6 +16,7 @@ type CommandExecuter struct { type Command interface { CombinedOutput() (out []byte, err error) + Output() (out []byte, err error) } func New(errLogger, infLogger *log.Logger) *CommandExecuter { @@ -22,19 +26,20 @@ func New(errLogger, infLogger *log.Logger) *CommandExecuter { } } -func (me CommandExecuter) Execute(cmd Command) bool { +func (me CommandExecuter) Execute(cmd Command) (result []byte, err error) { me.infLogger.Println("Processing message...") - out, err := cmd.CombinedOutput() + out, err := cmd.Output() if err != nil { me.infLogger.Println("Failed. Check error log for details.") me.errLogger.Printf("Failed: %s\n", string(out[:])) me.errLogger.Printf("Error: %s\n", err) - return false + + return out, fmt.Errorf("Error occured during execution of command: %s", err) } me.infLogger.Println("Processed!") - return true + return out, nil } diff --git a/command/command_executer_test.go b/command/command_executer_test.go index 41f80b1..53d69af 100644 --- a/command/command_executer_test.go +++ b/command/command_executer_test.go @@ -15,7 +15,7 @@ func TestExecutesCommand(t *testing.T) { cmd := new(TestCommand) - cmd.On("CombinedOutput").Return(make([]byte, 0), nil).Once() + cmd.On("Output").Return(make([]byte, 0), nil).Once() executer := New(errLogger, infLogger) executer.Execute(cmd) @@ -33,3 +33,9 @@ func (t *TestCommand) CombinedOutput() (out []byte, err error) { return argsT.Get(0).([]byte), argsT.Error(1) } + +func (t *TestCommand) Output() (out []byte, err error) { + argsT := t.Called() + + return argsT.Get(0).([]byte), argsT.Error(1) +} diff --git a/consumer/consumer.go b/consumer/consumer.go index 94fac3d..44cae85 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -15,7 +15,7 @@ import ( // Consumer represents a consumer type Consumer struct { - Channel *amqp.Channel + Channel Channel Connection *amqp.Connection Queue string Factory *command.CommandFactory @@ -67,15 +67,44 @@ func (c *Consumer) ProcessMessage(msg Delivery) { input = b.Bytes() } - // fmt.Printf("%v", input) cmd := c.Factory.Create(base64.StdEncoding.EncodeToString(input)) + out, err := c.Executer.Execute(cmd) - if c.Executer.Execute(cmd) { - msg.Ack(true) - } else { + if err != nil { msg.Nack(true, true) + return } + + if msg.IsRpcMessage() { + c.InfLogger.Println("Message is RPC message, trying to send reply...") + + if err := c.Reply(msg, out); err != nil { + c.InfLogger.Println("Sending RPC reply failed. Check error log.") + c.ErrLogger.Printf("Error occured during send RPC reply: %s", err) + msg.Nack(true, true) + + return + } + c.InfLogger.Println("RPC reply send.") + } + + // All went fine, ack message + msg.Ack(true) +} + +func (c *Consumer) Reply(msg Delivery, out []byte) error { + return c.Channel.Publish( + "", + msg.ReplyTo(), + false, + false, + amqp.Publishing{ + ContentType: "text/plain", + CorrelationId: msg.CorrelationId(), + Body: out, + }, + ) } // New returns a initialized consumer based on config @@ -179,6 +208,9 @@ type Channel interface { QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) Qos(prefetchCount, prefetchSize int, global bool) error QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error + Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) + Close() error + Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error } // Delivery interface describes interface for messages @@ -186,6 +218,9 @@ type Delivery interface { Ack(multiple bool) error Nack(multiple, requeue bool) error Body() []byte + IsRpcMessage() bool + CorrelationId() string + ReplyTo() string } type RabbitMqDelivery struct { @@ -204,3 +239,15 @@ func (r RabbitMqDelivery) Nack(multiple, requeue bool) error { func (r RabbitMqDelivery) Body() []byte { return r.body } + +func (r RabbitMqDelivery) IsRpcMessage() bool { + return r.delivery.ReplyTo != "" && r.delivery.CorrelationId != "" +} + +func (r RabbitMqDelivery) CorrelationId() string { + return r.delivery.CorrelationId +} + +func (r RabbitMqDelivery) ReplyTo() string { + return r.delivery.ReplyTo +} diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go index ce48777..cbd272c 100644 --- a/consumer/consumer_test.go +++ b/consumer/consumer_test.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/base64" "errors" + "fmt" "log" "testing" @@ -147,9 +148,10 @@ func TestProcessingMessageWithSuccess(t *testing.T) { body := []byte("the_body") args := base64.StdEncoding.EncodeToString(body) cmd := factory.Create(args) - executer.On("Execute", cmd).Return(true).Once() + executer.On("Execute", cmd).Return([]byte(""), nil).Once() msg.On("Body").Return(body).Once() msg.On("Ack", true).Return(nil).Once() + msg.On("IsRpcMessage").Return(false).Once() consumer.ProcessMessage(msg) @@ -172,7 +174,7 @@ func TestProcessingMessageWithFailure(t *testing.T) { body := []byte("the_body") args := base64.StdEncoding.EncodeToString(body) cmd := factory.Create(args) - executer.On("Execute", cmd).Return(false).Once() + executer.On("Execute", cmd).Return([]byte(""), fmt.Errorf("Test")).Once() msg.On("Body").Return(body).Once() msg.On("Nack", true, true).Return(nil).Once() @@ -182,6 +184,47 @@ func TestProcessingMessageWithFailure(t *testing.T) { msg.AssertExpectations(t) } +func TestProcessingRpcMessageWithSuccess(t *testing.T) { + msg := new(TestDelivery) + ch := new(TestChannel) + var b bytes.Buffer + errLogger := log.New(&b, "", 0) + infLogger := log.New(&b, "", 0) + executer := new(TestExecuter) + factory := &command.CommandFactory{ + Cmd: "test", + Args: []string{"aa"}, + } + consumer := Consumer{ + Executer: executer, + Factory: factory, + Compression: false, + Channel: ch, + ErrLogger: errLogger, + InfLogger: infLogger, + } + body := []byte("the_body") + args := base64.StdEncoding.EncodeToString(body) + cmd := factory.Create(args) + out := []byte("msg") + executer.On("Execute", cmd).Return(out, nil).Once() + ch.On("Publish", "", "queue_name", false, false, amqp.Publishing{ + ContentType: "text/plain", + CorrelationId: "123456", + Body: out, + }).Return(nil).Once() + msg.On("Body").Return(body).Once() + msg.On("IsRpcMessage").Return(true).Once() + msg.On("ReplyTo").Return("queue_name").Once() + msg.On("CorrelationId").Return("123456").Once() + msg.On("Ack", true).Return(nil).Once() + + consumer.ProcessMessage(msg) + + executer.AssertExpectations(t) + msg.AssertExpectations(t) +} + type TestCommand struct { mock.Mock } @@ -196,10 +239,10 @@ type TestExecuter struct { mock.Mock } -func (t *TestExecuter) Execute(cmd command.Command) bool { +func (t *TestExecuter) Execute(cmd command.Command) (result []byte, err error) { argsT := t.Called(cmd) - return argsT.Get(0).(bool) + return argsT.Get(0).([]byte), argsT.Error(1) } type TestDelivery struct { @@ -225,6 +268,24 @@ func (t *TestDelivery) Body() []byte { return argsT.Get(0).([]byte) } +func (t *TestDelivery) CorrelationId() string { + argsT := t.Called() + + return argsT.Get(0).(string) +} + +func (t *TestDelivery) IsRpcMessage() bool { + argsT := t.Called() + + return argsT.Get(0).(bool) +} + +func (t *TestDelivery) ReplyTo() string { + argsT := t.Called() + + return argsT.Get(0).(string) +} + type TestChannel struct { mock.Mock } @@ -253,6 +314,24 @@ func (t *TestChannel) QueueBind(name, key, exchange string, noWait bool, args am return argsT.Error(0) } +func (t *TestChannel) Close() error { + argsT := t.Called() + + return argsT.Error(0) +} + +func (t *TestChannel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) { + argsT := t.Called(queue, consumer, autoAck, exclusive, noLocal, noWait, args) + + return argsT.Get(0).(<-chan amqp.Delivery), argsT.Error(0) +} + +func (t *TestChannel) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error { + argsT := t.Called(exchange, key, mandatory, immediate, msg) + + return argsT.Error(0) +} + func createConfig() config.Config { return config.CreateFromString(`[rabbitmq] host=localhost From a5af4d2e87248970c0cd792d7be9a246abbc3940 Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Wed, 25 Nov 2015 20:59:19 +0100 Subject: [PATCH 27/36] Added RPC example and more stuff in docs --- README.md | 72 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 71 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index f3daf5f..7cce839 100644 --- a/README.md +++ b/README.md @@ -105,6 +105,59 @@ Run without arguments or with --help switch to show the helptext: --help, -h show help --version, -v print the version +## Fanout + +Todo. + +## Remote Procedure Call + +No special configuration is required for enabling RPC mode. You should be aware +that any output on STDOUT will be returned as reply to the requesting client. To +demonstrate how RPC works, we'll implement [the example](https://www.rabbitmq.com/tutorials/tutorial-six-php.html) on the RabbitMQ site +using rabbitmq-cli-consumer. + +We don't change the rpc_client.php, only the rpc_server.php: + +```php + Date: Wed, 25 Nov 2015 21:22:21 +0100 Subject: [PATCH 28/36] Routing key is now configurable --- config/config.go | 1 + consumer/consumer.go | 2 +- consumer/consumer_test.go | 5 +++-- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/config/config.go b/config/config.go index d0554ad..a1bbb1d 100644 --- a/config/config.go +++ b/config/config.go @@ -29,6 +29,7 @@ type Config struct { Autodelete bool Exclusive bool Nowait bool + Key string } Exchange struct { Name string `validate:"nonzero"` diff --git a/consumer/consumer.go b/consumer/consumer.go index 44cae85..1803fea 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -171,7 +171,7 @@ func Initialize(cfg *config.Config, ch Channel, errLogger, infLogger *log.Logger // Bind queue infLogger.Printf("Binding queue \"%s\" to exchange \"%s\"...", cfg.Queue.Name, cfg.Exchange.Name) - err = ch.QueueBind(cfg.Queue.Name, "", cfg.Exchange.Name, false, amqp.Table{}) + err = ch.QueueBind(cfg.Queue.Name, cfg.Queue.Key, cfg.Exchange.Name, false, amqp.Table{}) if nil != err { return fmt.Errorf("Failed to bind queue to exchange: %s", err.Error()) diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go index cbd272c..6155315 100644 --- a/consumer/consumer_test.go +++ b/consumer/consumer_test.go @@ -106,7 +106,7 @@ func TestBindQueueFails(t *testing.T) { ch.On("Qos", 3, 0, true).Return(nil).Once() ch.On("QueueDeclare", "worker", true, false, false, false, amqp.Table{}).Return(amqp.Queue{}, nil).Once() ch.On("ExchangeDeclare", "worker", "test", true, false, false, false, amqp.Table{}).Return(nil).Once() - ch.On("QueueBind", "worker", "", "worker", false, amqp.Table{}).Return(errors.New("error")).Once() + ch.On("QueueBind", "worker", "foo", "worker", false, amqp.Table{}).Return(errors.New("error")).Once() err := Initialize(&config, ch, errLogger, infLogger) @@ -125,7 +125,7 @@ func TestBindQueueSucceeds(t *testing.T) { ch.On("Qos", 3, 0, true).Return(nil).Once() ch.On("QueueDeclare", "worker", true, false, false, false, amqp.Table{}).Return(amqp.Queue{}, nil).Once() ch.On("ExchangeDeclare", "worker", "test", true, false, false, false, amqp.Table{}).Return(nil).Once() - ch.On("QueueBind", "worker", "", "worker", false, amqp.Table{}).Return(nil).Once() + ch.On("QueueBind", "worker", "foo", "worker", false, amqp.Table{}).Return(nil).Once() err := Initialize(&config, ch, errLogger, infLogger) @@ -350,6 +350,7 @@ func createConfig() config.Config { autodelete=Off exclusive=Off nowait=Off + key=foo [exchange] name=worker From 97b908f255809a6363588e76586fa85009fb2c35 Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Mon, 30 Nov 2015 20:16:45 +0100 Subject: [PATCH 29/36] Allow all options to be configured from cli --- config/config.go | 54 ++++++++++++++++++++++++++++++++ main.go | 80 +++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 130 insertions(+), 4 deletions(-) diff --git a/config/config.go b/config/config.go index a1bbb1d..a917409 100644 --- a/config/config.go +++ b/config/config.go @@ -1,9 +1,12 @@ package config import ( + "fmt" "log" "strings" + "github.com/codegangsta/cli" + "code.google.com/p/gcfg" "gopkg.in/validator.v2" @@ -83,3 +86,54 @@ func CreateFromString(config string) Config { return cfg } + +// CreateFromCliContext creates config from options passed to cli +func CreateFromCliContext(c *cli.Context) Config { + str := fmt.Sprintf( + `[rabbitmq] + host=%s + username=%s + password=%s + vhost=%s + port=%s + compression=%s + + [prefetch] + count=%s + global=%s + + [queue] + name=%s + durable=%s + autodelete=%s + exclusive=%s + nowait=%s + key=%s + + [exchange] + name=%s + autodelete=%s + type=%s + durable=%s`, + c.String("host"), + c.String("username"), + c.String("password"), + c.String("vhost"), + c.String("port"), + c.String("compression"), + c.String("prefetch-count"), + c.String("prefetch-global"), + c.String("queue-name"), + c.String("queue-durable"), + c.String("queue-autodelete"), + c.String("queue-exclusive"), + c.String("queue-nowait"), + c.String("queue-key"), + c.String("exchange-name"), + c.String("exchange-autodelete"), + c.String("exchange-type"), + c.String("exchange-durable"), + ) + + return CreateFromString(str) +} diff --git a/main.go b/main.go index b0d78d5..7a2f5e6 100644 --- a/main.go +++ b/main.go @@ -31,10 +31,82 @@ func main() { Name: "configuration, c", Usage: "Location of configuration file", }, - cli.BoolFlag{ + cli.StringFlag{ Name: "quiet, Q", Usage: "Enable quite mode (disables loggging to stdout and stderr)", }, + cli.StringFlag{ + Name: "host, H", + Usage: "IP or hostname of RabbitMQ server", + }, + cli.StringFlag{ + Name: "username, u", + Usage: "RabbitMQ username", + }, + cli.StringFlag{ + Name: "password, p", + Usage: "RabbitMQ password", + }, + cli.StringFlag{ + Name: "vhost, V", + Usage: "RabbitMQ vhost", + }, + cli.StringFlag{ + Name: "port, P", + Usage: "RabbitMQ port", + }, + cli.StringFlag{ + Name: "compression, o", + Usage: "Enable compression of messages", + }, + cli.StringFlag{ + Name: "prefetch-count, C", + Usage: "Prefetch count", + }, + cli.StringFlag{ + Name: "prefetch-global, G", + Usage: "Set prefetch count as global", + }, + cli.StringFlag{ + Name: "queue-name,q", + Usage: "Queue name", + }, + cli.StringFlag{ + Name: "queue-durable,D", + Usage: "Mark queue as durable", + }, + cli.StringFlag{ + Name: "queue-autodelete,a", + Usage: "Autodelete queue", + }, + cli.StringFlag{ + Name: "queue-exlusive,E", + Usage: "Mark queue as exclusive", + }, + cli.StringFlag{ + Name: "queue-nowait, T", + Usage: "Do not wait for the server to confirm the binding", + }, + cli.StringFlag{ + Name: "queue-key, k", + Usage: "Routing key to bind the queue on", + }, + cli.StringFlag{ + Name: "exchange-name, X", + Usage: "Exchange name", + }, + cli.StringFlag{ + Name: "exchange-autodelete, t", + Usage: "Autodelete exchange", + }, + cli.StringFlag{ + Name: "exchange-type, y", + Usage: "Exchange type (direct, fanout, topic or headers)", + }, + cli.StringFlag{ + Name: "exchange-durable, j", + Usage: "Mark exchange as durable", + }, } app.Action = func(c *cli.Context) { if c.String("executable") == "" { @@ -64,15 +136,15 @@ func main() { logger.Printf("Could not parse config: %s", err) } } + + // Read config settings passed in as option to the command + configs = append(configs, config.CreateFromCliContext(c)) merger := config.ConfigMerger{} cfg, _ := merger.Merge(configs) if !config.Validate(cfg, logger) { logger.Fatalf("Please fix configuration issues.") } - // fmt.Println(config) - // os.Exit(0) - errLogger, err := createLogger(cfg.Logs.Error, verbose, os.Stderr) if err != nil { logger.Fatalf("Failed creating error log: %s", err) From afae7c5706d6491c9d543985b51f929d1fe41e0a Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Mon, 30 Nov 2015 20:29:51 +0100 Subject: [PATCH 30/36] Updated README.md with new config options --- README.md | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 7cce839..72b84a3 100644 --- a/README.md +++ b/README.md @@ -158,7 +158,7 @@ type=direct durable=On ``` -## Configuration +# Configuration A configuration file is required. Example: @@ -187,10 +187,27 @@ Run without -V to get rid of the output: $ rabbitmq-cli-consumer -e "/path/to/your/app argument --flag" -c /path/to/your/configuration.conf +## Configuration inheritance + +By default rabbitmq-cli-consumer looks for configuration files in the following +order: + +1. /etc/rabbitmq-cli-consumer/rabbitmq-cli-consumer.conf +2. ~/.rabbitmq-cli-consumer.conf +3. location passed in via -c option + +## Override options on the command line: + +Every option in the configuration file can be overwritten by passing them on the +cli. Use On and Off for true and false +respectively. Example: + + $ rabbitmq-cli-consumer -c /you/config.conf -e "/path/to/executble" --queue-key=custom-key + ### Prefetch count It's possible to configure the prefetch count and if you want set it as global. Add the following section to your -configuration to confol these values: +configuration to control these values: ```ini [prefetch] @@ -224,8 +241,6 @@ exclusive=Off nowait=Off ``` - - ## The executable Your executable receives the message as the last argument. So consider the following: From bda2b1913dd9f8c4f7a8b19b31f722c76497f627 Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Wed, 2 Dec 2015 20:40:19 +0100 Subject: [PATCH 31/36] Queue TTL is now configurable --- config/config.go | 1 + consumer/consumer.go | 3 +++ consumer/consumer_test.go | 50 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 54 insertions(+) diff --git a/config/config.go b/config/config.go index a917409..6c08f85 100644 --- a/config/config.go +++ b/config/config.go @@ -33,6 +33,7 @@ type Config struct { Exclusive bool Nowait bool Key string + TTL int } Exchange struct { Name string `validate:"nonzero"` diff --git a/consumer/consumer.go b/consumer/consumer.go index 1803fea..40f632a 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -154,6 +154,9 @@ func Initialize(cfg *config.Config, ch Channel, errLogger, infLogger *log.Logger infLogger.Printf("Declaring queue \"%s\"...", cfg.Queue.Name) table := amqp.Table{} + if cfg.Queue.TTL > 0 { + table["x-message-ttl"] = cfg.Queue.TTL + } _, err := ch.QueueDeclare(cfg.Queue.Name, cfg.Queue.Durable, cfg.Queue.Autodelete, cfg.Queue.Exclusive, cfg.Queue.Nowait, table) if nil != err { diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go index 6155315..52c85c1 100644 --- a/consumer/consumer_test.go +++ b/consumer/consumer_test.go @@ -95,6 +95,23 @@ func TestDeclareQueueSucceeds(t *testing.T) { ch.AssertExpectations(t) } +func TestDeclareQueueWithTTL(t *testing.T) { + config := createConfigWithTTL() + ch := new(TestChannel) + + var b bytes.Buffer + errLogger := log.New(&b, "", 0) + infLogger := log.New(&b, "", 0) + + ch.On("Qos", 3, 0, true).Return(nil).Once() + ch.On("QueueDeclare", "worker", true, false, false, false, amqp.Table{"x-message-ttl": 1200}).Return(amqp.Queue{}, nil).Once() + ch.On("ExchangeDeclare", "worker", "test", true, false, false, false, amqp.Table{}).Return(errors.New("error")).Once() + + Initialize(&config, ch, errLogger, infLogger) + + ch.AssertExpectations(t) +} + func TestBindQueueFails(t *testing.T) { config := createConfig() ch := new(TestChannel) @@ -363,3 +380,36 @@ func createConfig() config.Config { info=b `) } + +func createConfigWithTTL() config.Config { + return config.CreateFromString(`[rabbitmq] + host=localhost + username=ricbra + password=t3st + vhost=staging + port=123 + + [prefetch] + count=3 + global=On + + [queue] + name=worker + durable=On + autodelete=Off + exclusive=Off + nowait=Off + key=foo + ttl=1200 + + [exchange] + name=worker + autodelete=Off + type=test + durable=On + + [logs] + error=a + info=b + `) +} From f070bd4b85d1544f2b533d42f9f15eadeea7d3e8 Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Wed, 2 Dec 2015 20:58:38 +0100 Subject: [PATCH 32/36] Added some more docs --- README.md | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 72b84a3..7d31d05 100644 --- a/README.md +++ b/README.md @@ -230,7 +230,7 @@ durable=On ### Configuring the queue -All queue options are configurable. Example: +All queue options are configurable. Full example: ```ini [queue] @@ -241,6 +241,30 @@ exclusive=Off nowait=Off ``` +If you want to define a TTL for your queue: + +```ini +[queue] +name=rpc_queue +durable=On +autodelete=Off +exclusive=Off +nowait=Off +ttl=1200 +``` + +When you want to configure the routing key: + +```ini +[queue] +name=rpc_queue +durable=On +autodelete=Off +exclusive=Off +nowait=Off +key=your-routing-key +``` + ## The executable Your executable receives the message as the last argument. So consider the following: From 09f2fbfe524525a188e431aca6e5bd51ea16d6f9 Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Fri, 4 Dec 2015 22:39:19 +0100 Subject: [PATCH 33/36] Implemented logrotate on USR1 signal --- main.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/main.go b/main.go index 7a2f5e6..88b135e 100644 --- a/main.go +++ b/main.go @@ -1,10 +1,13 @@ package main import ( + "fmt" "io" "log" "os" + "os/signal" "os/user" + "syscall" "code.google.com/p/gcfg" @@ -15,6 +18,8 @@ import ( "github.com/spf13/afero" ) +var files []*os.File + func main() { app := cli.NewApp() app.Name = "rabbitmq-cli-consumer" @@ -162,6 +167,25 @@ func main() { errLogger.Fatalf("Failed creating consumer: %s", err) } + // Reopen logs on USR1 + sigs := make(chan os.Signal) + signal.Notify(sigs, syscall.SIGUSR1) + + go func() { + for _ = range sigs { + for _, file := range files { + filename := file.Name() + file.Close() + new, err := os.OpenFile(filename, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0660) + + if err != nil { + panic(fmt.Sprintf("Failed reopeing log file: %s", err)) + } + file = new + } + } + }() + client.Consume() } @@ -175,6 +199,8 @@ func createLogger(filename string, verbose bool, out io.Writer) (*log.Logger, er return nil, err } + files = append(files, file) + var writers = []io.Writer{ file, } From aa94c90b02f14bfb7c5f3021b7a1cbe765dc22c1 Mon Sep 17 00:00:00 2001 From: Richard van den Brand Date: Fri, 4 Dec 2015 22:55:08 +0100 Subject: [PATCH 34/36] Added small section in README.md for log rotation --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.md b/README.md index 7d31d05..c4c6b45 100644 --- a/README.md +++ b/README.md @@ -385,6 +385,12 @@ exit(1); ``` +## Log rotation + +To close and reopen the logs send the USR1 signal: + + $ kill -s USR1 pid_of_process + # Developing Missing anything? Found a bug? I love to see your PR. From f6aad0fbefad7b0571d42896c10fdfe654055b90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miguel=20Sim=C3=B5es?= Date: Tue, 23 Aug 2016 14:36:13 +0100 Subject: [PATCH 35/36] Fixed the import statements on the main.go and config.go files since code.google.com is no longer available --- config/config.go | 2 +- main.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/config/config.go b/config/config.go index 6c08f85..d97f56f 100644 --- a/config/config.go +++ b/config/config.go @@ -7,7 +7,7 @@ import ( "github.com/codegangsta/cli" - "code.google.com/p/gcfg" + "gopkg.in/gcfg.v1" "gopkg.in/validator.v2" ) diff --git a/main.go b/main.go index 88b135e..78f59f0 100644 --- a/main.go +++ b/main.go @@ -9,7 +9,7 @@ import ( "os/user" "syscall" - "code.google.com/p/gcfg" + "gopkg.in/gcfg.v1" "github.com/codegangsta/cli" "github.com/ricbra/rabbitmq-cli-consumer/command" From 76d75e5a4c29f625c041e35737274a959559238d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miguel=20Sim=C3=B5es?= Date: Tue, 13 Sep 2016 15:59:53 +0100 Subject: [PATCH 36/36] Added the .gitignore file to the repository to ensure we do not commit the binary after building the consumer --- .gitignore | 1 + 1 file changed, 1 insertion(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..26d0cdb --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/rabbitmq-cli-consumer