Skip to content

Commit 1f792a8

Browse files
KAFKA-14978 ExactlyOnceWorkerSourceTask should remove parent metrics (#13690)
Reviewers: Chris Egerton <[email protected]>, Viktor Somogyi-Vass <[email protected]> Co-authored-by: Dániel Urbán <[email protected]>
1 parent 49400b7 commit 1f792a8

File tree

2 files changed

+21
-0
lines changed

2 files changed

+21
-0
lines changed

connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ protected void finalOffsetCommit(boolean failed) {
233233
@Override
234234
public void removeMetrics() {
235235
Utils.closeQuietly(transactionMetrics, "source task transaction metrics tracker");
236+
super.removeMetrics();
236237
}
237238

238239
@Override

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.apache.kafka.clients.producer.Producer;
2020
import org.apache.kafka.clients.producer.RecordMetadata;
2121
import org.apache.kafka.common.KafkaException;
22+
import org.apache.kafka.common.MetricName;
2223
import org.apache.kafka.common.TopicPartition;
2324
import org.apache.kafka.common.errors.InvalidTopicException;
2425
import org.apache.kafka.common.errors.RecordTooLargeException;
@@ -82,7 +83,9 @@
8283
import java.util.concurrent.TimeoutException;
8384
import java.util.concurrent.atomic.AtomicReference;
8485
import java.util.function.Consumer;
86+
import java.util.stream.Collectors;
8587

88+
import static java.util.Collections.emptySet;
8689
import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG;
8790
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
8891
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
@@ -291,6 +294,23 @@ private void createWorkerTask(TargetState initialState, Converter keyConverter,
291294
sourceConfig, Runnable::run, preProducerCheck, postProducerCheck);
292295
}
293296

297+
@Test
298+
public void testRemoveMetrics() {
299+
createWorkerTask();
300+
301+
workerTask.removeMetrics();
302+
303+
assertEquals(emptySet(), filterToTaskMetrics(metrics.metrics().metrics().keySet()));
304+
}
305+
306+
private Set<MetricName> filterToTaskMetrics(Set<MetricName> metricNames) {
307+
return metricNames
308+
.stream()
309+
.filter(m -> metrics.registry().taskGroupName().equals(m.group())
310+
|| metrics.registry().sourceTaskGroupName().equals(m.group()))
311+
.collect(Collectors.toSet());
312+
}
313+
294314
@Test
295315
public void testStartPaused() throws Exception {
296316
createWorkerTask(TargetState.PAUSED);

0 commit comments

Comments
 (0)