Skip to content

Commit b5cc10c

Browse files
author
ffffwh
committed
add metrics
- send_by_timeout/size_full - dest_queue2_size
1 parent fc23fe3 commit b5cc10c

File tree

4 files changed

+18
-7
lines changed

4 files changed

+18
-7
lines changed

drivers/mysql/common/models.go

+1
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type ThroughputStat struct {
4646
type BufferStat struct {
4747
BinlogEventQueueSize int
4848
ExtractorTxQueueSize int
49+
ApplierMsgQueueSize int
4950
ApplierTxQueueSize int
5051
SendByTimeout int
5152
SendBySizeFull int

drivers/mysql/handle.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,8 @@ func (h *taskHandle) emitStats(ru *common.TaskStatistics) {
134134
metrics.SetGaugeWithLabels([]string{"memory.full_kb_est"}, float32(ru.MemoryStat.Full) * srcFullFactor / 1024, labels)
135135
metrics.SetGaugeWithLabels([]string{"memory.incr_kb_est"}, float32(ru.MemoryStat.Incr) * srcIncrFactor / 1024, labels)
136136
case common.TaskTypeDest:
137-
metrics.SetGaugeWithLabels([]string{"buffer", "dest_queue_size"}, float32(ru.BufferStat.ApplierTxQueueSize), labels)
137+
metrics.SetGaugeWithLabels([]string{"buffer", "dest_queue_size"}, float32(ru.BufferStat.ApplierMsgQueueSize), labels)
138+
metrics.SetGaugeWithLabels([]string{"buffer", "dest_queue2_size"}, float32(ru.BufferStat.ApplierTxQueueSize), labels)
138139

139140
metrics.SetGaugeWithLabels([]string{"memory.full_kb_est"}, float32(ru.MemoryStat.Full) * dstFullFactor / 1024, labels)
140141
metrics.SetGaugeWithLabels([]string{"memory.incr_kb_est"}, float32(ru.MemoryStat.Incr) * dstIncrFactor / 1024, labels)

drivers/mysql/mysql/applier.go

+12-6
Original file line numberDiff line numberDiff line change
@@ -791,13 +791,17 @@ func (a *Applier) ApplyEventQueries(db *gosql.DB, entry *common.DumpEntry) error
791791
func (a *Applier) Stats() (*common.TaskStatistics, error) {
792792
a.logger.Debug("Stats")
793793
var totalDeltaCopied int64
794-
var lenApplyDataEntryQueue int
795-
var capApplyDataEntryQueue int
794+
var lenApplierMsgQueue int
795+
var capApplierMsgQueue int
796+
var lenApplierTxQueue int
797+
var capApplierTxQueue int
796798
var delay int64
797799
if a.ai != nil {
798800
totalDeltaCopied = a.ai.TotalDeltaCopied
799-
lenApplyDataEntryQueue = len(a.ai.incrBytesQueue)
800-
capApplyDataEntryQueue = cap(a.ai.incrBytesQueue)
801+
lenApplierMsgQueue = len(a.ai.incrBytesQueue)
802+
capApplierMsgQueue = cap(a.ai.incrBytesQueue)
803+
lenApplierTxQueue = len(a.ai.binlogEntryQueue)
804+
capApplierTxQueue = cap(a.ai.binlogEntryQueue)
801805
delay = a.ai.timestampCtx.GetDelay()
802806
}
803807
totalRowsReplay := a.TotalRowsReplayed
@@ -813,7 +817,8 @@ func (a *Applier) Stats() (*common.TaskStatistics, error) {
813817
if a.mysqlContext.Gtid != "" {
814818
// Done copying rows. The totalRowsCopied value is the de-facto number of rows,
815819
// and there is no further need to keep updating the value.
816-
backlog = fmt.Sprintf("%d/%d", lenApplyDataEntryQueue, capApplyDataEntryQueue)
820+
backlog = fmt.Sprintf("%d/%d", lenApplierMsgQueue + lenApplierTxQueue,
821+
capApplierMsgQueue + capApplierTxQueue)
817822
} else {
818823
backlog = fmt.Sprintf("%d/%d", len(a.fullBytesQueue), cap(a.fullBytesQueue))
819824
}
@@ -861,7 +866,8 @@ func (a *Applier) Stats() (*common.TaskStatistics, error) {
861866
RetrievedGtidSet: "",
862867
},
863868
BufferStat: common.BufferStat{
864-
ApplierTxQueueSize: lenApplyDataEntryQueue,
869+
ApplierMsgQueueSize: lenApplierMsgQueue,
870+
ApplierTxQueueSize: lenApplierTxQueue,
865871
},
866872
Timestamp: time.Now().UTC().UnixNano(),
867873
DelayCount: &common.DelayCount{

drivers/mysql/mysql/extractor.go

+3
Original file line numberDiff line numberDiff line change
@@ -923,6 +923,8 @@ func (e *Extractor) StreamEvents() error {
923923
"groupMaxSize", e.mysqlContext.GroupMaxSize,
924924
"Entries.len", len(entries.Entries))
925925

926+
e.sendBySizeFullCounter += 1
927+
926928
err := sendEntriesAndClear()
927929
if err != nil {
928930
e.onError(TaskStateDead, err)
@@ -939,6 +941,7 @@ func (e *Extractor) StreamEvents() error {
939941
if nEntries > 0 {
940942
e.logger.Debug("incr. send by timeout.", "entriesSize", entriesSize,
941943
"timeout", e.mysqlContext.GroupTimeout)
944+
e.sendByTimeoutCounter += 1
942945
err := sendEntriesAndClear()
943946
if err != nil {
944947
e.onError(TaskStateDead, err)

0 commit comments

Comments
 (0)