diff --git a/CHANGELOG.md b/CHANGELOG.md index 804a5bc..a82a4ad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,18 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## [2.0.1] +### Fixed +- `state` do not add dot to empty prefix at own metrics. + +### Removed +- **-TLS** argument in favor of **-tlsOutput** and **-tlsInput**. + +### Added +- Add compression for both `receiver` and `sender`. +- New arguments: **-tlsOutput**, **-tlsInput**, **-tlsInputCert**, **-tlsInputKey**. +- `state` new metrics: **OutBytes**, **OutBpm**. + ## [1.3.0] - 2021-06-03 ### Added - **hostname** argument. Allows setting hostname instead of getting a real one (good for Docker). diff --git a/README.md b/README.md index 840f4e4..d6b11a3 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,15 @@ # groxy Graphite Proxy in Golang +## Features +- TLS for input & output connections. +- Add configurable tenant. +- Add configurable prefix. +- Don't add prefix to metrics start with configurable strings. +- Send and receive metrics with **zlib** compression. +- Publish its own metrics by HTTP. +- Send its own metics to configurable tenant and prefix. + ## Build `go get -d` diff --git a/main.go b/main.go index 41cd9df..2f44763 100644 --- a/main.go +++ b/main.go @@ -1,29 +1,18 @@ package main import ( - "bufio" - "crypto/tls" - "encoding/json" "flag" - "fmt" log "github.com/sirupsen/logrus" - "net" - "net/http" "os" - "reflect" - "strconv" "strings" "sync/atomic" "time" - - "github.com/gorilla/mux" // "github.com/pkg/profile" ) -var version string = "1.3.0" +var version string = "2.0.1" var clog, slog, rlog, tlog, stlog *log.Entry -var hostname string type Metric struct { Prefix string `json:"prefix,omitempty"` @@ -33,341 +22,9 @@ type Metric struct { Tenant string `json:"tenant,omitempty"` } -type State struct { - Version string `json:"version"` - In int64 `json:"in"` - Bad int64 `json:"bad"` - Transformed int64 `json:"transformed"` - Out int64 `json:"out"` - ReadError int64 `json:"read_error"` - SendError int64 `json:"send_error"` - InMpm int64 `json:"in_mpm"` - BadMpm int64 `json:"bad_mpm"` - TransformedMpm int64 `json:"transformed_mpm"` - OutMpm int64 `json:"out_mpm"` - Connection int64 `json:"connection"` - ConnectionAlive int64 `json:"connection_alive"` - ConnectionError int64 `json:"connection_error"` - TransformQueue int64 `json:"transform_queue"` - OutQueue int64 `json:"out_queue"` - Queue int64 `json:"queue"` - NegativeQueueError int64 `json:"negative_queue_error"` - PacksOverflewError int64 `json:"packs_overflew_error"` -} - var emptyMetric *Metric var state State -func runRouter(address string, port int) { - stlog = clog.WithFields(log.Fields{ - "address": address, - "port": port, - "thread": "stats", - }) - netAddress := fmt.Sprintf("%s:%d", address, port) - stlog.Info("Starting Stats server.") - - // Create HTTP router - router := mux.NewRouter() - router.HandleFunc("/stats", getState).Methods("GET") - - stlog.Fatal(http.ListenAndServe(netAddress, router)) -} - -func getState(w http.ResponseWriter, req *http.Request) { - json.NewEncoder(w).Encode(state) -} - -func runReceiver(address string, port int, inputChan chan *Metric) { - rlog = clog.WithFields(log.Fields{ - "address": address, - "port": port, - "thread": "receiver", - }) - netAddress := fmt.Sprintf("%s:%d", address, port) - - rlog.Info("Starting Receiver.") - - ln, err := net.Listen("tcp4", netAddress) - if err != nil { - rlog.WithFields(log.Fields{"error": err}).Fatal("Listener error.") - } - defer ln.Close() - - for { - conn, err := ln.Accept() - if err != nil { - rlog.WithFields(log.Fields{"error": err}).Fatal("Reader accept error.") - return - } - go readMetric(conn, inputChan) - } -} - -func readMetric(connection net.Conn, inputChan chan *Metric) { - connection.SetReadDeadline(time.Now().Add(600 * time.Second)) - - scanner := bufio.NewScanner(connection) - for scanner.Scan() { - metricString := scanner.Text() - - rlog.WithFields(log.Fields{"metric": metricString}).Debug("Metric received.") - metricSlice := strings.Split(metricString, " ") - - metric := Metric{} - - switch len(metricSlice) { - case 3: - metric.Tenant = "" - case 4: - metric.Tenant = metricSlice[3] - default: - rlog.WithFields(log.Fields{"metric": metricString}).Error("Bad metric.") - atomic.AddInt64(&state.Bad, 1) - connection.Close() - return - } - - timestamp, err := strconv.ParseInt(metricSlice[2], 10, 64) - if err != nil { - rlog.WithFields(log.Fields{ - "metric": metricString, - "timestampString": metricSlice[2], - "error": err, - }).Error("Timestamp error.") - atomic.AddInt64(&state.Bad, 1) - return - } - - metric.Prefix = "" - metric.Path = metricSlice[0] - metric.Value = metricSlice[1] - metric.Timestamp = timestamp - - inputChan <- &metric - atomic.AddInt64(&state.In, 1) - } - if err := scanner.Err(); err != nil { - rlog.WithFields(log.Fields{"error": err}).Error("Error reading input.") - atomic.AddInt64(&state.ReadError, 1) - } - - connection.Close() -} - -func limitRefresher(limitPerSec *int) { - savedLimit := *limitPerSec - slog.WithFields(log.Fields{"limitPerSec": savedLimit}).Info("Starting Packs Limit Refresher.") - - for { - time.Sleep(1 * time.Second) - *limitPerSec = savedLimit - } -} - -func runSender(host string, port int, outputChan chan *Metric, TLS bool, ignoreCert bool, limitPerSec int) { - slog = clog.WithFields(log.Fields{ - "host": host, - "port": port, - "thread": "sender", - "tls": TLS, - "ignoreCert": ignoreCert, - }) - slog.Info("Starting Sender.") - - // Start limit refresher thread - go limitRefresher(&limitPerSec) - - for { - curLimit := limitPerSec - slog.WithFields(log.Fields{"limit": curLimit}).Debug("Current limit.") - if curLimit > 0 { - // Create output connection - connection, err := createConnection(host, port, TLS, ignoreCert) - if err != nil { - slog.WithFields(log.Fields{"error": err}).Error("Can't create connection.") - atomic.AddInt64(&state.ConnectionError, 1) - time.Sleep(5 * time.Second) - continue - } else { - atomic.AddInt64(&state.Connection, 1) - atomic.AddInt64(&state.ConnectionAlive, 1) - } - - // collect a pack of metrics - var metrics [10000]*Metric - - for i := 0; i < len(metrics); i++ { - select { - case metric := <-outputChan: - metrics[i] = metric - default: - metrics[i] = emptyMetric - time.Sleep(5 * time.Millisecond) - } - } - - // send the pack - if len(metrics) > 0 { - go sendMetric(&metrics, connection, outputChan) - limitPerSec-- - } - } else { - slog.Warning("Limit of metric packs per second overflew.") - atomic.AddInt64(&state.PacksOverflewError, 1) - time.Sleep(1 * time.Second) - } - } -} - -func sendMetric(metrics *[10000]*Metric, connection net.Conn, outputChan chan *Metric) { - sent := 0 - returned := 0 - connectionAlive := true - - for i := 0; i < len(metrics); i++ { - if metrics[i] == emptyMetric { - continue - } - - // If connection is dead - just return metrics to outputChan - if connectionAlive == false { - outputChan <- metrics[i] - returned++ - continue - } - - metricString := fmt.Sprintf( - "%s%s %s %d %s", - metrics[i].Prefix, - metrics[i].Path, - metrics[i].Value, - metrics[i].Timestamp, - metrics[i].Tenant, - ) - - dataLength, err := connection.Write([]byte(metricString + "\n")) - if err != nil { - slog.WithFields(log.Fields{"error": err}).Error("Connection write error.") - atomic.AddInt64(&state.SendError, 1) - - connectionAlive = false - - // Here we must return metric to out outputChan - outputChan <- metrics[i] - returned++ - } else { - slog.WithFields(log.Fields{ - "bytes": dataLength, - "metric": metricString, - "number": i, - }).Debug("Metric sent.") - sent++ - atomic.AddInt64(&state.Out, 1) - } - } - - connection.Close() - atomic.AddInt64(&state.ConnectionAlive, -1) - slog.WithFields(log.Fields{ - "sent": sent, - "returned": returned, - }).Info("Pack is finished.") -} - -func createConnection(host string, port int, TLS bool, ignoreCert bool) (net.Conn, error) { - netAddress := fmt.Sprintf("%s:%d", host, port) - slog.Debug("Connecting to Graphite.") - - timeout, _ := time.ParseDuration("10s") - dialer := &net.Dialer{Timeout: timeout} - - var connection net.Conn - var err error - - if TLS { - connection, err = tls.DialWithDialer( - dialer, - "tcp4", - netAddress, - &tls.Config{ - InsecureSkipVerify: ignoreCert, - MinVersion: tls.VersionTLS12, - MaxVersion: tls.VersionTLS12, - DynamicRecordSizingDisabled: false, - }, - ) - } else { - connection, err = dialer.Dial("tcp4", netAddress) - } - if err != nil { - slog.WithFields(log.Fields{"error": err}).Error("Dialer error.") - return connection, err - } - connection.SetDeadline(time.Now().Add(600 * time.Second)) - - slog.WithFields(log.Fields{ - "remoteAddress": connection.RemoteAddr(), - }).Info("Connection to Graphite established.") - return connection, err -} - -func runTransformer( - inputChan chan *Metric, - outputChan chan *Metric, - tenant string, - prefix string, - immutablePrefix []string) { - - tlog = clog.WithFields(log.Fields{ - "thread": "transformer", - "tenant": tenant, - "prefix": prefix, - "immutablePrefix": immutablePrefix, - }) - tlog.Info("Starting Transformer.") - - for { - select { - case metric := <-inputChan: - go transformMetric(metric, outputChan, tenant, prefix, immutablePrefix) - default: - time.Sleep(100 * time.Millisecond) - } - } -} - -func transformMetric( - metric *Metric, - outputChan chan *Metric, - tenant string, - prefix string, - immutablePrefix []string) { - - // Set tenant only if it is empty - if metric.Tenant == "" { - metric.Tenant = tenant - } - - if metric.Prefix == "" && prefix != "" { - mutate := true - for i := range immutablePrefix { - if strings.HasPrefix(metric.Path, immutablePrefix[i]) == true { - mutate = false - } - } - if mutate { - metric.Prefix = prefix + "." - } - } - - tlog.WithFields(log.Fields{"metric": metric}).Debug("Metric transformed.") - outputChan <- metric - - // Update state - atomic.AddInt64(&state.Transformed, 1) -} - type arrayFlags []string func (i *arrayFlags) String() string { @@ -379,22 +36,6 @@ func (i *arrayFlags) Set(value string) error { return nil } -func updateQueue(sleepSeconds int) { - clog.WithFields(log.Fields{"sleepSeconds": sleepSeconds}).Info("Starting queues update loop.") - for { - time.Sleep(time.Duration(sleepSeconds) * time.Second) - - state.TransformQueue = atomic.LoadInt64(&state.In) - atomic.LoadInt64(&state.Transformed) - state.OutQueue = atomic.LoadInt64(&state.Transformed) - atomic.LoadInt64(&state.Out) - state.Queue = atomic.LoadInt64(&state.In) - atomic.LoadInt64(&state.Out) - - if state.Queue < 0 { - clog.WithFields(log.Fields{"queue": state.Queue}).Error("Queue value is negative.") - atomic.AddInt64(&state.NegativeQueueError, 1) - } - } -} - func getHostname() string { hostname, err := os.Hostname() if err != nil { @@ -407,43 +48,6 @@ func getHostname() string { return hostname } -func sendStateMetrics(instance string, systemTenant string, systemPrefix string, inputChan chan *Metric) { - clog.Info("Sending state metrics.") - stateSnapshot := state - timestamp := time.Now().Unix() - - value := reflect.ValueOf(stateSnapshot) - for i := 0; i < value.NumField(); i++ { - structfield := value.Type().Field(i) - name := structfield.Name - value := value.Field(i) - tag := structfield.Tag.Get("json") - - metric := Metric{} - - if name == "Version" { - metric.Value = strings.Replace(value.String(), ".", "", 2) - } else { - metric.Value = fmt.Sprintf("%d", value.Int()) - } - - clog.WithFields(log.Fields{ - "name": name, - "tag": tag, - "value": metric.Value, - }).Debug("State field.") - - metric.Path = fmt.Sprintf("%s.groxy.%s.state.%s", hostname, instance, tag) - metric.Timestamp = timestamp - metric.Tenant = systemTenant - metric.Prefix = systemPrefix + "." - - // Pass to transformation - inputChan <- &metric - atomic.AddInt64(&state.In, 1) - } -} - func main() { // defer profile.Start().Stop() @@ -457,7 +61,10 @@ func main() { var statsPort int var address string var port int - var TLS bool + var tlsOutput bool + var tlsInput bool + var tlsInputCert string + var tlsInputKey string var ignoreCert bool var jsonLog bool var debug bool @@ -465,24 +72,32 @@ func main() { var limitPerSec int var systemTenant string var systemPrefix string + var hostname string + var compressedOutput bool + var compressedInput bool flag.StringVar(&instance, "instance", "default", "Groxy instance name (for log and metrics)") flag.StringVar(&tenant, "tenant", "", "Graphite project name to store metrics in") flag.StringVar(&prefix, "prefix", "", "Prefix to add to any metric") flag.Var(&immutablePrefix, "immutablePrefix", "Do not add prefix to metrics start with. Could be set many times") flag.StringVar(&graphiteAddress, "graphiteAddress", "", "Graphite server DNS name") - flag.IntVar(&graphitePort, "graphitePort", 2003, "Graphite server DNS name") + flag.IntVar(&graphitePort, "graphitePort", 2003, "Graphite server TCP port") flag.StringVar(&statsAddress, "statsAddress", "127.0.0.1", "Proxy stats bind address") flag.IntVar(&statsPort, "statsPort", 3003, "Proxy stats port") flag.StringVar(&address, "address", "127.0.0.1", "Proxy bind address") flag.IntVar(&port, "port", 2003, "Proxy bind port") - flag.BoolVar(&TLS, "TLS", false, "Use TLS encrypted connection") + flag.BoolVar(&tlsOutput, "tlsOutput", false, "Send metrics via TLS encrypted connection") + flag.BoolVar(&tlsInput, "tlsInput", false, "Receive metrics via TLS encrypted connection") + flag.StringVar(&tlsInputCert, "tlsInputCert", "groxy.crt", "TLS certificate for receiver") + flag.StringVar(&tlsInputKey, "tlsInputKey", "groxy.key", "TLS key for receiver") flag.BoolVar(&ignoreCert, "ignoreCert", false, "Do not verify Graphite server certificate") flag.BoolVar(&jsonLog, "jsonLog", false, "Log in JSON format") flag.BoolVar(&debug, "debug", false, "Log debug messages") flag.BoolVar(&logCaller, "logCaller", false, "Log message caller (file and line number)") + flag.BoolVar(&compressedOutput, "compressedOutput", false, "Compress messages when sending") + flag.BoolVar(&compressedInput, "compressedInput", false, "Read compressed messages when receiving") flag.IntVar(&limitPerSec, "limitPerSec", 2, "Maximum number of metric packs (<=10000 metrics per pack) sent per second") - flag.StringVar(&systemTenant, "systemTenant", "", "Graphite project name to store SELF metrics in. By default is equal to 'tennant'") + flag.StringVar(&systemTenant, "systemTenant", "", "Graphite project name to store SELF metrics in. By default is equal to 'tenant'") flag.StringVar(&systemPrefix, "systemPrefix", "", "Prefix to add to any SELF metric. By default is equal to 'prefix'") flag.StringVar(&hostname, "hostname", "", "Hostname of a computer running Groxy") @@ -537,9 +152,9 @@ func main() { inputChan := make(chan *Metric, 10000000) outputChan := make(chan *Metric, 10000000) - go runReceiver(address, port, inputChan) + go runReceiver(address, port, inputChan, tlsInput, tlsInputCert, tlsInputKey, ignoreCert, compressedInput) go runTransformer(inputChan, outputChan, tenant, prefix, immutablePrefix) - go runSender(graphiteAddress, graphitePort, outputChan, TLS, ignoreCert, limitPerSec) + go runSender(graphiteAddress, graphitePort, outputChan, tlsOutput, ignoreCert, limitPerSec, compressedOutput) go runRouter(statsAddress, statsPort) go updateQueue(1) @@ -550,6 +165,7 @@ func main() { out := atomic.LoadInt64(&state.Out) transformed := atomic.LoadInt64(&state.Transformed) bad := atomic.LoadInt64(&state.Bad) + out_bytes := atomic.LoadInt64(&state.OutBytes) time.Sleep(time.Duration(sleepSeconds) * time.Second) @@ -558,9 +174,10 @@ func main() { state.OutMpm = atomic.LoadInt64(&state.Out) - out state.TransformedMpm = atomic.LoadInt64(&state.Transformed) - transformed state.BadMpm = atomic.LoadInt64(&state.Bad) - bad + state.OutBpm = atomic.LoadInt64(&state.OutBytes) - out_bytes clog.WithFields(log.Fields{"state": state}).Info("Dumping state.") - sendStateMetrics(instance, systemTenant, systemPrefix, inputChan) + sendStateMetrics(instance, hostname, systemTenant, systemPrefix, inputChan) } } diff --git a/receiver.go b/receiver.go new file mode 100644 index 0000000..410418d --- /dev/null +++ b/receiver.go @@ -0,0 +1,141 @@ +package main + +import ( + "bufio" + "compress/zlib" + "crypto/tls" + "fmt" + log "github.com/sirupsen/logrus" + "io" + "net" + "strconv" + "strings" + "sync/atomic" + "time" + // "github.com/pkg/profile" +) + +func runReceiver( + address string, + port int, + inputChan chan *Metric, + TLS bool, + tlsCert string, + tlsKey string, + ignoreCert bool, + compress bool) { + rlog = clog.WithFields(log.Fields{ + "address": address, + "port": port, + "tls": TLS, + "ignoreCert": ignoreCert, + "compress": compress, + "thread": "receiver", + }) + netAddress := fmt.Sprintf("%s:%d", address, port) + + rlog.Info("Starting Receiver.") + + var listener net.Listener + var err error + + if TLS { + // pass + certs, err := tls.LoadX509KeyPair(tlsCert, tlsKey) + if err != nil { + rlog.WithFields(log.Fields{"error": err}).Fatal("TLS Listener certificates error.") + } + + config := &tls.Config{ + Certificates: []tls.Certificate{certs}, + InsecureSkipVerify: ignoreCert, + MinVersion: tls.VersionTLS12, + MaxVersion: tls.VersionTLS12, + DynamicRecordSizingDisabled: false, + } + + listener, err = tls.Listen("tcp4", netAddress, config) + if err != nil { + rlog.WithFields(log.Fields{"error": err}).Fatal("TLS Listener error.") + } + } else { + listener, err = net.Listen("tcp4", netAddress) + if err != nil { + rlog.WithFields(log.Fields{"error": err}).Fatal("Listener error.") + } + } + + defer listener.Close() + + for { + conn, err := listener.Accept() + if err != nil { + rlog.WithFields(log.Fields{"error": err}).Fatal("Reader connection acception error.") + } + go readMetric(conn, inputChan, compress) + } +} + +func readMetric(connection net.Conn, inputChan chan *Metric, compress bool) { + connection.SetReadDeadline(time.Now().Add(600 * time.Second)) + var uncompressedConn io.ReadCloser + var err error + + if compress == true { + uncompressedConn, err = zlib.NewReader(connection) + if err != nil { + rlog.WithFields(log.Fields{"error": err}).Error("Decompression error.") + atomic.AddInt64(&state.ReadError, 1) + return + } + } else { + uncompressedConn = connection + } + + scanner := bufio.NewScanner(uncompressedConn) + for scanner.Scan() { + metricString := scanner.Text() + + rlog.WithFields(log.Fields{"metric": metricString}).Debug("Metric received.") + metricSlice := strings.Split(metricString, " ") + + metric := Metric{} + + switch len(metricSlice) { + case 3: + metric.Tenant = "" + case 4: + metric.Tenant = metricSlice[3] + default: + rlog.WithFields(log.Fields{"metric": metricString}).Error("Bad metric.") + atomic.AddInt64(&state.Bad, 1) + connection.Close() + return + } + + timestamp, err := strconv.ParseInt(metricSlice[2], 10, 64) + if err != nil { + rlog.WithFields(log.Fields{ + "metric": metricString, + "timestampString": metricSlice[2], + "error": err, + }).Error("Timestamp error.") + atomic.AddInt64(&state.Bad, 1) + return + } + + metric.Prefix = "" + metric.Path = metricSlice[0] + metric.Value = metricSlice[1] + metric.Timestamp = timestamp + + inputChan <- &metric + atomic.AddInt64(&state.In, 1) + } + if err := scanner.Err(); err != nil { + rlog.WithFields(log.Fields{"error": err}).Error("Error reading input.") + atomic.AddInt64(&state.ReadError, 1) + } + + uncompressedConn.Close() +} diff --git a/sender.go b/sender.go new file mode 100644 index 0000000..f1117e9 --- /dev/null +++ b/sender.go @@ -0,0 +1,215 @@ +package main + +import ( + "bytes" + "compress/zlib" + "crypto/tls" + "fmt" + log "github.com/sirupsen/logrus" + "net" + "sync/atomic" + "time" + // "github.com/pkg/profile" +) + +func limitRefresher(limitPerSec *int) { + savedLimit := *limitPerSec + slog.WithFields(log.Fields{"limitPerSec": savedLimit}).Info("Starting Packs Limit Refresher.") + + for { + time.Sleep(1 * time.Second) + *limitPerSec = savedLimit + } +} + +func runSender( + host string, + port int, + outputChan chan *Metric, + TLS bool, + ignoreCert bool, + limitPerSec int, + compress bool) { + slog = clog.WithFields(log.Fields{ + "host": host, + "port": port, + "tls": TLS, + "ignoreCert": ignoreCert, + "compress": compress, + "thread": "sender", + }) + slog.Info("Starting Sender.") + + // Start limit refresher thread + go limitRefresher(&limitPerSec) + + for { + curLimit := limitPerSec + slog.WithFields(log.Fields{"limit": curLimit}).Debug("Current limit.") + if curLimit > 0 { + // Create output connection + connection, err := createConnection(host, port, TLS, ignoreCert) + if err != nil { + slog.WithFields(log.Fields{"error": err}).Error("Can't create connection.") + atomic.AddInt64(&state.ConnectionError, 1) + time.Sleep(5 * time.Second) + continue + } else { + atomic.AddInt64(&state.Connection, 1) + atomic.AddInt64(&state.ConnectionAlive, 1) + } + + // collect a pack of metrics + var metrics [10000]*Metric + + for i := 0; i < len(metrics); i++ { + select { + case metric := <-outputChan: + metrics[i] = metric + default: + metrics[i] = emptyMetric + time.Sleep(5 * time.Millisecond) + } + } + + // send the pack + if len(metrics) > 0 { + go sendMetric(&metrics, connection, outputChan, compress) + limitPerSec-- + } + } else { + slog.Warning("Limit of metric packs per second overflew.") + atomic.AddInt64(&state.PacksOverflewError, 1) + time.Sleep(1 * time.Second) + } + } +} + +func sendMetric(metrics *[10000]*Metric, connection net.Conn, outputChan chan *Metric, compress bool) { + buffered := 0 + sent := 0 + returned := 0 + connectionAlive := true + var buf bytes.Buffer + compressedBuf := zlib.NewWriter(&buf) + var dataLength int + var err error + + for i := 0; i < len(metrics); i++ { + if metrics[i] == emptyMetric { + continue + } + + // If connection is dead - just return metrics to outputChan + if connectionAlive == false { + outputChan <- metrics[i] + returned++ + continue + } + + metricString := fmt.Sprintf( + "%s%s %s %d %s\n", + metrics[i].Prefix, + metrics[i].Path, + metrics[i].Value, + metrics[i].Timestamp, + metrics[i].Tenant, + ) + + if compress == true { + dataLength, err = compressedBuf.Write([]byte(metricString)) + if err != nil { + slog.WithFields(log.Fields{"error": err}).Error("Compression buffer write error.") + } else { + buffered++ + } + } else { + dataLength, err = buf.Write([]byte(metricString)) + slog.WithFields(log.Fields{ + "bytes": dataLength, + "metric": metricString, + "number": i, + }).Debug("Metric buffered.") + buffered++ + } + } + + compressedBuf.Close() + + // Now the time to dump all buffer into connection + packDataLength, err := buf.WriteTo(connection) + + if err != nil { + slog.WithFields(log.Fields{"error": err}).Error("Connection write error.") + atomic.AddInt64(&state.SendError, 1) + + connectionAlive = false + + // Here we must return metric to out outputChan + for i := 0; i < len(metrics); i++ { + if metrics[i] == emptyMetric { + continue + } + + outputChan <- metrics[i] + returned += 1 + } + } else { + slog.WithFields(log.Fields{ + "bytes": packDataLength, + "number": buffered, + }).Debug("Metrics pack sent.") + + sent += buffered + + atomic.AddInt64(&state.OutBytes, int64(packDataLength)) + atomic.AddInt64(&state.Out, int64(buffered)) + } + + connection.Close() + atomic.AddInt64(&state.ConnectionAlive, -1) + slog.WithFields(log.Fields{ + "sent_bytes": packDataLength, + "sent_num": sent, + "returned_num": returned, + }).Info("Pack is finished.") +} + +func createConnection(host string, port int, TLS bool, ignoreCert bool) (net.Conn, error) { + netAddress := fmt.Sprintf("%s:%d", host, port) + slog.Debug("Connecting to Graphite.") + + timeout, _ := time.ParseDuration("10s") + dialer := &net.Dialer{Timeout: timeout} + + var connection net.Conn + var err error + + if TLS { + connection, err = tls.DialWithDialer( + dialer, + "tcp4", + netAddress, + &tls.Config{ + InsecureSkipVerify: ignoreCert, + MinVersion: tls.VersionTLS12, + MaxVersion: tls.VersionTLS12, + DynamicRecordSizingDisabled: false, + }, + ) + } else { + connection, err = dialer.Dial("tcp4", netAddress) + } + + if err != nil { + slog.WithFields(log.Fields{"error": err}).Error("Dialer error.") + } else { + connection.SetDeadline(time.Now().Add(600 * time.Second)) + + slog.WithFields(log.Fields{ + "remoteAddress": connection.RemoteAddr(), + }).Info("Connection to Graphite established.") + } + + return connection, err +} diff --git a/state.go b/state.go new file mode 100644 index 0000000..e2b0b3c --- /dev/null +++ b/state.go @@ -0,0 +1,116 @@ +package main + +import ( + "encoding/json" + "fmt" + log "github.com/sirupsen/logrus" + "net/http" + "reflect" + "strings" + "sync/atomic" + "time" + + "github.com/gorilla/mux" + // "github.com/pkg/profile" +) + +type State struct { + Version string `json:"version"` + In int64 `json:"in"` + Bad int64 `json:"bad"` + Transformed int64 `json:"transformed"` + Out int64 `json:"out"` + OutBytes int64 `json:"out_bytes"` + ReadError int64 `json:"read_error"` + SendError int64 `json:"send_error"` + InMpm int64 `json:"in_mpm"` + BadMpm int64 `json:"bad_mpm"` + TransformedMpm int64 `json:"transformed_mpm"` + OutMpm int64 `json:"out_mpm"` + OutBpm int64 `json:"out_bpm"` + Connection int64 `json:"connection"` + ConnectionAlive int64 `json:"connection_alive"` + ConnectionError int64 `json:"connection_error"` + TransformQueue int64 `json:"transform_queue"` + OutQueue int64 `json:"out_queue"` + Queue int64 `json:"queue"` + NegativeQueueError int64 `json:"negative_queue_error"` + PacksOverflewError int64 `json:"packs_overflew_error"` +} + +func runRouter(address string, port int) { + stlog = clog.WithFields(log.Fields{ + "address": address, + "port": port, + "thread": "stats", + }) + netAddress := fmt.Sprintf("%s:%d", address, port) + stlog.Info("Starting Stats server.") + + // Create HTTP router + router := mux.NewRouter() + router.HandleFunc("/stats", getState).Methods("GET") + + stlog.Fatal(http.ListenAndServe(netAddress, router)) +} + +func getState(w http.ResponseWriter, req *http.Request) { + json.NewEncoder(w).Encode(state) +} + +func updateQueue(sleepSeconds int) { + clog.WithFields(log.Fields{"sleepSeconds": sleepSeconds}).Info("Starting queues update loop.") + for { + time.Sleep(time.Duration(sleepSeconds) * time.Second) + + state.TransformQueue = atomic.LoadInt64(&state.In) - atomic.LoadInt64(&state.Transformed) + state.OutQueue = atomic.LoadInt64(&state.Transformed) - atomic.LoadInt64(&state.Out) + state.Queue = atomic.LoadInt64(&state.In) - atomic.LoadInt64(&state.Out) + + if state.Queue < 0 { + clog.WithFields(log.Fields{"queue": state.Queue}).Error("Queue value is negative.") + atomic.AddInt64(&state.NegativeQueueError, 1) + } + } +} + +func sendStateMetrics(instance string, hostname string, systemTenant string, systemPrefix string, inputChan chan *Metric) { + clog.Info("Sending state metrics.") + stateSnapshot := state + timestamp := time.Now().Unix() + + value := reflect.ValueOf(stateSnapshot) + for i := 0; i < value.NumField(); i++ { + structfield := value.Type().Field(i) + name := structfield.Name + value := value.Field(i) + tag := structfield.Tag.Get("json") + + metric := Metric{} + + if name == "Version" { + metric.Value = strings.Replace(value.String(), ".", "", 2) + } else { + metric.Value = fmt.Sprintf("%d", value.Int()) + } + + clog.WithFields(log.Fields{ + "name": name, + "tag": tag, + "value": metric.Value, + }).Debug("State field.") + + metric.Path = fmt.Sprintf("%s.groxy.%s.state.%s", hostname, instance, tag) + metric.Timestamp = timestamp + metric.Tenant = systemTenant + metric.Prefix = systemPrefix + + if metric.Prefix != "" { + metric.Prefix = metric.Prefix + "." + } + + // Pass to transformation + inputChan <- &metric + atomic.AddInt64(&state.In, 1) + } +} diff --git a/transformer.go b/transformer.go new file mode 100644 index 0000000..010bd68 --- /dev/null +++ b/transformer.go @@ -0,0 +1,65 @@ +package main + +import ( + log "github.com/sirupsen/logrus" + "strings" + "sync/atomic" + "time" + // "github.com/pkg/profile" +) + +func runTransformer( + inputChan chan *Metric, + outputChan chan *Metric, + tenant string, + prefix string, + immutablePrefix []string) { + + tlog = clog.WithFields(log.Fields{ + "thread": "transformer", + "tenant": tenant, + "prefix": prefix, + "immutablePrefix": immutablePrefix, + }) + tlog.Info("Starting Transformer.") + + for { + select { + case metric := <-inputChan: + go transformMetric(metric, outputChan, tenant, prefix, immutablePrefix) + default: + time.Sleep(100 * time.Millisecond) + } + } +} + +func transformMetric( + metric *Metric, + outputChan chan *Metric, + tenant string, + prefix string, + immutablePrefix []string) { + + // Set tenant only if it is empty + if metric.Tenant == "" { + metric.Tenant = tenant + } + + if metric.Prefix == "" && prefix != "" { + mutate := true + for i := range immutablePrefix { + if strings.HasPrefix(metric.Path, immutablePrefix[i]) == true { + mutate = false + } + } + if mutate { + metric.Prefix = prefix + "." + } + } + + tlog.WithFields(log.Fields{"metric": metric}).Debug("Metric transformed.") + outputChan <- metric + + // Update state + atomic.AddInt64(&state.Transformed, 1) +}