Skip to content

Commit 82a8982

Browse files
garyrussellartembilan
authored andcommitted
INT-4418: Micrometer docs and polishing
JIRA: https://jira.spring.io/browse/INT-4418 Add docs for the rework and optimize `Meter` creation. Polishing * Polishing according PR comments * Fix typo in `dsl.adoc`
1 parent cb0d43d commit 82a8982

File tree

11 files changed

+186
-98
lines changed

11 files changed

+186
-98
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractMessageChannel.java

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ public abstract class AbstractMessageChannel extends IntegrationObjectSupport
9292

9393
private MeterRegistry meterRegistry;
9494

95+
private Timer successTimer;
96+
97+
private Timer failureTimer;
98+
9599
public AbstractMessageChannel() {
96100
this.interceptors = new ChannelInterceptorList(logger);
97101
}
@@ -445,7 +449,7 @@ public boolean send(Message<?> message, long timeout) {
445449
logger.debug("preSend on channel '" + this + "', message: " + message);
446450
}
447451
if (interceptors.getSize() > 0) {
448-
interceptorStack = new ArrayDeque<ChannelInterceptor>();
452+
interceptorStack = new ArrayDeque<>();
449453
message = interceptors.preSend(message, this, interceptorStack);
450454
if (message == null) {
451455
return false;
@@ -458,13 +462,7 @@ public boolean send(Message<?> message, long timeout) {
458462
}
459463
sent = doSend(message, timeout);
460464
if (sample != null) {
461-
sample.stop(Timer.builder(SEND_TIMER_NAME)
462-
.tag("type", "channel")
463-
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
464-
.tag("result", sent ? "success" : "failure")
465-
.tag("exception", "none")
466-
.description("Subflow process time")
467-
.register(this.meterRegistry));
465+
sample.stop(sendTimer(sent));
468466
}
469467
channelMetrics.afterSend(metrics, sent);
470468
metricsProcessed = true;
@@ -485,13 +483,7 @@ public boolean send(Message<?> message, long timeout) {
485483
catch (Exception e) {
486484
if (countsEnabled && !metricsProcessed) {
487485
if (sample != null) {
488-
sample.stop(Timer.builder(SEND_TIMER_NAME)
489-
.tag("type", "channel")
490-
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
491-
.tag("result", "failure")
492-
.tag("exception", e.getClass().getSimpleName())
493-
.description("Subflow process time")
494-
.register(this.meterRegistry));
486+
sample.stop(buildSendTimer(false, e.getClass().getSimpleName()));
495487
}
496488
channelMetrics.afterSend(metrics, false);
497489
}
@@ -506,6 +498,31 @@ public boolean send(Message<?> message, long timeout) {
506498
}
507499
}
508500

501+
private Timer sendTimer(boolean sent) {
502+
if (sent) {
503+
if (this.successTimer == null) {
504+
this.successTimer = buildSendTimer(true, "none");
505+
}
506+
return this.successTimer;
507+
}
508+
else {
509+
if (this.failureTimer == null) {
510+
this.failureTimer = buildSendTimer(false, "none");
511+
}
512+
return this.failureTimer;
513+
}
514+
}
515+
516+
private Timer buildSendTimer(boolean success, String exception) {
517+
return Timer.builder(SEND_TIMER_NAME)
518+
.tag("type", "channel")
519+
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
520+
.tag("result", success ? "success" : "failure")
521+
.tag("exception", exception)
522+
.description("Send processing time")
523+
.register(this.meterRegistry);
524+
}
525+
509526
private Message<?> convertPayloadIfNecessary(Message<?> message) {
510527
// first pass checks if the payload type already matches any of the datatypes
511528
for (Class<?> datatype : this.datatypes) {

spring-integration-core/src/main/java/org/springframework/integration/channel/AbstractPollableChannel.java

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ public abstract class AbstractPollableChannel extends AbstractMessageChannel
4242

4343
private volatile int executorInterceptorsSize;
4444

45+
private Counter receiveCounter;
46+
4547
@Override
4648
public int getReceiveCount() {
4749
return getMetrics().getReceiveCount();
@@ -107,13 +109,7 @@ public Message<?> receive(long timeout) {
107109
Message<?> message = this.doReceive(timeout);
108110
if (countsEnabled && message != null) {
109111
if (getMeterRegistry() != null) {
110-
Counter.builder(RECEIVE_COUNTER_NAME)
111-
.tag("name", getComponentName())
112-
.tag("type", "channel")
113-
.tag("result", "success")
114-
.tag("exception", "none")
115-
.description("Messages received")
116-
.register(getMeterRegistry()).increment();
112+
incrementReceiveCounter();
117113
}
118114
getMetrics().afterReceive();
119115
counted = true;
@@ -134,12 +130,13 @@ else if (logger.isTraceEnabled()) {
134130
if (countsEnabled && !counted) {
135131
if (getMeterRegistry() != null) {
136132
Counter.builder(RECEIVE_COUNTER_NAME)
137-
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
138-
.tag("type", "channel")
139-
.tag("result", "failure")
140-
.tag("exception", e.getClass().getSimpleName())
141-
.description("Messages received")
142-
.register(getMeterRegistry()).increment();
133+
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
134+
.tag("type", "channel")
135+
.tag("result", "failure")
136+
.tag("exception", e.getClass().getSimpleName())
137+
.description("Messages received")
138+
.register(getMeterRegistry())
139+
.increment();
143140
}
144141
getMetrics().afterError();
145142
}
@@ -150,6 +147,19 @@ else if (logger.isTraceEnabled()) {
150147
}
151148
}
152149

150+
private void incrementReceiveCounter() {
151+
if (this.receiveCounter == null) {
152+
this.receiveCounter = Counter.builder(RECEIVE_COUNTER_NAME)
153+
.tag("name", getComponentName())
154+
.tag("type", "channel")
155+
.tag("result", "success")
156+
.tag("exception", "none")
157+
.description("Messages received")
158+
.register(getMeterRegistry());
159+
}
160+
this.receiveCounter.increment();
161+
}
162+
153163
@Override
154164
public void setInterceptors(List<ChannelInterceptor> interceptors) {
155165
super.setInterceptors(interceptors);

spring-integration-core/src/main/java/org/springframework/integration/channel/NullChannel.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -45,6 +45,7 @@
4545
*
4646
* @author Mark Fisher
4747
* @author Gary Russell
48+
* @author Artyem Bilan
4849
*/
4950
@IntegrationManagedResource
5051
public class NullChannel implements PollableChannel, MessageChannelMetrics,
@@ -66,6 +67,8 @@ public class NullChannel implements PollableChannel, MessageChannelMetrics,
6667

6768
private MeterRegistry meterRegistry;
6869

70+
private Timer successTimer;
71+
6972
@Override
7073
public void setBeanName(String beanName) {
7174
this.beanName = beanName;
@@ -221,29 +224,36 @@ public ManagementOverrides getOverrides() {
221224
return this.managementOverrides;
222225
}
223226

227+
@Override
228+
public boolean send(Message<?> message, long timeout) {
229+
return send(message);
230+
}
231+
224232
@Override
225233
public boolean send(Message<?> message) {
226234
if (this.loggingEnabled && this.logger.isDebugEnabled()) {
227235
this.logger.debug("message sent to null channel: " + message);
228236
}
229237
if (this.countsEnabled) {
230238
if (this.meterRegistry != null) {
231-
Timer.builder(SEND_TIMER_NAME)
232-
.tag("type", "channel")
233-
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
234-
.tag("result", "success")
235-
.tag("exception", "none")
236-
.description("Subflow process time")
237-
.register(this.meterRegistry).record(0, TimeUnit.MILLISECONDS);
239+
sendTimer().record(0, TimeUnit.MILLISECONDS);
238240
}
239241
this.channelMetrics.afterSend(this.channelMetrics.beforeSend(), true);
240242
}
241243
return true;
242244
}
243245

244-
@Override
245-
public boolean send(Message<?> message, long timeout) {
246-
return this.send(message);
246+
private Timer sendTimer() {
247+
if (this.successTimer == null) {
248+
this.successTimer = Timer.builder(SEND_TIMER_NAME)
249+
.tag("type", "channel")
250+
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
251+
.tag("result", "success")
252+
.tag("exception", "none")
253+
.description("Subflow process time")
254+
.register(this.meterRegistry);
255+
}
256+
return this.successTimer;
247257
}
248258

249259
@Override

spring-integration-core/src/main/java/org/springframework/integration/endpoint/AbstractMessageSource.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -60,14 +60,14 @@ public abstract class AbstractMessageSource<T> extends AbstractExpressionEvaluat
6060

6161
private String managedName;
6262

63-
private Counter counter;
64-
6563
private volatile boolean countsEnabled;
6664

6765
private volatile boolean loggingEnabled = true;
6866

6967
private MeterRegistry meterRegistry;
7068

69+
private Counter receiveCounter;
70+
7171
public void setHeaderExpressions(Map<String, Expression> headerExpressions) {
7272
this.headerExpressions = (headerExpressions != null)
7373
? headerExpressions : Collections.emptyMap();
@@ -130,11 +130,6 @@ public void setLoggingEnabled(boolean loggingEnabled) {
130130
this.managementOverrides.loggingConfigured = true;
131131
}
132132

133-
@Override
134-
public void setCounter(Counter counter) {
135-
this.counter = counter;
136-
}
137-
138133
@Override
139134
public void reset() {
140135
this.messageCount.set(0);
@@ -200,19 +195,26 @@ else if (result != null) {
200195
}
201196
if (this.countsEnabled && message != null) {
202197
if (this.meterRegistry != null) {
203-
Counter.builder(RECEIVE_COUNTER_NAME)
204-
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
205-
.tag("type", "source")
206-
.tag("result", "success")
207-
.tag("exception", "none")
208-
.description("Messages received")
209-
.register(this.meterRegistry).increment();
198+
incrementReceiveCounter();
210199
}
211200
this.messageCount.incrementAndGet();
212201
}
213202
return message;
214203
}
215204

205+
private void incrementReceiveCounter() {
206+
if (this.receiveCounter == null) {
207+
this.receiveCounter = Counter.builder(RECEIVE_COUNTER_NAME)
208+
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
209+
.tag("type", "source")
210+
.tag("result", "success")
211+
.tag("exception", "none")
212+
.description("Messages received")
213+
.register(this.meterRegistry);
214+
}
215+
this.receiveCounter.increment();
216+
}
217+
216218
private Map<String, Object> evaluateHeaders() {
217219
Map<String, Object> results = new HashMap<>();
218220
for (Map.Entry<String, Expression> entry : this.headerExpressions.entrySet()) {

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageHandler.java

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,12 @@
5050
* @author Mark Fisher
5151
* @author Oleg Zhurakousky
5252
* @author Gary Russell
53+
* @author Artem Bilan
5354
*/
5455
@IntegrationManagedResource
55-
public abstract class AbstractMessageHandler extends IntegrationObjectSupport implements MessageHandler,
56-
MessageHandlerMetrics, ConfigurableMetricsAware<AbstractMessageHandlerMetrics>, TrackableComponent, Orderable,
57-
CoreSubscriber<Message<?>> {
56+
public abstract class AbstractMessageHandler extends IntegrationObjectSupport
57+
implements MessageHandler, MessageHandlerMetrics, ConfigurableMetricsAware<AbstractMessageHandlerMetrics>,
58+
TrackableComponent, Orderable, CoreSubscriber<Message<?>> {
5859

5960
private final ManagementOverrides managementOverrides = new ManagementOverrides();
6061

@@ -76,6 +77,8 @@ public abstract class AbstractMessageHandler extends IntegrationObjectSupport im
7677

7778
private MeterRegistry meterRegistry;
7879

80+
private Timer successTimer;
81+
7982
@Override
8083
public boolean isLoggingEnabled() {
8184
return this.loggingEnabled;
@@ -147,19 +150,13 @@ public void handleMessage(Message<?> message) {
147150
}
148151
try {
149152
if (this.shouldTrack) {
150-
message = MessageHistory.write(message, this, this.getMessageBuilderFactory());
153+
message = MessageHistory.write(message, this, getMessageBuilderFactory());
151154
}
152155
if (countsEnabled) {
153156
start = handlerMetrics.beforeHandle();
154157
handleMessageInternal(message);
155158
if (this.meterRegistry != null) {
156-
sample.stop(Timer.builder(SEND_TIMER_NAME)
157-
.tag("type", "handler")
158-
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
159-
.tag("result", "success")
160-
.tag("exception", "none")
161-
.description("Subflow process time")
162-
.register(this.meterRegistry));
159+
sample.stop(sendTimer());
163160
}
164161
handlerMetrics.afterHandle(start, true);
165162
}
@@ -169,13 +166,7 @@ public void handleMessage(Message<?> message) {
169166
}
170167
catch (Exception e) {
171168
if (sample != null) {
172-
sample.stop(Timer.builder(SEND_TIMER_NAME)
173-
.tag("type", "handler")
174-
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
175-
.tag("result", "failure")
176-
.tag("exception", e.getClass().getSimpleName())
177-
.description("Subflow process time")
178-
.register(this.meterRegistry));
169+
sample.stop(buildSendTimer(false, e.getClass().getSimpleName()));
179170
}
180171
if (countsEnabled) {
181172
handlerMetrics.afterHandle(start, false);
@@ -187,6 +178,23 @@ public void handleMessage(Message<?> message) {
187178
}
188179
}
189180

181+
private Timer sendTimer() {
182+
if (this.successTimer == null) {
183+
this.successTimer = buildSendTimer(true, "none");
184+
}
185+
return this.successTimer;
186+
}
187+
188+
private Timer buildSendTimer(boolean success, String exception) {
189+
return Timer.builder(SEND_TIMER_NAME)
190+
.tag("type", "handler")
191+
.tag("name", getComponentName() == null ? "unknown" : getComponentName())
192+
.tag("result", success ? "success" : "failure")
193+
.tag("exception", exception)
194+
.description("Send processing time")
195+
.register(this.meterRegistry);
196+
}
197+
190198
@Override
191199
public void onSubscribe(Subscription subscription) {
192200
Assert.notNull(subscription, "'subscription' must not be null");

0 commit comments

Comments
 (0)