From e6fc0ca16aad119f0556fd4eedc9fd63bafb9713 Mon Sep 17 00:00:00 2001 From: iliya Date: Mon, 8 Dec 2025 19:15:41 +0300 Subject: [PATCH] ARTEMIS-5800 Fix AMQP session leak When a connection is disconnected, it should be destroyed, otherwise the AMQP session will not be closed on the local close event, which can cause a session leak. --- .../ActiveMQProtonRemotingConnection.java | 6 ++++ .../integration/amqp/AmqpSessionTest.java | 33 +++++++++++++++---- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java index 42ce9804312..ad514e1d1e8 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ActiveMQProtonRemotingConnection.java @@ -144,6 +144,12 @@ public void destroy() { @Override public void disconnect(boolean criticalError) { + if (destroyed) { + return; + } + + destroyed = true; + ErrorCondition errorCondition = new ErrorCondition(); errorCondition.setCondition(AmqpSupport.CONNECTION_FORCED); amqpConnection.close(errorCondition); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java index 54b17db7643..09a7b83579a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSessionTest.java @@ -16,15 +16,10 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -import java.lang.invoke.MethodHandles; -import java.util.concurrent.TimeUnit; - import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; import org.apache.activemq.transport.amqp.client.AmqpReceiver; @@ -39,6 +34,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.invoke.MethodHandles; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + public class AmqpSessionTest extends AmqpClientTestSupport { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -113,6 +115,23 @@ public void testCreateSessionProducerConsumerDoesNotLeakClosable() throws Except connection.close(); } + @Test + public void testServerSessionCloseOnRemotingConnectionDisconnect() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + assertNotNull(session); + + for (RemotingConnection remoteConnection : server.getRemotingService().getConnections()) { + remoteConnection.disconnect(true); + } + + Wait.assertTrue(connection::isClosed); + + assertEquals(0, server.getSessions().size()); + } + @Test public void testSessionClosedOnServerEndsClientSession() throws Exception { doTestSessionClosedOnServerEndsClientSession(false, false);