Skip to content

Commit

Permalink
[pinpoint-apm#2702] Fix ActiveMQ integration tests
Browse files Browse the repository at this point in the history
Protocol is now collected when tracing end points and remote address
  • Loading branch information
Xylus committed Apr 19, 2017
1 parent 666fb9e commit a496f89
Showing 1 changed file with 28 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ public void testQueuePull() throws Exception {

// Wait till all traces are recorded (consumer traces are recorded from another thread)
awaitAndVerifyTraceCount(5, 5000L);
verifyProducerSendEvent(testQueue); // trace count : 1
verifyConsumerPullEvent(testQueue, consumer, expectedTextMessage); // trace count : 4
verifyProducerSendEvent(testQueue, producerSession); // trace count : 1
verifyConsumerPullEvent(testQueue, consumerSession, consumer, expectedTextMessage); // trace count : 4
}

@Test
Expand Down Expand Up @@ -116,9 +116,9 @@ public void testTopicPull() throws Exception {

// Wait till all traces are recorded (consumer traces are recorded from another thread)
awaitAndVerifyTraceCount(9, 5000L);
verifyProducerSendEvent(testTopic); // trace count : 1
verifyConsumerPullEvent(testTopic, consumer1, expectedTextMessage); // trace count : 4
verifyConsumerPullEvent(testTopic, consumer2, expectedTextMessage); // trace count : 4
verifyProducerSendEvent(testTopic, producerSession); // trace count : 1
verifyConsumerPullEvent(testTopic, consumer1Session, consumer1, expectedTextMessage); // trace count : 4
verifyConsumerPullEvent(testTopic, consumer2Session, consumer2, expectedTextMessage); // trace count : 4
}

@Test
Expand Down Expand Up @@ -146,8 +146,8 @@ public void testQueuePush() throws Exception {
assertNoConsumerError(consumerThrowables);
// Wait till all traces are recorded (consumer traces are recorded from another thread)
awaitAndVerifyTraceCount(2, 5000L);
verifyProducerSendEvent(testQueue); // trace count : 1
verifyConsumerPushEvent(testQueue); // trace count : 1
verifyProducerSendEvent(testQueue, producerSession); // trace count : 1
verifyConsumerPushEvent(testQueue, consumerSession); // trace count : 1
}

@Test
Expand Down Expand Up @@ -183,9 +183,9 @@ public void testTopicPush() throws Exception {
// Then
// Wait till all traces are recorded (consumer traces are recorded from another thread)
awaitAndVerifyTraceCount(3, 1000L);
verifyProducerSendEvent(testTopic); // trace count : 1
verifyConsumerPushEvent(testTopic); // trace count : 1
verifyConsumerPushEvent(testTopic); // trace count : 1
verifyProducerSendEvent(testTopic, producerSession); // trace count : 1
verifyConsumerPushEvent(testTopic, consumer1Session); // trace count : 1
verifyConsumerPushEvent(testTopic, consumer2Session); // trace count : 1
}

/**
Expand All @@ -195,19 +195,23 @@ public void testTopicPush() throws Exception {
* @param destination the destination to which the producer is sending the message
* @throws Exception
*/
private void verifyProducerSendEvent(ActiveMQDestination destination) throws Exception {
private void verifyProducerSendEvent(ActiveMQDestination destination, ActiveMQSession session) throws Exception {
PluginTestVerifier verifier = PluginTestVerifierHolder.getInstance();
verifier.printCache();
Class<?> messageProducerClass = Class.forName("org.apache.activemq.ActiveMQMessageProducer");
Method send = messageProducerClass.getDeclaredMethod("send", Destination.class, Message.class, int.class, int.class, long.class);
URI producerBrokerUri = new URI(getProducerBrokerUrl());
// URI producerBrokerUri = new URI(getProducerBrokerUrl());
// String expectedEndPoint = getProducerBrokerUri.getHost() + ":" + producerBrokerUri.getPort();
// String expectedEndPoint = producerBrokerUri.toString();
String expectedEndPoint = session.getConnection().getTransport().getRemoteAddress();
verifier.verifyDiscreteTrace(event(
ACTIVEMQ_CLIENT, // serviceType
send, // method
null, // rpc
producerBrokerUri.getHost() + ":" + producerBrokerUri.getPort(), // endPoint
expectedEndPoint, // endPoint
destination.getPhysicalName(), // destinationId
annotation("message.queue.url", destination.getQualifiedName()),
annotation("activemq.broker.address", producerBrokerUri.getHost() + ":" + producerBrokerUri.getPort())
annotation("activemq.broker.address", expectedEndPoint)
));
}

Expand All @@ -221,18 +225,20 @@ private void verifyProducerSendEvent(ActiveMQDestination destination) throws Exc
* @param expectedMessage the message the consumer is expected to receive
* @throws Exception
*/
private void verifyConsumerPullEvent(ActiveMQDestination destination, MessageConsumer consumer, Message expectedMessage) throws Exception {
private void verifyConsumerPullEvent(ActiveMQDestination destination, ActiveMQSession session, MessageConsumer consumer, Message expectedMessage) throws Exception {
PluginTestVerifier verifier = PluginTestVerifierHolder.getInstance();
verifier.printCache();
Class<?> messageConsumerClass = Class.forName("org.apache.activemq.ActiveMQMessageConsumer");
Method receiveWithTimeout = messageConsumerClass.getDeclaredMethod("receive", long.class);
URI consumerBrokerUri = new URI(getConsumerBrokerUrl());
// URI consumerBrokerUri = new URI(getConsumerBrokerUrl());
// String expectedEndPoint = consumerBrokerUri.toString();
String expectedEndPoint = session.getConnection().getTransport().getRemoteAddress();

ExpectedTrace consumerDispatchTrace = root(ACTIVEMQ_CLIENT, // serviceType
"ActiveMQ Consumer Invocation", // method
destination.getQualifiedName(), // rpc
null, // endPoint (collected but there's no easy way to retrieve local address)
consumerBrokerUri.getHost() + ":" + consumerBrokerUri.getPort());
expectedEndPoint);
ExpectedTrace consumerReceiveTrace = event(ACTIVEMQ_CLIENT_INTERNAL, // serviceType
receiveWithTimeout, // method
annotation("activemq.message", getMessageAsString(expectedMessage)));
Expand Down Expand Up @@ -260,15 +266,17 @@ private void verifyConsumerPullEvent(ActiveMQDestination destination, MessageCon
* @param destination the destination from which the consumer is receiving the message
* @throws Exception
*/
private void verifyConsumerPushEvent(ActiveMQDestination destination) throws Exception {
private void verifyConsumerPushEvent(ActiveMQDestination destination, ActiveMQSession session) throws Exception {
PluginTestVerifier verifier = PluginTestVerifierHolder.getInstance();
URI consumerBrokerUri = new URI(getConsumerBrokerUrl());
// URI consumerBrokerUri = new URI(getConsumerBrokerUrl());
// String expectedRemoteAddress = consumerBrokerUri.toString();
String expectedRemoteAddress = session.getConnection().getTransport().getRemoteAddress();
verifier.verifyDiscreteTrace(root(
ACTIVEMQ_CLIENT, // serviceType
"ActiveMQ Consumer Invocation", // method
destination.getQualifiedName(), // rpc
null, // endPoint (collected but there's no easy way to retrieve local address so skip check)
consumerBrokerUri.getHost() + ":" + consumerBrokerUri.getPort() // remoteAddress
expectedRemoteAddress // remoteAddress
));
}

Expand Down

0 comments on commit a496f89

Please sign in to comment.