diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f51461..47e7eb7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,21 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## [3.1.0] - 2022-10-04 +### Fixed +- `main.go` Exit code after showing a version. + +### Changed +- `main.go` Catch both *SIGINT* and *SIGTERM* and exit with **0** code. + +## [3.0.0] - 2022-09-30 +### Changed +- **-graphiteAddress** takes `hostname:port` pair. +- Multiple senders allowed by repeating **-graphiteAddress** flag. + +### Removed +- **-graphitePort** flag. + ## [2.2.0] - 2022-06-07 ### Changed - Init as go module. diff --git a/README.md b/README.md index d6b11a3..1cc9e28 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,7 @@ Graphite Proxy in Golang ## Features +- Send metrics to multiple destinations (only host and port may differ but not TLS options). - TLS for input & output connections. - Add configurable tenant. - Add configurable prefix. @@ -22,7 +23,7 @@ Get help Run with some options -`./groxy -port 2004 -graphiteAddress localhost -tenant techops -prefix "groxy" -immutablePrefix "test." -TLS -ignoreCert true` +`./groxy -port 2004 -graphiteAddress localhost:2003 -tenant techops -prefix "groxy" -immutablePrefix "test." -TLS -ignoreCert true` ## Test Send metric like this diff --git a/main.go b/main.go index 43edb23..822c301 100644 --- a/main.go +++ b/main.go @@ -4,17 +4,25 @@ import ( "flag" "fmt" "os" + "os/signal" + "strconv" "strings" "sync/atomic" + "syscall" "time" log "github.com/sirupsen/logrus" // "github.com/pkg/profile" ) -var version string = "2.2.0" +var version string = "3.1.0" -var clog, slog, rlog, tlog, stlog *log.Entry +var clog, rlog, tlog, stlog *log.Entry + +var instance string +var hostname string +var systemTenant string +var systemPrefix string type Metric struct { Prefix string `json:"prefix,omitempty"` @@ -53,13 +61,11 @@ func getHostname() string { func main() { // defer profile.Start().Stop() - var instance string var tenant string var forceTenant bool var prefix string var immutablePrefix arrayFlags - var graphiteAddress string - var graphitePort int + var graphiteAddress arrayFlags var statsAddress string var statsPort int var address string @@ -79,9 +85,6 @@ func main() { var debug bool var logCaller bool var limitPerSec int - var systemTenant string - var systemPrefix string - var hostname string var compressedOutput bool var compressedInput bool var showVersion bool @@ -91,8 +94,7 @@ func main() { flag.BoolVar(&forceTenant, "forceTenant", false, "Overwrite metrics tenant even if it is already set") flag.StringVar(&prefix, "prefix", "", "Prefix to add to any metric") flag.Var(&immutablePrefix, "immutablePrefix", "Do not add prefix to metrics start with. Could be set many times") - flag.StringVar(&graphiteAddress, "graphiteAddress", "", "Graphite server DNS name") - flag.IntVar(&graphitePort, "graphitePort", 2003, "Graphite server TCP port") + flag.Var(&graphiteAddress, "graphiteAddress", "Graphite server DNS name : Graphite server TCP port") flag.StringVar(&statsAddress, "statsAddress", "127.0.0.1", "Proxy stats bind address") flag.IntVar(&statsPort, "statsPort", 3003, "Proxy stats port") flag.StringVar(&address, "address", "127.0.0.1", "Proxy bind address") @@ -126,7 +128,7 @@ func main() { if showVersion { fmt.Println(version) - os.Exit(1) + os.Exit(0) } if jsonLog == true { @@ -156,7 +158,7 @@ func main() { clog.Info("Groxy rocks!") // Validate variables - if graphiteAddress == "" { + if graphiteAddress == nil { clog.Fatal("You must set '-graphiteAddress'.") } @@ -173,7 +175,6 @@ func main() { } inputChan := make(chan *Metric, 10000000) - outputChan := make(chan *Metric, 10000000) go runReceiver( address, @@ -187,42 +188,100 @@ func main() { ignoreCert, compressedInput) - go runSender( - graphiteAddress, - graphitePort, - outputChan, - tlsOutput, - mtlsOutput, - tlsOutputCaCert, - tlsOutputCert, - tlsOutputKey, - ignoreCert, - limitPerSec, - compressedOutput) + var outputChans []chan *Metric + for id := range graphiteAddress { + outputChan := make(chan *Metric, 10000000) + outputChans = append(outputChans, outputChan) + + ga := strings.Split(graphiteAddress[id], ":") + host := ga[0] + port, err := strconv.Atoi(ga[1]) + if err != nil { + clog.Fatal("Can't get integer port from '-graphiteAddress'.") + } + + // Init sender counters that are slices + state.Destination = append(state.Destination, graphiteAddress[id]) + state.SendError = append(state.SendError, 0) + state.Out = append(state.Out, 0) + state.OutBytes = append(state.OutBytes, 0) + state.OutMpm = append(state.OutMpm, 0) + state.OutBpm = append(state.OutBpm, 0) + state.Returned = append(state.Returned, 0) + state.OutQueue = append(state.OutQueue, 0) + state.Queue = append(state.Queue, 0) + state.NegativeQueueError = append(state.NegativeQueueError, 0) + state.ConnectionError = append(state.ConnectionError, 0) + state.Connection = append(state.Connection, 0) + state.ConnectionAlive = append(state.ConnectionAlive, 0) + state.PacksOverflewError = append(state.PacksOverflewError, 0) + + go runSender( + id, + host, + port, + outputChan, + tlsOutput, + mtlsOutput, + tlsOutputCaCert, + tlsOutputCert, + tlsOutputKey, + ignoreCert, + limitPerSec, + compressedOutput) + + } - go runTransformer(inputChan, outputChan, tenant, forceTenant, prefix, immutablePrefix) + go runTransformer(inputChan, outputChans, tenant, forceTenant, prefix, immutablePrefix) go runRouter(statsAddress, statsPort) go updateQueue(1) + go waitForDeath() sleepSeconds := 60 clog.WithFields(log.Fields{"sleepSeconds": sleepSeconds}).Info("Starting a waiting loop.") for { + // Get initial values in := atomic.LoadInt64(&state.In) - out := atomic.LoadInt64(&state.Out) - transformed := atomic.LoadInt64(&state.Transformed) bad := atomic.LoadInt64(&state.Bad) - out_bytes := atomic.LoadInt64(&state.OutBytes) + transformed := atomic.LoadInt64(&state.Transformed) + var out, out_bytes []int64 + + // For multiple senders + for id := range graphiteAddress { + out = append(out, atomic.LoadInt64(&state.Out[id])) + out_bytes = append(out_bytes, atomic.LoadInt64(&state.OutBytes[id])) + } + // Sleep for a minute time.Sleep(time.Duration(sleepSeconds) * time.Second) + stlog.WithFields(log.Fields{"graphiteAddress": graphiteAddress}).Info("Updating per-minute metrics.") // Calculate MPMs - state.InMpm = atomic.LoadInt64(&state.In) - in - state.OutMpm = atomic.LoadInt64(&state.Out) - out - state.TransformedMpm = atomic.LoadInt64(&state.Transformed) - transformed - state.BadMpm = atomic.LoadInt64(&state.Bad) - bad - state.OutBpm = atomic.LoadInt64(&state.OutBytes) - out_bytes - - clog.WithFields(log.Fields{"state": state}).Info("Dumping state.") - sendStateMetrics(instance, hostname, systemTenant, systemPrefix, inputChan) + atomic.StoreInt64(&state.InMpm, atomic.LoadInt64(&state.In)-in) + atomic.StoreInt64(&state.BadMpm, atomic.LoadInt64(&state.Bad)-bad) + atomic.StoreInt64(&state.TransformedMpm, atomic.LoadInt64(&state.Transformed)-transformed) + + // For multiple senders + for id := range graphiteAddress { + atomic.StoreInt64(&state.OutMpm[id], atomic.LoadInt64(&state.Out[id])-out[id]) + atomic.StoreInt64(&state.OutBpm[id], atomic.LoadInt64(&state.OutBytes[id])-out_bytes[id]) + } + + // Send State metrics + sendStateMetrics(inputChan) + } +} + +func waitForDeath() { + clog.Info("Starting Wait For Death loop.") + cancelChan := make(chan os.Signal, 1) + signal.Notify(cancelChan, syscall.SIGTERM, syscall.SIGINT) + + for { + time.Sleep(time.Duration(1) * time.Second) + + sig := <-cancelChan + clog.WithFields(log.Fields{"signal": sig}).Info("Caught signal. Terminating.") + os.Exit(0) } } diff --git a/sender.go b/sender.go index 3d6c3d1..33fa84b 100644 --- a/sender.go +++ b/sender.go @@ -6,15 +6,16 @@ import ( "crypto/tls" "crypto/x509" "fmt" - log "github.com/sirupsen/logrus" "io/ioutil" "net" "sync/atomic" "time" + + log "github.com/sirupsen/logrus" // "github.com/pkg/profile" ) -func limitRefresher(limitPerSec *int) { +func limitRefresher(limitPerSec *int, slog *log.Entry) { savedLimit := *limitPerSec slog.WithFields(log.Fields{"limitPerSec": savedLimit}).Info("Starting Packs Limit Refresher.") @@ -25,6 +26,7 @@ func limitRefresher(limitPerSec *int) { } func runSender( + id int, host string, port int, outputChan chan *Metric, @@ -36,6 +38,8 @@ func runSender( ignoreCert bool, limitPerSec int, compress bool) { + + var slog *log.Entry slog = clog.WithFields(log.Fields{ "host": host, "port": port, @@ -44,11 +48,12 @@ func runSender( "ignoreCert": ignoreCert, "compress": compress, "thread": "sender", + "id": id, }) slog.Info("Starting Sender.") // Start limit refresher thread - go limitRefresher(&limitPerSec) + go limitRefresher(&limitPerSec, slog) for { curLimit := limitPerSec @@ -63,15 +68,16 @@ func runSender( tlsCaCert, tlsCert, tlsKey, - ignoreCert) + ignoreCert, + slog) if err != nil { slog.WithFields(log.Fields{"error": err}).Error("Can't create connection.") - atomic.AddInt64(&state.ConnectionError, 1) + atomic.AddInt64(&state.ConnectionError[id], 1) time.Sleep(5 * time.Second) continue } else { - atomic.AddInt64(&state.Connection, 1) - atomic.AddInt64(&state.ConnectionAlive, 1) + atomic.AddInt64(&state.Connection[id], 1) + atomic.AddInt64(&state.ConnectionAlive[id], 1) } // collect a pack of metrics @@ -89,12 +95,12 @@ func runSender( // send the pack if len(metrics) > 0 { - go sendMetric(&metrics, connection, outputChan, compress) + go sendMetric(&metrics, connection, outputChan, compress, slog, id) limitPerSec-- } } else { slog.Warning("Limit of metric packs per second overflew.") - atomic.AddInt64(&state.PacksOverflewError, 1) + atomic.AddInt64(&state.PacksOverflewError[id], 1) time.Sleep(1 * time.Second) } } @@ -108,7 +114,8 @@ func createConnection( tlsCaCert string, tlsCert string, tlsKey string, - ignoreCert bool) (net.Conn, error) { + ignoreCert bool, + slog *log.Entry) (net.Conn, error) { netAddress := fmt.Sprintf("%s:%d", host, port) slog.Debug("Connecting to Graphite.") @@ -169,7 +176,7 @@ func createConnection( return connection, err } -func sendMetric(metrics *[10000]*Metric, connection net.Conn, outputChan chan *Metric, compress bool) { +func sendMetric(metrics *[10000]*Metric, connection net.Conn, outputChan chan *Metric, compress bool, slog *log.Entry, id int) { buffered := 0 sent := 0 returned := 0 @@ -236,7 +243,7 @@ func sendMetric(metrics *[10000]*Metric, connection net.Conn, outputChan chan *M if err != nil { slog.WithFields(log.Fields{"error": err}).Error("Connection write error.") - atomic.AddInt64(&state.SendError, 1) + atomic.AddInt64(&state.SendError[id], 1) connectionAlive = false @@ -256,16 +263,18 @@ func sendMetric(metrics *[10000]*Metric, connection net.Conn, outputChan chan *M }).Debug("Metrics pack sent.") sent += buffered - - atomic.AddInt64(&state.OutBytes, int64(packDataLength)) - atomic.AddInt64(&state.Out, int64(buffered)) } connection.Close() - atomic.AddInt64(&state.ConnectionAlive, -1) + atomic.AddInt64(&state.ConnectionAlive[id], -1) slog.WithFields(log.Fields{ "sent_bytes": packDataLength, "sent_num": sent, "returned_num": returned, }).Info("Pack is finished.") + + // Increment per sender counters + atomic.AddInt64(&state.OutBytes[id], int64(packDataLength)) + atomic.AddInt64(&state.Out[id], int64(sent)) + atomic.AddInt64(&state.Returned[id], int64(returned)) } diff --git a/state.go b/state.go index e2b0b3c..5d0b626 100644 --- a/state.go +++ b/state.go @@ -3,39 +3,45 @@ package main import ( "encoding/json" "fmt" - log "github.com/sirupsen/logrus" "net/http" "reflect" "strings" "sync/atomic" "time" + log "github.com/sirupsen/logrus" + "github.com/gorilla/mux" // "github.com/pkg/profile" ) type State struct { - Version string `json:"version"` - In int64 `json:"in"` - Bad int64 `json:"bad"` - Transformed int64 `json:"transformed"` - Out int64 `json:"out"` - OutBytes int64 `json:"out_bytes"` - ReadError int64 `json:"read_error"` - SendError int64 `json:"send_error"` - InMpm int64 `json:"in_mpm"` - BadMpm int64 `json:"bad_mpm"` - TransformedMpm int64 `json:"transformed_mpm"` - OutMpm int64 `json:"out_mpm"` - OutBpm int64 `json:"out_bpm"` - Connection int64 `json:"connection"` - ConnectionAlive int64 `json:"connection_alive"` - ConnectionError int64 `json:"connection_error"` - TransformQueue int64 `json:"transform_queue"` - OutQueue int64 `json:"out_queue"` - Queue int64 `json:"queue"` - NegativeQueueError int64 `json:"negative_queue_error"` - PacksOverflewError int64 `json:"packs_overflew_error"` + Version string `json:"version" type:"string"` + // Reciever metrics + In int64 `json:"in" type:"int"` + Bad int64 `json:"bad" type:"int"` + Transformed int64 `json:"transformed" type:"int"` + ReadError int64 `json:"read_error" type:"int"` + InMpm int64 `json:"in_mpm" type:"int"` + BadMpm int64 `json:"bad_mpm" type:"int"` + // Transformer metrics + TransformedMpm int64 `json:"transformed_mpm" type:"int"` + TransformQueue int64 `json:"transform_queue" type:"int"` + // Sender metrics + Destination []string `json:"destination" type:"slice"` + Connection []int64 `json:"connection" type:"slice"` + ConnectionAlive []int64 `json:"connection_alive" type:"slice"` + ConnectionError []int64 `json:"connection_error" type:"slice"` + Out []int64 `json:"out" type:"slice"` + OutBytes []int64 `json:"out_bytes" type:"slice"` + Returned []int64 `json:"returned" type:"slice"` + SendError []int64 `json:"send_error" type:"slice"` + OutQueue []int64 `json:"out_queue" type:"slice"` + Queue []int64 `json:"queue" type:"slice"` + NegativeQueueError []int64 `json:"negative_queue_error" type:"slice"` + PacksOverflewError []int64 `json:"packs_overflew_error" type:"slice"` + OutMpm []int64 `json:"out_mpm" type:"slice"` + OutBpm []int64 `json:"out_bpm" type:"slice"` } func runRouter(address string, port int) { @@ -64,53 +70,90 @@ func updateQueue(sleepSeconds int) { time.Sleep(time.Duration(sleepSeconds) * time.Second) state.TransformQueue = atomic.LoadInt64(&state.In) - atomic.LoadInt64(&state.Transformed) - state.OutQueue = atomic.LoadInt64(&state.Transformed) - atomic.LoadInt64(&state.Out) - state.Queue = atomic.LoadInt64(&state.In) - atomic.LoadInt64(&state.Out) - if state.Queue < 0 { - clog.WithFields(log.Fields{"queue": state.Queue}).Error("Queue value is negative.") - atomic.AddInt64(&state.NegativeQueueError, 1) + // Work with multiple Out queues + for id := 0; id < len(state.Out); id++ { + state.OutQueue[id] = atomic.LoadInt64(&state.Transformed) - atomic.LoadInt64(&state.Out[id]) + state.Queue[id] = atomic.LoadInt64(&state.In) - atomic.LoadInt64(&state.Out[id]) + + if state.Queue[id] < 0 { + clog.WithFields(log.Fields{"queue": state.Queue[id]}).Error("Queue value is negative.") + atomic.AddInt64(&state.NegativeQueueError[id], 1) + } } } } -func sendStateMetrics(instance string, hostname string, systemTenant string, systemPrefix string, inputChan chan *Metric) { - clog.Info("Sending state metrics.") +func sendStateMetrics(inputChan chan *Metric) { + stlog.WithFields(log.Fields{"state": fmt.Sprintf("%+v", state)}).Info("Dumping and sending state as metrics.") stateSnapshot := state timestamp := time.Now().Unix() - value := reflect.ValueOf(stateSnapshot) - for i := 0; i < value.NumField(); i++ { - structfield := value.Type().Field(i) + values := reflect.ValueOf(stateSnapshot) + for i := 0; i < values.NumField(); i++ { + structfield := values.Type().Field(i) name := structfield.Name - value := value.Field(i) - tag := structfield.Tag.Get("json") - - metric := Metric{} + vtype := structfield.Tag.Get("type") + metricName := structfield.Tag.Get("json") + valueField := values.Field(i) + + switch name { + case "Version": + value := strings.Replace(valueField.String(), ".", "", 2) + doSend(name, metricName, value, timestamp, inputChan) + continue + case "Destination": + value := fmt.Sprintf("%d", valueField.Len()) + doSend(name, metricName, value, timestamp, inputChan) + continue + } - if name == "Version" { - metric.Value = strings.Replace(value.String(), ".", "", 2) - } else { - metric.Value = fmt.Sprintf("%d", value.Int()) + switch vtype { + case "int": + value := fmt.Sprintf("%d", valueField.Int()) + doSend(name, metricName, value, timestamp, inputChan) + case "slice": + for id := 0; id < valueField.Len(); id++ { + metricName = fmt.Sprintf("%s_%d", structfield.Tag.Get("json"), id) + value := fmt.Sprintf("%d", valueField.Index(id).Int()) + doSend(name, metricName, value, timestamp, inputChan) + } + default: + stlog.WithFields(log.Fields{"vtype": vtype}).Fatal("Unknown state value type.") } + } +} - clog.WithFields(log.Fields{ - "name": name, - "tag": tag, - "value": metric.Value, - }).Debug("State field.") +func doSend( + name string, + metricName string, + value string, + timestamp int64, + inputChan chan *Metric) { - metric.Path = fmt.Sprintf("%s.groxy.%s.state.%s", hostname, instance, tag) - metric.Timestamp = timestamp - metric.Tenant = systemTenant - metric.Prefix = systemPrefix + metric := Metric{} - if metric.Prefix != "" { - metric.Prefix = metric.Prefix + "." - } + metric.Path = fmt.Sprintf("%s.groxy.%s.state.%s", hostname, instance, metricName) + metric.Value = value + metric.Timestamp = timestamp + metric.Tenant = systemTenant + metric.Prefix = systemPrefix - // Pass to transformation - inputChan <- &metric - atomic.AddInt64(&state.In, 1) + if metric.Prefix != "" { + metric.Prefix = metric.Prefix + "." } + + stlog.WithFields(log.Fields{ + "stateName": name, + "metricName": metricName, + "metricValue": metric.Value, + "metricPath": metric.Path, + "metricTimestamp": metric.Timestamp, + "metricTenant": metric.Tenant, + "metricPrefix": metric.Prefix, + }).Debug("Sending state field.") + + // Pass to transformation + inputChan <- &metric + atomic.AddInt64(&state.In, 1) } diff --git a/transformer.go b/transformer.go index d2d628b..e47d0ff 100644 --- a/transformer.go +++ b/transformer.go @@ -1,16 +1,17 @@ package main import ( - log "github.com/sirupsen/logrus" "strings" "sync/atomic" "time" + + log "github.com/sirupsen/logrus" // "github.com/pkg/profile" ) func runTransformer( inputChan chan *Metric, - outputChan chan *Metric, + outputChans []chan *Metric, tenant string, forceTenant bool, prefix string, @@ -27,7 +28,7 @@ func runTransformer( for { select { case metric := <-inputChan: - go transformMetric(metric, outputChan, tenant, forceTenant, prefix, immutablePrefix) + go transformMetric(metric, outputChans, tenant, forceTenant, prefix, immutablePrefix) default: time.Sleep(100 * time.Millisecond) } @@ -36,7 +37,7 @@ func runTransformer( func transformMetric( metric *Metric, - outputChan chan *Metric, + outputChans []chan *Metric, tenant string, forceTenant bool, prefix string, @@ -64,7 +65,10 @@ func transformMetric( } tlog.WithFields(log.Fields{"metric": metric}).Debug("Metric transformed.") - outputChan <- metric + + for i := 0; i < len(outputChans); i++ { + outputChans[i] <- metric + } // Update state atomic.AddInt64(&state.Transformed, 1)