Skip to content

Commit

Permalink
feat(connect): optimize fetch connect details & status (#587)
Browse files Browse the repository at this point in the history
close #586
  • Loading branch information
tchiotludo authored Feb 3, 2021
1 parent d9557b0 commit 55a0e80
Showing 1 changed file with 12 additions and 14 deletions.
26 changes: 12 additions & 14 deletions src/main/java/org/akhq/repositories/ConnectRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
import org.akhq.models.ConnectPlugin;
import org.akhq.modules.KafkaModule;
import org.akhq.utils.UserGroupUtils;
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorPlugin;
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorPluginConfigDefinition;
import org.sourcelab.kafka.connect.apiclient.request.dto.NewConnectorDefinition;
import org.sourcelab.kafka.connect.apiclient.request.dto.*;
import org.sourcelab.kafka.connect.apiclient.rest.exceptions.ConcurrentConfigModificationException;
import org.sourcelab.kafka.connect.apiclient.rest.exceptions.InvalidRequestException;
import org.sourcelab.kafka.connect.apiclient.rest.exceptions.ResourceNotFoundException;
Expand Down Expand Up @@ -68,24 +66,24 @@ public ConnectDefinition getDefinition(String clusterId, String connectId, Strin
ResourceNotFoundException.class
}, delay = "3s", attempts = "5")
public List<ConnectDefinition> getDefinitions(String clusterId, String connectId) {
Collection<String> unfiltered = this.kafkaModule
ConnectorsWithExpandedMetadata unfiltered = this.kafkaModule
.getConnectRestClient(clusterId)
.get(connectId)
.getConnectors();

ArrayList<String> filtered = new ArrayList<String>();
for (String item : unfiltered) {
if (isMatchRegex(getConnectFilterRegex(), item)) {
filtered.add(item);
.getConnectorsWithAllExpandedMetadata();

ArrayList<ConnectDefinition> filtered = new ArrayList<>();
for (ConnectorDefinition item : unfiltered.getAllDefinitions()) {
if (isMatchRegex(getConnectFilterRegex(), item.getName())) {
filtered.add(new ConnectDefinition(
item,
unfiltered.getStatusForConnector(item.getName())
));
}
}

return filtered.stream()
.map(s -> getDefinition(clusterId, connectId, s))
.collect(Collectors.toList());
return filtered;
}


public Optional<ConnectPlugin> getPlugin(String clusterId, String connectId, String className) {
return this.kafkaModule
.getConnectRestClient(clusterId)
Expand Down

0 comments on commit 55a0e80

Please sign in to comment.