diff --git a/src/RabbitMQClient.cpp b/src/RabbitMQClient.cpp index d1cdc2d..b7fd5ba 100644 --- a/src/RabbitMQClient.cpp +++ b/src/RabbitMQClient.cpp @@ -1,5 +1,6 @@ #include "RabbitMQClient.h" #include "Utils.h" +#include #include #if defined(__linux__) #include @@ -275,7 +276,7 @@ void RabbitMQClient::basicConsumeImpl(Biterp::CallContext& ctx) { result = tag; LOGI("Consumer created " + tag); { - std::unique_lock lock(_mutex); + std::lock_guard lock(_mutex); consumers.push_back(tag); consumerError.clear(); } @@ -302,19 +303,19 @@ void RabbitMQClient::basicConsumeImpl(Biterp::CallContext& ctx) { msgOb.headers = message.headers(); { LOGI("Consume push message"); - std::unique_lock lock(_mutex); + std::lock_guard lock(_mutex); messageQueue.push(msgOb); cvDataArrived.notify_all(); } }) .onCancelled([this](const std::string &consumer){ LOGI("Consumer cancelled " + consumer); - std::unique_lock lock(_mutex); + std::lock_guard lock(_mutex); consumers.erase(std::remove_if(consumers.begin(), consumers.end(), [&consumer](std::string& s){return s == consumer;})); }) .onError([this, &result](const char* message) { - std::unique_lock lock(_mutex); + std::lock_guard lock(_mutex); consumerError = message; LOGE("Consumer error: " + consumerError); if (result.empty()){ @@ -328,8 +329,11 @@ void RabbitMQClient::basicConsumeImpl(Biterp::CallContext& ctx) { void RabbitMQClient::basicConsumeMessageImpl(Biterp::CallContext& ctx) { - if (consumers.empty()) { - throw Biterp::Error("No active consumers"); + { + std::lock_guard lock(_mutex); + if (consumers.empty()) { + throw Biterp::Error("No active consumers"); + } } ctx.skipParam(); tVariant* outdata = ctx.skipParam(); @@ -339,17 +343,17 @@ void RabbitMQClient::basicConsumeMessageImpl(Biterp::CallContext& ctx) { ctx.setIntResult(0, outMessageTag); { std::unique_lock lock(_mutex); - if (!consumerError.empty()){ - throw Biterp::Error(consumerError); - } - if (!cvDataArrived.wait_for(lock, std::chrono::milliseconds(timeout), [&] { return !messageQueue.empty(); })) { - ctx.setBoolResult(false); - ctx.setStringResult(u"", outdata); - ctx.setIntResult(0, outMessageTag); - return; - } - if (messageQueue.empty()) { - throw Biterp::Error("Empty consume message"); + if (messageQueue.empty()){ + if (!consumerError.empty()){ + throw Biterp::Error(consumerError); + } + if (!cvDataArrived.wait_for(lock, std::chrono::milliseconds(timeout), [&] { return !messageQueue.empty(); })) { + ctx.setBoolResult(false); + return; + } + if (messageQueue.empty()) { + throw Biterp::Error("Empty consume message"); + } } lastMessage = messageQueue.front(); messageQueue.pop(); @@ -374,7 +378,7 @@ void RabbitMQClient::clear() { connection->loop(); } consumers.clear(); - std::unique_lock lock(_mutex); + std::lock_guard lock(_mutex); std::queue empty; messageQueue.swap(empty); cvDataArrived.notify_all(); diff --git a/test/test_amqp.py b/test/test_amqp.py index 8485767..8736288 100644 --- a/test/test_amqp.py +++ b/test/test_amqp.py @@ -84,6 +84,6 @@ def test_consume_nomsg(com): if not ret: break assert res - assert msg[0] == '' + assert msg[0] == None assert mtag[0] == 0