Skip to content

Commit 2cf2d88

Browse files
authored
chore: panic first when bulk_indexing on error (#230)
1 parent c848085 commit 2cf2d88

File tree

1 file changed

+16
-15
lines changed

1 file changed

+16
-15
lines changed

plugins/elastic/bulk_indexing/bulk_indexing.go

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -676,7 +676,7 @@ func (processor *BulkIndexingProcessor) NewSlicedBulkWorker(ctx *pipeline.Contex
676676
panic(errors.Errorf("queue:[%v], slice_id:%v, offset [%v]-[%v], bulk can't continue (host: %v, err: %v)", qConfig.ID, sliceID, committedOffset, offset, host, err))
677677
}
678678
} else {
679-
log.Error("should not submit this bulk request, but it did,", err)
679+
log.Errorf("should not submit this bulk request, worker[%v], queue:[%v], slice:[%v], offset:[%v]->[%v],%v, msg:%v", workerID, qConfig.ID, sliceID, committedOffset, offset, err, mainBuf.GetMessageCount())
680680
}
681681
}
682682
log.Debugf("exit worker[%v], message count[%d], queue:[%v], slice_id:%v", workerID, mainBuf.GetMessageCount(), qConfig.ID, sliceID)
@@ -911,7 +911,7 @@ READ_DOCS:
911911
msgSize := mainBuf.GetMessageSize()
912912
msgCount := mainBuf.GetMessageCount()
913913

914-
if (bulkSizeInByte > 0 && msgSize > (bulkSizeInByte)) || (processor.config.BulkConfig.BulkMaxDocsCount > 0 && msgCount > processor.config.BulkConfig.BulkMaxDocsCount) {
914+
if (bulkSizeInByte > 0 && msgSize > (bulkSizeInByte)) || (msgCount > 0 && processor.config.BulkConfig.BulkMaxDocsCount > 0 && msgCount > processor.config.BulkConfig.BulkMaxDocsCount) {
915915
if global.Env().IsDebug {
916916
log.Debugf("slice worker, worker:[%v], consuming [%v], slice_id:%v, hit buffer limit, size:%v, count:%v, submit now", workerID, qConfig.Name, sliceID, msgSize, msgCount)
917917
}
@@ -921,6 +921,8 @@ READ_DOCS:
921921
err = nil
922922
if offset != nil && committedOffset != nil && !offset.Equals(*committedOffset) {
923923
continueNext, err = processor.submitBulkRequest(ctx, qConfig, tag, esClusterID, meta, host, bulkProcessor, mainBuf)
924+
mainBuf.ResetData()
925+
924926
if !continueNext && err == nil {
925927
panic("unknown error, not to continue without error messages")
926928
}
@@ -940,25 +942,26 @@ READ_DOCS:
940942

941943
log.Errorf("error on submit bulk_requests, queue:[%v], slice_id:%v, offset [%v]-[%v], bulk failed (host: %v, err: %v)", qConfig.ID, sliceID, committedOffset, offset, host, err)
942944
time.Sleep(time.Duration(processor.config.RetryDelayIntervalInMs) * time.Millisecond)
943-
continue
945+
//continue //TODO, should panic? clear mainbuffer
946+
panic(errors.Errorf("error on submit bulk_requests, queue:[%v], slice_id:%v, offset [%v]-[%v], bulk failed (host: %v, err: %v)", qConfig.ID, sliceID, committedOffset, offset, host, err))
944947
} else {
945-
//reset buffer
946-
mainBuf.ResetData()
947-
if offset != nil && committedOffset != nil && !pop.NextOffset.Equals(*committedOffset) {
948-
err := consumerInstance.CommitOffset(pop.NextOffset)
948+
if offset != nil && committedOffset != nil && !offset.Equals(*committedOffset) {
949+
err := consumerInstance.CommitOffset(*offset)
949950
if err != nil {
950951
panic(err)
951952
}
952953

953954
if global.Env().IsDebug {
954955
log.Tracef("slice worker, worker:[%v], [%v][%v][%v][%v] success commit offset:%v,ctx:%v,timeout:%v,err:%v", workerID, qConfig.Name, consumerConfig.Group, consumerConfig.Name, sliceID, *offset, committedOffset, ctx1.String(), timeout, err)
955956
}
956-
committedOffset = &pop.NextOffset
957+
committedOffset = offset
958+
} else {
959+
log.Error("offset not committed:", offset, ",moved to:", &pop.NextOffset)
957960
}
958961
offset = &pop.NextOffset
959962
}
960963
} else {
961-
log.Error("should not submit this bulk request, but it did,", err)
964+
log.Errorf("should not submit this bulk request, worker[%v], queue:[%v], slice:[%v], offset:[%v]->[%v],%v, msg:%v", workerID, qConfig.ID, sliceID, committedOffset, offset, err, msgCount)
962965
}
963966
}
964967
}
@@ -995,6 +998,8 @@ CLEAN_BUFFER:
995998
continueNext := false
996999
if offset != nil && committedOffset != nil && !offset.Equals(*committedOffset) {
9971000
continueNext, err = processor.submitBulkRequest(ctx, qConfig, tag, esClusterID, meta, host, bulkProcessor, mainBuf)
1001+
//reset buffer
1002+
mainBuf.ResetData()
9981003

9991004
if global.Env().IsDebug {
10001005
log.Tracef("slice worker, worker:[%v], [%v][%v][%v][%v] submit request:%v,continue:%v,err:%v", workerID, qConfig.Name, consumerConfig.Group, consumerConfig.Name, sliceID, mainBuf.GetMessageCount(), continueNext, err)
@@ -1009,19 +1014,15 @@ CLEAN_BUFFER:
10091014
}
10101015

10111016
if continueNext {
1012-
//reset buffer
1013-
mainBuf.ResetData()
10141017
if offset != nil && committedOffset != nil && !offset.Equals(*committedOffset) {
10151018
err := consumerInstance.CommitOffset(*offset)
10161019
if err != nil {
10171020
panic(err)
10181021
}
10191022
committedOffset = offset
1020-
10211023
if global.Env().IsDebug {
10221024
log.Tracef("slice worker, worker:[%v], [%v][%v][%v][%v] commit offset:%v,ctx:%v,err:%v", workerID, qConfig.Name, consumerConfig.Group, consumerConfig.Name, sliceID, offset, ctx1.String(), err)
10231025
}
1024-
10251026
}
10261027
} else {
10271028
//logging failure offset boundry
@@ -1036,10 +1037,10 @@ CLEAN_BUFFER:
10361037

10371038
log.Errorf("queue:[%v], slice_id:%v, offset [%v]-[%v], bulk failed (host: %v, err: %v)", qConfig.ID, sliceID, committedOffset, offset, host, err)
10381039
time.Sleep(time.Duration(processor.config.RetryDelayIntervalInMs) * time.Millisecond)
1039-
goto CLEAN_BUFFER
1040+
panic(errors.Errorf("queue:[%v], slice_id:%v, offset [%v]-[%v], bulk can't continue (host: %v, err: %v)", qConfig.ID, sliceID, committedOffset, offset, host, err))
10401041
}
10411042
} else {
1042-
log.Error("should not submit this bulk request, but it did,", err)
1043+
log.Errorf("should not submit this bulk request, worker[%v], queue:[%v], slice:[%v], offset:[%v]->[%v],%v, msg:%v", workerID, qConfig.ID, sliceID, committedOffset, offset, err, mainBuf.GetMessageCount())
10431044
}
10441045
}
10451046

0 commit comments

Comments
 (0)