Skip to content

Commit

Permalink
v0.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
nixargh committed Dec 29, 2020
1 parent 3a15923 commit ae458e3
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 68 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [0.5.0] - 2020-12-28
### Changed
- Use a single out connection and pass packs as pointers to array.

## [0.4.1] - 2020-12-21
### Fixed
- Wrong **TransformQueue--** action.
Expand Down
127 changes: 59 additions & 68 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
// "github.com/pkg/profile"
)

var version string = "0.4.1"
var version string = "0.5.0"

type Metric struct {
Prefix string `json:"prefix,omitempty"`
Expand Down Expand Up @@ -133,92 +133,83 @@ func runSender(host string, port int, outputChan chan Metric, TLS bool, ignoreCe
log.Printf("Starting Sender to '%s:%d'.\n", host, port)

for {
var connections [4]net.Conn

// Create connections
for n := 0; n < len(connections); {
// Create a new one
connection, err := createConnection(host, port, TLS, ignoreCert)
if err != nil {
log.Printf("Can't create connection: '%s'.", err)
state.ConnectionError++
} else {
connections[n] = connection
n++
state.Connection++
state.ConnectionAlive++
}
// Create output connection
connection, err := createConnection(host, port, TLS, ignoreCert)
if err != nil {
log.Printf("Can't create connection: '%s'.", err)
state.ConnectionError++
time.Sleep(5 * time.Second)
continue
} else {
state.Connection++
state.ConnectionAlive++
}

// Send some packs of metrics
for pack := 0; pack < 100; pack++ {
metrics := make([]Metric, 0)

n := pack % len(connections)
// collect a pack of metrics
var metrics [1000]Metric

// collect a pack of metrics
for i := 0; i < 100; i++ {
select {
case metric := <-outputChan:
metrics = append(metrics, metric)
default:
time.Sleep(100 * time.Millisecond)
}
}

// send the pack
if len(metrics) > 0 {
go sendMetric(metrics, connections[n], outputChan)
for i := 0; i < len(metrics); i++ {
select {
case metric := <-outputChan:
metrics[i] = metric
default:
metrics[i] = Metric{}
time.Sleep(100 * time.Millisecond)
}
}

// Close connections
for n := 0; n < len(connections); n++ {
// Close the old one
connections[n].Close()
state.ConnectionAlive--
// send the pack
if len(metrics) > 0 {
go sendMetric(&metrics, connection, outputChan)
}
}
}

func sendMetric(metrics []Metric, connection net.Conn, outputChan chan Metric) {
func sendMetric(metrics *[1000]Metric, connection net.Conn, outputChan chan Metric) {
sent := 0
returned := 0
connectionAlive := true

for i := 0; i < len(metrics); i++ {
metricString := fmt.Sprintf(
"%s%s %s %d %s",
metrics[i].Prefix,
metrics[i].Path,
metrics[i].Value,
metrics[i].Timestamp,
metrics[i].Tenant,
)
emptyMetric := Metric{}

// If connection is dead - just return metrics to outputChan
if connectionAlive {
dataLength, err := connection.Write([]byte(metricString + "\n"))
if err != nil {
log.Printf("Connection write error: '%s'.", err)
state.SendError++

connectionAlive = false

// Here we must return metric to ResendQueue
for i := 0; i < len(metrics); i++ {
if metrics[i] != emptyMetric {
metricString := fmt.Sprintf(
"%s%s %s %d %s",
metrics[i].Prefix,
metrics[i].Path,
metrics[i].Value,
metrics[i].Timestamp,
metrics[i].Tenant,
)

// If connection is dead - just return metrics to outputChan
if connectionAlive {
dataLength, err := connection.Write([]byte(metricString + "\n"))
if err != nil {
log.Printf("Connection write error: '%s'.", err)
state.SendError++

connectionAlive = false

// Here we must return metric to ResendQueue
outputChan <- metrics[i]
returned++
} else {
log.Printf("[%d] Out (%d bytes): '%s'.\n", i, dataLength, metricString)
sent++
state.Out++
state.OutQueue--
}
} else {
outputChan <- metrics[i]
returned++
} else {
log.Printf("[%d] Out (%d bytes): '%s'.\n", i, dataLength, metricString)
sent++
state.Out++
state.OutQueue--
}
} else {
outputChan <- metrics[i]
returned++
}
}

connection.Close()
state.ConnectionAlive--
log.Printf("The pack is finished: sent=%d; returned=%d.\n", sent, returned)
}

Expand Down Expand Up @@ -251,7 +242,7 @@ func createConnection(address string, port int, TLS bool, ignoreCert bool) (net.
log.Printf("Dialer error: '%s'.", err)
return connection, err
}
connection.SetDeadline(time.Now().Add(1200 * time.Second))
connection.SetDeadline(time.Now().Add(600 * time.Second))

log.Printf("Connection remote address: '%s'.\n", connection.RemoteAddr())
return connection, err
Expand Down

0 comments on commit ae458e3

Please sign in to comment.