Skip to content

Commit c252543

Browse files
authored
Merge pull request #1 from queue-interop/amqp-basic-consume-specs
[amqp] Add specs for AMQP's basic consume method.
2 parents 1a03ced + 3868e84 commit c252543

6 files changed

+384
-1
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
<?php
2+
3+
namespace Interop\Queue\Spec\Amqp;
4+
5+
use Interop\Amqp\AmqpConsumer;
6+
use Interop\Amqp\AmqpContext;
7+
use Interop\Amqp\AmqpMessage;
8+
use Interop\Amqp\AmqpQueue;
9+
use PHPUnit\Framework\TestCase;
10+
11+
/**
12+
* @group functional
13+
*/
14+
abstract class BasicConsumeBreakOnFalseSpec extends TestCase
15+
{
16+
/**
17+
* @var AmqpContext
18+
*/
19+
private $context;
20+
21+
public function tearDown()
22+
{
23+
if ($this->context) {
24+
$this->context->close();
25+
}
26+
27+
parent::tearDown();
28+
}
29+
30+
public function test()
31+
{
32+
$this->context = $context = $this->createContext();
33+
$fooQueue = $this->createQueue($context, 'foo_basic_consume_break_on_false_spec');
34+
$barQueue = $this->createQueue($context, 'bar_basic_consume_break_on_false_spec');
35+
36+
$expectedFooBody = __CLASS__.'foo'.time();
37+
$expectedBarBody = __CLASS__.'bar'.time();
38+
39+
$context->createProducer()->send($fooQueue, $context->createMessage($expectedFooBody));
40+
$context->createProducer()->send($barQueue, $context->createMessage($expectedBarBody));
41+
42+
$consumedMessages = 0;
43+
$callback = function(AmqpMessage $message, AmqpConsumer $consumer) use (&$consumedMessages) {
44+
$consumedMessages++;
45+
46+
$consumer->acknowledge($message);
47+
48+
return false;
49+
};
50+
51+
$fooConsumer = $context->createConsumer($fooQueue);
52+
$barConsumer = $context->createConsumer($barQueue);
53+
54+
$context->basicConsumeSubscribe($fooConsumer, $callback);
55+
$context->basicConsumeSubscribe($barConsumer, $callback);
56+
$context->basicConsume(1000);
57+
58+
$this->assertEquals(1, $consumedMessages);
59+
}
60+
61+
/**
62+
* @return AmqpContext
63+
*/
64+
abstract protected function createContext();
65+
66+
/**
67+
* @param AmqpContext $context
68+
* @param string $queueName
69+
*
70+
* @return AmqpQueue
71+
*/
72+
protected function createQueue(AmqpContext $context, $queueName)
73+
{
74+
$queue = $context->createQueue($queueName);
75+
$context->declareQueue($queue);
76+
$context->purgeQueue($queue);
77+
78+
return $queue;
79+
}
80+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
<?php
2+
3+
namespace Interop\Queue\Spec\Amqp;
4+
5+
use Interop\Amqp\AmqpConsumer;
6+
use Interop\Amqp\AmqpContext;
7+
use Interop\Amqp\AmqpMessage;
8+
use Interop\Amqp\AmqpQueue;
9+
use PHPUnit\Framework\TestCase;
10+
11+
/**
12+
* @group functional
13+
*/
14+
abstract class BasicConsumeFromAllSubscribedQueuesSpec extends TestCase
15+
{
16+
/**
17+
* @var AmqpContext
18+
*/
19+
private $context;
20+
21+
public function tearDown()
22+
{
23+
if ($this->context) {
24+
$this->context->close();
25+
}
26+
27+
parent::tearDown();
28+
}
29+
30+
public function test()
31+
{
32+
$this->context = $context = $this->createContext();
33+
$fooQueue = $this->createQueue($context, 'foo_basic_consume_from_all_subscribed_queues_spec');
34+
$barQueue = $this->createQueue($context, 'bar_basic_consume_from_all_subscribed_queues_spec');
35+
36+
$expectedFooBody = 'fooBody';
37+
$expectedBarBody = 'barBody';
38+
39+
$context->createProducer()->send($fooQueue, $context->createMessage($expectedFooBody));
40+
$context->createProducer()->send($barQueue, $context->createMessage($expectedBarBody));
41+
42+
$fooConsumer = $context->createConsumer($fooQueue);
43+
$barConsumer = $context->createConsumer($barQueue);
44+
45+
$actualBodies = [];
46+
$actualQueues = [];
47+
$callback = function(AmqpMessage $message, AmqpConsumer $consumer) use (&$actualBodies, &$actualQueues) {
48+
$actualBodies[] = $message->getBody();
49+
$actualQueues[] = $consumer->getQueue()->getQueueName();
50+
51+
$consumer->acknowledge($message);
52+
53+
return true;
54+
};
55+
56+
$context->basicConsumeSubscribe($fooConsumer, $callback);
57+
$context->basicConsumeSubscribe($barConsumer, $callback);
58+
$context->basicConsume(1000);
59+
60+
$this->assertEquals([$expectedFooBody, $expectedBarBody], $actualBodies);
61+
$this->assertEquals(
62+
[
63+
'foo_basic_consume_from_all_subscribed_queues_spec',
64+
'bar_basic_consume_from_all_subscribed_queues_spec'
65+
],
66+
$actualQueues
67+
);
68+
}
69+
70+
/**
71+
* @return AmqpContext
72+
*/
73+
abstract protected function createContext();
74+
75+
/**
76+
* @param AmqpContext $context
77+
* @param string $queueName
78+
*
79+
* @return AmqpQueue
80+
*/
81+
protected function createQueue(AmqpContext $context, $queueName)
82+
{
83+
$queue = $context->createQueue($queueName);
84+
$context->declareQueue($queue);
85+
$context->purgeQueue($queue);
86+
87+
return $queue;
88+
}
89+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
<?php
2+
3+
namespace Interop\Queue\Spec\Amqp;
4+
5+
use Interop\Amqp\AmqpConsumer;
6+
use Interop\Amqp\AmqpContext;
7+
use Interop\Amqp\AmqpMessage;
8+
use Interop\Amqp\AmqpQueue;
9+
use PHPUnit\Framework\TestCase;
10+
11+
/**
12+
* @group functional
13+
*/
14+
abstract class BasicConsumeShouldAddConsumerTagOnSubscribeSpec extends TestCase
15+
{
16+
/**
17+
* @var AmqpContext
18+
*/
19+
private $context;
20+
21+
public function tearDown()
22+
{
23+
if ($this->context) {
24+
$this->context->close();
25+
}
26+
27+
parent::tearDown();
28+
}
29+
30+
public function test()
31+
{
32+
$this->context = $context = $this->createContext();
33+
$queue = $this->createQueue($context, 'basic_consume_should_add_consumer_tag_on_subscribe_spec');
34+
35+
$consumer = $context->createConsumer($queue);
36+
37+
$context->basicConsumeSubscribe($consumer, function() {});
38+
39+
$this->assertNotEmpty($consumer->getConsumerTag());
40+
}
41+
42+
/**
43+
* @return AmqpContext
44+
*/
45+
abstract protected function createContext();
46+
47+
/**
48+
* @param AmqpContext $context
49+
* @param string $queueName
50+
*
51+
* @return AmqpQueue
52+
*/
53+
protected function createQueue(AmqpContext $context, $queueName)
54+
{
55+
$queue = $context->createQueue($queueName);
56+
$context->declareQueue($queue);
57+
$context->purgeQueue($queue);
58+
59+
return $queue;
60+
}
61+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
<?php
2+
3+
namespace Interop\Queue\Spec\Amqp;
4+
5+
use Interop\Amqp\AmqpConsumer;
6+
use Interop\Amqp\AmqpContext;
7+
use Interop\Amqp\AmqpMessage;
8+
use Interop\Amqp\AmqpQueue;
9+
use PHPUnit\Framework\TestCase;
10+
11+
/**
12+
* @group functional
13+
*/
14+
abstract class BasicConsumeShouldRemoveConsumerTagOnUnsubscribeSpec extends TestCase
15+
{
16+
/**
17+
* @var AmqpContext
18+
*/
19+
private $context;
20+
21+
public function tearDown()
22+
{
23+
if ($this->context) {
24+
$this->context->close();
25+
}
26+
27+
parent::tearDown();
28+
}
29+
30+
public function test()
31+
{
32+
$this->context = $context = $this->createContext();
33+
$queue = $this->createQueue($context, 'basic_consume_should_remove_consumer_tag_on_unsubscribe_spec');
34+
35+
$consumer = $context->createConsumer($queue);
36+
37+
$context->basicConsumeSubscribe($consumer, function() {});
38+
$context->basicConsume(100);
39+
40+
// guard
41+
$this->assertNotEmpty($consumer->getConsumerTag());
42+
43+
$context->basicConsumeUnsubscribe($consumer);
44+
45+
$this->assertEmpty($consumer->getConsumerTag());
46+
}
47+
48+
/**
49+
* @return AmqpContext
50+
*/
51+
abstract protected function createContext();
52+
53+
/**
54+
* @param AmqpContext $context
55+
* @param string $queueName
56+
*
57+
* @return AmqpQueue
58+
*/
59+
protected function createQueue(AmqpContext $context, $queueName)
60+
{
61+
$queue = $context->createQueue($queueName);
62+
$context->declareQueue($queue);
63+
$context->purgeQueue($queue);
64+
65+
return $queue;
66+
}
67+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
<?php
2+
3+
namespace Interop\Queue\Spec\Amqp;
4+
5+
use Interop\Amqp\AmqpConsumer;
6+
use Interop\Amqp\AmqpContext;
7+
use Interop\Amqp\AmqpMessage;
8+
use Interop\Amqp\AmqpQueue;
9+
use PHPUnit\Framework\TestCase;
10+
11+
/**
12+
* @group functional
13+
*/
14+
abstract class BasicConsumeUntilUnsubscribedSpec extends TestCase
15+
{
16+
/**
17+
* @var AmqpContext
18+
*/
19+
private $context;
20+
21+
public function tearDown()
22+
{
23+
if ($this->context) {
24+
$this->context->close();
25+
}
26+
27+
parent::tearDown();
28+
}
29+
30+
public function test()
31+
{
32+
$this->context = $context = $this->createContext();
33+
$fooQueue = $this->createQueue($context, 'foo_basic_consume_until_unsubscribed_spec');
34+
$barQueue = $this->createQueue($context, 'bar_basic_consume_until_unsubscribed_spec');
35+
36+
$context->createProducer()->send($fooQueue, $context->createMessage());
37+
$context->createProducer()->send($barQueue, $context->createMessage());
38+
39+
$fooConsumer = $context->createConsumer($fooQueue);
40+
$barConsumer = $context->createConsumer($barQueue);
41+
42+
$consumedMessages = 0;
43+
$callback = function(AmqpMessage $message, AmqpConsumer $consumer) use (&$consumedMessages) {
44+
$consumedMessages++;
45+
46+
$consumer->acknowledge($message);
47+
48+
return true;
49+
};
50+
51+
$context->basicConsumeSubscribe($fooConsumer, $callback);
52+
$context->basicConsumeSubscribe($barConsumer, $callback);
53+
$context->basicConsume(1000);
54+
55+
$this->assertEquals(2, $consumedMessages);
56+
57+
$context->createProducer()->send($fooQueue, $context->createMessage());
58+
$context->createProducer()->send($barQueue, $context->createMessage());
59+
60+
$consumedMessages = 0;
61+
$context->basicConsumeUnsubscribe($fooConsumer);
62+
$context->basicConsume(1000);
63+
64+
$this->assertEquals(1, $consumedMessages);
65+
}
66+
67+
/**
68+
* @return AmqpContext
69+
*/
70+
abstract protected function createContext();
71+
72+
/**
73+
* @param AmqpContext $context
74+
* @param string $queueName
75+
*
76+
* @return AmqpQueue
77+
*/
78+
protected function createQueue(AmqpContext $context, $queueName)
79+
{
80+
$queue = $context->createQueue($queueName);
81+
$context->declareQueue($queue);
82+
$context->purgeQueue($queue);
83+
84+
return $queue;
85+
}
86+
}

src/SendAndReceiveDelayedMessageFromQueueSpec.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public function test()
3636
$consumer->acknowledge($message);
3737
$this->assertSame($expectedBody, $message->getBody());
3838

39-
$this->assertGreaterThanOrEqual(5, microtime(true) - $sendAt);
39+
$this->assertGreaterThanOrEqual(4, microtime(true) - $sendAt);
4040
}
4141

4242
/**

0 commit comments

Comments
 (0)