Skip to content

Commit

Permalink
feat: support for json logs (#205)
Browse files Browse the repository at this point in the history
* feat: support for json logs

* fix: comments addressed

* feat: updated parser to convert top level fields to attributes

* feat: tests updated
  • Loading branch information
nityanandagohain authored Oct 25, 2023
1 parent 6dd675c commit 4215d9a
Show file tree
Hide file tree
Showing 12 changed files with 442 additions and 47 deletions.
6 changes: 3 additions & 3 deletions receiver/httpreceiver/bodyparser/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (
type Default struct {
}

func (l *Default) Parse(body []byte) (plog.Logs, int) {
func (l *Default) Parse(body []byte) (plog.Logs, int, error) {
// split by newline and return
// TODO: add configuration for multiline
ld := plog.NewLogs()
data := string(body)
if data == "" {
return ld, 0
return ld, 0, nil
}
rl := ld.ResourceLogs().AppendEmpty()
sl := rl.ScopeLogs().AppendEmpty()
Expand All @@ -27,5 +27,5 @@ func (l *Default) Parse(body []byte) (plog.Logs, int) {
l.Body().SetStr(log)
l.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now().UTC()))
}
return ld, len(loglines)
return ld, len(loglines), nil
}
2 changes: 1 addition & 1 deletion receiver/httpreceiver/bodyparser/default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestDefaultParse(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
res, _ := d.Parse([]byte(tt.PayLoad))
res, _, _ := d.Parse([]byte(tt.PayLoad))
logs := tt.Logs()
assert.NoError(t, plogtest.CompareLogs(logs, res, plogtest.IgnoreObservedTimestamp()))
})
Expand Down
4 changes: 2 additions & 2 deletions receiver/httpreceiver/bodyparser/google.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ import "go.opentelemetry.io/collector/pdata/plog"
type GCloud struct {
}

func (l *GCloud) Parse(body []byte) (plog.Logs, int) {
return plog.Logs{}, 0
func (l *GCloud) Parse(body []byte) (plog.Logs, int, error) {
return plog.Logs{}, 0, nil
}
6 changes: 3 additions & 3 deletions receiver/httpreceiver/bodyparser/heroku.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Heroku struct {
names []string
}

func NewHeroku() *Heroku {
func NewHerokuBodyParser() *Heroku {
regex, err := regexp.Compile(`^<(?P<priority>\d|\d{2}|1[1-8]\d|19[01])>(?P<version>\d{1,2})\s(?P<timestamp>-|[^\s]+)\s(?P<hostname>[\S]{1,255})\s(?P<appname>[\S]{1,48})\s(?P<procid>[\S]{1,128})\s(?P<msgid>[\S]{1,32})(?:\s(?P<msg>.+))?$`)
if err != nil {
panic(err)
Expand All @@ -41,7 +41,7 @@ type log struct {
body string
}

func (l *Heroku) Parse(body []byte) (plog.Logs, int) {
func (l *Heroku) Parse(body []byte) (plog.Logs, int, error) {
data := string(body)

loglines := octetCountingSplitter(data)
Expand Down Expand Up @@ -99,7 +99,7 @@ func (l *Heroku) Parse(body []byte) (plog.Logs, int) {
}
}
}
return ld, len(loglines)
return ld, len(loglines), nil

}

Expand Down
68 changes: 34 additions & 34 deletions receiver/httpreceiver/bodyparser/heroku_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,66 +12,66 @@ func TestOctetCountingSplitter(t *testing.T) {
t.Parallel()

tests := []struct {
name string
PayLoad string
LogLines []string
isError bool
name string
payLoad string
expectedLogLines []string
isError bool
}{
{
name: "Test 1",
PayLoad: `9 <1>1 - -`,
LogLines: []string{
payLoad: `9 <1>1 - -`,
expectedLogLines: []string{
`<1>1 - -`,
},
},
{
name: "Test 2",
PayLoad: `9 <1>1 - -9 <2>2 - -`,
LogLines: []string{
payLoad: `9 <1>1 - -9 <2>2 - -`,
expectedLogLines: []string{
`<1>1 - -`,
`<2>2 - -`,
},
},
{
name: "Test 3 with newline",
PayLoad: `9 <1>1 - -
payLoad: `9 <1>1 - -
11 <2>2 - - s`,
LogLines: []string{
expectedLogLines: []string{
`<1>1 - -`,
`<2>2 - - s`,
},
},
{
name: "Test 4 with newline and tabs",
PayLoad: `9 <1>1 - -
payLoad: `9 <1>1 - -
9 <2>1 - -
9 <3>1 - -`,
LogLines: []string{
expectedLogLines: []string{
`<1>1 - -`,
`<2>1 - -`,
`<3>1 - -`,
},
},
{
name: "Test 5",
PayLoad: `250 <190>1 2023-10-13T10:48:11.04591+00:00 host app web.1 - 10.1.23.40 - - [13/Oct/2023:10:48:11 +0000] "GET / HTTP/1.1" 200 7450 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"`,
LogLines: []string{
payLoad: `250 <190>1 2023-10-13T10:48:11.04591+00:00 host app web.1 - 10.1.23.40 - - [13/Oct/2023:10:48:11 +0000] "GET / HTTP/1.1" 200 7450 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"`,
expectedLogLines: []string{
`<190>1 2023-10-13T10:48:11.04591+00:00 host app web.1 - 10.1.23.40 - - [13/Oct/2023:10:48:11 +0000] "GET / HTTP/1.1" 200 7450 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"`},
},
{
name: "Test 6 - extra newline",
PayLoad: `9 <1>1 - -
payLoad: `9 <1>1 - -
`,
LogLines: []string{
expectedLogLines: []string{
`<1>1 - -`,
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
res := octetCountingSplitter(tt.PayLoad)
assert.Equal(t, tt.LogLines, res)
res := octetCountingSplitter(tt.payLoad)
assert.Equal(t, tt.expectedLogLines, res)
})
}
}
Expand All @@ -88,18 +88,18 @@ func AddDefaultResources(rl plog.ResourceLogs) {

func TestHerokuParse(t *testing.T) {
t.Parallel()
d := NewHeroku()
d := NewHerokuBodyParser()
tests := []struct {
name string
PayLoad string
Logs func() plog.Logs
count int
isError bool
name string
payLoad string
expectedLogs func() plog.Logs
count int
isError bool
}{
{
name: "Test 1",
PayLoad: `151 <190>1 2023-10-12T07:25:48.393741+00:00 host app otel-collector.1 - 2023-10-12T07:25:48.393Z info service/telemetry.go:104 Setting up own telemetry...`,
Logs: func() plog.Logs {
payLoad: `151 <190>1 2023-10-12T07:25:48.393741+00:00 host app otel-collector.1 - 2023-10-12T07:25:48.393Z info service/telemetry.go:104 Setting up own telemetry...`,
expectedLogs: func() plog.Logs {
ld := plog.NewLogs()
rl := ld.ResourceLogs().AppendEmpty()
AddDefaultResources(rl)
Expand All @@ -113,9 +113,9 @@ func TestHerokuParse(t *testing.T) {
},
{
name: "Test 2 - multiline",
PayLoad: `151 <190>1 2023-10-12T07:25:48.393741+00:00 host app otel-collector.1 - 2023-10-12T07:25:48.393Z info service/telemetry.go:104 Setting up own telemetry...
payLoad: `151 <190>1 2023-10-12T07:25:48.393741+00:00 host app otel-collector.1 - 2023-10-12T07:25:48.393Z info service/telemetry.go:104 Setting up own telemetry...
189 <190>1 2023-10-12T07:25:48.393855+00:00 host app otel-collector.1 - 2023-10-12T07:25:48.393Z info service/telemetry.go:127 Serving Prometheus metrics {"address": ":8888", "level": "Basic"}`,
Logs: func() plog.Logs {
expectedLogs: func() plog.Logs {
ld := plog.NewLogs()
rl := ld.ResourceLogs().AppendEmpty()
AddDefaultResources(rl)
Expand All @@ -133,16 +133,16 @@ func TestHerokuParse(t *testing.T) {
},
{
name: "Test 3 - empty",
PayLoad: ``,
Logs: func() plog.Logs {
payLoad: ``,
expectedLogs: func() plog.Logs {
ld := plog.NewLogs()
return ld
},
},
{
name: "Test 4 - wrong pattern",
PayLoad: `28 Setting up own telemetry...`,
Logs: func() plog.Logs {
payLoad: `28 Setting up own telemetry...`,
expectedLogs: func() plog.Logs {
ld := plog.NewLogs()
rl := ld.ResourceLogs().AppendEmpty()
sl := rl.ScopeLogs().AppendEmpty()
Expand All @@ -155,8 +155,8 @@ func TestHerokuParse(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
res, _ := d.Parse([]byte(tt.PayLoad))
logs := tt.Logs()
res, _, _ := d.Parse([]byte(tt.payLoad))
logs := tt.expectedLogs()
assert.NoError(t, plogtest.CompareLogs(logs, res, plogtest.IgnoreObservedTimestamp()))
})
}
Expand Down
172 changes: 172 additions & 0 deletions receiver/httpreceiver/bodyparser/json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package bodyparser

import (
"encoding/hex"
"encoding/json"
"fmt"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
)

type JSON struct {
}

type JSONLog struct {
Timestamp int64 `json:"timestamp"`
TraceID string `json:"trace_id"`
SpanID string `json:"span_id"`
TraceFlags int `json:"trace_flags"`
SeverityText string `json:"severity_text"`
SeverityNumber int `json:"severity_number"`
Attributes map[string]interface{} `json:"attributes"`
Resources map[string]interface{} `json:"resources"`
Body string `json:"body"`
}

func NewJsonBodyParser() *JSON {
return &JSON{}
}

func (l *JSON) Parse(body []byte) (plog.Logs, int, error) {

data := []map[string]interface{}{}
err := json.Unmarshal(body, &data)
if err != nil {
return plog.NewLogs(), 0, fmt.Errorf("error unmarshalling data:%w", err)
}

jsonLogArray := []JSONLog{}
for _, log := range data {
jsonLog := JSONLog{}
for key, val := range log {
switch key {
case "timestamp":
// nanosecond epoch
data, ok := val.(float64)
if !ok {
return plog.NewLogs(), 0, fmt.Errorf("timestamp must be a uint64 nanoseconds since Unix epoch")
}
jsonLog.Timestamp = int64(data)
case "trace_id":
data, ok := val.(string)
if !ok {
return plog.NewLogs(), 0, fmt.Errorf("trace_id must be a hex string")
}
jsonLog.TraceID = data
case "span_id":
data, ok := val.(string)
if !ok {
return plog.NewLogs(), 0, fmt.Errorf("span_id must be a hex string")
}
jsonLog.SpanID = data
case "trace_flags":
data, ok := val.(float64)
if !ok {
return plog.NewLogs(), 0, fmt.Errorf("trace_flags must be a number")
}
jsonLog.TraceFlags = int(data)
case "severity_text":
data, ok := val.(string)
if !ok {
return plog.NewLogs(), 0, fmt.Errorf("severity_text must be a string")
}
jsonLog.SeverityText = data
case "severity_number":
data, ok := val.(float64)
if !ok {
return plog.NewLogs(), 0, fmt.Errorf("severity_number must be a number")
}
jsonLog.SeverityNumber = int(data)
case "attributes":
data, ok := val.(map[string]interface{})
if !ok {
return plog.NewLogs(), 0, fmt.Errorf("attributes must be a map")
}
jsonLog.Attributes = data
case "resources":
data, ok := val.(map[string]interface{})
if !ok {
return plog.NewLogs(), 0, fmt.Errorf("resources must be a map")
}
jsonLog.Resources = data
case "message", "body":
data, ok := val.(string)
if !ok {
return plog.NewLogs(), 0, fmt.Errorf("%s must be a string", key)
}
jsonLog.Body = data
default:
// if there is any other key present convert it to an attribute
if jsonLog.Attributes == nil {
jsonLog.Attributes = map[string]interface{}{}
}
jsonLog.Attributes[key] = val
}
}
jsonLogArray = append(jsonLogArray, jsonLog)
}

ld := plog.NewLogs()
for _, log := range jsonLogArray {
rl := ld.ResourceLogs().AppendEmpty()
rAttrLen := len(log.Resources)
rl.Resource().Attributes().EnsureCapacity(rAttrLen)
for k, v := range log.Resources {
l.AddAttribute(rl.Resource().Attributes(), k, v)
}
sl := rl.ScopeLogs().AppendEmpty()
rec := sl.LogRecords().AppendEmpty()
attrLen := len(log.Attributes)
rec.Attributes().EnsureCapacity(attrLen)
for k, v := range log.Attributes {
l.AddAttribute(rec.Attributes(), k, v)
}
rec.Body().SetStr(log.Body)
if log.TraceID != "" {
traceIdByte, err := hex.DecodeString(log.TraceID)
if err != nil {
return plog.Logs{}, 0, fmt.Errorf("error decoding trace_id:%w", err)
}
var traceID [16]byte
copy(traceID[:], traceIdByte)
rec.SetTraceID(pcommon.TraceID(pcommon.TraceID(traceID)))
}
if log.SpanID != "" {
spanIdByte, err := hex.DecodeString(log.SpanID)
if err != nil {
return plog.Logs{}, 0, fmt.Errorf("error decoding span_id:%w", err)
}
var spanID [8]byte
copy(spanID[:], spanIdByte)
rec.SetSpanID(pcommon.SpanID(spanID))
}
rec.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(0, log.Timestamp)))
rec.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now().UTC()))
rec.SetSeverityText(log.SeverityText)
rec.SetSeverityNumber(plog.SeverityNumber(log.SeverityNumber))
rec.SetFlags(plog.LogRecordFlags(log.TraceFlags))
}
return ld, len(data), nil
}

func (l *JSON) AddAttribute(attrs pcommon.Map, key string, value interface{}) {
switch value.(type) {
case string:
attrs.PutStr(key, value.(string))
case int, int8, int16, int32, int64:
attrs.PutInt(key, value.(int64))
case uint, uint8, uint16, uint32, uint64:
attrs.PutInt(key, int64(value.(uint64)))
case float32, float64:
attrs.PutDouble(key, value.(float64))
case bool:
attrs.PutBool(key, value.(bool))
default:
// ignoring the error for now
bytes, _ := json.Marshal(value)
attrs.PutStr(key, string(bytes))
}

}
Loading

0 comments on commit 4215d9a

Please sign in to comment.