Skip to content

Commit

Permalink
v2.1.0 Mutual TLS support
Browse files Browse the repository at this point in the history
  • Loading branch information
nixargh committed Jan 30, 2022
1 parent 5ff1a35 commit 53e250f
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 64 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [2.0.1]
## [2.1.0] - 2022-01-30
### Added
- Mutual TLS support.
- New parameter **-forceTenant** that allows to revrite metric tenant if it is already set.

## [2.0.1] - 2022-01-08
### Fixed
- `state` do not add dot to empty prefix at own metrics.

Expand Down
45 changes: 41 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
// "github.com/pkg/profile"
)

var version string = "2.0.1"
var version string = "2.1.0"

var clog, slog, rlog, tlog, stlog *log.Entry

Expand Down Expand Up @@ -53,6 +53,7 @@ func main() {

var instance string
var tenant string
var forceTenant bool
var prefix string
var immutablePrefix arrayFlags
var graphiteAddress string
Expand All @@ -63,8 +64,14 @@ func main() {
var port int
var tlsOutput bool
var tlsInput bool
var mtlsOutput bool
var mtlsInput bool
var tlsInputCaCert string
var tlsInputCert string
var tlsInputKey string
var tlsOutputCaCert string
var tlsOutputCert string
var tlsOutputKey string
var ignoreCert bool
var jsonLog bool
var debug bool
Expand All @@ -78,6 +85,7 @@ func main() {

flag.StringVar(&instance, "instance", "default", "Groxy instance name (for log and metrics)")
flag.StringVar(&tenant, "tenant", "", "Graphite project name to store metrics in")
flag.BoolVar(&forceTenant, "forceTenant", false, "Overwrite metrics tenant even if it is already set")
flag.StringVar(&prefix, "prefix", "", "Prefix to add to any metric")
flag.Var(&immutablePrefix, "immutablePrefix", "Do not add prefix to metrics start with. Could be set many times")
flag.StringVar(&graphiteAddress, "graphiteAddress", "", "Graphite server DNS name")
Expand All @@ -88,8 +96,14 @@ func main() {
flag.IntVar(&port, "port", 2003, "Proxy bind port")
flag.BoolVar(&tlsOutput, "tlsOutput", false, "Send metrics via TLS encrypted connection")
flag.BoolVar(&tlsInput, "tlsInput", false, "Receive metrics via TLS encrypted connection")
flag.BoolVar(&mtlsOutput, "mtlsOutput", false, "Send metrics via mutual TLS encrypted connection")
flag.BoolVar(&mtlsInput, "mtlsInput", false, "Receive metrics via mutual TLS encrypted connection")
flag.StringVar(&tlsInputCaCert, "tlsInputCaCert", "ca.crt", "TLS CA certificate for receiver")
flag.StringVar(&tlsInputCert, "tlsInputCert", "groxy.crt", "TLS certificate for receiver")
flag.StringVar(&tlsInputKey, "tlsInputKey", "groxy.key", "TLS key for receiver")
flag.StringVar(&tlsOutputCaCert, "tlsOutputCaCert", "ca.crt", "TLS CA certificate for sender")
flag.StringVar(&tlsOutputCert, "tlsOutputCert", "groxy.crt", "TLS certificate for sender")
flag.StringVar(&tlsOutputKey, "tlsOutputKey", "groxy.key", "TLS key for sender")
flag.BoolVar(&ignoreCert, "ignoreCert", false, "Do not verify Graphite server certificate")
flag.BoolVar(&jsonLog, "jsonLog", false, "Log in JSON format")
flag.BoolVar(&debug, "debug", false, "Log debug messages")
Expand Down Expand Up @@ -152,9 +166,32 @@ func main() {
inputChan := make(chan *Metric, 10000000)
outputChan := make(chan *Metric, 10000000)

go runReceiver(address, port, inputChan, tlsInput, tlsInputCert, tlsInputKey, ignoreCert, compressedInput)
go runTransformer(inputChan, outputChan, tenant, prefix, immutablePrefix)
go runSender(graphiteAddress, graphitePort, outputChan, tlsOutput, ignoreCert, limitPerSec, compressedOutput)
go runReceiver(
address,
port,
inputChan,
tlsInput,
mtlsInput,
tlsInputCaCert,
tlsInputCert,
tlsInputKey,
ignoreCert,
compressedInput)

go runSender(
graphiteAddress,
graphitePort,
outputChan,
tlsOutput,
mtlsOutput,
tlsOutputCaCert,
tlsOutputCert,
tlsOutputKey,
ignoreCert,
limitPerSec,
compressedOutput)

go runTransformer(inputChan, outputChan, tenant, forceTenant, prefix, immutablePrefix)
go runRouter(statsAddress, statsPort)
go updateQueue(1)

Expand Down
45 changes: 36 additions & 9 deletions receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"bufio"
"compress/zlib"
"crypto/tls"
"crypto/x509"
"fmt"
log "github.com/sirupsen/logrus"
"io"
"io/ioutil"
"net"
"strconv"
"strings"
Expand All @@ -20,6 +22,8 @@ func runReceiver(
port int,
inputChan chan *Metric,
TLS bool,
mutualTLS bool,
tlsCaCert string,
tlsCert string,
tlsKey string,
ignoreCert bool,
Expand All @@ -28,6 +32,7 @@ func runReceiver(
"address": address,
"port": port,
"tls": TLS,
"mutualTLS": mutualTLS,
"ignoreCert": ignoreCert,
"compress": compress,
"thread": "receiver",
Expand All @@ -39,19 +44,41 @@ func runReceiver(
var listener net.Listener
var err error

if TLS {
// pass
if TLS || mutualTLS {
certs, err := tls.LoadX509KeyPair(tlsCert, tlsKey)
if err != nil {
rlog.WithFields(log.Fields{"error": err}).Fatal("TLS Listener certificates error.")
rlog.WithFields(log.Fields{"error": err}).Fatal("TLS Listener server certificates error.")
}

config := &tls.Config{
Certificates: []tls.Certificate{certs},
InsecureSkipVerify: ignoreCert,
MinVersion: tls.VersionTLS12,
MaxVersion: tls.VersionTLS12,
DynamicRecordSizingDisabled: false,
var config *tls.Config

if mutualTLS {
caCert, err := ioutil.ReadFile(tlsCaCert)
if err != nil {
rlog.WithFields(log.Fields{"error": err}).Fatal("TLS Listener CA certificate error.")
}

caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

config = &tls.Config{
ClientCAs: caCertPool,
ClientAuth: tls.RequireAndVerifyClientCert,
Certificates: []tls.Certificate{certs},
MinVersion: tls.VersionTLS12,
MaxVersion: tls.VersionTLS12,
DynamicRecordSizingDisabled: false,
PreferServerCipherSuites: true,
}
} else {
config = &tls.Config{
Certificates: []tls.Certificate{certs},
InsecureSkipVerify: ignoreCert,
MinVersion: tls.VersionTLS12,
MaxVersion: tls.VersionTLS12,
DynamicRecordSizingDisabled: false,
PreferServerCipherSuites: true,
}
}

listener, err = tls.Listen("tcp4", netAddress, config)
Expand Down
152 changes: 104 additions & 48 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"bytes"
"compress/zlib"
"crypto/tls"
"crypto/x509"
"fmt"
log "github.com/sirupsen/logrus"
"io/ioutil"
"net"
"sync/atomic"
"time"
Expand All @@ -27,13 +29,18 @@ func runSender(
port int,
outputChan chan *Metric,
TLS bool,
mutualTLS bool,
tlsCaCert string,
tlsCert string,
tlsKey string,
ignoreCert bool,
limitPerSec int,
compress bool) {
slog = clog.WithFields(log.Fields{
"host": host,
"port": port,
"tls": TLS,
"mutualTLS": mutualTLS,
"ignoreCert": ignoreCert,
"compress": compress,
"thread": "sender",
Expand All @@ -48,7 +55,15 @@ func runSender(
slog.WithFields(log.Fields{"limit": curLimit}).Debug("Current limit.")
if curLimit > 0 {
// Create output connection
connection, err := createConnection(host, port, TLS, ignoreCert)
connection, err := createConnection(
host,
port,
TLS,
mutualTLS,
tlsCaCert,
tlsCert,
tlsKey,
ignoreCert)
if err != nil {
slog.WithFields(log.Fields{"error": err}).Error("Can't create connection.")
atomic.AddInt64(&state.ConnectionError, 1)
Expand Down Expand Up @@ -85,6 +100,75 @@ func runSender(
}
}

func createConnection(
host string,
port int,
TLS bool,
mutualTLS bool,
tlsCaCert string,
tlsCert string,
tlsKey string,
ignoreCert bool) (net.Conn, error) {
netAddress := fmt.Sprintf("%s:%d", host, port)
slog.Debug("Connecting to Graphite.")

timeout, _ := time.ParseDuration("10s")
dialer := &net.Dialer{Timeout: timeout}

var connection net.Conn
var err error

if TLS || mutualTLS {
var config *tls.Config

if mutualTLS {
caCert, err := ioutil.ReadFile(tlsCaCert)
if err != nil {
rlog.WithFields(log.Fields{"error": err}).Fatal("TLS Sender CA certificate error.")
}

certs, err := tls.LoadX509KeyPair(tlsCert, tlsKey)
if err != nil {
rlog.WithFields(log.Fields{"error": err}).Fatal("TLS Sender client certificates error.")
}

caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)

config = &tls.Config{
RootCAs: caCertPool,
Certificates: []tls.Certificate{certs},
MinVersion: tls.VersionTLS12,
MaxVersion: tls.VersionTLS12,
DynamicRecordSizingDisabled: false,
}
} else {
config = &tls.Config{
InsecureSkipVerify: ignoreCert,
MinVersion: tls.VersionTLS12,
MaxVersion: tls.VersionTLS12,
DynamicRecordSizingDisabled: false,
}
}

connection, err = tls.DialWithDialer(dialer, "tcp4", netAddress, config)
} else {
connection, err = dialer.Dial("tcp4", netAddress)
}

if err != nil {
slog.WithFields(log.Fields{"error": err}).Error("Dialer error.")
} else {
connection.SetDeadline(time.Now().Add(600 * time.Second))

slog.WithFields(log.Fields{
"remoteAddress": connection.RemoteAddr(),
}).Info("Connection to Graphite established.")
}

return connection, err
}

func sendMetric(metrics *[10000]*Metric, connection net.Conn, outputChan chan *Metric, compress bool) {
buffered := 0
sent := 0
Expand All @@ -107,14 +191,25 @@ func sendMetric(metrics *[10000]*Metric, connection net.Conn, outputChan chan *M
continue
}

metricString := fmt.Sprintf(
"%s%s %s %d %s\n",
metrics[i].Prefix,
metrics[i].Path,
metrics[i].Value,
metrics[i].Timestamp,
metrics[i].Tenant,
)
var metricString string
if metrics[i].Tenant == "" {
metricString = fmt.Sprintf(
"%s%s %s %d\n",
metrics[i].Prefix,
metrics[i].Path,
metrics[i].Value,
metrics[i].Timestamp,
)
} else {
metricString = fmt.Sprintf(
"%s%s %s %d %s\n",
metrics[i].Prefix,
metrics[i].Path,
metrics[i].Value,
metrics[i].Timestamp,
metrics[i].Tenant,
)
}

if compress == true {
dataLength, err = compressedBuf.Write([]byte(metricString))
Expand Down Expand Up @@ -174,42 +269,3 @@ func sendMetric(metrics *[10000]*Metric, connection net.Conn, outputChan chan *M
"returned_num": returned,
}).Info("Pack is finished.")
}

func createConnection(host string, port int, TLS bool, ignoreCert bool) (net.Conn, error) {
netAddress := fmt.Sprintf("%s:%d", host, port)
slog.Debug("Connecting to Graphite.")

timeout, _ := time.ParseDuration("10s")
dialer := &net.Dialer{Timeout: timeout}

var connection net.Conn
var err error

if TLS {
connection, err = tls.DialWithDialer(
dialer,
"tcp4",
netAddress,
&tls.Config{
InsecureSkipVerify: ignoreCert,
MinVersion: tls.VersionTLS12,
MaxVersion: tls.VersionTLS12,
DynamicRecordSizingDisabled: false,
},
)
} else {
connection, err = dialer.Dial("tcp4", netAddress)
}

if err != nil {
slog.WithFields(log.Fields{"error": err}).Error("Dialer error.")
} else {
connection.SetDeadline(time.Now().Add(600 * time.Second))

slog.WithFields(log.Fields{
"remoteAddress": connection.RemoteAddr(),
}).Info("Connection to Graphite established.")
}

return connection, err
}
Loading

0 comments on commit 53e250f

Please sign in to comment.