Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions core/elastic/bulk_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,10 @@ DO:
req.SetURI(clonedURI)
req.SetHost(orignalHost)

if resp == nil {
return false, statsRet, nil, errors.Errorf("received empty response from server, err: %v", err)
}

if err != nil {
if rate.GetRateLimiter(metadata.Config.ID, host+"5xx_on_error", 1, 1, 5*time.Second).Allow() {
log.Error("status:", resp.StatusCode(), ",", host, ",", err, " ", util.SubString(util.UnsafeBytesToString(resp.GetRawBody()), 0, 256))
Expand All @@ -602,13 +606,6 @@ DO:
//
//}

if resp == nil {
if global.Env().IsDebug {
log.Error(err)
}
return false, statsRet, nil, err
}

// Do we need to decompress the response?
var resbody = resp.GetRawBody()

Expand Down
1 change: 1 addition & 0 deletions docs/content.en/docs/release-notes/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Information about release notes of INFINI Framework is provided here.
- chore: update default fuzziness to 3 #215
- chore: avoid using same session name for mulit instances #221
- refactor: refactoring pipeline #222
- chore: enhance bulk indexing - stricter offset validation and improved error handling #224

## 1.2.0 (2025-07-25)
### ❌ Breaking changes
Expand Down
190 changes: 107 additions & 83 deletions plugins/elastic/bulk_indexing/bulk_indexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,42 +637,46 @@ func (processor *BulkIndexingProcessor) NewSlicedBulkWorker(ctx *pipeline.Contex
//cleanup buffer before exit worker
//log.Info("start final submit:",qConfig.ID,",",esClusterID,",msg count:",mainBuf.GetMessageCount(),", ",committedOffset," vs ",offset )
if mainBuf.GetMessageCount() > 0 {
continueNext, err := processor.submitBulkRequest(ctx, qConfig, tag, esClusterID, meta, host, bulkProcessor, mainBuf)

if global.Env().IsDebug {
log.Debugf("slice worker, worker:[%v], [%v][%v][%v][%v] submit request:%v,continue:%v,err:%v", workerID, qConfig.Name, consumerConfig.Group, consumerConfig.Name, sliceID, mainBuf.GetMessageCount(), continueNext, err)
}

mainBuf.ResetData()
if continueNext {
if !offset.Equals(*committedOffset) {
if consumerInstance != nil {
if global.Env().IsDebug {
log.Debugf("queue: %v, consumer: %v, commit offset: %v, init: %v", qConfig.ID, consumerConfig.ID, offset, committedOffset)
}
continueNext := false
if offset != nil && committedOffset != nil && !offset.Equals(*committedOffset) {
continueNext, err = processor.submitBulkRequest(ctx, qConfig, tag, esClusterID, meta, host, bulkProcessor, mainBuf)
if global.Env().IsDebug {
log.Debugf("slice worker, worker:[%v], [%v][%v][%v][%v] submit request:%v,continue:%v,err:%v", workerID, qConfig.Name, consumerConfig.Group, consumerConfig.Name, sliceID, mainBuf.GetMessageCount(), continueNext, err)
}

//log.Info("final commit:",qConfig.ID,",",esClusterID,",msg count:",mainBuf.GetMessageCount(),", ",committedOffset," vs ",offset )
log.Debugf("final commit, queue: %v, consumer: %v, commit offset: %v, init: %v", qConfig.ID, consumerConfig.String(), offset, committedOffset)
err := consumerInstance.CommitOffset(*offset)
if err != nil {
mainBuf.ResetData()
if continueNext {
if !offset.Equals(*committedOffset) {
if consumerInstance != nil {
if global.Env().IsDebug {
panic(err)
log.Debugf("queue: %v, consumer: %v, commit offset: %v, init: %v", qConfig.ID, consumerConfig.ID, offset, committedOffset)
}

//log.Info("final commit:",qConfig.ID,",",esClusterID,",msg count:",mainBuf.GetMessageCount(),", ",committedOffset," vs ",offset )
log.Debugf("final commit, queue: %v, consumer: %v, commit offset: %v, init: %v", qConfig.ID, consumerConfig.String(), offset, committedOffset)
err := consumerInstance.CommitOffset(*offset)
if err != nil {
if global.Env().IsDebug {
panic(err)
}
}
log.Debugf("success commit, queue: %v, consumer: %v,offset to: %v, previous init: %v", qConfig.ID, consumerConfig.String(), *offset, committedOffset)
committedOffset = nil
offset = nil
} else {
panic("invalid consumer instance")
}
log.Debugf("success commit, queue: %v, consumer: %v,offset to: %v, previous init: %v", qConfig.ID, consumerConfig.String(), *offset, committedOffset)
committedOffset = nil
offset = nil
} else {
panic("invalid consumer instance")
log.Debugf("continueNext, queue:[%v], slice_id:%v, offset [%v]-[%v], err:%v", qConfig.ID, sliceID, committedOffset, offset, err)
}
log.Debugf("continueNext, queue:[%v], slice_id:%v, offset [%v]-[%v], err:%v", qConfig.ID, sliceID, committedOffset, offset, err)
log.Debugf("continueNext at same offset, queue:[%v], slice_id:%v, offset [%v]-[%v], err:%v", qConfig.ID, sliceID, committedOffset, offset, err)
} else {
if global.Env().IsDebug {
log.Errorf("queue:[%v], slice_id:%v, offset [%v]-[%v], err:%v", qConfig.ID, sliceID, committedOffset, offset, err)
}
panic(errors.Errorf("queue:[%v], slice_id:%v, offset [%v]-[%v], bulk can't continue (host: %v, err: %v)", qConfig.ID, sliceID, committedOffset, offset, host, err))
}
log.Debugf("continueNext at same offset, queue:[%v], slice_id:%v, offset [%v]-[%v], err:%v", qConfig.ID, sliceID, committedOffset, offset, err)
} else {
if global.Env().IsDebug {
log.Errorf("queue:[%v], slice_id:%v, offset [%v]-[%v], err:%v", qConfig.ID, sliceID, committedOffset, offset, err)
}
panic(errors.Errorf("queue:[%v], slice_id:%v, offset [%v]-[%v], bulk can't continue (host: %v, err: %v)", qConfig.ID, sliceID, committedOffset, offset, host, err))
log.Error("should not submit this bulk request, but it did,", err)
}
}
log.Debugf("exit worker[%v], message count[%d], queue:[%v], slice_id:%v", workerID, mainBuf.GetMessageCount(), qConfig.ID, sliceID)
Expand Down Expand Up @@ -913,36 +917,50 @@ READ_DOCS:
}

//submit request
continueNext, err := processor.submitBulkRequest(ctx, qConfig, tag, esClusterID, meta, host, bulkProcessor, mainBuf)
if global.Env().IsDebug {
log.Tracef("slice worker, worker:[%v], [%v][%v][%v][%v] submit request:%v,continue:%v,err:%v", workerID, qConfig.Name, consumerConfig.Group, consumerConfig.Name, sliceID, mainBuf.GetMessageCount(), continueNext, err)
}
if !continueNext {
//TODO handle 429 gracefully
if !util.ContainStr(err.Error(), "code 429") {
panic(errors.Errorf("queue:[%v], slice_id:%v, offset [%v]-[%v], bulk failed (host:%v, err: %v)", qConfig.ID, sliceID, committedOffset, offset, host, err))
continueNext := false
err = nil
if offset != nil && committedOffset != nil && !offset.Equals(*committedOffset) {
continueNext, err = processor.submitBulkRequest(ctx, qConfig, tag, esClusterID, meta, host, bulkProcessor, mainBuf)
if !continueNext && err == nil {
panic("unknown error, not to continue without error messages")
}
log.Errorf("error on submit bulk_requests, queue:[%v], slice_id:%v, offset [%v]-[%v], bulk failed (host: %v, err: %v)", qConfig.ID, sliceID, committedOffset, offset, host, err)
time.Sleep(time.Duration(processor.config.RetryDelayIntervalInMs) * time.Millisecond)
continue
} else {
//reset buffer
mainBuf.ResetData()
if offset != nil && committedOffset != nil && !pop.NextOffset.Equals(*committedOffset) {
err := consumerInstance.CommitOffset(pop.NextOffset)

if global.Env().IsDebug {
log.Tracef("slice worker, worker:[%v], [%v][%v][%v][%v] submit request:%v,continue:%v,err:%v", workerID, qConfig.Name, consumerConfig.Group, consumerConfig.Name, sliceID, mainBuf.GetMessageCount(), continueNext, err)
}
if !continueNext {
if err != nil {
panic(err)
//TODO handle 429 gracefully
if !util.ContainStr(err.Error(), "code 429") {
panic(errors.Errorf("queue:[%v], slice_id:%v, offset [%v]-[%v], bulk failed (host:%v, err: %v)", qConfig.ID, sliceID, committedOffset, offset, host, err))
}
} else {
log.Error("unknown error, but marked as not to continue")
}

if global.Env().IsDebug {
log.Tracef("slice worker, worker:[%v], [%v][%v][%v][%v] success commit offset:%v,ctx:%v,timeout:%v,err:%v", workerID, qConfig.Name, consumerConfig.Group, consumerConfig.Name, sliceID, *offset, committedOffset, ctx1.String(), timeout, err)
log.Errorf("error on submit bulk_requests, queue:[%v], slice_id:%v, offset [%v]-[%v], bulk failed (host: %v, err: %v)", qConfig.ID, sliceID, committedOffset, offset, host, err)
time.Sleep(time.Duration(processor.config.RetryDelayIntervalInMs) * time.Millisecond)
continue
} else {
//reset buffer
mainBuf.ResetData()
if offset != nil && committedOffset != nil && !pop.NextOffset.Equals(*committedOffset) {
err := consumerInstance.CommitOffset(pop.NextOffset)
if err != nil {
panic(err)
}

if global.Env().IsDebug {
log.Tracef("slice worker, worker:[%v], [%v][%v][%v][%v] success commit offset:%v,ctx:%v,timeout:%v,err:%v", workerID, qConfig.Name, consumerConfig.Group, consumerConfig.Name, sliceID, *offset, committedOffset, ctx1.String(), timeout, err)
}
committedOffset = &pop.NextOffset
}
committedOffset = &pop.NextOffset
offset = &pop.NextOffset
}
offset = &pop.NextOffset
} else {
log.Error("should not submit this bulk request, but it did,", err)
}
}

}

offset = &ctx1.NextOffset
Expand Down Expand Up @@ -974,48 +992,54 @@ CLEAN_BUFFER:
lastCommit = time.Now()
// check bulk result, if ok, then commit offset, or retry non-200 requests, or save failure offset
if mainBuf.GetMessageCount() > 0 {
continueNext, err := processor.submitBulkRequest(ctx, qConfig, tag, esClusterID, meta, host, bulkProcessor, mainBuf)
if global.Env().IsDebug {
log.Tracef("slice worker, worker:[%v], [%v][%v][%v][%v] submit request:%v,continue:%v,err:%v", workerID, qConfig.Name, consumerConfig.Group, consumerConfig.Name, sliceID, mainBuf.GetMessageCount(), continueNext, err)
}
continueNext := false
if offset != nil && committedOffset != nil && !offset.Equals(*committedOffset) {
continueNext, err = processor.submitBulkRequest(ctx, qConfig, tag, esClusterID, meta, host, bulkProcessor, mainBuf)

if !continueNext {
log.Errorf("queue:[%v], slice_id:%v, offset [%v]-[%v], bulk failed (host: %v, err: %v)", qConfig.ID, sliceID, committedOffset, offset, host, err)
}
if global.Env().IsDebug {
log.Tracef("slice worker, worker:[%v], [%v][%v][%v][%v] submit request:%v,continue:%v,err:%v", workerID, qConfig.Name, consumerConfig.Group, consumerConfig.Name, sliceID, mainBuf.GetMessageCount(), continueNext, err)
}

if global.Env().IsDebug {
log.Debug(qConfig.ID, ",", qConfig.Name, ",", offset, ",", committedOffset, ",", continueNext, ",", err)
}
if !continueNext {
log.Errorf("queue:[%v], slice_id:%v, offset [%v]-[%v], bulk failed (host: %v, err: %v)", qConfig.ID, sliceID, committedOffset, offset, host, err)
}

if continueNext {
//reset buffer
mainBuf.ResetData()
if offset != nil && committedOffset != nil && !offset.Equals(*committedOffset) {
err := consumerInstance.CommitOffset(*offset)
if err != nil {
panic(err)
if global.Env().IsDebug {
log.Debug(qConfig.ID, ",", qConfig.Name, ",", offset, ",", committedOffset, ",", continueNext, ",", err)
}

if continueNext {
//reset buffer
mainBuf.ResetData()
if offset != nil && committedOffset != nil && !offset.Equals(*committedOffset) {
err := consumerInstance.CommitOffset(*offset)
if err != nil {
panic(err)
}
committedOffset = offset

if global.Env().IsDebug {
log.Tracef("slice worker, worker:[%v], [%v][%v][%v][%v] commit offset:%v,ctx:%v,err:%v", workerID, qConfig.Name, consumerConfig.Group, consumerConfig.Name, sliceID, offset, ctx1.String(), err)
}

}
} else {
//logging failure offset boundry
//TODO handle 429 gracefully
if !util.ContainStr(err.Error(), "429") {
panic(errors.Errorf("queue:[%v], slice_id:%v, offset [%v]-[%v], bulk failed (host: %v, err: %v)", qConfig.ID, sliceID, committedOffset, offset, host, err))
}
committedOffset = offset

if global.Env().IsDebug {
log.Tracef("slice worker, worker:[%v], [%v][%v][%v][%v] commit offset:%v,ctx:%v,err:%v", workerID, qConfig.Name, consumerConfig.Group, consumerConfig.Name, sliceID, offset, ctx1.String(), err)
log.Tracef("slice worker, worker:[%v], [%v][%v][%v][%v] skip continue:%v,ctx:%v,err:%v", workerID, qConfig.Name, consumerConfig.Group, consumerConfig.Name, sliceID, offset, ctx1.String(), err)
}

log.Errorf("queue:[%v], slice_id:%v, offset [%v]-[%v], bulk failed (host: %v, err: %v)", qConfig.ID, sliceID, committedOffset, offset, host, err)
time.Sleep(time.Duration(processor.config.RetryDelayIntervalInMs) * time.Millisecond)
goto CLEAN_BUFFER
}
} else {
//logging failure offset boundry
//TODO handle 429 gracefully
if !util.ContainStr(err.Error(), "429") {
panic(errors.Errorf("queue:[%v], slice_id:%v, offset [%v]-[%v], bulk failed (host: %v, err: %v)", qConfig.ID, sliceID, committedOffset, offset, host, err))
}

if global.Env().IsDebug {
log.Tracef("slice worker, worker:[%v], [%v][%v][%v][%v] skip continue:%v,ctx:%v,err:%v", workerID, qConfig.Name, consumerConfig.Group, consumerConfig.Name, sliceID, offset, ctx1.String(), err)
}

log.Errorf("queue:[%v], slice_id:%v, offset [%v]-[%v], bulk failed (host: %v, err: %v)", qConfig.ID, sliceID, committedOffset, offset, host, err)
time.Sleep(time.Duration(processor.config.RetryDelayIntervalInMs) * time.Millisecond)
goto CLEAN_BUFFER
log.Error("should not submit this bulk request, but it did,", err)
}
}

Expand Down
Loading