diff --git a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/MQTTConnectionManager.java b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/MQTTConnectionManager.java index dfe71ea62..efd7b3c53 100644 --- a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/MQTTConnectionManager.java +++ b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/MQTTConnectionManager.java @@ -21,14 +21,16 @@ import io.streamnative.pulsar.handlers.mqtt.common.systemtopic.ConnectEvent; import io.streamnative.pulsar.handlers.mqtt.common.systemtopic.EventListener; import io.streamnative.pulsar.handlers.mqtt.common.systemtopic.MqttEvent; -import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.jetcd.shaded.io.vertx.core.impl.ConcurrentHashSet; /** * Proxy connection manager. @@ -38,7 +40,7 @@ public class MQTTConnectionManager { private final ConcurrentMap localConnections; - private final ConcurrentMap eventConnections; + private final ConcurrentHashSet eventClientIds; @Getter private static final HashedWheelTimer sessionExpireInterval = @@ -56,7 +58,7 @@ public class MQTTConnectionManager { public MQTTConnectionManager(String advertisedAddress) { this.advertisedAddress = advertisedAddress; this.localConnections = new ConcurrentHashMap<>(2048); - this.eventConnections = new ConcurrentHashMap<>(2048); + this.eventClientIds = new ConcurrentHashSet<>(2048); this.connectListener = new ConnectEventListener(); this.disconnectListener = new DisconnectEventListener(); } @@ -102,11 +104,11 @@ public Collection getLocalConnections() { return this.localConnections.values(); } - public Collection getAllConnections() { - Collection connections = new ArrayList<>(this.localConnections.values().size() - + this.eventConnections.values().size()); - connections.addAll(this.localConnections.values()); - connections.addAll(eventConnections.values()); + public Collection getAllConnectionsId() { + Set connections = new LinkedHashSet<>(this.localConnections.keySet().size() + + this.eventClientIds.size()); + connections.addAll(this.localConnections.keySet()); + connections.addAll(eventClientIds); return connections; } @@ -126,7 +128,7 @@ public void onChange(MqttEvent event) { log.warn("[ConnectEvent] close existing connection : {}", connection); connection.disconnect(); } else { - eventConnections.put(connectEvent.getClientId(), connection); + eventClientIds.add(connectEvent.getClientId()); } } } @@ -141,7 +143,7 @@ public void onChange(MqttEvent event) { if (event.getEventType() == DISCONNECT) { ConnectEvent connectEvent = (ConnectEvent) event.getSourceEvent(); if (!connectEvent.getAddress().equals(advertisedAddress)) { - eventConnections.remove(connectEvent.getClientId()); + eventClientIds.remove(connectEvent.getClientId()); } } } diff --git a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/systemtopic/SystemTopicBasedSystemEventService.java b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/systemtopic/SystemTopicBasedSystemEventService.java index 0a836aece..f68b203ae 100644 --- a/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/systemtopic/SystemTopicBasedSystemEventService.java +++ b/mqtt-common/src/main/java/io/streamnative/pulsar/handlers/mqtt/common/systemtopic/SystemTopicBasedSystemEventService.java @@ -237,7 +237,13 @@ private void refreshCache(Message msg) { default: break; } - listeners.forEach(listener -> listener.onChange(value)); + listeners.forEach(listener -> { + try { + listener.onChange(value); + } catch (Throwable e) { + log.error("Failed to process event : {}", value.getKey(), e); + } + }); } catch (Throwable ex) { log.error("refresh cache error", ex); } diff --git a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/Devices.java b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/Devices.java index a1fb9f5ca..b1ef0bf73 100644 --- a/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/Devices.java +++ b/mqtt-proxy/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/web/admin/Devices.java @@ -13,13 +13,11 @@ */ package io.streamnative.pulsar.handlers.mqtt.proxy.web.admin; -import io.streamnative.pulsar.handlers.mqtt.common.Connection; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; import java.util.Collection; -import java.util.stream.Collectors; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; @@ -45,9 +43,8 @@ public class Devices extends WebResource { @ApiResponse(code = 500, message = "Internal server error")}) public void getList(@Suspended final AsyncResponse asyncResponse) { try { - final Collection allConnections = service().getConnectionManager().getAllConnections(); - asyncResponse.resume(allConnections.stream().map(e -> - e.getClientId()).collect(Collectors.toList())); + final Collection allConnections = service().getConnectionManager().getAllConnectionsId(); + asyncResponse.resume(allConnections); } catch (Exception e) { log.error("[{}] Failed to list devices {}", clientAppId(), e); asyncResponse.resume(new RestException(e));