Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/ndt7-prometheus-exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ var (
flagPeriodMax = flag.Duration("period_max", 15 * time.Hour, "maximum period, e.g. 15h, between speed tests, when running in daemon mode")

flagPort = flag.Int("port", 0, "if non-zero, start an HTTP server on this port to export prometheus metrics")
flagMaxHistory = flag.Int("max_history", 10, "Number of results to keep in memory")
)

func init() {
Expand Down Expand Up @@ -216,6 +217,11 @@ func main() {

e = emitter.NewPrometheus(e, dlThroughput, dlLatency, ulThroughput, ulLatency, lastResultGauge)
http.Handle("/metrics", promhttp.Handler())

h := newStatusHandler(e, *flagMaxHistory)
e = h
http.Handle("/", h.handler())

go func() {
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", *flagPort), nil))
}()
Expand Down
165 changes: 165 additions & 0 deletions cmd/ndt7-prometheus-exporter/web.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
package main

import (
"fmt"
"net/http"
"io"
"strings"
"sync"
"time"

"github.com/m-lab/ndt7-client-go/internal/emitter"
"github.com/m-lab/ndt7-client-go/spec"
)

type entry struct {
completion time.Time
summary emitter.Summary
}

type circularQueue struct {
sync.Mutex

// Underlying storage for queue elements
entries []entry

// Index into current start of the queue
start int

// Number of elements in the queue
count int
}

func newCircularQueue(maxSize int) *circularQueue {
return &circularQueue{
entries: make([]entry, maxSize),
start: 0,
count: 0,
}
}

func (q *circularQueue) internalPop() {
if q.count == 0 {
return
}

// Assumes mutex is locked
q.start = (q.start + 1) % len(q.entries)
q.count--
}

func (q *circularQueue) push(s entry) {
q.Lock()
defer q.Unlock()

if q.count >= len(q.entries) {
q.internalPop()
}

i := (q.start + q.count) % len(q.entries)
q.entries[i] = s
q.count++
}

func (q *circularQueue) forEachReversed(f func(entry)) {
var copy []entry
func() {
q.Lock()
defer q.Unlock()

copy = make([]entry, q.count)
for i := 0; i < q.count; i++ {
j := (i + q.start) % len(q.entries)
copy[q.count - i - 1] = q.entries[j]
}
}()

for _, entry := range copy {
f(entry)
}
}

// statusHandler implements both emitter.Emitter and http.Handler interfaces
type statusHandler struct {
// Chained emitter.Emitter
emitter emitter.Emitter

// A cache of recent test results
results *circularQueue
}

func newStatusHandler(e emitter.Emitter, maxSize int) *statusHandler {
return &statusHandler{
emitter: e,
results: newCircularQueue(maxSize),
}
}

// OnStarting emits the starting event
func (h *statusHandler) OnStarting(test spec.TestKind) error {
return h.emitter.OnStarting(test)
}

// OnError emits the error event
func (h *statusHandler) OnError(test spec.TestKind, err error) error {
return h.emitter.OnError(test, err)
}

// OnConnected emits the connected event
func (h *statusHandler) OnConnected(test spec.TestKind, fqdn string) error {
return h.emitter.OnConnected(test, fqdn)
}

// OnDownloadEvent handles an event emitted during the download
func (h *statusHandler) OnDownloadEvent(m *spec.Measurement) error {
return h.emitter.OnDownloadEvent(m)
}

// OnUploadEvent handles an event emitted during the upload
func (h *statusHandler) OnUploadEvent(m *spec.Measurement) error {
return h.emitter.OnUploadEvent(m)
}

// OnComplete is the event signalling the end of the test
func (h *statusHandler) OnComplete(test spec.TestKind) error {
return h.emitter.OnComplete(test)
}

// OnSummary handles the summary event, emitted after the test is over.
func (h *statusHandler) OnSummary(s *emitter.Summary) error {
h.results.push(entry{time.Now(), *s})

return h.emitter.OnSummary(s)
}

// Part of http.Handler interface
func (h *statusHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
io.WriteString(w, `<!DOCTYPE html>
<html>
<head><title>NDT7 Prometheus Exporter</title></head>
<body>
<h1>A non-interactive NDT7 client</h1>
<p><a href="/metrics">Metrics</a></p>
<table border='1'>
`)

h.results.forEachReversed(func(entry entry) {
b := &strings.Builder{}
io.WriteString(b, "<tr>\n")
fmt.Fprintf(b, "<td>%s</td><td><pre>\n", entry.completion.Format(time.RFC3339))
e := emitter.NewHumanReadableWithWriter(b)
e.OnSummary(&entry.summary)
io.WriteString(b, "\n</pre></td>\n")
io.WriteString(b, "</tr>\n")
io.WriteString(w, b.String())
})

io.WriteString(w, `</table>
</body>
</html>
`)
}

func (h *statusHandler) handler() http.Handler {
return h
}
61 changes: 61 additions & 0 deletions cmd/ndt7-prometheus-exporter/web_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package main

import (
"fmt"
"reflect"
"testing"
"time"

"github.com/m-lab/ndt7-client-go/internal/emitter"
)

func TestCircularQueue(t * testing.T) {
fakeSummary := func(fqdn string) emitter.Summary {
return emitter.Summary{
ServerFQDN: fqdn,
}
}

cases := []struct{
size int
input []string
want []string
}{
{
size: 2,
input: []string{},
want: []string{},
},
{
size: 2,
input: []string{"zero"},
want: []string{"0 zero"},
},
{
size: 2,
input: []string{"zero", "one"},
want: []string{"1 one", "0 zero"},
},
{
size: 2,
input: []string{"zero", "one", "two"},
want: []string{"2 two", "1 one"},
},
}

for _, tc := range(cases) {
q := newCircularQueue(tc.size)
for i, s := range(tc.input) {
q.push(entry{time.Unix(int64(i), 0), fakeSummary(s)})
}

got := make([]string, 0)
q.forEachReversed(func(e entry) {
got = append(got, fmt.Sprintf("%d %s", e.completion.Unix(), e.summary.ServerFQDN))
})

if !reflect.DeepEqual(tc.want, got) {
t.Fatalf("want %v; got %v", tc.want, got)
}
}
}