Skip to content

Commit

Permalink
v0.9.0
Browse files Browse the repository at this point in the history
- Replace uint64 with int64 for state counters
- Fix #1 by atomic counters
- Small refactoring
  • Loading branch information
nixargh committed Feb 9, 2021
1 parent dc2201f commit 0db2ba5
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 74 deletions.
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [0.8.1]
## [0.9.0] - 2021-02-09
### Changed
- Replace **uint64** with **int64** for state counters.
- Refactor **sendMetric**.

### Fixed
- [#1](https://github.com/nixargh/groxy/issues/1) use **sync/atomic** for counters.

## [0.8.1] - 2021-01-28
### Changed
- Replace **int64** with **uint64** for state counters.

Expand Down
157 changes: 84 additions & 73 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import (
"reflect"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/gorilla/mux"
// "github.com/pkg/profile"
)

var version string = "0.8.1"
var version string = "0.9.0"

var clog, slog, rlog, tlog, stlog *log.Entry
var hostname string
Expand All @@ -33,22 +34,24 @@ type Metric struct {
}

type State struct {
Version string `json:"version"`
In uint64 `json:"in"`
Out uint64 `json:"out"`
Transformed uint64 `json:"transformed"`
Bad uint64 `json:"bad"`
SendError uint64 `json:"send_error"`
InMpm uint64 `json:"in_mpm"`
OutMpm uint64 `json:"out_mpm"`
BadMpm uint64 `json:"bad_mpm"`
TransformedMpm uint64 `json:"transformed_mpm"`
Connection uint64 `json:"connection"`
ConnectionAlive uint64 `json:"connection_alive"`
ConnectionError uint64 `json:"connection_error"`
Queue uint64 `json:"queue"`
OutQueue uint64 `json:"out_queue"`
TransformQueue uint64 `json:"transform_queue"`
Version string `json:"version"`
In int64 `json:"in"`
Bad int64 `json:"bad"`
Transformed int64 `json:"transformed"`
Out int64 `json:"out"`
ReadError int64 `json:"read_error"`
SendError int64 `json:"send_error"`
InMpm int64 `json:"in_mpm"`
BadMpm int64 `json:"bad_mpm"`
TransformedMpm int64 `json:"transformed_mpm"`
OutMpm int64 `json:"out_mpm"`
Connection int64 `json:"connection"`
ConnectionAlive int64 `json:"connection_alive"`
ConnectionError int64 `json:"connection_error"`
TransformQueue int64 `json:"transform_queue"`
OutQueue int64 `json:"out_queue"`
Queue int64 `json:"queue"`
NegativeQueueError int64 `json:"negative_queue_error"`
}

var state State
Expand Down Expand Up @@ -118,7 +121,7 @@ func readMetric(connection net.Conn, inputChan chan Metric) {
metric.Tenant = metricSlice[3]
default:
rlog.WithFields(log.Fields{"metric": metricString}).Error("Bad metric.")
state.Bad++
atomic.AddInt64(&state.Bad, 1)
connection.Close()
return
}
Expand All @@ -130,7 +133,7 @@ func readMetric(connection net.Conn, inputChan chan Metric) {
"timestampString": metricSlice[2],
"error": err,
}).Error("Timestamp error.")
state.Bad++
atomic.AddInt64(&state.Bad, 1)
return
}

Expand All @@ -140,10 +143,11 @@ func readMetric(connection net.Conn, inputChan chan Metric) {
metric.Timestamp = timestamp

inputChan <- metric
state.In++
atomic.AddInt64(&state.In, 1)
}
if err := scanner.Err(); err != nil {
rlog.WithFields(log.Fields{"error": err}).Error("Error reading input.")
atomic.AddInt64(&state.ReadError, 1)
}

connection.Close()
Expand All @@ -164,12 +168,12 @@ func runSender(host string, port int, outputChan chan Metric, TLS bool, ignoreCe
connection, err := createConnection(host, port, TLS, ignoreCert)
if err != nil {
slog.WithFields(log.Fields{"error": err}).Fatal("Can't create connection.")
state.ConnectionError++
atomic.AddInt64(&state.ConnectionError, 1)
time.Sleep(5 * time.Second)
continue
} else {
state.Connection++
state.ConnectionAlive++
atomic.AddInt64(&state.Connection, 1)
atomic.AddInt64(&state.ConnectionAlive, 1)
}

// collect a pack of metrics
Expand Down Expand Up @@ -200,47 +204,49 @@ func sendMetric(metrics *[1000]Metric, connection net.Conn, outputChan chan Metr
emptyMetric := Metric{}

for i := 0; i < len(metrics); i++ {
if metrics[i] != emptyMetric {
// If connection is dead - just return metrics to outputChan
if connectionAlive == false {
outputChan <- metrics[i]
returned++
continue
}
if metrics[i] == emptyMetric {
continue
}

metricString := fmt.Sprintf(
"%s%s %s %d %s",
metrics[i].Prefix,
metrics[i].Path,
metrics[i].Value,
metrics[i].Timestamp,
metrics[i].Tenant,
)

dataLength, err := connection.Write([]byte(metricString + "\n"))
if err != nil {
slog.WithFields(log.Fields{"error": err}).Error("Connection write error.")
state.SendError++

connectionAlive = false

// Here we must return metric to ResendQueue
outputChan <- metrics[i]
returned++
} else {
slog.WithFields(log.Fields{
"bytes": dataLength,
"metric": metricString,
"number": i,
}).Debug("Metric sent.")
sent++
state.Out++
}
// If connection is dead - just return metrics to outputChan
if connectionAlive == false {
outputChan <- metrics[i]
returned++
continue
}

metricString := fmt.Sprintf(
"%s%s %s %d %s",
metrics[i].Prefix,
metrics[i].Path,
metrics[i].Value,
metrics[i].Timestamp,
metrics[i].Tenant,
)

dataLength, err := connection.Write([]byte(metricString + "\n"))
if err != nil {
slog.WithFields(log.Fields{"error": err}).Error("Connection write error.")
atomic.AddInt64(&state.SendError, 1)

connectionAlive = false

// Here we must return metric to out outputChan
outputChan <- metrics[i]
returned++
} else {
slog.WithFields(log.Fields{
"bytes": dataLength,
"metric": metricString,
"number": i,
}).Debug("Metric sent.")
sent++
atomic.AddInt64(&state.Out, 1)
}
}

connection.Close()
state.ConnectionAlive--
atomic.AddInt64(&state.ConnectionAlive, -1)
slog.WithFields(log.Fields{
"sent": sent,
"returned": returned,
Expand Down Expand Up @@ -333,7 +339,7 @@ func transformMetric(
outputChan <- metric

// Update state
state.Transformed++
atomic.AddInt64(&state.Transformed, 1)
}

type arrayFlags []string
Expand All @@ -352,9 +358,14 @@ func updateQueue(sleepSeconds int) {
for {
time.Sleep(time.Duration(sleepSeconds) * time.Second)

state.TransformQueue = state.In - state.Transformed
state.OutQueue = state.Transformed - state.Out
state.Queue = state.In - state.Out
state.TransformQueue = atomic.LoadInt64(&state.In) - atomic.LoadInt64(&state.Transformed)
state.OutQueue = atomic.LoadInt64(&state.Transformed) - atomic.LoadInt64(&state.Out)
state.Queue = atomic.LoadInt64(&state.In) - atomic.LoadInt64(&state.Out)

if state.Queue < 0 {
clog.WithFields(log.Fields{"queue": state.Queue}).Error("Queue value is negative.")
atomic.AddInt64(&state.NegativeQueueError, 1)
}
}
}

Expand Down Expand Up @@ -387,7 +398,7 @@ func sendStateMetrics(instance string, inputChan chan Metric) {
if name == "Version" {
metric.Value = strings.Replace(value.String(), ".", "", 2)
} else {
metric.Value = fmt.Sprintf("%d", value.Uint())
metric.Value = fmt.Sprintf("%d", value.Int())
}

clog.WithFields(log.Fields{
Expand All @@ -401,7 +412,7 @@ func sendStateMetrics(instance string, inputChan chan Metric) {

// Pass to transformation
inputChan <- metric
state.In++
atomic.AddInt64(&state.In, 1)
}
}

Expand Down Expand Up @@ -490,18 +501,18 @@ func main() {
sleepSeconds := 60
clog.WithFields(log.Fields{"sleepSeconds": sleepSeconds}).Info("Starting a waiting loop.")
for {
in := state.In
out := state.Out
transformed := state.Transformed
bad := state.Bad
in := atomic.LoadInt64(&state.In)
out := atomic.LoadInt64(&state.Out)
transformed := atomic.LoadInt64(&state.Transformed)
bad := atomic.LoadInt64(&state.Bad)

time.Sleep(time.Duration(sleepSeconds) * time.Second)

// Calculate MPMs
state.InMpm = state.In - in
state.OutMpm = state.Out - out
state.TransformedMpm = state.Transformed - transformed
state.BadMpm = state.Bad - bad
state.InMpm = atomic.LoadInt64(&state.In) - in
state.OutMpm = atomic.LoadInt64(&state.Out) - out
state.TransformedMpm = atomic.LoadInt64(&state.Transformed) - transformed
state.BadMpm = atomic.LoadInt64(&state.Bad) - bad

clog.WithFields(log.Fields{"state": state}).Info("Dumping state.")

Expand Down

0 comments on commit 0db2ba5

Please sign in to comment.