Skip to content
This repository has been archived by the owner on Feb 15, 2023. It is now read-only.

Refactor SNMP enricher #2

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 28 additions & 51 deletions enrichers/snmp.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package enrichers

import (
"context"
"fmt"
"github.com/alouca/gosnmp"
flow "github.com/bwNetFlow/protobuf/go"
cache "github.com/patrickmn/go-cache"
"golang.org/x/sync/semaphore"
"log"
"net"
"regexp"
"strings"
"time"
)

Expand All @@ -20,7 +21,9 @@ var (
oidBase = ".1.3.6.1.2.1.31.1.1.1.%d.%d"
oidExts = map[string]uint8{"name": 1, "speed": 15, "desc": 18}

snmpSemaphore = make(chan struct{}, 64)
snmpSemaphore = semaphore.NewWeighted(64)

endsWithDesc = regexp.MustCompile(".*desc$")
)

type cacheEntry struct {
Expand All @@ -32,6 +35,7 @@ type cacheEntry struct {
// Refresh OID cache periodically. Needs no syncing or waiting, it will die
// when main() exits. Tries to refresh all entries, ignores errors. If an entry
// errors multiple times in a row, it will be expunged eventually.
// Entries aren't queried in parallel.
func refreshLoop() {
log.Println("SNMP refresh: Started.")
for {
Expand All @@ -46,82 +50,49 @@ func refreshLoop() {
log.Println("SNMP refresh: Found blank key, initial Get pending.")
continue
}
s, err := gosnmp.NewGoSNMP(content.router, snmpCommunity, gosnmp.Version2c, 1)
if err != nil {
log.Println("SNMP refresh: Connection Error:", err)
continue
}
resp, err := s.Get(content.oid)
if err != nil {
log.Printf("SNMP refresh: Get failed for '%s' from %s.", content.oid, content.router)
log.Printf(" Error Message: %s\n", err)
continue
}

// parse and cache
if len(resp.Variables) == 1 {
new_answer := resp.Variables[0].Value
// this is somewhat ugly, maybe place this elsewhere?
if strings.Contains(key, "desc") {
actual := ifdescRegex.FindStringSubmatch(new_answer.(string))
if actual != nil && len(actual) > 1 {
new_answer = actual[1]
}
}
if new_answer != content.answer {
// .Replace actually, but Set does not
// error if the key expired meanwhile
snmpCache.Set(
key,
cacheEntry{content.router, content.oid, new_answer},
cache.DefaultExpiration)
log.Printf("SNMP refresh: Updated '%s' from %s.\n", content.oid, content.router)
} else {
// reset expire
snmpCache.Set(key, content, cache.DefaultExpiration)
}
} else {
log.Printf("SNMP refresh: Bad response: %v", resp.Variables)
}
querySNMP(key, content.oid, content.router)
}
log.Println("SNMP refresh: Finished hourly run.")
}
}

// Query a single SNMP datapoint and cache the result. Supposedly a short-lived
// goroutine.
func querySNMP(router string, iface uint32, datapoint string) {
func queryNewSNMPEntry(router string, iface uint32, datapoint string) {
cache_key := fmt.Sprintf("%s %d %s", router, iface, datapoint)
oid := fmt.Sprintf(oidBase, oidExts[datapoint], iface)
snmpSemaphore <- struct{}{} // acquire

querySNMP(cache_key, oid, router)
}
func querySNMP(cache_key string, oid string, router string) {
snmpSemaphore.Acquire(context.Background(), 1)
// release semaphore, UDP sockets are closed
defer func() { snmpSemaphore.Release(1) }()

s, err := gosnmp.NewGoSNMP(router, snmpCommunity, gosnmp.Version2c, 1)
if err != nil {
log.Println("SNMP: Connection Error:", err)
<-snmpSemaphore // release
return
}
resp, err := s.Get(oid)
if err != nil {
log.Printf("SNMP: Get failed for '%s' from %s.", oid, router)
log.Printf(" Error Message: %s\n", err)
<-snmpSemaphore // release
return
}

// release semaphore, UDP sockets are closed
<-snmpSemaphore

// parse and cache
if len(resp.Variables) == 1 {
value := resp.Variables[0].Value // TODO: check data types maybe
// this is somewhat ugly, maybe place this elsewhere?
if datapoint == "desc" {
// Check if datapoint == desc
if endsWithDesc.Match([]byte(cache_key)) {
actual := ifdescRegex.FindStringSubmatch(value.(string))
if actual != nil && len(actual) > 1 {
value = actual[1]
}
}
snmpCache.Set(fmt.Sprintf("%s %d %s", router, iface, datapoint), cacheEntry{router, oid, value}, cache.DefaultExpiration)
snmpCache.Set(cache_key, cacheEntry{router, oid, value}, cache.DefaultExpiration)
// log.Printf("SNMP: Retrieved '%s' from %s.\n", oid, router)

} else {
Expand All @@ -133,7 +104,12 @@ func querySNMP(router string, iface uint32, datapoint string) {
// up hourly refreshs of the cached data.
func InitSnmp(regex string, community string) {
snmpCache = cache.New(3*time.Hour, 1*time.Hour)
ifdescRegex, _ = regexp.Compile(regex)

var err error
if ifdescRegex, err = regexp.Compile(regex); err != nil {
log.Fatal("Enricher: Failed to compile regular expression", err)
}

snmpCommunity = community

go refreshLoop()
Expand All @@ -145,7 +121,7 @@ func InitSnmp(regex string, community string) {
func getCachedOrQuery(router string, iface uint32) (string, string, uint32) {
var name, desc string
var speed uint32
for datapoint, _ := range oidExts {
for datapoint := range oidExts {
if value, found := snmpCache.Get(fmt.Sprintf("%s %d %s", router, iface, datapoint)); found {
entry := value.(cacheEntry)
if entry.answer == nil {
Expand All @@ -162,8 +138,9 @@ func getCachedOrQuery(router string, iface uint32) (string, string, uint32) {
speed = uint32(entry.answer.(uint64))
}
} else {
// Add a placeholder entry, that will be eventually replaced by the queried metadata.
snmpCache.Set(fmt.Sprintf("%s %d %s", router, iface, datapoint), cacheEntry{}, cache.DefaultExpiration)
go querySNMP(router, iface, datapoint)
go queryNewSNMPEntry(router, iface, datapoint)
}
}
return name, desc, speed
Expand Down
10 changes: 1 addition & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,19 @@ module github.com/bwNetFlow/processor_enricher
go 1.14

require (
github.com/DataDog/zstd v1.4.5 // indirect
github.com/Shopify/sarama v1.27.2 // indirect
github.com/alouca/gologger v0.0.0-20120904114645-7d4b7291de9c // indirect
github.com/alouca/gosnmp v0.0.0-20170620005048-04d83944c9ab
github.com/bsm/sarama-cluster v2.1.15+incompatible // indirect
github.com/bwNetFlow/ip_prefix_trie v0.0.0-20190717112653-3fe50ea5b638
github.com/bwNetFlow/kafkaconnector v0.0.0-20201026123259-b5b86cfb0ae8
github.com/bwNetFlow/protobuf/go v0.0.0-20201022144742-933b7710ed05
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/gogo/protobuf v1.3.1 // indirect
github.com/golang/protobuf v1.4.3 // indirect
github.com/golang/snappy v0.0.2 // indirect
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/klauspost/compress v1.11.1 // indirect
github.com/onsi/ginkgo v1.12.1 // indirect
github.com/onsi/gomega v1.10.0 // indirect
github.com/oschwald/maxminddb-golang v1.7.0
github.com/patrickmn/go-cache v2.1.0+incompatible
golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 // indirect
golang.org/x/net v0.0.0-20201026091529-146b70c837a4 // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sys v0.0.0-20201024232916-9f70ab9862d5 // indirect
google.golang.org/protobuf v1.25.0 // indirect
gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect
)
Loading