Skip to content

Commit

Permalink
0.4.0 re-send metrics after failed send
Browse files Browse the repository at this point in the history
  • Loading branch information
nixargh committed Dec 20, 2020
1 parent f37064b commit 3145047
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 41 deletions.
8 changes: 6 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]
## [0.4.0]
### Changed
- Create multiple connection for Sender and send a number of packs to each before close.
- Create multiple connections for Sender and send a number of packs to each before close.

### Added
- Stats server that returns current state by HTTP.
- Re-send of metrics after send failure.

### Fixed
- Sending to closed connections.

## [0.3.0] - 2020-12-16
### Fixed
Expand Down
100 changes: 61 additions & 39 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
// "github.com/pkg/profile"
)

var version string = "0.3.0"
var version string = "0.4.0"

type Metric struct {
Prefix string `json:"prefix,omitempty"`
Expand All @@ -38,7 +38,7 @@ type State struct {
BadMpm int64 `json:"bad_mpm"`
TransformedMpm int64 `json:"transformed_mpm"`
Connection int64 `json:"connection"`
ConnectionLive int64 `json:"connection_live"`
ConnectionAlive int64 `json:"connection_alive"`
ConnectionError int64 `json:"connection_error"`
OutQueue int64 `json:"out_queue"`
TransformQueue int64 `json:"transform_queue"`
Expand Down Expand Up @@ -102,13 +102,16 @@ func readMetric(connection net.Conn, inputChan chan Metric) {
default:
log.Printf("Bad metric: '%s'.", metricString)
state.Bad++
state.TransformQueue--
connection.Close()
return
}

timestamp, err := strconv.ParseInt(metricSlice[2], 10, 64)
if err != nil {
log.Printf("Timestamp error: '%s'.", err)
state.Bad++
state.TransformQueue--
return
}

Expand All @@ -133,6 +136,8 @@ func runSender(host string, port int, outputChan chan Metric, TLS bool, ignoreCe

for {
var connections [4]net.Conn

// Create connections
for n := 0; n < len(connections); {
// Create a new one
connection, err := createConnection(host, port, TLS, ignoreCert)
Expand All @@ -143,43 +148,82 @@ func runSender(host string, port int, outputChan chan Metric, TLS bool, ignoreCe
connections[n] = connection
n++
state.Connection++
state.ConnectionLive++
state.ConnectionAlive++
}
}

// Send some packs of metrics
for pack := 0; pack < 100; pack++ {
var metrics [100]string
metrics := make([]Metric, 0)

n := pack % len(connections)

// collect a pack of metrics
for i := 0; i < len(metrics); i++ {
for i := 0; i < 100; i++ {
select {
case metric := <-outputChan:
metrics[i] = fmt.Sprintf(
"%s%s %s %d %s",
metric.Prefix,
metric.Path,
metric.Value,
metric.Timestamp,
metric.Tenant,
)
metrics = append(metrics, metric)
default:
time.Sleep(100 * time.Millisecond)
}
}

// send the pack
go sendMetric(metrics, connections[n])
if len(metrics) > 0 {
go sendMetric(metrics, connections[n], outputChan)
}
}

// Close connections
for n := 0; n < len(connections); n++ {
// Close the old one
connections[n].Close()
state.ConnectionLive--
defer connections[n].Close()
state.ConnectionAlive--
}
}
}

func sendMetric(metrics []Metric, connection net.Conn, outputChan chan Metric) {
sent := 0
returned := 0
connectionAlive := true

for i := 0; i < len(metrics); i++ {
metricString := fmt.Sprintf(
"%s%s %s %d %s",
metrics[i].Prefix,
metrics[i].Path,
metrics[i].Value,
metrics[i].Timestamp,
metrics[i].Tenant,
)

// If connection is dead - just return metrics to outputChan
if connectionAlive {
dataLength, err := connection.Write([]byte(metricString + "\n"))
if err != nil {
log.Printf("Connection write error: '%s'.", err)
state.SendError++

connectionAlive = false

// Here we must return metric to ResendQueue
outputChan <- metrics[i]
returned++
} else {
log.Printf("[%d] Out (%d bytes): '%s'.\n", i, dataLength, metricString)
sent++
state.Out++
state.OutQueue--
}
} else {
outputChan <- metrics[i]
returned++
}
}
log.Printf("The pack is finished: sent=%d; returned=%d.\n", sent, returned)
}

func createConnection(address string, port int, TLS bool, ignoreCert bool) (net.Conn, error) {
netAddress := fmt.Sprintf("%s:%d", address, port)
log.Printf("Connecting to: '%s'.\n", netAddress)
Expand Down Expand Up @@ -209,34 +253,12 @@ func createConnection(address string, port int, TLS bool, ignoreCert bool) (net.
log.Printf("Dialer error: '%s'.", err)
return connection, err
}
connection.SetDeadline(time.Now().Add(600 * time.Second))
connection.SetDeadline(time.Now().Add(1200 * time.Second))

log.Printf("Connection remote address: '%s'.\n", connection.RemoteAddr())
return connection, err
}

func sendMetric(metrics [100]string, connection net.Conn) {
errors := 0

for i := 0; i < len(metrics); i++ {
if metrics[i] != "" {
dataLength, err := connection.Write([]byte(metrics[i] + "\n"))
if err != nil {
log.Printf("Connection write error: '%s'.", err)
errors++
state.SendError++

// Here we must return metric to OutQueue
} else {
log.Printf("[%d] Out (%d bytes): '%s'.\n", i, dataLength, metrics[i])
state.Out++
state.OutQueue--
}
}
}
log.Printf("The pack is sent with %d error(s).\n", errors)
}

func runTransformer(
inputChan chan Metric,
outputChan chan Metric,
Expand Down

0 comments on commit 3145047

Please sign in to comment.