Skip to content

Commit

Permalink
Add grok parser (#54)
Browse files Browse the repository at this point in the history
* feat: grok parser added

* go.mod and go.sum updated

* fix: using same data for benchmark as regex

* fix: map[string]struct used for membership testing

* fix: grok package updated to signoz fork

* fix: type support added for grok

Co-authored-by: Srikanth Chekuri <[email protected]>
  • Loading branch information
nityanandagohain and srikanthccv authored Jan 11, 2023
1 parent 7c2113f commit b6e35ee
Show file tree
Hide file tree
Showing 9 changed files with 1,203 additions and 3 deletions.
1 change: 1 addition & 0 deletions components/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ import (
"github.com/SigNoz/signoz-otel-collector/exporter/clickhouselogsexporter"
"github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter"
"github.com/SigNoz/signoz-otel-collector/exporter/clickhousetracesexporter"
_ "github.com/SigNoz/signoz-otel-collector/pkg/parser/grok"
"github.com/SigNoz/signoz-otel-collector/processor/signozspanmetricsprocessor"
)

Expand Down
10 changes: 7 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/extension/pprofextension v0.66.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.66.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.66.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.66.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.66.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor v0.66.0
github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatorateprocessor v0.66.0
Expand Down Expand Up @@ -111,6 +112,7 @@ require (
github.com/sirupsen/logrus v1.9.0
github.com/spf13/viper v1.14.0
github.com/stretchr/testify v1.8.1
github.com/vjeantet/grok v1.0.1
go.opencensus.io v0.24.0
go.opentelemetry.io/collector v0.66.0
go.opentelemetry.io/collector/component v0.66.0
Expand All @@ -127,6 +129,7 @@ require (
go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.66.0
go.opentelemetry.io/collector/receiver/otlpreceiver v0.66.0
go.opentelemetry.io/collector/semconv v0.66.0
go.uber.org/atomic v1.10.0
go.uber.org/multierr v1.8.0
go.uber.org/zap v1.23.0
google.golang.org/grpc v1.51.0
Expand Down Expand Up @@ -320,7 +323,6 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.66.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.66.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.66.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.66.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.66.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus v0.66.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.66.0 // indirect
Expand Down Expand Up @@ -394,7 +396,6 @@ require (
go.opentelemetry.io/otel/sdk v1.11.1 // indirect
go.opentelemetry.io/otel/sdk/metric v0.33.0 // indirect
go.opentelemetry.io/otel/trace v1.11.1 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/goleak v1.2.0 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/exp v0.0.0-20221019170559-20944726eadf // indirect
Expand Down Expand Up @@ -431,7 +432,10 @@ require (
sigs.k8s.io/yaml v1.3.0 // indirect
)

replace github.com/golang-migrate/migrate/v4 => github.com/sergey-telpuk/migrate/v4 v4.15.3-0.20220303065225-d5ae59d12ff7
replace (
github.com/golang-migrate/migrate/v4 => github.com/sergey-telpuk/migrate/v4 v4.15.3-0.20220303065225-d5ae59d12ff7
github.com/vjeantet/grok => github.com/signoz/grok v1.0.3
)

// see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/4433
exclude github.com/StackExchange/wmi v1.2.0
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1685,6 +1685,8 @@ github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFR
github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8=
github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/signoz/grok v1.0.3 h1:JHPUTwOKf4YbrCwFYVxcaYAkk1PfjCpIG/6RX+juDOQ=
github.com/signoz/grok v1.0.3/go.mod h1:ax1aAchzC6/QMXMcyzHQGZWaW1l195+uMYIkCWPCNIo=
github.com/sijms/go-ora/v2 v2.5.8 h1:V0ITqRXzZngDw6+A2R3zR4Yf+T/LSNPZ4f9tc+3dDgk=
github.com/sijms/go-ora/v2 v2.5.8/go.mod h1:EHxlY6x7y9HAsdfumurRfTd+v8NrEOTR3Xl4FWlH6xk=
github.com/sirupsen/logrus v1.0.4-0.20170822132746-89742aefa4b2/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
Expand Down
212 changes: 212 additions & 0 deletions pkg/parser/grok/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// copied from https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/stanza/operator/parser/regex/cache.go

package grok

import (
"math"
"sync"
"time"

"go.uber.org/atomic"
)

// cache allows operators to cache a value and look it up later
type cache interface {
get(key string) interface{}
add(key string, data interface{}) bool
copy() map[string]interface{}
maxSize() uint16
}

// newMemoryCache takes a cache size and a limiter interval and
// returns a new memory backed cache
func newMemoryCache(maxSize uint16, interval uint64) *memoryCache {
// start throttling when cache turnover is above 100%
limit := uint64(maxSize) + 1

return &memoryCache{
cache: make(map[string]interface{}),
keys: make(chan string, maxSize),
limiter: newStartedAtomicLimiter(limit, interval),
}
}

// memoryCache is an in memory cache of items with a pre defined
// max size. Memory's underlying storage is a map[string]item
// and does not perform any manipulation of the data. Memory
// is designed to be as fast as possible while being thread safe.
// When the cache is full, new items will evict the oldest
// item using a FIFO style queue.
type memoryCache struct {
// Key / Value pairs of cached items
cache map[string]interface{}

// When the cache is full, the oldest entry's key is
// read from the channel and used to index into the
// cache during cleanup
keys chan string

// All read options will trigger a read lock while all
// write options will trigger a lock
mutex sync.RWMutex

// Limiter rate limits the cache
limiter limiter
}

var _ cache = (&memoryCache{})

// get returns a cached entry, nil if it does not exist
func (m *memoryCache) get(key string) interface{} {
// Read and unlock as fast as possible
m.mutex.RLock()
data := m.cache[key]
m.mutex.RUnlock()

return data
}

// add inserts an item into the cache, if the cache is full, the
// oldest item is removed
func (m *memoryCache) add(key string, data interface{}) bool {
if m.limiter.throttled() {
return false
}

m.mutex.Lock()
defer m.mutex.Unlock()

if len(m.keys) == cap(m.keys) {
// Pop the oldest key from the channel
// and remove it from the cache
delete(m.cache, <-m.keys)

// notify the rate limiter that an entry
// was evicted
m.limiter.increment()
}

// Write the cached entry and add the key
// to the channel
m.cache[key] = data
m.keys <- key
return true
}

// copy returns a deep copy of the cache
func (m *memoryCache) copy() map[string]interface{} {
copy := make(map[string]interface{}, cap(m.keys))

m.mutex.Lock()
defer m.mutex.Unlock()

for k, v := range m.cache {
copy[k] = v
}
return copy
}

// maxSize returns the max size of the cache
func (m *memoryCache) maxSize() uint16 {
return uint16(cap(m.keys))
}

// limiter provides rate limiting methods for
// the cache
type limiter interface {
init()
increment()
currentCount() uint64
limit() uint64
resetInterval() time.Duration
throttled() bool
}

// newStartedAtomicLimiter returns a started atomicLimiter
func newStartedAtomicLimiter(max uint64, interval uint64) *atomicLimiter {
if interval == 0 {
interval = 5
}

a := &atomicLimiter{
count: atomic.NewUint64(0),
max: max,
interval: time.Second * time.Duration(interval),
}

a.init()
return a
}

// atomicLimiter enables rate limiting using an atomic
// counter. When count is >= max, throttled will return
// true. The count is reset on an interval.
type atomicLimiter struct {
count *atomic.Uint64
max uint64
interval time.Duration
start sync.Once
}

var _ limiter = &atomicLimiter{count: atomic.NewUint64(0)}

// init initializes the limiter
func (l *atomicLimiter) init() {
// start the reset go routine once
l.start.Do(func() {
go func() {
// During every interval period, reduce the counter
// by 10%
x := math.Round(-0.10 * float64(l.max))
for {
time.Sleep(l.interval)
if l.currentCount() > 0 {
l.count.Add(^uint64(x))
}
}
}()
})
}

// increment increments the atomic counter
func (l *atomicLimiter) increment() {
if l.count.Load() == l.max {
return
}
l.count.Inc()
}

// Returns true if the cache is currently throttled, meaning a high
// number of evictions have recently occurred due to the cache being
// full. When the cache is constantly locked, reads and writes are
// blocked, causing the regex parser to be slower than if it was
// not caching at all.
func (l *atomicLimiter) throttled() bool {
return l.currentCount() >= l.max
}

func (l *atomicLimiter) currentCount() uint64 {
return l.count.Load()
}

func (l *atomicLimiter) limit() uint64 {
return l.max
}

func (l *atomicLimiter) resetInterval() time.Duration {
return l.interval
}
Loading

0 comments on commit b6e35ee

Please sign in to comment.