From b1ad39a98a9e4cc253c629a4573560115c29750f Mon Sep 17 00:00:00 2001 From: nixargh Date: Fri, 30 Sep 2022 15:11:53 +0300 Subject: [PATCH] fixes and improvements --- main.go | 32 +++++++++++++++++-- state.go | 95 ++++++++++++++++++++------------------------------------ 2 files changed, 63 insertions(+), 64 deletions(-) diff --git a/main.go b/main.go index 8f97b9a..d3420ef 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( "os" "strconv" "strings" + "sync/atomic" "time" log "github.com/sirupsen/logrus" @@ -198,6 +199,7 @@ func main() { } // Init sender counters that are slices + state.Destination = append(state.Destination, graphiteAddress[id]) state.SendError = append(state.SendError, 0) state.Out = append(state.Out, 0) state.OutBytes = append(state.OutBytes, 0) @@ -231,12 +233,38 @@ func main() { go runTransformer(inputChan, outputChans, tenant, forceTenant, prefix, immutablePrefix) go runRouter(statsAddress, statsPort) go updateQueue(1) - go updatePerMinuteCounters(graphiteAddress, inputChan) sleepSeconds := 60 clog.WithFields(log.Fields{"sleepSeconds": sleepSeconds}).Info("Starting a waiting loop.") for { - sendStateMetrics(inputChan) + // Get initial values + in := atomic.LoadInt64(&state.In) + bad := atomic.LoadInt64(&state.Bad) + transformed := atomic.LoadInt64(&state.Transformed) + var out, out_bytes []int64 + + // For multiple senders + for id := range graphiteAddress { + out = append(out, atomic.LoadInt64(&state.Out[id])) + out_bytes = append(out_bytes, atomic.LoadInt64(&state.OutBytes[id])) + } + + // Sleep for a minute time.Sleep(time.Duration(sleepSeconds) * time.Second) + + stlog.WithFields(log.Fields{"graphiteAddress": graphiteAddress}).Info("Updating per-minute metrics.") + // Calculate MPMs + atomic.StoreInt64(&state.InMpm, atomic.LoadInt64(&state.In)-in) + atomic.StoreInt64(&state.BadMpm, atomic.LoadInt64(&state.Bad)-bad) + atomic.StoreInt64(&state.TransformedMpm, atomic.LoadInt64(&state.Transformed)-transformed) + + // For multiple senders + for id := range graphiteAddress { + atomic.StoreInt64(&state.OutMpm[id], atomic.LoadInt64(&state.Out[id])-out[id]) + atomic.StoreInt64(&state.OutBpm[id], atomic.LoadInt64(&state.OutBytes[id])-out_bytes[id]) + } + + // Send State metrics + sendStateMetrics(inputChan) } } diff --git a/state.go b/state.go index 2992b5c..5d0b626 100644 --- a/state.go +++ b/state.go @@ -28,19 +28,20 @@ type State struct { TransformedMpm int64 `json:"transformed_mpm" type:"int"` TransformQueue int64 `json:"transform_queue" type:"int"` // Sender metrics - Connection []int64 `json:"connection" type:"slice"` - ConnectionAlive []int64 `json:"connection_alive" type:"slice"` - ConnectionError []int64 `json:"connection_error" type:"slice"` - Out []int64 `json:"out" type:"slice"` - OutBytes []int64 `json:"out_bytes" type:"slice"` - Returned []int64 `json:"returned" type:"slice"` - SendError []int64 `json:"send_error" type:"slice"` - OutQueue []int64 `json:"out_queue" type:"slice"` - Queue []int64 `json:"queue" type:"slice"` - NegativeQueueError []int64 `json:"negative_queue_error" type:"slice"` - PacksOverflewError []int64 `json:"packs_overflew_error" type:"slice"` - OutMpm []int64 `json:"out_mpm" type:"slice"` - OutBpm []int64 `json:"out_bpm" type:"slice"` + Destination []string `json:"destination" type:"slice"` + Connection []int64 `json:"connection" type:"slice"` + ConnectionAlive []int64 `json:"connection_alive" type:"slice"` + ConnectionError []int64 `json:"connection_error" type:"slice"` + Out []int64 `json:"out" type:"slice"` + OutBytes []int64 `json:"out_bytes" type:"slice"` + Returned []int64 `json:"returned" type:"slice"` + SendError []int64 `json:"send_error" type:"slice"` + OutQueue []int64 `json:"out_queue" type:"slice"` + Queue []int64 `json:"queue" type:"slice"` + NegativeQueueError []int64 `json:"negative_queue_error" type:"slice"` + PacksOverflewError []int64 `json:"packs_overflew_error" type:"slice"` + OutMpm []int64 `json:"out_mpm" type:"slice"` + OutBpm []int64 `json:"out_bpm" type:"slice"` } func runRouter(address string, port int) { @@ -96,23 +97,29 @@ func sendStateMetrics(inputChan chan *Metric) { metricName := structfield.Tag.Get("json") valueField := values.Field(i) - if name == "Version" { + switch name { + case "Version": value := strings.Replace(valueField.String(), ".", "", 2) doSend(name, metricName, value, timestamp, inputChan) - } else { - switch vtype { - case "int": - value := fmt.Sprintf("%d", valueField.Int()) + continue + case "Destination": + value := fmt.Sprintf("%d", valueField.Len()) + doSend(name, metricName, value, timestamp, inputChan) + continue + } + + switch vtype { + case "int": + value := fmt.Sprintf("%d", valueField.Int()) + doSend(name, metricName, value, timestamp, inputChan) + case "slice": + for id := 0; id < valueField.Len(); id++ { + metricName = fmt.Sprintf("%s_%d", structfield.Tag.Get("json"), id) + value := fmt.Sprintf("%d", valueField.Index(id).Int()) doSend(name, metricName, value, timestamp, inputChan) - case "slice": - for id := 0; id < valueField.Len(); id++ { - metricName = fmt.Sprintf("%s_%d", structfield.Tag.Get("json"), id) - value := fmt.Sprintf("%d", valueField.Index(id).Int()) - doSend(name, metricName, value, timestamp, inputChan) - } - default: - stlog.WithFields(log.Fields{"vtype": vtype}).Fatal("Unknown state value type.") } + default: + stlog.WithFields(log.Fields{"vtype": vtype}).Fatal("Unknown state value type.") } } } @@ -150,39 +157,3 @@ func doSend( inputChan <- &metric atomic.AddInt64(&state.In, 1) } - -func updatePerMinuteCounters(graphiteAddress []string, inputChan chan *Metric) { - sleepSeconds := 60 - stlog.WithFields(log.Fields{ - "graphiteAddress": graphiteAddress, - "sleepSeconds": sleepSeconds, - }).Info("Starting PerMinuteCounters Updater loop.") - - for { - in := atomic.LoadInt64(&state.In) - bad := atomic.LoadInt64(&state.Bad) - transformed := atomic.LoadInt64(&state.Transformed) - var out, out_bytes []int64 - - // For multiple senders - for id := range graphiteAddress { - out = append(out, atomic.LoadInt64(&state.Out[id])) - out_bytes = append(out_bytes, atomic.LoadInt64(&state.OutBytes[id])) - } - - // Sleep for a minute - time.Sleep(time.Duration(sleepSeconds) * time.Second) - - stlog.WithFields(log.Fields{"graphiteAddress": graphiteAddress}).Info("Updating per-minute metrics.") - // Calculate MPMs - atomic.StoreInt64(&state.InMpm, atomic.LoadInt64(&state.In)-in) - atomic.StoreInt64(&state.BadMpm, atomic.LoadInt64(&state.Bad)-bad) - atomic.StoreInt64(&state.TransformedMpm, atomic.LoadInt64(&state.Transformed)-transformed) - - // For multiple senders - for id := range graphiteAddress { - atomic.StoreInt64(&state.OutMpm[id], atomic.LoadInt64(&state.Out[id])-out[id]) - atomic.StoreInt64(&state.OutBpm[id], atomic.LoadInt64(&state.OutBytes[id])-out_bytes[id]) - } - } -}