@@ -100,12 +100,13 @@ type EventDispatcherManager struct {
100
100
cancel context.CancelFunc
101
101
wg sync.WaitGroup
102
102
103
- tableEventDispatcherCount prometheus.Gauge
104
- metricCreateDispatcherDuration prometheus.Observer
105
- metricCheckpointTs prometheus.Gauge
106
- metricCheckpointTsLag prometheus.Gauge
107
- metricResolvedTs prometheus.Gauge
108
- metricResolvedTsLag prometheus.Gauge
103
+ metricTableTriggerEventDispatcherCount prometheus.Gauge
104
+ metricEventDispatcherCount prometheus.Gauge
105
+ metricCreateDispatcherDuration prometheus.Observer
106
+ metricCheckpointTs prometheus.Gauge
107
+ metricCheckpointTsLag prometheus.Gauge
108
+ metricResolvedTs prometheus.Gauge
109
+ metricResolvedTsLag prometheus.Gauge
109
110
}
110
111
111
112
// return actual startTs of the table trigger event dispatcher
@@ -118,23 +119,24 @@ func NewEventDispatcherManager(
118
119
maintainerID node.ID ) (* EventDispatcherManager , uint64 , error ) {
119
120
ctx , cancel := context .WithCancel (context .Background ())
120
121
manager := & EventDispatcherManager {
121
- dispatcherMap : newDispatcherMap (),
122
- changefeedID : changefeedID ,
123
- maintainerID : maintainerID ,
124
- statusesChan : make (chan TableSpanStatusWithSeq , 8192 ),
125
- blockStatusesChan : make (chan * heartbeatpb.TableSpanBlockStatus , 1024 * 1024 ),
126
- errCh : make (chan error , 1 ),
127
- cancel : cancel ,
128
- config : cfConfig ,
129
- filterConfig : toFilterConfigPB (cfConfig .Filter ),
130
- schemaIDToDispatchers : dispatcher .NewSchemaIDToDispatchers (),
131
- latestWatermark : NewWatermark (startTs ),
132
- tableEventDispatcherCount : metrics .TableEventDispatcherGauge .WithLabelValues (changefeedID .Namespace (), changefeedID .Name ()),
133
- metricCreateDispatcherDuration : metrics .CreateDispatcherDuration .WithLabelValues (changefeedID .Namespace (), changefeedID .Name ()),
134
- metricCheckpointTs : metrics .EventDispatcherManagerCheckpointTsGauge .WithLabelValues (changefeedID .Namespace (), changefeedID .Name ()),
135
- metricCheckpointTsLag : metrics .EventDispatcherManagerCheckpointTsLagGauge .WithLabelValues (changefeedID .Namespace (), changefeedID .Name ()),
136
- metricResolvedTs : metrics .EventDispatcherManagerResolvedTsGauge .WithLabelValues (changefeedID .Namespace (), changefeedID .Name ()),
137
- metricResolvedTsLag : metrics .EventDispatcherManagerResolvedTsLagGauge .WithLabelValues (changefeedID .Namespace (), changefeedID .Name ()),
122
+ dispatcherMap : newDispatcherMap (),
123
+ changefeedID : changefeedID ,
124
+ maintainerID : maintainerID ,
125
+ statusesChan : make (chan TableSpanStatusWithSeq , 8192 ),
126
+ blockStatusesChan : make (chan * heartbeatpb.TableSpanBlockStatus , 1024 * 1024 ),
127
+ errCh : make (chan error , 1 ),
128
+ cancel : cancel ,
129
+ config : cfConfig ,
130
+ filterConfig : toFilterConfigPB (cfConfig .Filter ),
131
+ schemaIDToDispatchers : dispatcher .NewSchemaIDToDispatchers (),
132
+ latestWatermark : NewWatermark (startTs ),
133
+ metricTableTriggerEventDispatcherCount : metrics .TableTriggerEventDispatcherGauge .WithLabelValues (changefeedID .Namespace (), changefeedID .Name ()),
134
+ metricEventDispatcherCount : metrics .EventDispatcherGauge .WithLabelValues (changefeedID .Namespace (), changefeedID .Name ()),
135
+ metricCreateDispatcherDuration : metrics .CreateDispatcherDuration .WithLabelValues (changefeedID .Namespace (), changefeedID .Name ()),
136
+ metricCheckpointTs : metrics .EventDispatcherManagerCheckpointTsGauge .WithLabelValues (changefeedID .Namespace (), changefeedID .Name ()),
137
+ metricCheckpointTsLag : metrics .EventDispatcherManagerCheckpointTsLagGauge .WithLabelValues (changefeedID .Namespace (), changefeedID .Name ()),
138
+ metricResolvedTs : metrics .EventDispatcherManagerResolvedTsGauge .WithLabelValues (changefeedID .Namespace (), changefeedID .Name ()),
139
+ metricResolvedTsLag : metrics .EventDispatcherManagerResolvedTsLagGauge .WithLabelValues (changefeedID .Namespace (), changefeedID .Name ()),
138
140
}
139
141
140
142
// Set Sync Point Config
@@ -256,7 +258,8 @@ func (e *EventDispatcherManager) close(remove bool) {
256
258
e .cancel ()
257
259
e .wg .Wait ()
258
260
259
- metrics .TableEventDispatcherGauge .DeleteLabelValues (e .changefeedID .Namespace (), e .changefeedID .Name ())
261
+ metrics .TableTriggerEventDispatcherGauge .DeleteLabelValues (e .changefeedID .Namespace (), e .changefeedID .Name ())
262
+ metrics .EventDispatcherGauge .DeleteLabelValues (e .changefeedID .Namespace (), e .changefeedID .Name ())
260
263
metrics .CreateDispatcherDuration .DeleteLabelValues (e .changefeedID .Namespace (), e .changefeedID .Name ())
261
264
metrics .EventDispatcherManagerCheckpointTsGauge .DeleteLabelValues (e .changefeedID .Namespace (), e .changefeedID .Name ())
262
265
metrics .EventDispatcherManagerResolvedTsGauge .DeleteLabelValues (e .changefeedID .Namespace (), e .changefeedID .Name ())
@@ -385,7 +388,11 @@ func (e *EventDispatcherManager) newDispatchers(infos []dispatcherCreateInfo) er
385
388
Seq : seq ,
386
389
}
387
390
388
- e .tableEventDispatcherCount .Inc ()
391
+ if d .IsTableTriggerEventDispatcher () {
392
+ e .metricTableTriggerEventDispatcherCount .Inc ()
393
+ } else {
394
+ e .metricEventDispatcherCount .Inc ()
395
+ }
389
396
390
397
log .Info ("new dispatcher created" ,
391
398
zap .String ("ID" , id .String ()),
@@ -637,7 +644,11 @@ func (e *EventDispatcherManager) cleanDispatcher(id common.DispatcherID, schemaI
637
644
if e .tableTriggerEventDispatcher != nil && e .tableTriggerEventDispatcher .GetId () == id {
638
645
e .tableTriggerEventDispatcher = nil
639
646
}
640
- e .tableEventDispatcherCount .Dec ()
647
+ if id == e .tableTriggerEventDispatcher .GetId () {
648
+ e .metricTableTriggerEventDispatcherCount .Dec ()
649
+ } else {
650
+ e .metricEventDispatcherCount .Dec ()
651
+ }
641
652
log .Info ("table event dispatcher completely stopped, and delete it from event dispatcher manager" , zap .Any ("dispatcher id" , id ))
642
653
}
643
654
0 commit comments