Skip to content

Commit 27a57d0

Browse files
committed
ARTEMIS-5801 Add a subscription name option to the consumer CLI tool
Adds option 'subscriptionName' to the consumer CLI options to allow consuming from an existing durable subscription not created by the CLI
1 parent 568735d commit 27a57d0

3 files changed

Lines changed: 81 additions & 3 deletions

File tree

artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/Consumer.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ public class Consumer extends DestAbstract {
5050
@Option(names = "--data", description = "Serialize the messages to the specified file as they are consumed.")
5151
String file;
5252

53+
@Option(names = "--subscriptionName", description = "The subscription name to use for a durable consumer.")
54+
String subscriptionName;
55+
5356
@Override
5457
public Object execute(ActionContext context) throws Exception {
5558
super.execute(context);
@@ -78,6 +81,15 @@ public Object execute(ActionContext context) throws Exception {
7881
serializer.start();
7982
}
8083

84+
if (subscriptionName != null) {
85+
if (threads != 1) {
86+
context.err.println("Error: Cannot assign a subscription name when multiple threads are also configured.");
87+
return null;
88+
} else {
89+
context.out.println("Consumer:: subscription name = " + subscriptionName);
90+
}
91+
}
92+
8193
ConnectionFactory factory = createConnectionFactory();
8294

8395
try (Connection connection = factory.createConnection()) {
@@ -92,7 +104,11 @@ public Object execute(ActionContext context) throws Exception {
92104
}
93105

94106
Destination dest = getDestination(session);
95-
threadsArray[i] = new ConsumerThread(session, dest, i, context);
107+
if (subscriptionName == null) {
108+
threadsArray[i] = new ConsumerThread(session, dest, i, context);
109+
} else {
110+
threadsArray[i] = new ConsumerThread(session, dest, subscriptionName, context);
111+
}
96112

97113
threadsArray[i]
98114
.setVerbose(verbose)
@@ -191,4 +207,13 @@ public Consumer setFile(String file) {
191207
this.file = file;
192208
return this;
193209
}
210+
211+
public String getSubscriptionName() {
212+
return subscriptionName;
213+
}
214+
215+
public Consumer setSubscriptionName(String subscriptionName) {
216+
this.subscriptionName = subscriptionName;
217+
return this;
218+
}
194219
}

artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/messages/ConsumerThread.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,12 @@ public class ConsumerThread extends Thread {
5757
MessageListener listener;
5858

5959
public ConsumerThread(Session session, Destination destination, int threadNr, ActionContext context) {
60-
super("Consumer " + destination.toString() + ", thread=" + threadNr);
60+
this(session, destination, "Consumer " + destination.toString() + ", thread=" + threadNr, context);
61+
}
62+
63+
public ConsumerThread(Session session, Destination destination, String subscriptionName, ActionContext context) {
64+
super(subscriptionName);
65+
6166
this.destination = destination;
6267
this.session = session;
6368
this.context = context;

artemis-cli/src/test/java/org/apache/activemq/cli/test/CliConsumerTest.java

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@
1717
package org.apache.activemq.cli.test;
1818

1919
import javax.jms.Connection;
20-
20+
import javax.jms.Session;
2121
import org.apache.activemq.artemis.api.core.Pair;
22+
import org.apache.activemq.artemis.api.core.SimpleString;
2223
import org.apache.activemq.artemis.cli.commands.messages.Consumer;
2324
import org.apache.activemq.artemis.cli.commands.messages.Producer;
2425
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -29,6 +30,7 @@
2930
import org.junit.jupiter.api.BeforeEach;
3031
import org.junit.jupiter.api.Test;
3132

33+
import static org.junit.jupiter.api.Assertions.assertNotNull;
3234
import static org.junit.jupiter.api.Assertions.assertTrue;
3335

3436
public class CliConsumerTest extends CliTestBase {
@@ -113,4 +115,50 @@ private void sendAndConsume(long messageCount, int timeout) throws Exception {
113115

114116
Wait.assertEquals(0L, () -> server.locateQueue(address).getMessageCount(), 2000, 50);
115117
}
118+
119+
@Test
120+
public void testConsumeFromExistingDurableSubscription() throws Exception {
121+
final String address = "test-topic";
122+
final String addressPrefix = "topic://";
123+
final String clientID = "test-client";
124+
final String subscriptionName = "test-sub";
125+
final String credentials = "admin";
126+
final TestActionContext context = new TestActionContext();
127+
128+
// Creates the durable subscription to consumer from using the CLI tool.
129+
try (Connection connection = cf.createConnection(credentials, credentials)) {
130+
131+
connection.setClientID(clientID);
132+
connection.start();
133+
134+
final Session session = createSession(connection);
135+
136+
session.createDurableConsumer(session.createTopic(address), subscriptionName);
137+
}
138+
139+
produceMessages(addressPrefix + address, TEST_MESSAGE_COUNT);
140+
141+
server.addressQuery(SimpleString.of(address));
142+
143+
final String subscriptionQueueName = server.bindingQuery(SimpleString.of(address)).getQueueNames().get(0).toString();
144+
assertNotNull(subscriptionQueueName);
145+
final org.apache.activemq.artemis.core.server.Queue subscriptionQueue = server.locateQueue(subscriptionQueueName);
146+
Wait.assertEquals((long) TEST_MESSAGE_COUNT, () -> subscriptionQueue.getMessageCount(), 2000, 50);
147+
148+
// Consume from the durable subscription with messages added.
149+
new Consumer()
150+
.setSubscriptionName(subscriptionName)
151+
.setReceiveTimeout(100)
152+
.setBreakOnNull(true)
153+
.setDurable(true)
154+
.setMessageCount(TEST_MESSAGE_COUNT)
155+
.setDestination(addressPrefix + address)
156+
.setClientID(clientID)
157+
.setUser(credentials)
158+
.setPassword(credentials)
159+
.execute(context);
160+
161+
Wait.assertTrue(() -> context.getStdout().contains("subscription name"), 2000, 100);
162+
Wait.assertEquals(0L, () -> subscriptionQueue.getMessageCount(), 2000, 50);
163+
}
116164
}

0 commit comments

Comments
 (0)