Skip to content

Partial log aggregation #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"strconv"
"math"
"strings"
)

// An mapped version of logger.Message where Line is a String, not a byte array
Expand Down Expand Up @@ -161,12 +162,14 @@ func writeLogsToKafka(lf *logPair, topic string, keyStrategy KeyStrategy, tag st
dec := protoio.NewUint32DelimitedReader(lf.stream, binary.BigEndian, 1e6)
defer dec.Close()
var buf logdriver.LogEntry

var partialBuffer string
for {
// Check if there are any Kafka errors thus far
select {
case kafkaErr := <- lf.producer.Errors():
// In the event of an error, continue to attempt to write messages
logrus.Error("error recieved from Kafka", kafkaErr)
logrus.Error("error received from Kafka", kafkaErr)
default:
//No errors, continue
}
Expand All @@ -177,11 +180,31 @@ func writeLogsToKafka(lf *logPair, topic string, keyStrategy KeyStrategy, tag st
lf.stream.Close()
return
}
logrus.WithField("id", lf.info.ContainerID).WithError(err).Error("error whilst reading from log")
dec = protoio.NewUint32DelimitedReader(lf.stream, binary.BigEndian, 1e6)
}

lineAsString := string(buf.Line)

// In the event that the message is partial, we attempt to aggregate if possible
switch {
case buf.Partial:
logrus.WithField("line", lineAsString).WithField("buffer", partialBuffer).Debug("Received partial message, start reconstruction")
fallthrough
case len(partialBuffer) > 0 && !strings.HasSuffix(lineAsString, "\n"):
logrus.WithField("line", lineAsString).WithField("buffer", partialBuffer).Debug("Received partial message, continue reconstruction")
partialBuffer += lineAsString
continue
case len(partialBuffer) > 0 && strings.HasSuffix(lineAsString, "\n"):
logrus.WithField("line", lineAsString).WithField("buffer", partialBuffer).Debug("Received partial message, finish reconstruction")
// Add the remaining line to the buffer and reset the line to be the total buffer size
partialBuffer += lineAsString
lineAsString = partialBuffer
partialBuffer = ""
}

var msg LogMessage
msg.Line = string(buf.Line)
msg.Line = lineAsString
msg.Source = buf.Source
msg.Partial = buf.Partial
msg.Timestamp = time.Unix(0, buf.TimeNano)
Expand Down
25 changes: 25 additions & 0 deletions driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,24 @@ func TestConsumesMultipleLogMessagesFromDocker(t *testing.T) {
assertLineMatch(t, "delta", <-producer.Successes())
}

func TestAggregatesPartialLogMessagesFromDocker(t *testing.T) {
producer := NewProducer(t)
defer producer.Close()

stream := createBufferForLogMessages([]logdriver.LogEntry{
newPartialLogEntry("alpha"),
newLogEntry("beta"),
newLogEntry("charlie\n"),
})

lf := createLogPair(producer, stream)

producer.ExpectInputAndSucceed()
writeLogsToKafka(&lf, "topic", KEY_BY_TIMESTAMP, TAG)

assertLineMatch(t, "alphabetacharlie\n", <-producer.Successes())
}

func TestJsonIncludesContainerInformation(t *testing.T) {
expectedContainerId := "containerid1"
expectedContainerName := "containername1"
Expand Down Expand Up @@ -440,3 +458,10 @@ func newLogEntry(line string) logdriver.LogEntry {
le.TimeNano = time.Now().UnixNano()
return le
}

func newPartialLogEntry(line string) logdriver.LogEntry {
le := newLogEntry(line)
le.Partial = true
return le
}