Skip to content

Commit

Permalink
merge multisend branch into main
Browse files Browse the repository at this point in the history
  • Loading branch information
nixargh committed Oct 4, 2022
1 parent 0a7fb1b commit 31b9cf5
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 114 deletions.
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,21 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [3.1.0] - 2022-10-04
### Fixed
- `main.go` Exit code after showing a version.

### Changed
- `main.go` Catch both *SIGINT* and *SIGTERM* and exit with **0** code.

## [3.0.0] - 2022-09-30
### Changed
- **-graphiteAddress** takes `hostname:port` pair.
- Multiple senders allowed by repeating **-graphiteAddress** flag.

### Removed
- **-graphitePort** flag.

## [2.2.0] - 2022-06-07
### Changed
- Init as go module.
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Graphite Proxy in Golang

## Features
- Send metrics to multiple destinations (only host and port may differ but not TLS options).
- TLS for input & output connections.
- Add configurable tenant.
- Add configurable prefix.
Expand All @@ -22,7 +23,7 @@ Get help

Run with some options

`./groxy -port 2004 -graphiteAddress localhost -tenant techops -prefix "groxy" -immutablePrefix "test." -TLS -ignoreCert true`
`./groxy -port 2004 -graphiteAddress localhost:2003 -tenant techops -prefix "groxy" -immutablePrefix "test." -TLS -ignoreCert true`

## Test
Send metric like this
Expand Down
133 changes: 96 additions & 37 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,25 @@ import (
"flag"
"fmt"
"os"
"os/signal"
"strconv"
"strings"
"sync/atomic"
"syscall"
"time"

log "github.com/sirupsen/logrus"
// "github.com/pkg/profile"
)

var version string = "2.2.0"
var version string = "3.1.0"

var clog, slog, rlog, tlog, stlog *log.Entry
var clog, rlog, tlog, stlog *log.Entry

var instance string
var hostname string
var systemTenant string
var systemPrefix string

type Metric struct {
Prefix string `json:"prefix,omitempty"`
Expand Down Expand Up @@ -53,13 +61,11 @@ func getHostname() string {
func main() {
// defer profile.Start().Stop()

var instance string
var tenant string
var forceTenant bool
var prefix string
var immutablePrefix arrayFlags
var graphiteAddress string
var graphitePort int
var graphiteAddress arrayFlags
var statsAddress string
var statsPort int
var address string
Expand All @@ -79,9 +85,6 @@ func main() {
var debug bool
var logCaller bool
var limitPerSec int
var systemTenant string
var systemPrefix string
var hostname string
var compressedOutput bool
var compressedInput bool
var showVersion bool
Expand All @@ -91,8 +94,7 @@ func main() {
flag.BoolVar(&forceTenant, "forceTenant", false, "Overwrite metrics tenant even if it is already set")
flag.StringVar(&prefix, "prefix", "", "Prefix to add to any metric")
flag.Var(&immutablePrefix, "immutablePrefix", "Do not add prefix to metrics start with. Could be set many times")
flag.StringVar(&graphiteAddress, "graphiteAddress", "", "Graphite server DNS name")
flag.IntVar(&graphitePort, "graphitePort", 2003, "Graphite server TCP port")
flag.Var(&graphiteAddress, "graphiteAddress", "Graphite server DNS name : Graphite server TCP port")
flag.StringVar(&statsAddress, "statsAddress", "127.0.0.1", "Proxy stats bind address")
flag.IntVar(&statsPort, "statsPort", 3003, "Proxy stats port")
flag.StringVar(&address, "address", "127.0.0.1", "Proxy bind address")
Expand Down Expand Up @@ -126,7 +128,7 @@ func main() {

if showVersion {
fmt.Println(version)
os.Exit(1)
os.Exit(0)
}

if jsonLog == true {
Expand Down Expand Up @@ -156,7 +158,7 @@ func main() {
clog.Info("Groxy rocks!")

// Validate variables
if graphiteAddress == "" {
if graphiteAddress == nil {
clog.Fatal("You must set '-graphiteAddress'.")
}

Expand All @@ -173,7 +175,6 @@ func main() {
}

inputChan := make(chan *Metric, 10000000)
outputChan := make(chan *Metric, 10000000)

go runReceiver(
address,
Expand All @@ -187,42 +188,100 @@ func main() {
ignoreCert,
compressedInput)

go runSender(
graphiteAddress,
graphitePort,
outputChan,
tlsOutput,
mtlsOutput,
tlsOutputCaCert,
tlsOutputCert,
tlsOutputKey,
ignoreCert,
limitPerSec,
compressedOutput)
var outputChans []chan *Metric
for id := range graphiteAddress {
outputChan := make(chan *Metric, 10000000)
outputChans = append(outputChans, outputChan)

ga := strings.Split(graphiteAddress[id], ":")
host := ga[0]
port, err := strconv.Atoi(ga[1])
if err != nil {
clog.Fatal("Can't get integer port from '-graphiteAddress'.")
}

// Init sender counters that are slices
state.Destination = append(state.Destination, graphiteAddress[id])
state.SendError = append(state.SendError, 0)
state.Out = append(state.Out, 0)
state.OutBytes = append(state.OutBytes, 0)
state.OutMpm = append(state.OutMpm, 0)
state.OutBpm = append(state.OutBpm, 0)
state.Returned = append(state.Returned, 0)
state.OutQueue = append(state.OutQueue, 0)
state.Queue = append(state.Queue, 0)
state.NegativeQueueError = append(state.NegativeQueueError, 0)
state.ConnectionError = append(state.ConnectionError, 0)
state.Connection = append(state.Connection, 0)
state.ConnectionAlive = append(state.ConnectionAlive, 0)
state.PacksOverflewError = append(state.PacksOverflewError, 0)

go runSender(
id,
host,
port,
outputChan,
tlsOutput,
mtlsOutput,
tlsOutputCaCert,
tlsOutputCert,
tlsOutputKey,
ignoreCert,
limitPerSec,
compressedOutput)

}

go runTransformer(inputChan, outputChan, tenant, forceTenant, prefix, immutablePrefix)
go runTransformer(inputChan, outputChans, tenant, forceTenant, prefix, immutablePrefix)
go runRouter(statsAddress, statsPort)
go updateQueue(1)
go waitForDeath()

sleepSeconds := 60
clog.WithFields(log.Fields{"sleepSeconds": sleepSeconds}).Info("Starting a waiting loop.")
for {
// Get initial values
in := atomic.LoadInt64(&state.In)
out := atomic.LoadInt64(&state.Out)
transformed := atomic.LoadInt64(&state.Transformed)
bad := atomic.LoadInt64(&state.Bad)
out_bytes := atomic.LoadInt64(&state.OutBytes)
transformed := atomic.LoadInt64(&state.Transformed)
var out, out_bytes []int64

// For multiple senders
for id := range graphiteAddress {
out = append(out, atomic.LoadInt64(&state.Out[id]))
out_bytes = append(out_bytes, atomic.LoadInt64(&state.OutBytes[id]))
}

// Sleep for a minute
time.Sleep(time.Duration(sleepSeconds) * time.Second)

stlog.WithFields(log.Fields{"graphiteAddress": graphiteAddress}).Info("Updating per-minute metrics.")
// Calculate MPMs
state.InMpm = atomic.LoadInt64(&state.In) - in
state.OutMpm = atomic.LoadInt64(&state.Out) - out
state.TransformedMpm = atomic.LoadInt64(&state.Transformed) - transformed
state.BadMpm = atomic.LoadInt64(&state.Bad) - bad
state.OutBpm = atomic.LoadInt64(&state.OutBytes) - out_bytes

clog.WithFields(log.Fields{"state": state}).Info("Dumping state.")
sendStateMetrics(instance, hostname, systemTenant, systemPrefix, inputChan)
atomic.StoreInt64(&state.InMpm, atomic.LoadInt64(&state.In)-in)
atomic.StoreInt64(&state.BadMpm, atomic.LoadInt64(&state.Bad)-bad)
atomic.StoreInt64(&state.TransformedMpm, atomic.LoadInt64(&state.Transformed)-transformed)

// For multiple senders
for id := range graphiteAddress {
atomic.StoreInt64(&state.OutMpm[id], atomic.LoadInt64(&state.Out[id])-out[id])
atomic.StoreInt64(&state.OutBpm[id], atomic.LoadInt64(&state.OutBytes[id])-out_bytes[id])
}

// Send State metrics
sendStateMetrics(inputChan)
}
}

func waitForDeath() {
clog.Info("Starting Wait For Death loop.")
cancelChan := make(chan os.Signal, 1)
signal.Notify(cancelChan, syscall.SIGTERM, syscall.SIGINT)

for {
time.Sleep(time.Duration(1) * time.Second)

sig := <-cancelChan
clog.WithFields(log.Fields{"signal": sig}).Info("Caught signal. Terminating.")
os.Exit(0)
}
}
41 changes: 25 additions & 16 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
log "github.com/sirupsen/logrus"
"io/ioutil"
"net"
"sync/atomic"
"time"

log "github.com/sirupsen/logrus"
// "github.com/pkg/profile"
)

func limitRefresher(limitPerSec *int) {
func limitRefresher(limitPerSec *int, slog *log.Entry) {
savedLimit := *limitPerSec
slog.WithFields(log.Fields{"limitPerSec": savedLimit}).Info("Starting Packs Limit Refresher.")

Expand All @@ -25,6 +26,7 @@ func limitRefresher(limitPerSec *int) {
}

func runSender(
id int,
host string,
port int,
outputChan chan *Metric,
Expand All @@ -36,6 +38,8 @@ func runSender(
ignoreCert bool,
limitPerSec int,
compress bool) {

var slog *log.Entry
slog = clog.WithFields(log.Fields{
"host": host,
"port": port,
Expand All @@ -44,11 +48,12 @@ func runSender(
"ignoreCert": ignoreCert,
"compress": compress,
"thread": "sender",
"id": id,
})
slog.Info("Starting Sender.")

// Start limit refresher thread
go limitRefresher(&limitPerSec)
go limitRefresher(&limitPerSec, slog)

for {
curLimit := limitPerSec
Expand All @@ -63,15 +68,16 @@ func runSender(
tlsCaCert,
tlsCert,
tlsKey,
ignoreCert)
ignoreCert,
slog)
if err != nil {
slog.WithFields(log.Fields{"error": err}).Error("Can't create connection.")
atomic.AddInt64(&state.ConnectionError, 1)
atomic.AddInt64(&state.ConnectionError[id], 1)
time.Sleep(5 * time.Second)
continue
} else {
atomic.AddInt64(&state.Connection, 1)
atomic.AddInt64(&state.ConnectionAlive, 1)
atomic.AddInt64(&state.Connection[id], 1)
atomic.AddInt64(&state.ConnectionAlive[id], 1)
}

// collect a pack of metrics
Expand All @@ -89,12 +95,12 @@ func runSender(

// send the pack
if len(metrics) > 0 {
go sendMetric(&metrics, connection, outputChan, compress)
go sendMetric(&metrics, connection, outputChan, compress, slog, id)
limitPerSec--
}
} else {
slog.Warning("Limit of metric packs per second overflew.")
atomic.AddInt64(&state.PacksOverflewError, 1)
atomic.AddInt64(&state.PacksOverflewError[id], 1)
time.Sleep(1 * time.Second)
}
}
Expand All @@ -108,7 +114,8 @@ func createConnection(
tlsCaCert string,
tlsCert string,
tlsKey string,
ignoreCert bool) (net.Conn, error) {
ignoreCert bool,
slog *log.Entry) (net.Conn, error) {
netAddress := fmt.Sprintf("%s:%d", host, port)
slog.Debug("Connecting to Graphite.")

Expand Down Expand Up @@ -169,7 +176,7 @@ func createConnection(
return connection, err
}

func sendMetric(metrics *[10000]*Metric, connection net.Conn, outputChan chan *Metric, compress bool) {
func sendMetric(metrics *[10000]*Metric, connection net.Conn, outputChan chan *Metric, compress bool, slog *log.Entry, id int) {
buffered := 0
sent := 0
returned := 0
Expand Down Expand Up @@ -236,7 +243,7 @@ func sendMetric(metrics *[10000]*Metric, connection net.Conn, outputChan chan *M

if err != nil {
slog.WithFields(log.Fields{"error": err}).Error("Connection write error.")
atomic.AddInt64(&state.SendError, 1)
atomic.AddInt64(&state.SendError[id], 1)

connectionAlive = false

Expand All @@ -256,16 +263,18 @@ func sendMetric(metrics *[10000]*Metric, connection net.Conn, outputChan chan *M
}).Debug("Metrics pack sent.")

sent += buffered

atomic.AddInt64(&state.OutBytes, int64(packDataLength))
atomic.AddInt64(&state.Out, int64(buffered))
}

connection.Close()
atomic.AddInt64(&state.ConnectionAlive, -1)
atomic.AddInt64(&state.ConnectionAlive[id], -1)
slog.WithFields(log.Fields{
"sent_bytes": packDataLength,
"sent_num": sent,
"returned_num": returned,
}).Info("Pack is finished.")

// Increment per sender counters
atomic.AddInt64(&state.OutBytes[id], int64(packDataLength))
atomic.AddInt64(&state.Out[id], int64(sent))
atomic.AddInt64(&state.Returned[id], int64(returned))
}
Loading

0 comments on commit 31b9cf5

Please sign in to comment.