diff --git a/CHANGELOG.md b/CHANGELOG.md index a82a4ad..83fe5a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). -## [2.0.1] +## [2.1.0] - 2022-01-30 +### Added +- Mutual TLS support. +- New parameter **-forceTenant** that allows to revrite metric tenant if it is already set. + +## [2.0.1] - 2022-01-08 ### Fixed - `state` do not add dot to empty prefix at own metrics. diff --git a/main.go b/main.go index 2f44763..227ce52 100644 --- a/main.go +++ b/main.go @@ -10,7 +10,7 @@ import ( // "github.com/pkg/profile" ) -var version string = "2.0.1" +var version string = "2.1.0" var clog, slog, rlog, tlog, stlog *log.Entry @@ -53,6 +53,7 @@ func main() { var instance string var tenant string + var forceTenant bool var prefix string var immutablePrefix arrayFlags var graphiteAddress string @@ -63,8 +64,14 @@ func main() { var port int var tlsOutput bool var tlsInput bool + var mtlsOutput bool + var mtlsInput bool + var tlsInputCaCert string var tlsInputCert string var tlsInputKey string + var tlsOutputCaCert string + var tlsOutputCert string + var tlsOutputKey string var ignoreCert bool var jsonLog bool var debug bool @@ -78,6 +85,7 @@ func main() { flag.StringVar(&instance, "instance", "default", "Groxy instance name (for log and metrics)") flag.StringVar(&tenant, "tenant", "", "Graphite project name to store metrics in") + flag.BoolVar(&forceTenant, "forceTenant", false, "Overwrite metrics tenant even if it is already set") flag.StringVar(&prefix, "prefix", "", "Prefix to add to any metric") flag.Var(&immutablePrefix, "immutablePrefix", "Do not add prefix to metrics start with. Could be set many times") flag.StringVar(&graphiteAddress, "graphiteAddress", "", "Graphite server DNS name") @@ -88,8 +96,14 @@ func main() { flag.IntVar(&port, "port", 2003, "Proxy bind port") flag.BoolVar(&tlsOutput, "tlsOutput", false, "Send metrics via TLS encrypted connection") flag.BoolVar(&tlsInput, "tlsInput", false, "Receive metrics via TLS encrypted connection") + flag.BoolVar(&mtlsOutput, "mtlsOutput", false, "Send metrics via mutual TLS encrypted connection") + flag.BoolVar(&mtlsInput, "mtlsInput", false, "Receive metrics via mutual TLS encrypted connection") + flag.StringVar(&tlsInputCaCert, "tlsInputCaCert", "ca.crt", "TLS CA certificate for receiver") flag.StringVar(&tlsInputCert, "tlsInputCert", "groxy.crt", "TLS certificate for receiver") flag.StringVar(&tlsInputKey, "tlsInputKey", "groxy.key", "TLS key for receiver") + flag.StringVar(&tlsOutputCaCert, "tlsOutputCaCert", "ca.crt", "TLS CA certificate for sender") + flag.StringVar(&tlsOutputCert, "tlsOutputCert", "groxy.crt", "TLS certificate for sender") + flag.StringVar(&tlsOutputKey, "tlsOutputKey", "groxy.key", "TLS key for sender") flag.BoolVar(&ignoreCert, "ignoreCert", false, "Do not verify Graphite server certificate") flag.BoolVar(&jsonLog, "jsonLog", false, "Log in JSON format") flag.BoolVar(&debug, "debug", false, "Log debug messages") @@ -152,9 +166,32 @@ func main() { inputChan := make(chan *Metric, 10000000) outputChan := make(chan *Metric, 10000000) - go runReceiver(address, port, inputChan, tlsInput, tlsInputCert, tlsInputKey, ignoreCert, compressedInput) - go runTransformer(inputChan, outputChan, tenant, prefix, immutablePrefix) - go runSender(graphiteAddress, graphitePort, outputChan, tlsOutput, ignoreCert, limitPerSec, compressedOutput) + go runReceiver( + address, + port, + inputChan, + tlsInput, + mtlsInput, + tlsInputCaCert, + tlsInputCert, + tlsInputKey, + ignoreCert, + compressedInput) + + go runSender( + graphiteAddress, + graphitePort, + outputChan, + tlsOutput, + mtlsOutput, + tlsOutputCaCert, + tlsOutputCert, + tlsOutputKey, + ignoreCert, + limitPerSec, + compressedOutput) + + go runTransformer(inputChan, outputChan, tenant, forceTenant, prefix, immutablePrefix) go runRouter(statsAddress, statsPort) go updateQueue(1) diff --git a/receiver.go b/receiver.go index 410418d..07ce84b 100644 --- a/receiver.go +++ b/receiver.go @@ -4,9 +4,11 @@ import ( "bufio" "compress/zlib" "crypto/tls" + "crypto/x509" "fmt" log "github.com/sirupsen/logrus" "io" + "io/ioutil" "net" "strconv" "strings" @@ -20,6 +22,8 @@ func runReceiver( port int, inputChan chan *Metric, TLS bool, + mutualTLS bool, + tlsCaCert string, tlsCert string, tlsKey string, ignoreCert bool, @@ -28,6 +32,7 @@ func runReceiver( "address": address, "port": port, "tls": TLS, + "mutualTLS": mutualTLS, "ignoreCert": ignoreCert, "compress": compress, "thread": "receiver", @@ -39,19 +44,41 @@ func runReceiver( var listener net.Listener var err error - if TLS { - // pass + if TLS || mutualTLS { certs, err := tls.LoadX509KeyPair(tlsCert, tlsKey) if err != nil { - rlog.WithFields(log.Fields{"error": err}).Fatal("TLS Listener certificates error.") + rlog.WithFields(log.Fields{"error": err}).Fatal("TLS Listener server certificates error.") } - config := &tls.Config{ - Certificates: []tls.Certificate{certs}, - InsecureSkipVerify: ignoreCert, - MinVersion: tls.VersionTLS12, - MaxVersion: tls.VersionTLS12, - DynamicRecordSizingDisabled: false, + var config *tls.Config + + if mutualTLS { + caCert, err := ioutil.ReadFile(tlsCaCert) + if err != nil { + rlog.WithFields(log.Fields{"error": err}).Fatal("TLS Listener CA certificate error.") + } + + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + config = &tls.Config{ + ClientCAs: caCertPool, + ClientAuth: tls.RequireAndVerifyClientCert, + Certificates: []tls.Certificate{certs}, + MinVersion: tls.VersionTLS12, + MaxVersion: tls.VersionTLS12, + DynamicRecordSizingDisabled: false, + PreferServerCipherSuites: true, + } + } else { + config = &tls.Config{ + Certificates: []tls.Certificate{certs}, + InsecureSkipVerify: ignoreCert, + MinVersion: tls.VersionTLS12, + MaxVersion: tls.VersionTLS12, + DynamicRecordSizingDisabled: false, + PreferServerCipherSuites: true, + } } listener, err = tls.Listen("tcp4", netAddress, config) diff --git a/sender.go b/sender.go index f1117e9..3d6c3d1 100644 --- a/sender.go +++ b/sender.go @@ -4,8 +4,10 @@ import ( "bytes" "compress/zlib" "crypto/tls" + "crypto/x509" "fmt" log "github.com/sirupsen/logrus" + "io/ioutil" "net" "sync/atomic" "time" @@ -27,6 +29,10 @@ func runSender( port int, outputChan chan *Metric, TLS bool, + mutualTLS bool, + tlsCaCert string, + tlsCert string, + tlsKey string, ignoreCert bool, limitPerSec int, compress bool) { @@ -34,6 +40,7 @@ func runSender( "host": host, "port": port, "tls": TLS, + "mutualTLS": mutualTLS, "ignoreCert": ignoreCert, "compress": compress, "thread": "sender", @@ -48,7 +55,15 @@ func runSender( slog.WithFields(log.Fields{"limit": curLimit}).Debug("Current limit.") if curLimit > 0 { // Create output connection - connection, err := createConnection(host, port, TLS, ignoreCert) + connection, err := createConnection( + host, + port, + TLS, + mutualTLS, + tlsCaCert, + tlsCert, + tlsKey, + ignoreCert) if err != nil { slog.WithFields(log.Fields{"error": err}).Error("Can't create connection.") atomic.AddInt64(&state.ConnectionError, 1) @@ -85,6 +100,75 @@ func runSender( } } +func createConnection( + host string, + port int, + TLS bool, + mutualTLS bool, + tlsCaCert string, + tlsCert string, + tlsKey string, + ignoreCert bool) (net.Conn, error) { + netAddress := fmt.Sprintf("%s:%d", host, port) + slog.Debug("Connecting to Graphite.") + + timeout, _ := time.ParseDuration("10s") + dialer := &net.Dialer{Timeout: timeout} + + var connection net.Conn + var err error + + if TLS || mutualTLS { + var config *tls.Config + + if mutualTLS { + caCert, err := ioutil.ReadFile(tlsCaCert) + if err != nil { + rlog.WithFields(log.Fields{"error": err}).Fatal("TLS Sender CA certificate error.") + } + + certs, err := tls.LoadX509KeyPair(tlsCert, tlsKey) + if err != nil { + rlog.WithFields(log.Fields{"error": err}).Fatal("TLS Sender client certificates error.") + } + + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + config = &tls.Config{ + RootCAs: caCertPool, + Certificates: []tls.Certificate{certs}, + MinVersion: tls.VersionTLS12, + MaxVersion: tls.VersionTLS12, + DynamicRecordSizingDisabled: false, + } + } else { + config = &tls.Config{ + InsecureSkipVerify: ignoreCert, + MinVersion: tls.VersionTLS12, + MaxVersion: tls.VersionTLS12, + DynamicRecordSizingDisabled: false, + } + } + + connection, err = tls.DialWithDialer(dialer, "tcp4", netAddress, config) + } else { + connection, err = dialer.Dial("tcp4", netAddress) + } + + if err != nil { + slog.WithFields(log.Fields{"error": err}).Error("Dialer error.") + } else { + connection.SetDeadline(time.Now().Add(600 * time.Second)) + + slog.WithFields(log.Fields{ + "remoteAddress": connection.RemoteAddr(), + }).Info("Connection to Graphite established.") + } + + return connection, err +} + func sendMetric(metrics *[10000]*Metric, connection net.Conn, outputChan chan *Metric, compress bool) { buffered := 0 sent := 0 @@ -107,14 +191,25 @@ func sendMetric(metrics *[10000]*Metric, connection net.Conn, outputChan chan *M continue } - metricString := fmt.Sprintf( - "%s%s %s %d %s\n", - metrics[i].Prefix, - metrics[i].Path, - metrics[i].Value, - metrics[i].Timestamp, - metrics[i].Tenant, - ) + var metricString string + if metrics[i].Tenant == "" { + metricString = fmt.Sprintf( + "%s%s %s %d\n", + metrics[i].Prefix, + metrics[i].Path, + metrics[i].Value, + metrics[i].Timestamp, + ) + } else { + metricString = fmt.Sprintf( + "%s%s %s %d %s\n", + metrics[i].Prefix, + metrics[i].Path, + metrics[i].Value, + metrics[i].Timestamp, + metrics[i].Tenant, + ) + } if compress == true { dataLength, err = compressedBuf.Write([]byte(metricString)) @@ -174,42 +269,3 @@ func sendMetric(metrics *[10000]*Metric, connection net.Conn, outputChan chan *M "returned_num": returned, }).Info("Pack is finished.") } - -func createConnection(host string, port int, TLS bool, ignoreCert bool) (net.Conn, error) { - netAddress := fmt.Sprintf("%s:%d", host, port) - slog.Debug("Connecting to Graphite.") - - timeout, _ := time.ParseDuration("10s") - dialer := &net.Dialer{Timeout: timeout} - - var connection net.Conn - var err error - - if TLS { - connection, err = tls.DialWithDialer( - dialer, - "tcp4", - netAddress, - &tls.Config{ - InsecureSkipVerify: ignoreCert, - MinVersion: tls.VersionTLS12, - MaxVersion: tls.VersionTLS12, - DynamicRecordSizingDisabled: false, - }, - ) - } else { - connection, err = dialer.Dial("tcp4", netAddress) - } - - if err != nil { - slog.WithFields(log.Fields{"error": err}).Error("Dialer error.") - } else { - connection.SetDeadline(time.Now().Add(600 * time.Second)) - - slog.WithFields(log.Fields{ - "remoteAddress": connection.RemoteAddr(), - }).Info("Connection to Graphite established.") - } - - return connection, err -} diff --git a/transformer.go b/transformer.go index 010bd68..d2d628b 100644 --- a/transformer.go +++ b/transformer.go @@ -12,6 +12,7 @@ func runTransformer( inputChan chan *Metric, outputChan chan *Metric, tenant string, + forceTenant bool, prefix string, immutablePrefix []string) { @@ -26,7 +27,7 @@ func runTransformer( for { select { case metric := <-inputChan: - go transformMetric(metric, outputChan, tenant, prefix, immutablePrefix) + go transformMetric(metric, outputChan, tenant, forceTenant, prefix, immutablePrefix) default: time.Sleep(100 * time.Millisecond) } @@ -37,12 +38,17 @@ func transformMetric( metric *Metric, outputChan chan *Metric, tenant string, + forceTenant bool, prefix string, immutablePrefix []string) { // Set tenant only if it is empty - if metric.Tenant == "" { + if forceTenant == true { metric.Tenant = tenant + } else { + if metric.Tenant == "" { + metric.Tenant = tenant + } } if metric.Prefix == "" && prefix != "" {