Skip to content

Commit 9e37bb8

Browse files
committed
Added sample module with queue arguments
1 parent c1f2b1f commit 9e37bb8

11 files changed

+268
-0
lines changed

Console/Command/PriorityMessage.php

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
<?php
2+
namespace Rmsundar\RabbitMqPriority\Console\Command;
3+
4+
use Magento\Framework\MessageQueue\PublisherInterface;
5+
use Symfony\Component\Console\Command\Command;
6+
use Symfony\Component\Console\Input\InputInterface;
7+
use Symfony\Component\Console\Input\InputOption;
8+
use Symfony\Component\Console\Output\OutputInterface;
9+
10+
class PriorityMessage extends Command
11+
{
12+
/**
13+
* Message priority
14+
*/
15+
private const MESSAGE_PRIORITY = 'priority';
16+
private const TOPIC_NAME = 'sample.priority.topic';
17+
18+
/**
19+
* @var PublisherInterface
20+
*/
21+
private $publisher;
22+
23+
/**
24+
* PriorityMessage constructor.
25+
* @param PublisherInterface $publisher
26+
* @param string|null $name
27+
*/
28+
public function __construct(PublisherInterface $publisher, string $name = null)
29+
{
30+
$this->publisher = $publisher;
31+
parent::__construct($name);
32+
}
33+
34+
/**
35+
* @inheritDoc
36+
*/
37+
protected function configure()
38+
{
39+
$this->setName('rabbitmq:priority:sample');
40+
$this->setDescription('To test priority message');
41+
$this->addOption(self::MESSAGE_PRIORITY, '-p', InputOption::VALUE_OPTIONAL, 'Message priority', 1);
42+
parent::configure();
43+
}
44+
45+
/**
46+
* CLI command description
47+
*
48+
* @param InputInterface $input
49+
* @param OutputInterface $output
50+
*
51+
* @return void
52+
*/
53+
protected function execute(InputInterface $input, OutputInterface $output): void
54+
{
55+
$priority = $input->getOption(self::MESSAGE_PRIORITY);
56+
$message = 'Message Priority: ' . $priority;
57+
$this->publisher->publish(
58+
self::TOPIC_NAME,
59+
[
60+
'body' => $message,
61+
'properties' => ['priority' => $priority]
62+
]
63+
);
64+
$output->writeln('<info>Message Published with priority ' . $priority . '</info>');
65+
}
66+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
<?php
2+
3+
namespace Rmsundar\RabbitMqPriority\Framework;
4+
5+
use Magento\Framework\MessageQueue\EnvelopeFactory;
6+
use Magento\Framework\MessageQueue\ExchangeRepository;
7+
use Magento\Framework\MessageQueue\MessageEncoder;
8+
use Magento\Framework\MessageQueue\MessageValidator;
9+
use Magento\Framework\MessageQueue\Publisher\ConfigInterface as PublisherConfig;
10+
use Magento\Framework\MessageQueue\PublisherInterface;
11+
12+
class PublisherWithMessageProperties implements PublisherInterface
13+
{
14+
/**
15+
* @var ExchangeRepository
16+
*/
17+
private $exchangeRepository;
18+
19+
/**
20+
* @var EnvelopeFactory
21+
*/
22+
private $envelopeFactory;
23+
24+
/**
25+
* @var MessageEncoder
26+
*/
27+
private $messageEncoder;
28+
29+
/**
30+
* @var MessageValidator
31+
*/
32+
private $messageValidator;
33+
34+
/**
35+
* @var PublisherConfig
36+
*/
37+
private $publisherConfig;
38+
39+
/**
40+
* PublisherWithMessageProperties constructor.
41+
*
42+
* @param ExchangeRepository $exchangeRepository
43+
* @param EnvelopeFactory $envelopeFactory
44+
* @param MessageEncoder $messageEncoder
45+
* @param MessageValidator $messageValidator
46+
* @param PublisherConfig $publisherConfig
47+
*/
48+
public function __construct(
49+
ExchangeRepository $exchangeRepository,
50+
EnvelopeFactory $envelopeFactory,
51+
MessageEncoder $messageEncoder,
52+
MessageValidator $messageValidator,
53+
PublisherConfig $publisherConfig
54+
) {
55+
$this->exchangeRepository = $exchangeRepository;
56+
$this->envelopeFactory = $envelopeFactory;
57+
$this->messageEncoder = $messageEncoder;
58+
$this->messageValidator = $messageValidator;
59+
$this->publisherConfig = $publisherConfig;
60+
}
61+
62+
/**
63+
* {@inheritdoc}
64+
*/
65+
public function publish($topicName, $data)
66+
{
67+
$this->messageValidator->validate($topicName, $data['body']);
68+
$properties = $data['properties'] ?? [];
69+
$data = $this->messageEncoder->encode($topicName, $data['body']);
70+
$envelope = $this->envelopeFactory->create(
71+
[
72+
'body' => $data,
73+
'properties' => array_merge(
74+
[
75+
'delivery_mode' => 2,
76+
'message_id' => md5(uniqid($topicName))
77+
],
78+
$properties
79+
)
80+
]
81+
);
82+
$connectionName = $this->publisherConfig->getPublisher($topicName)->getConnection()->getName();
83+
$exchange = $this->exchangeRepository->getByConnectionName($connectionName);
84+
$exchange->enqueue($topicName, $envelope);
85+
return null;
86+
}
87+
}

Model/PriorityConsumer.php

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
namespace Rmsundar\RabbitMqPriority\Model;
3+
4+
use Psr\Log\LoggerInterface;
5+
6+
class PriorityConsumer
7+
{
8+
/**
9+
* @var \Psr\Log\LoggerInterface
10+
*/
11+
private $logger;
12+
13+
/**
14+
* PriorityConsumer constructor.
15+
*
16+
* @param \Psr\Log\LoggerInterface $logger
17+
*/
18+
public function __construct(LoggerInterface $logger)
19+
{
20+
$this->logger = $logger;
21+
}
22+
23+
/**
24+
* @param string $message
25+
*/
26+
public function consume($message)
27+
{
28+
$this->logger->info($message);
29+
}
30+
}

composer.json

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"name": "rmsundar/rabbit-mq-priority",
3+
"version": "1.0.0",
4+
"description": "Sample module for rabbitmq priority queue",
5+
"type": "magento2-module",
6+
"require": {
7+
"magento/framework": "103.0.*",
8+
"magento/framework-message-queue": "~100.4.2"
9+
},
10+
"license": [
11+
"Open Software License (OSL)"
12+
],
13+
"autoload": {
14+
"files": [
15+
"registration.php"
16+
],
17+
"psr-4": {
18+
"Rmsundar\\RabbitMqPriority\\": ""
19+
}
20+
}
21+
}

etc/communication.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
<?xml version="1.0"?>
2+
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
4+
<topic name="sample.priority.topic" request="string"/>
5+
<topic name="dead.queue" request="string"/>
6+
</config>

etc/di.xml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<?xml version="1.0"?>
2+
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:ObjectManager/etc/config.xsd">
3+
<type name="Magento\Framework\Console\CommandListInterface">
4+
<arguments>
5+
<argument name="commands" xsi:type="array">
6+
<item name="rmsundar_rabbitmqpriority_priority_message" xsi:type="object">Rmsundar\RabbitMqPriority\Console\Command\PriorityMessage</item>
7+
</argument>
8+
</arguments>
9+
</type>
10+
11+
<type name="Rmsundar\RabbitMqPriority\Console\Command\PriorityMessage">
12+
<arguments>
13+
<argument name="publisher" xsi:type="object">Rmsundar\RabbitMqPriority\Framework\PublisherWithMessageProperties</argument>
14+
</arguments>
15+
</type>
16+
</config>

etc/module.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
<?xml version="1.0"?>
2+
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Module/etc/module.xsd">
3+
<module name="Rmsundar_RabbitMqPriority"/>
4+
</config>

etc/queue_consumer.xml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<?xml version="1.0"?>
2+
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
4+
<consumer name="sample.priority.consumer" queue="sample.priority" connection="amqp"
5+
handler="Rmsundar\RabbitMqPriority\Model\PriorityConsumer::consume"/>
6+
<consumer name="sample.priority.single.consumer" queue="sample.priority.single.consumer" connection="amqp"
7+
handler="Rmsundar\RabbitMqPriority\Model\PriorityConsumer::consume"/>
8+
<consumer name="dead.queue" queue="dead.queue" connection="amqp" />
9+
</config>

etc/queue_publisher.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
<?xml version="1.0"?>
2+
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
3+
<publisher topic="sample.priority.topic">
4+
<connection name="amqp" exchange="magento" disabled="false" />
5+
</publisher>
6+
</config>

etc/queue_topology.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?xml version="1.0"?>
2+
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
3+
<exchange name="magento" type="topic" connection="amqp">
4+
<binding id="sample.priority" topic="sample.priority.topic" destinationType="queue" destination="sample.priority">
5+
<arguments>
6+
<argument name="x-max-priority" xsi:type="number">10</argument>
7+
</arguments>
8+
</binding>
9+
<binding id="sample.priority.single.consumer" topic="sample.priority.topic" destinationType="queue" destination="sample.priority.single.consumer">
10+
<arguments>
11+
<argument name="x-message-ttl" xsi:type="number">10000</argument>
12+
<argument name="x-dead-letter-routing-key" xsi:type="string">dead.queue</argument>
13+
<argument name="x-dead-letter-exchange" xsi:type="string">magento</argument>
14+
</arguments>
15+
</binding>
16+
<binding id="dead.rounting.queue" topic="dead.queue" destinationType="queue" destination="dead.queue"/>
17+
</exchange>
18+
</config>

0 commit comments

Comments
 (0)