Skip to content

Commit

Permalink
Byte size based batching
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Dec 19, 2024
1 parent 50104db commit b89ad4e
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 16 deletions.
15 changes: 15 additions & 0 deletions exporter/exporterbatcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type MinSizeConfig struct {
// sent regardless of the timeout. There is no guarantee that the batch size always greater than this value.
// This option requires the Request to implement RequestItemsCounter interface. Otherwise, it will be ignored.
MinSizeItems int `mapstructure:"min_size_items"`
MinSizeBytes int `mapstructure:"min_size_bytes"`
}

// MaxSizeConfig defines the configuration for the maximum number of items in a batch.
Expand All @@ -41,18 +42,32 @@ type MaxSizeConfig struct {
// If the batch size exceeds this value, it will be broken up into smaller batches if possible.
// Setting this value to zero disables the maximum size limit.
MaxSizeItems int `mapstructure:"max_size_items"`
MaxSizeBytes int `mapstructure:"max_size_bytes"`
}

func (c Config) Validate() error {
if c.MinSizeBytes != 0 && c.MinSizeItems != 0 || c.MinSizeBytes != 0 && c.MaxSizeItems != 0 || c.MinSizeItems != 0 && c.MaxSizeBytes != 0 {
return errors.New("size limit and bytes limit cannot be specified at the same time")
}

if c.MinSizeItems < 0 {
return errors.New("min_size_items must be greater than or equal to zero")
}
if c.MinSizeBytes < 0 {
return errors.New("min_size_bytes must be greater than or equal to zero")
}
if c.MaxSizeItems < 0 {
return errors.New("max_size_items must be greater than or equal to zero")
}
if c.MaxSizeBytes < 0 {
return errors.New("max_size_bytes must be greater than or equal to zero")
}
if c.MaxSizeItems != 0 && c.MaxSizeItems < c.MinSizeItems {
return errors.New("max_size_items must be greater than or equal to min_size_items")
}
if c.MaxSizeBytes != 0 && c.MaxSizeBytes < c.MinSizeBytes {
return errors.New("max_size_bytes must be greater than or equal to min_size_bytes")
}
if c.FlushTimeout <= 0 {
return errors.New("timeout must be greater than zero")
}
Expand Down
4 changes: 4 additions & 0 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func (req *logsRequest) Export(ctx context.Context) error {
return req.pusher(ctx, req.ld)
}

func (req *logsRequest) BytesSize() int {
return req.ld.GetOrig().Size()
}

func (req *logsRequest) ItemsCount() int {
return req.ld.LogRecordCount()
}
Expand Down
15 changes: 8 additions & 7 deletions exporter/exporterhelper/logs_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ func (req *logsRequest) Merge(_ context.Context, r2 Request) (Request, error) {
// conforming with the MaxSizeConfig.
func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 Request) ([]Request, error) {
var (
res []Request
destReq *logsRequest
capacityLeft = cfg.MaxSizeItems
res []Request
destReq *logsRequest
// capacityLeft = cfg.MaxSizeItems
capacityLeft = cfg.MaxSizeBytes
)
for _, req := range []Request{req, r2} {
if req == nil {
Expand All @@ -37,22 +38,22 @@ func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSiz
if !ok {
return nil, errors.New("invalid input type")
}
if srcReq.ld.LogRecordCount() <= capacityLeft {
if srcReq.ld.GetOrig().Size() <= capacityLeft {
if destReq == nil {
destReq = srcReq
} else {
srcReq.ld.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs())
}
capacityLeft -= destReq.ld.LogRecordCount()
capacityLeft -= destReq.ld.GetOrig().Size()
continue
}

for {
extractedLogs := extractLogs(srcReq.ld, capacityLeft)
if extractedLogs.LogRecordCount() == 0 {
if extractedLogs.GetOrig().Size() == 0 {
break
}
capacityLeft -= extractedLogs.LogRecordCount()
capacityLeft -= extractedLogs.GetOrig().Size()
if destReq == nil {
destReq = &logsRequest{ld: extractedLogs, pusher: srcReq.pusher}
} else {
Expand Down
20 changes: 17 additions & 3 deletions exporter/internal/queue/default_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ func (qb *DefaultBatcher) resetTimer() {
}
}

func (qb *DefaultBatcher) maxSizeLimitExists() bool {
return qb.batchCfg.MaxSizeItems > 0 || qb.batchCfg.MaxSizeBytes > 0
}

func (qb *DefaultBatcher) reachedMinSizeThreadhold(req internal.Request) bool {
if qb.batchCfg.MinSizeItems > 0 {
return req.ItemsCount() >= qb.batchCfg.MinSizeItems
} else if qb.batchCfg.MinSizeBytes > 0 {
return req.BytesSize() >= qb.batchCfg.MinSizeBytes
} else {
return true
}
}

// startReadingFlushingGoroutine starts a goroutine that reads and then flushes.
func (qb *DefaultBatcher) startReadingFlushingGoroutine() {
qb.stopWG.Add(1)
Expand All @@ -43,7 +57,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() {

qb.currentBatchMu.Lock()

if qb.batchCfg.MaxSizeItems > 0 {
if qb.maxSizeLimitExists() {
var reqList []internal.Request
var mergeSplitErr error
if qb.currentBatch == nil || qb.currentBatch.req == nil {
Expand All @@ -60,7 +74,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() {
}

// If there was a split, we flush everything immediately.
if reqList[0].ItemsCount() >= qb.batchCfg.MinSizeItems || len(reqList) > 1 {
if qb.reachedMinSizeThreadhold(reqList[0]) || len(reqList) > 1 {
qb.currentBatch = nil
qb.currentBatchMu.Unlock()
for i := 0; i < len(reqList); i++ {
Expand Down Expand Up @@ -102,7 +116,7 @@ func (qb *DefaultBatcher) startReadingFlushingGoroutine() {
}
}

if qb.currentBatch.req.ItemsCount() >= qb.batchCfg.MinSizeItems {
if qb.reachedMinSizeThreadhold(qb.currentBatch.req) {
batchToFlush := *qb.currentBatch
qb.currentBatch = nil
qb.currentBatchMu.Unlock()
Expand Down
2 changes: 2 additions & 0 deletions exporter/internal/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type Request interface {
// sent. For example, for OTLP exporter, this value represents the number of spans,
// metric data points or log records.
ItemsCount() int
// BytesSize returns the size of serialized request.
BytesSize() int
// Merge is a function that merges this request with another one into a single request.
// Do not mutate the requests passed to the function if error can be returned after mutation or if the exporter is
// marked as not mutable.
Expand Down
2 changes: 1 addition & 1 deletion pdata/plog/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (*JSONUnmarshaler) UnmarshalLogs(buf []byte) (Logs, error) {
if iter.Error != nil {
return Logs{}, iter.Error
}
otlp.MigrateLogs(ld.getOrig().ResourceLogs)
otlp.MigrateLogs(ld.GetOrig().ResourceLogs)
return ld, nil
}

Expand Down
4 changes: 2 additions & 2 deletions pdata/plog/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func newLogs(orig *otlpcollectorlog.ExportLogsServiceRequest) Logs {
return Logs(internal.NewLogs(orig, &state))
}

func (ms Logs) getOrig() *otlpcollectorlog.ExportLogsServiceRequest {
func (ms Logs) GetOrig() *otlpcollectorlog.ExportLogsServiceRequest {
return internal.GetOrigLogs(internal.Logs(ms))
}

Expand Down Expand Up @@ -57,7 +57,7 @@ func (ms Logs) LogRecordCount() int {

// ResourceLogs returns the ResourceLogsSlice associated with this Logs.
func (ms Logs) ResourceLogs() ResourceLogsSlice {
return newResourceLogsSlice(&ms.getOrig().ResourceLogs, internal.GetLogsState(internal.Logs(ms)))
return newResourceLogsSlice(&ms.GetOrig().ResourceLogs, internal.GetLogsState(internal.Logs(ms)))
}

// MarkReadOnly marks the Logs as shared so that no further modifications can be done on it.
Expand Down
6 changes: 3 additions & 3 deletions pdata/plog/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestToFromLogOtlp(t *testing.T) {
otlp := &otlpcollectorlog.ExportLogsServiceRequest{}
logs := newLogs(otlp)
assert.EqualValues(t, NewLogs(), logs)
assert.EqualValues(t, otlp, logs.getOrig())
assert.EqualValues(t, otlp, logs.GetOrig())
}

func TestResourceLogsWireCompatibility(t *testing.T) {
Expand All @@ -84,7 +84,7 @@ func TestResourceLogsWireCompatibility(t *testing.T) {
fillTestResourceLogsSlice(logs.ResourceLogs())

// Marshal its underlying ProtoBuf to wire.
wire1, err := gogoproto.Marshal(logs.getOrig())
wire1, err := gogoproto.Marshal(logs.GetOrig())
require.NoError(t, err)
assert.NotNil(t, wire1)

Expand All @@ -105,7 +105,7 @@ func TestResourceLogsWireCompatibility(t *testing.T) {

// Now compare that the original and final ProtoBuf messages are the same.
// This proves that goproto and gogoproto marshaling/unmarshaling are wire compatible.
assert.EqualValues(t, logs.getOrig(), &gogoprotoRS2)
assert.EqualValues(t, logs.GetOrig(), &gogoprotoRS2)
}

func TestLogsCopyTo(t *testing.T) {
Expand Down

0 comments on commit b89ad4e

Please sign in to comment.