Skip to content

Commit 3c0fad1

Browse files
hudeqiCerchie
authored andcommitted
KAFKA-15139: Avoid slow Set.removeAll(List) in MirrorCheckpointConnector (apache#13946)
Reviewed-by: Greg Harris <[email protected]>
1 parent 3fac756 commit 3c0fad1

File tree

2 files changed

+26
-26
lines changed

2 files changed

+26
-26
lines changed

connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@
2929
import org.slf4j.Logger;
3030
import org.slf4j.LoggerFactory;
3131

32+
import java.util.ArrayList;
3233
import java.util.Collection;
3334
import java.util.Collections;
3435
import java.util.HashSet;
35-
import java.util.LinkedList;
3636
import java.util.List;
3737
import java.util.Map;
3838
import java.util.Set;
@@ -55,14 +55,14 @@ public class MirrorCheckpointConnector extends SourceConnector {
5555
private Admin sourceAdminClient;
5656
private Admin targetAdminClient;
5757
private SourceAndTarget sourceAndTarget;
58-
private List<String> knownConsumerGroups = Collections.emptyList();
58+
private Set<String> knownConsumerGroups = Collections.emptySet();
5959

6060
public MirrorCheckpointConnector() {
6161
// nop
6262
}
6363

6464
// visible for testing
65-
MirrorCheckpointConnector(List<String> knownConsumerGroups, MirrorCheckpointConfig config) {
65+
MirrorCheckpointConnector(Set<String> knownConsumerGroups, MirrorCheckpointConfig config) {
6666
this.knownConsumerGroups = knownConsumerGroups;
6767
this.config = config;
6868
}
@@ -116,7 +116,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
116116
return Collections.emptyList();
117117
}
118118
int numTasks = Math.min(maxTasks, knownConsumerGroups.size());
119-
List<List<String>> groupsPartitioned = ConnectorUtils.groupPartitions(knownConsumerGroups, numTasks);
119+
List<List<String>> groupsPartitioned = ConnectorUtils.groupPartitions(new ArrayList<>(knownConsumerGroups), numTasks);
120120
return IntStream.range(0, numTasks)
121121
.mapToObj(i -> config.taskConfigForConsumerGroups(groupsPartitioned.get(i), i))
122122
.collect(Collectors.toList());
@@ -134,12 +134,10 @@ public String version() {
134134

135135
private void refreshConsumerGroups()
136136
throws InterruptedException, ExecutionException {
137-
List<String> consumerGroups = findConsumerGroups();
138-
Set<String> newConsumerGroups = new HashSet<>();
139-
newConsumerGroups.addAll(consumerGroups);
137+
Set<String> consumerGroups = findConsumerGroups();
138+
Set<String> newConsumerGroups = new HashSet<>(consumerGroups);
140139
newConsumerGroups.removeAll(knownConsumerGroups);
141-
Set<String> deadConsumerGroups = new HashSet<>();
142-
deadConsumerGroups.addAll(knownConsumerGroups);
140+
Set<String> deadConsumerGroups = new HashSet<>(knownConsumerGroups);
143141
deadConsumerGroups.removeAll(consumerGroups);
144142
if (!newConsumerGroups.isEmpty() || !deadConsumerGroups.isEmpty()) {
145143
log.info("Found {} consumer groups for {}. {} are new. {} were removed. Previously had {}.",
@@ -156,15 +154,15 @@ private void loadInitialConsumerGroups()
156154
knownConsumerGroups = findConsumerGroups();
157155
}
158156

159-
List<String> findConsumerGroups()
157+
Set<String> findConsumerGroups()
160158
throws InterruptedException, ExecutionException {
161159
List<String> filteredGroups = listConsumerGroups().stream()
162160
.map(ConsumerGroupListing::groupId)
163161
.filter(this::shouldReplicateByGroupFilter)
164162
.collect(Collectors.toList());
165163

166-
List<String> checkpointGroups = new LinkedList<>();
167-
List<String> irrelevantGroups = new LinkedList<>();
164+
Set<String> checkpointGroups = new HashSet<>();
165+
Set<String> irrelevantGroups = new HashSet<>();
168166

169167
for (String group : filteredGroups) {
170168
Set<String> consumedTopics = listConsumerGroupOffsets(group).keySet().stream()

connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.kafka.common.TopicPartition;
2222
import org.junit.jupiter.api.Test;
2323

24-
import java.util.ArrayList;
2524
import java.util.Arrays;
2625
import java.util.Collection;
2726
import java.util.Collections;
@@ -49,7 +48,7 @@ public void testMirrorCheckpointConnectorDisabled() {
4948
MirrorCheckpointConfig config = new MirrorCheckpointConfig(
5049
makeProps("emit.checkpoints.enabled", "false"));
5150

52-
List<String> knownConsumerGroups = new ArrayList<>();
51+
Set<String> knownConsumerGroups = new HashSet<>();
5352
knownConsumerGroups.add(CONSUMER_GROUP);
5453
// MirrorCheckpointConnector as minimum to run taskConfig()
5554
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups,
@@ -65,7 +64,7 @@ public void testMirrorCheckpointConnectorEnabled() {
6564
MirrorCheckpointConfig config = new MirrorCheckpointConfig(
6665
makeProps("emit.checkpoints.enabled", "true"));
6766

68-
List<String> knownConsumerGroups = new ArrayList<>();
67+
Set<String> knownConsumerGroups = new HashSet<>();
6968
knownConsumerGroups.add(CONSUMER_GROUP);
7069
// MirrorCheckpointConnector as minimum to run taskConfig()
7170
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups,
@@ -81,7 +80,7 @@ public void testMirrorCheckpointConnectorEnabled() {
8180
@Test
8281
public void testNoConsumerGroup() {
8382
MirrorCheckpointConfig config = new MirrorCheckpointConfig(makeProps());
84-
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(new ArrayList<>(), config);
83+
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(new HashSet<>(), config);
8584
List<Map<String, String>> output = connector.taskConfigs(1);
8685
// expect no task will be created
8786
assertEquals(0, output.size(), "ConsumerGroup shouldn't exist");
@@ -92,7 +91,7 @@ public void testReplicationDisabled() {
9291
// disable the replication
9392
MirrorCheckpointConfig config = new MirrorCheckpointConfig(makeProps("enabled", "false"));
9493

95-
List<String> knownConsumerGroups = new ArrayList<>();
94+
Set<String> knownConsumerGroups = new HashSet<>();
9695
knownConsumerGroups.add(CONSUMER_GROUP);
9796
// MirrorCheckpointConnector as minimum to run taskConfig()
9897
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups, config);
@@ -106,7 +105,7 @@ public void testReplicationEnabled() {
106105
// enable the replication
107106
MirrorCheckpointConfig config = new MirrorCheckpointConfig(makeProps("enabled", "true"));
108107

109-
List<String> knownConsumerGroups = new ArrayList<>();
108+
Set<String> knownConsumerGroups = new HashSet<>();
110109
knownConsumerGroups.add(CONSUMER_GROUP);
111110
// MirrorCheckpointConnector as minimum to run taskConfig()
112111
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(knownConsumerGroups, config);
@@ -120,7 +119,7 @@ public void testReplicationEnabled() {
120119
@Test
121120
public void testFindConsumerGroups() throws Exception {
122121
MirrorCheckpointConfig config = new MirrorCheckpointConfig(makeProps());
123-
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(Collections.emptyList(), config);
122+
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(Collections.emptySet(), config);
124123
connector = spy(connector);
125124

126125
Collection<ConsumerGroupListing> groups = Arrays.asList(
@@ -132,21 +131,21 @@ public void testFindConsumerGroups() throws Exception {
132131
doReturn(true).when(connector).shouldReplicateByTopicFilter(anyString());
133132
doReturn(true).when(connector).shouldReplicateByGroupFilter(anyString());
134133
doReturn(offsets).when(connector).listConsumerGroupOffsets(anyString());
135-
List<String> groupFound = connector.findConsumerGroups();
134+
Set<String> groupFound = connector.findConsumerGroups();
136135

137136
Set<String> expectedGroups = groups.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toSet());
138-
assertEquals(expectedGroups, new HashSet<>(groupFound),
137+
assertEquals(expectedGroups, groupFound,
139138
"Expected groups are not the same as findConsumerGroups");
140139

141140
doReturn(false).when(connector).shouldReplicateByTopicFilter(anyString());
142-
List<String> topicFilterGroupFound = connector.findConsumerGroups();
143-
assertEquals(Collections.emptyList(), topicFilterGroupFound);
141+
Set<String> topicFilterGroupFound = connector.findConsumerGroups();
142+
assertEquals(Collections.emptySet(), topicFilterGroupFound);
144143
}
145144

146145
@Test
147146
public void testFindConsumerGroupsInCommonScenarios() throws Exception {
148147
MirrorCheckpointConfig config = new MirrorCheckpointConfig(makeProps());
149-
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(Collections.emptyList(), config);
148+
MirrorCheckpointConnector connector = new MirrorCheckpointConnector(Collections.emptySet(), config);
150149
connector = spy(connector);
151150

152151
Collection<ConsumerGroupListing> groups = Arrays.asList(
@@ -177,8 +176,11 @@ public void testFindConsumerGroupsInCommonScenarios() throws Exception {
177176
doReturn(offsetsForGroup3).when(connector).listConsumerGroupOffsets("g3");
178177
doReturn(offsetsForGroup4).when(connector).listConsumerGroupOffsets("g4");
179178

180-
List<String> groupFound = connector.findConsumerGroups();
181-
assertEquals(groupFound, Arrays.asList("g1", "g2"));
179+
Set<String> groupFound = connector.findConsumerGroups();
180+
Set<String> verifiedSet = new HashSet<>();
181+
verifiedSet.add("g1");
182+
verifiedSet.add("g2");
183+
assertEquals(groupFound, verifiedSet);
182184
}
183185

184186
}

0 commit comments

Comments
 (0)