diff --git a/components/components.go b/components/components.go index 23c1f1ba..7412fcb0 100644 --- a/components/components.go +++ b/components/components.go @@ -108,6 +108,7 @@ import ( "github.com/SigNoz/signoz-otel-collector/exporter/clickhouselogsexporter" "github.com/SigNoz/signoz-otel-collector/exporter/clickhousemetricsexporter" "github.com/SigNoz/signoz-otel-collector/exporter/clickhousetracesexporter" + _ "github.com/SigNoz/signoz-otel-collector/pkg/parser/grok" "github.com/SigNoz/signoz-otel-collector/processor/signozspanmetricsprocessor" ) diff --git a/go.mod b/go.mod index f26efe1d..e03ae1fb 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/extension/pprofextension v0.66.0 github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.66.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry v0.66.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.66.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.66.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/cumulativetodeltaprocessor v0.66.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatorateprocessor v0.66.0 @@ -111,6 +112,7 @@ require ( github.com/sirupsen/logrus v1.9.0 github.com/spf13/viper v1.14.0 github.com/stretchr/testify v1.8.1 + github.com/vjeantet/grok v1.0.1 go.opencensus.io v0.24.0 go.opentelemetry.io/collector v0.66.0 go.opentelemetry.io/collector/component v0.66.0 @@ -127,6 +129,7 @@ require ( go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.66.0 go.opentelemetry.io/collector/receiver/otlpreceiver v0.66.0 go.opentelemetry.io/collector/semconv v0.66.0 + go.uber.org/atomic v1.10.0 go.uber.org/multierr v1.8.0 go.uber.org/zap v1.23.0 google.golang.org/grpc v1.51.0 @@ -320,7 +323,6 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.66.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.66.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.66.0 // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.66.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.66.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus v0.66.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.66.0 // indirect @@ -394,7 +396,6 @@ require ( go.opentelemetry.io/otel/sdk v1.11.1 // indirect go.opentelemetry.io/otel/sdk/metric v0.33.0 // indirect go.opentelemetry.io/otel/trace v1.11.1 // indirect - go.uber.org/atomic v1.10.0 // indirect go.uber.org/goleak v1.2.0 // indirect golang.org/x/crypto v0.1.0 // indirect golang.org/x/exp v0.0.0-20221019170559-20944726eadf // indirect @@ -431,7 +432,10 @@ require ( sigs.k8s.io/yaml v1.3.0 // indirect ) -replace github.com/golang-migrate/migrate/v4 => github.com/sergey-telpuk/migrate/v4 v4.15.3-0.20220303065225-d5ae59d12ff7 +replace ( + github.com/golang-migrate/migrate/v4 => github.com/sergey-telpuk/migrate/v4 v4.15.3-0.20220303065225-d5ae59d12ff7 + github.com/vjeantet/grok => github.com/signoz/grok v1.0.3 +) // see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/4433 exclude github.com/StackExchange/wmi v1.2.0 diff --git a/go.sum b/go.sum index fba0d55b..7d28dac6 100644 --- a/go.sum +++ b/go.sum @@ -1685,6 +1685,8 @@ github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFR github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= +github.com/signoz/grok v1.0.3 h1:JHPUTwOKf4YbrCwFYVxcaYAkk1PfjCpIG/6RX+juDOQ= +github.com/signoz/grok v1.0.3/go.mod h1:ax1aAchzC6/QMXMcyzHQGZWaW1l195+uMYIkCWPCNIo= github.com/sijms/go-ora/v2 v2.5.8 h1:V0ITqRXzZngDw6+A2R3zR4Yf+T/LSNPZ4f9tc+3dDgk= github.com/sijms/go-ora/v2 v2.5.8/go.mod h1:EHxlY6x7y9HAsdfumurRfTd+v8NrEOTR3Xl4FWlH6xk= github.com/sirupsen/logrus v1.0.4-0.20170822132746-89742aefa4b2/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc= diff --git a/pkg/parser/grok/cache.go b/pkg/parser/grok/cache.go new file mode 100644 index 00000000..f8d6cd4d --- /dev/null +++ b/pkg/parser/grok/cache.go @@ -0,0 +1,212 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// copied from https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/stanza/operator/parser/regex/cache.go + +package grok + +import ( + "math" + "sync" + "time" + + "go.uber.org/atomic" +) + +// cache allows operators to cache a value and look it up later +type cache interface { + get(key string) interface{} + add(key string, data interface{}) bool + copy() map[string]interface{} + maxSize() uint16 +} + +// newMemoryCache takes a cache size and a limiter interval and +// returns a new memory backed cache +func newMemoryCache(maxSize uint16, interval uint64) *memoryCache { + // start throttling when cache turnover is above 100% + limit := uint64(maxSize) + 1 + + return &memoryCache{ + cache: make(map[string]interface{}), + keys: make(chan string, maxSize), + limiter: newStartedAtomicLimiter(limit, interval), + } +} + +// memoryCache is an in memory cache of items with a pre defined +// max size. Memory's underlying storage is a map[string]item +// and does not perform any manipulation of the data. Memory +// is designed to be as fast as possible while being thread safe. +// When the cache is full, new items will evict the oldest +// item using a FIFO style queue. +type memoryCache struct { + // Key / Value pairs of cached items + cache map[string]interface{} + + // When the cache is full, the oldest entry's key is + // read from the channel and used to index into the + // cache during cleanup + keys chan string + + // All read options will trigger a read lock while all + // write options will trigger a lock + mutex sync.RWMutex + + // Limiter rate limits the cache + limiter limiter +} + +var _ cache = (&memoryCache{}) + +// get returns a cached entry, nil if it does not exist +func (m *memoryCache) get(key string) interface{} { + // Read and unlock as fast as possible + m.mutex.RLock() + data := m.cache[key] + m.mutex.RUnlock() + + return data +} + +// add inserts an item into the cache, if the cache is full, the +// oldest item is removed +func (m *memoryCache) add(key string, data interface{}) bool { + if m.limiter.throttled() { + return false + } + + m.mutex.Lock() + defer m.mutex.Unlock() + + if len(m.keys) == cap(m.keys) { + // Pop the oldest key from the channel + // and remove it from the cache + delete(m.cache, <-m.keys) + + // notify the rate limiter that an entry + // was evicted + m.limiter.increment() + } + + // Write the cached entry and add the key + // to the channel + m.cache[key] = data + m.keys <- key + return true +} + +// copy returns a deep copy of the cache +func (m *memoryCache) copy() map[string]interface{} { + copy := make(map[string]interface{}, cap(m.keys)) + + m.mutex.Lock() + defer m.mutex.Unlock() + + for k, v := range m.cache { + copy[k] = v + } + return copy +} + +// maxSize returns the max size of the cache +func (m *memoryCache) maxSize() uint16 { + return uint16(cap(m.keys)) +} + +// limiter provides rate limiting methods for +// the cache +type limiter interface { + init() + increment() + currentCount() uint64 + limit() uint64 + resetInterval() time.Duration + throttled() bool +} + +// newStartedAtomicLimiter returns a started atomicLimiter +func newStartedAtomicLimiter(max uint64, interval uint64) *atomicLimiter { + if interval == 0 { + interval = 5 + } + + a := &atomicLimiter{ + count: atomic.NewUint64(0), + max: max, + interval: time.Second * time.Duration(interval), + } + + a.init() + return a +} + +// atomicLimiter enables rate limiting using an atomic +// counter. When count is >= max, throttled will return +// true. The count is reset on an interval. +type atomicLimiter struct { + count *atomic.Uint64 + max uint64 + interval time.Duration + start sync.Once +} + +var _ limiter = &atomicLimiter{count: atomic.NewUint64(0)} + +// init initializes the limiter +func (l *atomicLimiter) init() { + // start the reset go routine once + l.start.Do(func() { + go func() { + // During every interval period, reduce the counter + // by 10% + x := math.Round(-0.10 * float64(l.max)) + for { + time.Sleep(l.interval) + if l.currentCount() > 0 { + l.count.Add(^uint64(x)) + } + } + }() + }) +} + +// increment increments the atomic counter +func (l *atomicLimiter) increment() { + if l.count.Load() == l.max { + return + } + l.count.Inc() +} + +// Returns true if the cache is currently throttled, meaning a high +// number of evictions have recently occurred due to the cache being +// full. When the cache is constantly locked, reads and writes are +// blocked, causing the regex parser to be slower than if it was +// not caching at all. +func (l *atomicLimiter) throttled() bool { + return l.currentCount() >= l.max +} + +func (l *atomicLimiter) currentCount() uint64 { + return l.count.Load() +} + +func (l *atomicLimiter) limit() uint64 { + return l.max +} + +func (l *atomicLimiter) resetInterval() time.Duration { + return l.interval +} diff --git a/pkg/parser/grok/cache_test.go b/pkg/parser/grok/cache_test.go new file mode 100644 index 00000000..a5eb322c --- /dev/null +++ b/pkg/parser/grok/cache_test.go @@ -0,0 +1,275 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// copied from https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/stanza/operator/parser/regex/cache_test.go + +package grok + +import ( + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.uber.org/atomic" +) + +func TestNewMemoryCache(t *testing.T) { + cases := []struct { + name string + maxSize uint16 + expect *memoryCache + expectSize int + }{ + { + "size-50", + 50, + &memoryCache{ + cache: make(map[string]interface{}), + keys: make(chan string, 50), + }, + 50, + }, + } + + for _, tc := range cases { + output := newMemoryCache(tc.maxSize, 0) + require.Equal(t, tc.expect.cache, output.cache) + require.Len(t, output.cache, 0, "new memory should always be empty") + require.Len(t, output.keys, 0, "new memory should always be empty") + require.Equal(t, tc.expectSize, cap(output.keys), "keys channel should have cap of expected size") + } +} + +func TestMemory(t *testing.T) { + cases := []struct { + name string + cache *memoryCache + input map[string]interface{} + expect *memoryCache + }{ + { + "basic", + func() *memoryCache { + return newMemoryCache(3, 0) + }(), + map[string]interface{}{ + "key": "value", + "map-value": map[string]string{ + "x": "y", + "dev": "stanza", + }, + }, + &memoryCache{ + cache: map[string]interface{}{ + "key": "value", + "map-value": map[string]string{ + "x": "y", + "dev": "stanza", + }, + }, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + for key, value := range tc.input { + tc.cache.add(key, value) + out := tc.cache.get(key) + require.NotNil(t, out, "expected to get value from cache immediately after adding it") + require.Equal(t, value, out, "expected value to equal the value that was added to the cache") + } + + require.Equal(t, len(tc.expect.cache), len(tc.cache.cache)) + + for expectKey, expectItem := range tc.expect.cache { + actual := tc.cache.get(expectKey) + require.NotNil(t, actual) + require.Equal(t, expectItem, actual) + } + }) + } +} + +// A full cache should replace the oldest element with the new element +func TestCleanupLast(t *testing.T) { + maxSize := 10 + + m := newMemoryCache(uint16(maxSize), 0) + + // Add to cache until it is full + for i := 0; i <= cap(m.keys); i++ { + str := strconv.Itoa(i) + m.add(str, i) + } + + // make sure the cache looks the way we expect + expectCache := map[string]interface{}{ + "1": 1, // oldest key, will be removed when 11 is added + "2": 2, + "3": 3, + "4": 4, + "5": 5, + "6": 6, + "7": 7, + "8": 8, + "9": 9, + "10": 10, // youngest key, will be removed when 20 is added + } + require.Equal(t, expectCache, m.cache) + require.Len(t, m.cache, maxSize) + require.Len(t, m.keys, maxSize) + + // for every additional key, the oldest should be removed + // 1, 2, 3 and so on. + for i := 11; i <= 20; i++ { + str := strconv.Itoa(i) + m.add(str, i) + + removedKey := strconv.Itoa(i - 10) + x := m.get(removedKey) + require.Nil(t, x, "expected key %s to have been removed", removedKey) + require.Len(t, m.cache, maxSize) + } + + // All entries should have been replaced by now + expectCache = map[string]interface{}{ + "11": 11, + "12": 12, + "13": 13, + "14": 14, + "15": 15, + "16": 16, + "17": 17, + "18": 18, + "19": 19, + "20": 20, + } + require.Equal(t, expectCache, m.cache) + require.Len(t, m.cache, maxSize) +} + +func TestNewStartedAtomicLimiter(t *testing.T) { + cases := []struct { + name string + max uint64 + interval uint64 + }{ + { + "default", + 0, + 0, + }, + { + "max", + 30, + 0, + }, + { + "interval", + 0, + 3, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + l := newStartedAtomicLimiter(tc.max, tc.interval) + require.Equal(t, tc.max, l.max) + if tc.interval == 0 { + // default + tc.interval = 5 + } + require.Equal(t, float64(tc.interval), l.interval.Seconds()) + require.Equal(t, uint64(0), l.currentCount()) + }) + } +} + +// Start a limiter with a max of 3 and ensure throttling begins +func TestLimiter(t *testing.T) { + max := uint64(3) + + l := newStartedAtomicLimiter(max, 120) + require.NotNil(t, l) + require.Equal(t, max, l.max) + + require.False(t, l.throttled(), "new limiter should not be throttling") + require.Equal(t, uint64(0), l.currentCount()) + + var i uint64 + for i = 1; i < max; i++ { + l.increment() + require.Equal(t, i, l.currentCount()) + require.False(t, l.throttled()) + } + + l.increment() + require.True(t, l.throttled()) +} + +func TestThrottledLimiter(t *testing.T) { + max := uint64(3) + + // Limiter with a count higher than the max, which will force + // it to be throttled by default. Also note that the init method + // has not been called yet, so the reset go routine is not running + l := atomicLimiter{ + max: max, + count: atomic.NewUint64(max + 1), + interval: 1, + } + + require.True(t, l.throttled()) + + // Test the reset go routine by calling init() and waiting + // for it to reset the counter. The limiter will no longer + // be in a throttled state and the count will be reset. + l.init() + wait := 2 * l.interval + time.Sleep(time.Second * wait) + require.False(t, l.throttled()) + require.Equal(t, uint64(0), l.currentCount()) +} + +func TestThrottledCache(t *testing.T) { + c := newMemoryCache(3, 120) + require.False(t, c.limiter.throttled()) + require.Equal(t, 4, int(c.limiter.limit()), "expected limit be cache size + 1") + require.Equal(t, float64(120), c.limiter.resetInterval().Seconds(), "expected reset interval to be 120 seconds") + + // fill the cache and cause 100% evictions + for i := 1; i <= 6; i++ { + key := strconv.Itoa(i) + value := i + c.add(key, value) + require.False(t, c.limiter.throttled()) + } + + // limiter is incremented after cache is full. a cache of size 3 + // with 6 additions will cause the limiter to be set to 3. + require.Equal(t, 3, int(c.limiter.currentCount()), "expected limit count to be 3 after 6 additions to the cache") + + // 7th addition will be throttled because the cache + // has already reached 100% eviction rate + c.add("7", "should be limited") + require.True(t, c.limiter.throttled()) + + // 8th addition will skip adding to the cache + // because the 7th addition enabled the limiter + result := c.add("8", "add miss") + require.True(t, c.limiter.throttled()) + require.False(t, result, "expected add to return false when cache writes are throttled") +} diff --git a/pkg/parser/grok/config_test.go b/pkg/parser/grok/config_test.go new file mode 100644 index 00000000..997d1c4d --- /dev/null +++ b/pkg/parser/grok/config_test.go @@ -0,0 +1,144 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package grok + +import ( + "path/filepath" + "testing" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/operatortest" +) + +func TestParserGoldenConfig(t *testing.T) { + operatortest.ConfigUnmarshalTests{ + DefaultConfig: NewConfig(), + TestsFile: filepath.Join(".", "testdata", "config.yaml"), + Tests: []operatortest.ConfigUnmarshalTest{ + { + Name: "default", + Expect: NewConfig(), + }, + { + Name: "cache", + Expect: func() *Config { + cfg := NewConfig() + cfg.Cache.Size = 50 + return cfg + }(), + }, + { + Name: "parse_from_simple", + Expect: func() *Config { + cfg := NewConfig() + cfg.ParseFrom = entry.NewBodyField("from") + return cfg + }(), + }, + { + Name: "parse_to_simple", + Expect: func() *Config { + cfg := NewConfig() + cfg.ParseTo = entry.RootableField{Field: entry.NewBodyField("log")} + return cfg + }(), + }, + { + Name: "on_error_drop", + Expect: func() *Config { + cfg := NewConfig() + cfg.OnError = "drop" + return cfg + }(), + }, + { + Name: "timestamp", + Expect: func() *Config { + cfg := NewConfig() + parseField := entry.NewBodyField("timestamp_field") + newTime := helper.TimeParser{ + LayoutType: "strptime", + Layout: "%Y-%m-%d", + ParseFrom: &parseField, + } + cfg.TimeParser = &newTime + return cfg + }(), + }, + { + Name: "severity", + Expect: func() *Config { + cfg := NewConfig() + parseField := entry.NewBodyField("severity_field") + severityParser := helper.NewSeverityConfig() + severityParser.ParseFrom = &parseField + mapping := map[string]interface{}{ + "critical": "5xx", + "error": "4xx", + "info": "3xx", + "debug": "2xx", + } + severityParser.Mapping = mapping + cfg.SeverityConfig = &severityParser + return cfg + }(), + }, + { + Name: "pattern", + Expect: func() *Config { + cfg := NewConfig() + cfg.Pattern = "a=%{NOTSPACE:data}" + return cfg + }(), + }, + { + Name: "scope_name", + Expect: func() *Config { + cfg := NewConfig() + cfg.Pattern = "a=%{NOTSPACE:data}" + parseField := entry.NewBodyField("logger_name_field") + loggerNameParser := helper.NewScopeNameParser() + loggerNameParser.ParseFrom = parseField + cfg.ScopeNameParser = &loggerNameParser + return cfg + }(), + }, + { + Name: "parse_to_attributes", + Expect: func() *Config { + p := NewConfig() + p.ParseTo = entry.RootableField{Field: entry.NewAttributeField()} + return p + }(), + }, + { + Name: "parse_to_body", + Expect: func() *Config { + p := NewConfig() + p.ParseTo = entry.RootableField{Field: entry.NewBodyField()} + return p + }(), + }, + { + Name: "parse_to_resource", + Expect: func() *Config { + p := NewConfig() + p.ParseTo = entry.RootableField{Field: entry.NewResourceField()} + return p + }(), + }, + }, + }.Run(t) +} diff --git a/pkg/parser/grok/grok.go b/pkg/parser/grok/grok.go new file mode 100644 index 00000000..31a94bf9 --- /dev/null +++ b/pkg/parser/grok/grok.go @@ -0,0 +1,164 @@ +package grok + +import ( + "context" + "fmt" + + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + "github.com/vjeantet/grok" +) + +const operatorType = "grok_parser" + +func init() { + operator.Register(operatorType, func() operator.Builder { return NewConfig() }) +} + +// NewConfig creates a new grok parser config with default values +func NewConfig() *Config { + return NewConfigWithID(operatorType) +} + +// NewConfigWithID creates a new grok parser config with default values +func NewConfigWithID(operatorID string) *Config { + return &Config{ + ParserConfig: helper.NewParserConfig(operatorID, operatorType), + } +} + +// Config is the configuration of a grok parser operator. +type Config struct { + helper.ParserConfig `mapstructure:",squash"` + + // grok pattern + Pattern string `mapstructure:"pattern"` + + Cache struct { + Size uint16 `mapstructure:"size"` + } `mapstructure:"cache"` + + // array of keys that will be present in the final values + Include []string `mapstructure:"include"` + + // array of keys that will be removed from the final values + Exclude []string `mapstructure:"exclude"` +} + +// Build will build a grok parser operator. +func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { + parserOperator, err := c.ParserConfig.Build(logger) + if err != nil { + return nil, err + } + + if c.Pattern == "" { + return nil, fmt.Errorf("missing required field 'pattern'") + } + + g, err := grok.NewWithConfig(&grok.Config{NamedCapturesOnly: true}) + if err != nil { + return nil, fmt.Errorf("new grok object: %w", err) + } + + include := make(map[string]struct{}) + exclude := make(map[string]struct{}) + includeKeysPresent := false + excludeKeysPresent := false + if c.Include != nil && len(c.Include) > 0 { + includeKeysPresent = true + for _, k := range c.Include { + include[k] = struct{}{} + } + } + if c.Exclude != nil && len(c.Exclude) > 0 { + excludeKeysPresent = true + for _, k := range c.Exclude { + exclude[k] = struct{}{} + } + } + + op := &Parser{ + ParserOperator: parserOperator, + grok: g, + pattern: c.Pattern, + includeKeysPresent: includeKeysPresent, + include: include, + excludeKeysPresent: excludeKeysPresent, + exclude: exclude, + } + + if c.Cache.Size > 0 { + op.cache = newMemoryCache(c.Cache.Size, 0) + logger.Debugf("configured %s with memory cache of size %d", op.ID(), op.cache.maxSize()) + } + + return op, nil +} + +// Parser is an operator that parses grok in an entry. +type Parser struct { + helper.ParserOperator + grok *grok.Grok + pattern string + cache cache + includeKeysPresent bool + include map[string]struct{} + excludeKeysPresent bool + exclude map[string]struct{} +} + +// Process will parse an entry for grok. +func (r *Parser) Process(ctx context.Context, entry *entry.Entry) error { + return r.ParserOperator.ProcessWith(ctx, entry, r.parse) +} + +// parse will parse a value using the supplied grok. +func (r *Parser) parse(value interface{}) (interface{}, error) { + var raw string + switch m := value.(type) { + case string: + raw = m + default: + return nil, fmt.Errorf("type '%T' cannot be parsed as grok", value) + } + return r.match(raw) +} + +func (r *Parser) match(value string) (interface{}, error) { + if r.cache != nil { + if x := r.cache.get(value); x != nil { + return x, nil + } + } + + values, err := r.grok.ParseTyped(r.pattern, value) + if err != nil { + return nil, fmt.Errorf("failed to parse log: %v", err.Error()) + } + + parsedValues := map[string]interface{}{} + for k, v := range values { + if r.excludeKeysPresent { + if _, ok := r.exclude[k]; ok { + continue + } + } + if r.includeKeysPresent { + if _, ok := r.include[k]; ok { + parsedValues[k] = v + } + } else { + parsedValues[k] = v + } + } + + if r.cache != nil { + r.cache.add(value, parsedValues) + } + + return parsedValues, nil +} diff --git a/pkg/parser/grok/grok_test.go b/pkg/parser/grok/grok_test.go new file mode 100644 index 00000000..585024ba --- /dev/null +++ b/pkg/parser/grok/grok_test.go @@ -0,0 +1,351 @@ +// reference from https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/stanza/operator/parser/regex/regex.go + +package grok + +import ( + "context" + "fmt" + "math/rand" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/testutil" +) + +var ( + apacheCommonLog = `252.18.112.145 - kshlerin6026 [23/Dec/2022:04:37:05 +0000] "POST /architectures/back-end/relationships/value-added HTTP/2.0" 203 25439` +) + +func newTestParser(t *testing.T, pattern string, cacheSize uint16) *Parser { + cfg := NewConfigWithID("test") + cfg.Pattern = pattern + if cacheSize > 0 { + cfg.Cache.Size = cacheSize + } + op, err := cfg.Build(testutil.Logger(t)) + require.NoError(t, err) + return op.(*Parser) +} + +func TestParserBuildFailure(t *testing.T) { + cfg := NewConfigWithID("test") + cfg.OnError = "invalid_on_error" + _, err := cfg.Build(testutil.Logger(t)) + require.Error(t, err) + require.Contains(t, err.Error(), "invalid `on_error` field") +} + +func TestParserByteFailure(t *testing.T) { + parser := newTestParser(t, "%{COMMONAPACHELOG}", 0) + _, err := parser.parse([]byte("invalid")) + require.Error(t, err) + require.Contains(t, err.Error(), "type '[]uint8' cannot be parsed as grok") +} + +func TestParserInvalidType(t *testing.T) { + parser := newTestParser(t, "%{COMMONAPACHELOG}", 0) + _, err := parser.parse([]int{}) + require.Error(t, err) + require.Contains(t, err.Error(), "type '[]int' cannot be parsed as grok") +} + +func TestParserStringEmpty(t *testing.T) { + parser := newTestParser(t, "%{COMMONAPACHELOG}", 0) + res, err := parser.parse("invalid") + require.NoError(t, err) + require.Equal(t, 0, len(res.(map[string]interface{}))) +} + +func TestParserCache(t *testing.T) { + parser := newTestParser(t, "%{COMMONAPACHELOG}", 100) + require.NotNil(t, parser.cache, "expected cache to be configured") + require.Equal(t, parser.cache.maxSize(), uint16(100)) +} + +func TestParserGrok(t *testing.T) { + cases := []struct { + name string + configure func(*Config) + input *entry.Entry + expected *entry.Entry + }{ + { + "grok pattern with name specified", + func(p *Config) { + p.Pattern = "a=%{NOTSPACE:data}" + p.Cache.Size = 100 + }, + &entry.Entry{ + Body: "a=b", + }, + &entry.Entry{ + Body: "a=b", + Attributes: map[string]interface{}{ + "data": "b", + }, + }, + }, + { + "use existing grok template", + func(p *Config) { + p.Pattern = `%{COMMONAPACHELOG}` + p.Cache.Size = 100 + }, + &entry.Entry{ + Body: apacheCommonLog, + }, + &entry.Entry{ + Body: apacheCommonLog, + Attributes: map[string]interface{}{ + "ident": "-", + "httpversion": "2.0", + "rawrequest": "", + "bytes": "25439", + "clientip": "252.18.112.145", + "auth": "kshlerin6026", + "timestamp": "23/Dec/2022:04:37:05 +0000", + "verb": "POST", + "request": "/architectures/back-end/relationships/value-added", + "response": "203", + }, + }, + }, + { + "grok pattern with type specified", + func(p *Config) { + p.Pattern = `%{IPV4:ip:string} %{NUMBER:status:int} %{NUMBER:duration:float}` + p.Cache.Size = 100 + }, + &entry.Entry{ + Body: "127.0.0.1 200 0.8", + }, + &entry.Entry{ + Body: "127.0.0.1 200 0.8", + Attributes: map[string]interface{}{ + "ip": "127.0.0.1", + "status": 200, + "duration": 0.8, + }, + }, + }, + { + "parser with include in config", + func(p *Config) { + p.Pattern = `%{COMMONAPACHELOG}` + p.Cache.Size = 100 + p.Include = []string{"timestamp", "auth", "response"} + }, + &entry.Entry{ + Body: apacheCommonLog, + }, + &entry.Entry{ + Body: apacheCommonLog, + Attributes: map[string]interface{}{ + "auth": "kshlerin6026", + "timestamp": "23/Dec/2022:04:37:05 +0000", + "response": "203", + }, + }, + }, + { + "parser with exclude in config", + func(p *Config) { + p.Pattern = `%{COMMONAPACHELOG}` + p.Cache.Size = 100 + p.Exclude = []string{"ident", "httpversion", "rawrequest", "bytes", "clientip", "request"} + }, + &entry.Entry{ + Body: apacheCommonLog, + }, + &entry.Entry{ + Body: apacheCommonLog, + Attributes: map[string]interface{}{ + "auth": "kshlerin6026", + "timestamp": "23/Dec/2022:04:37:05 +0000", + "verb": "POST", + "response": "203", + }, + }, + }, + { + "parser with both include and exclude in config", + func(p *Config) { + p.Pattern = `%{COMMONAPACHELOG}` + p.Cache.Size = 100 + p.Exclude = []string{"httpversion", "rawrequest", "bytes", "clientip", "request", "ident"} + p.Include = []string{"timestamp", "auth", "response", "ident"} + }, + &entry.Entry{ + Body: apacheCommonLog, + }, + &entry.Entry{ + Body: apacheCommonLog, + Attributes: map[string]interface{}{ + "auth": "kshlerin6026", + "timestamp": "23/Dec/2022:04:37:05 +0000", + "response": "203", + }, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + cfg := NewConfigWithID("test") + cfg.OutputIDs = []string{"fake"} + tc.configure(cfg) + + op, err := cfg.Build(testutil.Logger(t)) + require.NoError(t, err) + + fake := testutil.NewFakeOutput(t) + require.NoError(t, op.SetOutputs([]operator.Operator{fake})) + + ots := time.Now() + tc.input.ObservedTimestamp = ots + tc.expected.ObservedTimestamp = ots + + err = op.Process(context.Background(), tc.input) + require.NoError(t, err) + + fake.ExpectEntry(t, tc.expected) + }) + } +} + +// return 100 unique file names, example: +// dafplsjfbcxoeff-5644d7b6d9-mzngq_kube-system_coredns-901f7510281180a402936c92f5bc0f3557f5a21ccb5a4591c5bf98f3ddbffdd6.log +// rswxpldnjobcsnv-5644d7b6d9-mzngq_kube-system_coredns-901f7510281180a402936c92f5bc0f3557f5a21ccb5a4591c5bf98f3ddbffdd6.log +// lgtemapezqleqyh-5644d7b6d9-mzngq_kube-system_coredns-901f7510281180a402936c92f5bc0f3557f5a21ccb5a4591c5bf98f3ddbffdd6.log +func benchParseInput() (patterns []string) { + const letterBytes = "abcdefghijklmnopqrstuvwxyz" + for i := 1; i <= 100; i++ { + b := make([]byte, 15) + for i := range b { + b[i] = letterBytes[rand.Intn(len(letterBytes))] + } + randomStr := string(b) + p := fmt.Sprintf("%s-5644d7b6d9-mzngq_kube-system_coredns-901f7510281180a402936c92f5bc0f3557f5a21ccb5a4591c5bf98f3ddbffdd6.log", randomStr) + patterns = append(patterns, p) + } + return patterns +} + +// Grok pattern use to parse a apache common log +const benchParsePattern = `%{DATA:podname}_%{DATA:namespace}_%{GREEDYDATA:container_name}-%{WORD:container_id}.log` + +var benchParsePatterns = benchParseInput() + +func newTestBenchParser(t *testing.T, cacheSize uint16) *Parser { + cfg := NewConfigWithID("bench") + cfg.Pattern = benchParsePattern + cfg.Cache.Size = cacheSize + + op, err := cfg.Build(testutil.Logger(t)) + require.NoError(t, err) + return op.(*Parser) +} + +func benchmarkParseThreaded(b *testing.B, parser *Parser, input []string) { + wg := sync.WaitGroup{} + + for _, i := range input { + wg.Add(1) + + go func(i string) { + if _, err := parser.match(i); err != nil { + b.Error(err) + } + wg.Done() + }(i) + } + + wg.Wait() +} + +func benchmarkParse(b *testing.B, parser *Parser, input []string) { + for _, i := range input { + if _, err := parser.match(i); err != nil { + b.Error(err) + } + } +} + +// No cache +func BenchmarkParseNoCache(b *testing.B) { + parser := newTestBenchParser(&testing.T{}, 0) + for n := 0; n < b.N; n++ { + benchmarkParseThreaded(b, parser, benchParsePatterns) + } +} + +// Memory cache at capacity +func BenchmarkParseWithMemoryCache(b *testing.B) { + parser := newTestBenchParser(&testing.T{}, 100) + for n := 0; n < b.N; n++ { + benchmarkParseThreaded(b, parser, benchParsePatterns) + } +} + +// Memory cache over capacity by one +func BenchmarkParseWithMemoryCacheFullByOne(b *testing.B) { + parser := newTestBenchParser(&testing.T{}, 99) + for n := 0; n < b.N; n++ { + benchmarkParseThreaded(b, parser, benchParsePatterns) + } +} + +// Memory cache over capacity by 10 +func BenchmarkParseWithMemoryCacheFullBy10(b *testing.B) { + parser := newTestBenchParser(&testing.T{}, 90) + for n := 0; n < b.N; n++ { + benchmarkParseThreaded(b, parser, benchParsePatterns) + } +} + +// Memory cache over capacity by 50 +func BenchmarkParseWithMemoryCacheFullBy50(b *testing.B) { + parser := newTestBenchParser(&testing.T{}, 50) + for n := 0; n < b.N; n++ { + benchmarkParseThreaded(b, parser, benchParsePatterns) + } +} + +// Memory cache over capacity by 90 +func BenchmarkParseWithMemoryCacheFullBy90(b *testing.B) { + parser := newTestBenchParser(&testing.T{}, 10) + for n := 0; n < b.N; n++ { + benchmarkParseThreaded(b, parser, benchParsePatterns) + } +} + +// Memory cache over capacity by 99 +func BenchmarkParseWithMemoryCacheFullBy99(b *testing.B) { + parser := newTestBenchParser(&testing.T{}, 1) + for n := 0; n < b.N; n++ { + benchmarkParseThreaded(b, parser, benchParsePatterns) + } +} + +// No cache one file +func BenchmarkParseNoCacheOneFile(b *testing.B) { + parser := newTestBenchParser(&testing.T{}, 0) + for n := 0; n < b.N; n++ { + pattern := []string{benchParsePatterns[0]} + benchmarkParse(b, parser, pattern) + } +} + +// Memory cache one file +func BenchmarkParseWithMemoryCacheOneFile(b *testing.B) { + parser := newTestBenchParser(&testing.T{}, 100) + for n := 0; n < b.N; n++ { + pattern := []string{benchParsePatterns[0]} + benchmarkParse(b, parser, pattern) + } +} diff --git a/pkg/parser/grok/testdata/config.yaml b/pkg/parser/grok/testdata/config.yaml new file mode 100644 index 00000000..070dbe0c --- /dev/null +++ b/pkg/parser/grok/testdata/config.yaml @@ -0,0 +1,47 @@ +cache: + type: grok_parser + cache: + size: 50 +default: + type: grok_parser +on_error_drop: + type: grok_parser + on_error: "drop" +parse_from_simple: + type: grok_parser + parse_from: "body.from" +parse_to_attributes: + type: grok_parser + parse_to: attributes +parse_to_body: + type: grok_parser + parse_to: body +parse_to_resource: + type: grok_parser + parse_to: resource +parse_to_simple: + type: grok_parser + parse_to: "body.log" +pattern: + type: grok_parser + pattern: "a=%{NOTSPACE:data}" +scope_name: + type: grok_parser + pattern: "a=%{NOTSPACE:data}" + scope_name: + parse_from: body.logger_name_field +severity: + type: grok_parser + severity: + parse_from: body.severity_field + mapping: + critical: 5xx + error: 4xx + info: 3xx + debug: 2xx +timestamp: + type: grok_parser + timestamp: + parse_from: body.timestamp_field + layout_type: strptime + layout: "%Y-%m-%d"