Skip to content

Commit

Permalink
v1.0.0
Browse files Browse the repository at this point in the history
- New argument '-limitPerSec' that limits number of metrics packs send for a minute
- New metric 'PacksOverflewError' exposed as 'packs_overflew_error' to know about packs limit reached
- Huge memory optimization (~6-10 times less)
  • Loading branch information
nixargh committed Feb 17, 2021
1 parent 0db2ba5 commit 8fedfa7
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 42 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [1.0.0] - 2021-02-17
### Added
- **limitPerSec** new argument to limit the number of metric packs sent per second. Default is 10 packs that is equals to 10x1000=10000 metrics per second or 600000 mpm.
- New metric **PacksOverflewError** exported as **packs_overflew_error**.

### Changed
- Send pointers to *Metric* through chanels instead of *Metric* itself. Reduce memory consumption (~6-10 times).

## [0.9.0] - 2021-02-09
### Changed
- Replace **uint64** with **int64** for state counters.
Expand Down
108 changes: 66 additions & 42 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
// "github.com/pkg/profile"
)

var version string = "0.9.0"
var version string = "1.0.0"

var clog, slog, rlog, tlog, stlog *log.Entry
var hostname string
Expand Down Expand Up @@ -52,8 +52,10 @@ type State struct {
OutQueue int64 `json:"out_queue"`
Queue int64 `json:"queue"`
NegativeQueueError int64 `json:"negative_queue_error"`
PacksOverflewError int64 `json:"packs_overflew_error"`
}

var emptyMetric *Metric
var state State

func runRouter(address string, port int) {
Expand All @@ -76,7 +78,7 @@ func getState(w http.ResponseWriter, req *http.Request) {
json.NewEncoder(w).Encode(state)
}

func runReceiver(address string, port int, inputChan chan Metric) {
func runReceiver(address string, port int, inputChan chan *Metric) {
rlog = clog.WithFields(log.Fields{
"address": address,
"port": port,
Expand All @@ -102,7 +104,7 @@ func runReceiver(address string, port int, inputChan chan Metric) {
}
}

func readMetric(connection net.Conn, inputChan chan Metric) {
func readMetric(connection net.Conn, inputChan chan *Metric) {
connection.SetReadDeadline(time.Now().Add(600 * time.Second))

scanner := bufio.NewScanner(connection)
Expand Down Expand Up @@ -142,7 +144,7 @@ func readMetric(connection net.Conn, inputChan chan Metric) {
metric.Value = metricSlice[1]
metric.Timestamp = timestamp

inputChan <- metric
inputChan <- &metric
atomic.AddInt64(&state.In, 1)
}
if err := scanner.Err(); err != nil {
Expand All @@ -153,7 +155,17 @@ func readMetric(connection net.Conn, inputChan chan Metric) {
connection.Close()
}

func runSender(host string, port int, outputChan chan Metric, TLS bool, ignoreCert bool) {
func limitRefresher(limitPerSec *int) {
savedLimit := *limitPerSec
slog.WithFields(log.Fields{"limitPerSec": savedLimit}).Info("Starting Packs Limit Refresher.")

for {
time.Sleep(1 * time.Second)
*limitPerSec = savedLimit
}
}

func runSender(host string, port int, outputChan chan *Metric, TLS bool, ignoreCert bool, limitPerSec int) {
slog = clog.WithFields(log.Fields{
"host": host,
"port": port,
Expand All @@ -163,46 +175,56 @@ func runSender(host string, port int, outputChan chan Metric, TLS bool, ignoreCe
})
slog.Info("Starting Sender.")

for {
// Create output connection
connection, err := createConnection(host, port, TLS, ignoreCert)
if err != nil {
slog.WithFields(log.Fields{"error": err}).Fatal("Can't create connection.")
atomic.AddInt64(&state.ConnectionError, 1)
time.Sleep(5 * time.Second)
continue
} else {
atomic.AddInt64(&state.Connection, 1)
atomic.AddInt64(&state.ConnectionAlive, 1)
}
// Start limit refresher thread
go limitRefresher(&limitPerSec)

// collect a pack of metrics
var metrics [1000]Metric
for {
curLimit := limitPerSec
slog.WithFields(log.Fields{"limit": curLimit}).Debug("Current limit.")
if curLimit > 0 {
// Create output connection
connection, err := createConnection(host, port, TLS, ignoreCert)
if err != nil {
slog.WithFields(log.Fields{"error": err}).Fatal("Can't create connection.")
atomic.AddInt64(&state.ConnectionError, 1)
time.Sleep(5 * time.Second)
continue
} else {
atomic.AddInt64(&state.Connection, 1)
atomic.AddInt64(&state.ConnectionAlive, 1)
}

for i := 0; i < len(metrics); i++ {
select {
case metric := <-outputChan:
metrics[i] = metric
default:
metrics[i] = Metric{}
time.Sleep(100 * time.Millisecond)
// collect a pack of metrics
var metrics [1000]*Metric

for i := 0; i < len(metrics); i++ {
select {
case metric := <-outputChan:
metrics[i] = metric
default:
metrics[i] = emptyMetric
time.Sleep(100 * time.Millisecond)
}
}
}

// send the pack
if len(metrics) > 0 {
go sendMetric(&metrics, connection, outputChan)
// send the pack
if len(metrics) > 0 {
go sendMetric(&metrics, connection, outputChan)
limitPerSec--
}
} else {
slog.Warning("Limit of metric packs per second overflew.")
atomic.AddInt64(&state.PacksOverflewError, 1)
time.Sleep(1 * time.Second)
}
}
}

func sendMetric(metrics *[1000]Metric, connection net.Conn, outputChan chan Metric) {
func sendMetric(metrics *[1000]*Metric, connection net.Conn, outputChan chan *Metric) {
sent := 0
returned := 0
connectionAlive := true

emptyMetric := Metric{}

for i := 0; i < len(metrics); i++ {
if metrics[i] == emptyMetric {
continue
Expand Down Expand Up @@ -291,8 +313,8 @@ func createConnection(host string, port int, TLS bool, ignoreCert bool) (net.Con
}

func runTransformer(
inputChan chan Metric,
outputChan chan Metric,
inputChan chan *Metric,
outputChan chan *Metric,
tenant string,
prefix string,
immutablePrefix []string) {
Expand All @@ -316,8 +338,8 @@ func runTransformer(
}

func transformMetric(
metric Metric,
outputChan chan Metric,
metric *Metric,
outputChan chan *Metric,
tenant string,
prefix string,
immutablePrefix []string) {
Expand Down Expand Up @@ -381,7 +403,7 @@ func getHostname() string {
return hostname
}

func sendStateMetrics(instance string, inputChan chan Metric) {
func sendStateMetrics(instance string, inputChan chan *Metric) {
clog.Info("Sending state metrics.")
stateSnapshot := state
timestamp := time.Now().Unix()
Expand Down Expand Up @@ -411,7 +433,7 @@ func sendStateMetrics(instance string, inputChan chan Metric) {
metric.Timestamp = timestamp

// Pass to transformation
inputChan <- metric
inputChan <- &metric
atomic.AddInt64(&state.In, 1)
}
}
Expand All @@ -434,6 +456,7 @@ func main() {
var jsonLog bool
var debug bool
var logCaller bool
var limitPerSec int

flag.StringVar(&instance, "instance", "default", "Groxy instance name (for log and metrics)")
flag.StringVar(&tenant, "tenant", "", "Graphite project name to store metrics in")
Expand All @@ -450,6 +473,7 @@ func main() {
flag.BoolVar(&jsonLog, "jsonLog", false, "Log in JSON format")
flag.BoolVar(&debug, "debug", false, "Log debug messages")
flag.BoolVar(&logCaller, "logCaller", false, "Log message caller (file and line number)")
flag.IntVar(&limitPerSec, "limitPerSec", 10, "Maximum number of metric packs (<=1000 metrics per pack) sent per second")

flag.Parse()

Expand Down Expand Up @@ -487,14 +511,14 @@ func main() {
clog.Fatal("You must set '-graphiteAddress'.")
}

inputChan := make(chan Metric, 10000000)
outputChan := make(chan Metric, 10000000)
inputChan := make(chan *Metric, 10000000)
outputChan := make(chan *Metric, 10000000)

hostname = getHostname()

go runReceiver(address, port, inputChan)
go runTransformer(inputChan, outputChan, tenant, prefix, immutablePrefix)
go runSender(graphiteAddress, graphitePort, outputChan, TLS, ignoreCert)
go runSender(graphiteAddress, graphitePort, outputChan, TLS, ignoreCert, limitPerSec)
go runRouter(statsAddress, statsPort)
go updateQueue(1)

Expand Down

0 comments on commit 8fedfa7

Please sign in to comment.