Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread Pool Fix High CPU Load When Paused #1039

Merged
merged 8 commits into from
Apr 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ Checks: '*,
-modernize-return-braced-init-list,
-cppcoreguidelines-avoid-magic-numbers,
-readability-magic-numbers,
-cppcoreguidelines-avoid-do-while
-cppcoreguidelines-avoid-do-while,
-llvmlibc-inline-function-decl,
-altera-struct-pack-align
'
WarningsAsErrors: '*'
HeaderFilterRegex: 'src/*.hpp'
Expand Down
15 changes: 12 additions & 3 deletions cpr/threadpool.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "cpr/threadpool.h"
#include <chrono>
#include <cstddef>
#include <ctime>
#include <memory>
#include <mutex>
Expand All @@ -8,7 +9,7 @@

namespace cpr {

ThreadPool::ThreadPool(size_t min_threads, size_t max_threads, std::chrono::milliseconds max_idle_ms) : min_thread_num(min_threads), max_thread_num(max_threads), max_idle_time(max_idle_ms), status(STOP), cur_thread_num(0), idle_thread_num(0) {}
ThreadPool::ThreadPool(size_t min_threads, size_t max_threads, std::chrono::milliseconds max_idle_ms) : min_thread_num(min_threads), max_thread_num(max_threads), max_idle_time(max_idle_ms) {}

ThreadPool::~ThreadPool() {
Stop();
Expand All @@ -32,16 +33,21 @@ int ThreadPool::Start(size_t start_threads) {
}

int ThreadPool::Stop() {
const std::unique_lock status_lock(status_wait_mutex);
if (status == STOP) {
return -1;
}

status = STOP;
status_wait_cond.notify_all();
task_cond.notify_all();

for (auto& i : threads) {
if (i.thread->joinable()) {
i.thread->join();
}
}

threads.clear();
cur_thread_num = 0;
idle_thread_num = 0;
Expand All @@ -56,8 +62,10 @@ int ThreadPool::Pause() {
}

int ThreadPool::Resume() {
const std::unique_lock status_lock(status_wait_mutex);
if (status == PAUSE) {
status = RUNNING;
status_wait_cond.notify_all();
}
return 0;
}
Expand All @@ -79,8 +87,9 @@ bool ThreadPool::CreateThread() {
std::thread* thread = new std::thread([this] {
bool initialRun = true;
while (status != STOP) {
while (status == PAUSE) {
std::this_thread::yield();
{
std::unique_lock status_lock(status_wait_mutex);
status_wait_cond.wait(status_lock, [this]() { return status != Status::PAUSE; });
}

Task task;
Expand Down
34 changes: 25 additions & 9 deletions include/cpr/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#define CPR_DEFAULT_THREAD_POOL_MAX_THREAD_NUM std::thread::hardware_concurrency()

constexpr size_t CPR_DEFAULT_THREAD_POOL_MIN_THREAD_NUM = 1;
constexpr std::chrono::milliseconds CPR_DEFAULT_THREAD_POOL_MAX_IDLE_TIME{60000};
constexpr std::chrono::milliseconds CPR_DEFAULT_THREAD_POOL_MAX_IDLE_TIME{250};

namespace cpr {

Expand All @@ -25,27 +25,38 @@ class ThreadPool {
using Task = std::function<void()>;

explicit ThreadPool(size_t min_threads = CPR_DEFAULT_THREAD_POOL_MIN_THREAD_NUM, size_t max_threads = CPR_DEFAULT_THREAD_POOL_MAX_THREAD_NUM, std::chrono::milliseconds max_idle_ms = CPR_DEFAULT_THREAD_POOL_MAX_IDLE_TIME);
ThreadPool(const ThreadPool& other) = delete;
ThreadPool(ThreadPool&& old) = delete;

virtual ~ThreadPool();

ThreadPool& operator=(const ThreadPool& other) = delete;
ThreadPool& operator=(ThreadPool&& old) = delete;

void SetMinThreadNum(size_t min_threads) {
min_thread_num = min_threads;
}

void SetMaxThreadNum(size_t max_threads) {
max_thread_num = max_threads;
}

void SetMaxIdleTime(std::chrono::milliseconds ms) {
max_idle_time = ms;
}

size_t GetCurrentThreadNum() {
return cur_thread_num;
}

size_t GetIdleThreadNum() {
return idle_thread_num;
}

bool IsStarted() {
return status != STOP;
}

bool IsStopped() {
return status == STOP;
}
Expand Down Expand Up @@ -107,14 +118,19 @@ class ThreadPool {
time_t stop_time;
};

std::atomic<Status> status;
std::atomic<size_t> cur_thread_num;
std::atomic<size_t> idle_thread_num;
std::list<ThreadData> threads;
std::mutex thread_mutex;
std::queue<Task> tasks;
std::mutex task_mutex;
std::condition_variable task_cond;
std::atomic<Status> status{Status::STOP};
std::condition_variable status_wait_cond{};
std::mutex status_wait_mutex{};

std::atomic<size_t> cur_thread_num{0};
std::atomic<size_t> idle_thread_num{0};

std::list<ThreadData> threads{};
std::mutex thread_mutex{};

std::queue<Task> tasks{};
std::mutex task_mutex{};
std::condition_variable task_cond{};
};

} // namespace cpr
Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ add_cpr_test(interceptor_multi)
add_cpr_test(multiperform)
add_cpr_test(resolve)
add_cpr_test(multiasync)
add_cpr_test(threadpool)

if (ENABLE_SSL_TESTS)
add_cpr_test(ssl)
Expand Down
106 changes: 106 additions & 0 deletions test/threadpool_tests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#include <atomic>
#include <cstddef>
#include <gtest/gtest.h>


#include "cpr/threadpool.h"

TEST(ThreadPoolTests, DISABLED_BasicWorkOneThread) {
std::atomic_uint32_t invCount{0};
uint32_t invCountExpected{100};

{
cpr::ThreadPool tp;
tp.SetMinThreadNum(1);
tp.SetMaxThreadNum(1);
tp.Start(0);

for (size_t i = 0; i < invCountExpected; ++i) {
tp.Submit([&invCount]() -> void { invCount++; });
}

// Wait for the thread pool to finish its work
tp.Wait();
}

EXPECT_EQ(invCount, invCountExpected);
}

TEST(ThreadPoolTests, DISABLED_BasicWorkMultipleThreads) {
std::atomic_uint32_t invCount{0};
uint32_t invCountExpected{100};

{
cpr::ThreadPool tp;
tp.SetMinThreadNum(1);
tp.SetMaxThreadNum(10);
tp.Start(0);

for (size_t i = 0; i < invCountExpected; ++i) {
tp.Submit([&invCount]() -> void { invCount++; });
}

// Wait for the thread pool to finish its work
tp.Wait();
}

EXPECT_EQ(invCount, invCountExpected);
}

TEST(ThreadPoolTests, DISABLED_PauseResumeSingleThread) {
std::atomic_uint32_t invCount{0};

uint32_t repCount{100};
uint32_t invBunchSize{20};

cpr::ThreadPool tp;
tp.SetMinThreadNum(1);
tp.SetMaxThreadNum(10);
tp.Start(0);

for (size_t i = 0; i < repCount; ++i) {
tp.Pause();
EXPECT_EQ(invCount, i * invBunchSize);

for (size_t e = 0; e < invBunchSize; ++e) {
tp.Submit([&invCount]() -> void { invCount++; });
}
tp.Resume();
// Wait for the thread pool to finish its work
tp.Wait();

EXPECT_EQ(invCount, (i + 1) * invBunchSize);
}
}

TEST(ThreadPoolTests, DISABLED_PauseResumeMultipleThreads) {
std::atomic_uint32_t invCount{0};

uint32_t repCount{100};
uint32_t invBunchSize{20};

cpr::ThreadPool tp;
tp.SetMinThreadNum(1);
tp.SetMaxThreadNum(10);
tp.Start(0);

for (size_t i = 0; i < repCount; ++i) {
tp.Pause();
EXPECT_EQ(invCount, i * invBunchSize);

for (size_t e = 0; e < invBunchSize; ++e) {
tp.Submit([&invCount]() -> void { invCount++; });
}
tp.Resume();
// Wait for the thread pool to finish its work
tp.Wait();

EXPECT_EQ(invCount, (i + 1) * invBunchSize);
}
}


int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
Loading