Skip to content

Commit

Permalink
channel: Make the blocking parameters of the object queue explicit
Browse files Browse the repository at this point in the history
  • Loading branch information
rgacogne committed Aug 18, 2023
1 parent b833e2c commit c1d7652
Show file tree
Hide file tree
Showing 8 changed files with 25 additions and 15 deletions.
20 changes: 15 additions & 5 deletions pdns/channel.hh
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ namespace pdns
{
namespace channel
{
enum class SenderBlockingMode
{
SenderNonBlocking,
SenderBlocking
};
enum class ReceiverBlockingMode
{
ReceiverNonBlocking,
ReceiverBlocking
};

/**
* The sender's end of a channel used to pass objects between threads.
*
Expand Down Expand Up @@ -136,7 +147,7 @@ namespace channel
* \throw runtime_error if the channel creation failed.
*/
template <typename T, typename D = std::default_delete<T>>
std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(bool sendNonBlocking = true, bool receiveNonBlocking = true, size_t pipeBufferSize = 0, bool throwOnEOF = true);
std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(SenderBlockingMode senderBlockingMode = SenderBlockingMode::SenderNonBlocking, ReceiverBlockingMode receiverBlockingMode = ReceiverBlockingMode::ReceiverNonBlocking, size_t pipeBufferSize = 0, bool throwOnEOF = true);

/**
* The notifier's end of a channel used to communicate between threads.
Expand Down Expand Up @@ -307,7 +318,7 @@ namespace channel
}

template <typename T, typename D>
std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(bool sendNonBlocking, bool receiveNonBlocking, size_t pipeBufferSize, bool throwOnEOF)
std::pair<Sender<T, D>, Receiver<T, D>> createObjectQueue(SenderBlockingMode senderBlockingMode, ReceiverBlockingMode receiverBlockingMode, size_t pipeBufferSize, bool throwOnEOF)
{
int fds[2] = {-1, -1};
if (pipe(fds) < 0) {
Expand All @@ -316,13 +327,12 @@ namespace channel

FDWrapper sender(fds[1]);
FDWrapper receiver(fds[0]);

if (receiveNonBlocking && !setNonBlocking(receiver.getHandle())) {
if (receiverBlockingMode == ReceiverBlockingMode::ReceiverNonBlocking && !setNonBlocking(receiver.getHandle())) {
int err = errno;
throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err));
}

if (sendNonBlocking && !setNonBlocking(sender.getHandle())) {
if (senderBlockingMode == SenderBlockingMode::SenderNonBlocking && !setNonBlocking(sender.getHandle())) {
int err = errno;
throw std::runtime_error("Error making channel pipe non-blocking: " + stringerror(err));
}
Expand Down
2 changes: 1 addition & 1 deletion pdns/delaypipe.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
template<class T>
ObjectPipe<T>::ObjectPipe()
{
auto [sender, receiver] = pdns::channel::createObjectQueue<T>(false, true, 0, false);
auto [sender, receiver] = pdns::channel::createObjectQueue<T>(pdns::channel::SenderBlockingMode::SenderBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, 0, false);
d_sender = std::move(sender);
d_receiver = std::move(receiver);
}
Expand Down
2 changes: 1 addition & 1 deletion pdns/distributor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ template<class Answer, class Question, class Backend>MultiThreadDistributor<Answ
}

for (int distributorIdx = 0; distributorIdx < numberOfThreads; distributorIdx++) {
auto [sender, receiver] = pdns::channel::createObjectQueue<QuestionData>(false, false);
auto [sender, receiver] = pdns::channel::createObjectQueue<QuestionData>(pdns::channel::SenderBlockingMode::SenderBlocking, pdns::channel::ReceiverBlockingMode::ReceiverBlocking);
d_senders.push_back(std::move(sender));
d_receivers.push_back(std::move(receiver));
}
Expand Down
6 changes: 3 additions & 3 deletions pdns/dnsdist-tcp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ TCPClientCollection::TCPClientCollection(size_t maxThreads, std::vector<ClientSt
void TCPClientCollection::addTCPClientThread(std::vector<ClientState*>& tcpAcceptStates)
{
try {
auto [queryChannelSender, queryChannelReceiver] = pdns::channel::createObjectQueue<ConnectionInfo>(true, true, g_tcpInternalPipeBufferSize);
auto [queryChannelSender, queryChannelReceiver] = pdns::channel::createObjectQueue<ConnectionInfo>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, g_tcpInternalPipeBufferSize);

auto [crossProtocolQueryChannelSender, crossProtocolQueryChannelReceiver] = pdns::channel::createObjectQueue<CrossProtocolQuery>(true, true, g_tcpInternalPipeBufferSize);
auto [crossProtocolQueryChannelSender, crossProtocolQueryChannelReceiver] = pdns::channel::createObjectQueue<CrossProtocolQuery>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, g_tcpInternalPipeBufferSize);

auto [crossProtocolResponseChannelSender, crossProtocolResponseChannelReceiver] = pdns::channel::createObjectQueue<TCPCrossProtocolResponse>(true, true, g_tcpInternalPipeBufferSize);
auto [crossProtocolResponseChannelSender, crossProtocolResponseChannelReceiver] = pdns::channel::createObjectQueue<TCPCrossProtocolResponse>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, g_tcpInternalPipeBufferSize);

vinfolog("Adding TCP Client thread");

Expand Down
2 changes: 1 addition & 1 deletion pdns/dnsdistdist/dnsdist-nghttp2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1054,7 +1054,7 @@ void DoHClientCollection::addThread()
{
#ifdef HAVE_NGHTTP2
try {
auto [sender, receiver] = pdns::channel::createObjectQueue<CrossProtocolQuery>(true, true, g_tcpInternalPipeBufferSize);
auto [sender, receiver] = pdns::channel::createObjectQueue<CrossProtocolQuery>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, g_tcpInternalPipeBufferSize);

vinfolog("Adding DoH Client thread");
std::lock_guard<std::mutex> lock(d_mutex);
Expand Down
4 changes: 2 additions & 2 deletions pdns/dnsdistdist/doh.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,14 @@ struct DOHServerConfig
{
#ifndef USE_SINGLE_ACCEPTOR_THREAD
{
auto [sender, receiver] = pdns::channel::createObjectQueue<DOHUnit, void(*)(DOHUnit*)>(true, false, internalPipeBufferSize);
auto [sender, receiver] = pdns::channel::createObjectQueue<DOHUnit, void(*)(DOHUnit*)>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverBlocking, internalPipeBufferSize);
d_querySender = std::move(sender);
d_queryReceiver = std::move(receiver);
}
#endif /* USE_SINGLE_ACCEPTOR_THREAD */

{
auto [sender, receiver] = pdns::channel::createObjectQueue<DOHUnit, void(*)(DOHUnit*)>(true, true, internalPipeBufferSize);
auto [sender, receiver] = pdns::channel::createObjectQueue<DOHUnit, void(*)(DOHUnit*)>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, internalPipeBufferSize);
d_responseSender = std::move(sender);
d_responseReceiver = std::move(receiver);
}
Expand Down
2 changes: 1 addition & 1 deletion pdns/snmp-agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ SNMPAgent::SNMPAgent([[maybe_unused]] const std::string& name, [[maybe_unused]]

init_snmp(name.c_str());

auto [sender, receiver] = pdns::channel::createObjectQueue<netsnmp_variable_list, void(*)(netsnmp_variable_list*)>(true, true);
auto [sender, receiver] = pdns::channel::createObjectQueue<netsnmp_variable_list, void(*)(netsnmp_variable_list*)>();
d_sender = std::move(sender);
d_receiver = std::move(receiver);
#endif /* HAVE_NET_SNMP */
Expand Down
2 changes: 1 addition & 1 deletion pdns/test-channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ BOOST_AUTO_TEST_CASE(test_object_queue_throw_on_eof)

BOOST_AUTO_TEST_CASE(test_object_queue_do_not_throw_on_eof)
{
auto [sender, receiver] = pdns::channel::createObjectQueue<MyObject>(true, true, 0U, false);
auto [sender, receiver] = pdns::channel::createObjectQueue<MyObject>(pdns::channel::SenderBlockingMode::SenderNonBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, 0U, false);
sender.close();
auto got = receiver.receive();
BOOST_CHECK(got == std::nullopt);
Expand Down

0 comments on commit c1d7652

Please sign in to comment.