Skip to content

Commit 929ba52

Browse files
committed
fixed bug in getjob where it could return null pointer
1 parent f224965 commit 929ba52

File tree

2 files changed

+22
-17
lines changed

2 files changed

+22
-17
lines changed

src/DataModelBase/WorkerPoolManager.cpp

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ PoolManager_args::PoolManager_args() : Thread_args() {}
1818

1919
PoolManager_args::~PoolManager_args() {}
2020

21-
WorkerPoolManager::WorkerPoolManager(JobQueue& job_queue, unsigned int* thread_cap, unsigned int* global_thread_cap, unsigned int* global_thread_num, JobDeque* job_out_deque, bool self_serving, bool threaded, unsigned int thread_sleep_us, unsigned int thread_management_period_us, unsigned int job_assignment_period_us){
21+
WorkerPoolManager::WorkerPoolManager(JobQueue& job_queue, unsigned int* thread_cap, unsigned int* global_thread_cap, std::atomic<unsigned int>* global_thread_num, JobDeque* job_out_deque, bool self_serving, bool threaded, unsigned int thread_sleep_us, unsigned int thread_management_period_us, unsigned int job_assignment_period_us){
2222

2323
m_util = new Utilities();
2424

@@ -69,7 +69,7 @@ void WorkerPoolManager::CreateManagerThread() {
6969
}
7070

7171

72-
void WorkerPoolManager::CreateWorkerThread(std::vector<PoolWorker_args*>& in_args, bool &in_self_serving, unsigned int &in_thread_sleep_us, JobQueue* in_job_queue, JobDeque* in_job_out_deque,unsigned long &thread_num, Utilities* in_util, std::map<std::string,PoolManagerStats>* in_stats, std::mutex* in_stats_mtx, unsigned int* global_thread_num) {
72+
void WorkerPoolManager::CreateWorkerThread(std::vector<PoolWorker_args*>& in_args, bool &in_self_serving, unsigned int &in_thread_sleep_us, JobQueue* in_job_queue, JobDeque* in_job_out_deque,unsigned long &thread_num, Utilities* in_util, std::map<std::string,PoolManagerStats>* in_stats, std::mutex* in_stats_mtx, std::atomic<unsigned int>* global_thread_num) {
7373
PoolWorker_args* tmparg = new PoolWorker_args();
7474
tmparg->busy = false;
7575
tmparg->thread_sleep_us = in_thread_sleep_us;
@@ -88,12 +88,12 @@ void WorkerPoolManager::CreateWorkerThread(std::vector<PoolWorker_args*>& in_arg
8888
if(global_thread_num) (*global_thread_num)++;
8989
}
9090

91-
void WorkerPoolManager::DeleteWorkerThread(unsigned int pos, Utilities* in_util, std::vector<PoolWorker_args*> &in_args, unsigned int* global_thread_num) {
91+
void WorkerPoolManager::DeleteWorkerThread(unsigned int pos, Utilities* in_util, std::vector<PoolWorker_args*> &in_args, std::atomic<unsigned int>* global_thread_num) {
9292
in_util->KillThread(in_args.at(pos));
9393
delete in_args.at(pos);
9494
in_args.at(pos) = 0;
9595
in_args.erase(in_args.begin() + pos );
96-
if(global_thread_num) global_thread_num--;
96+
if(global_thread_num) (*global_thread_num)--;
9797
}
9898

9999
void WorkerPoolManager::WorkerThread(Thread_args* arg) {
@@ -103,6 +103,11 @@ void WorkerPoolManager::WorkerThread(Thread_args* arg) {
103103
else {
104104
if(args->self_serving){
105105
args->job=args->job_queue->GetJob();
106+
if(!args->job){
107+
usleep(args->thread_sleep_us);
108+
return;
109+
}
110+
106111
args->stats_mtx->lock();
107112
(*args->stats)[args->job->m_id].processing++;
108113
args->stats_mtx->unlock();
@@ -154,7 +159,7 @@ void WorkerPoolManager::WorkerThread(Thread_args* arg) {
154159
if (args->job_out_deque || args->job->out_deque) {
155160
if(args->job->out_deque) args->job->out_deque->push_back(args->job);
156161
else args->job_out_deque->push_back(args->job);
157-
args->job->m_in_progress=false;
162+
args->job->m_in_progress=false;
158163
}
159164
else if(args->job->out_pool){
160165
args->job->out_pool->Add(args->job);
@@ -172,11 +177,11 @@ void WorkerPoolManager::WorkerThread(Thread_args* arg) {
172177
void WorkerPoolManager::ManagerThread(Thread_args* arg) {
173178

174179
PoolManager_args* args = reinterpret_cast<PoolManager_args*>(arg);
175-
180+
176181
args->now = std::chrono::high_resolution_clock::now();
177182
args->manage = std::chrono::duration<double, std::micro>(args->now - args->managing_timer).count() > args->thread_management_period_us;
178183
args->sleep = !args->manage;
179-
184+
180185
if(!args->self_serving){
181186
args->serve = std::chrono::duration<double, std::micro>(args->now - args->serving_timer).count() > args->job_assignment_period_us;
182187
args->sleep = !args->serve && !args->manage;
@@ -188,11 +193,11 @@ void WorkerPoolManager::ManagerThread(Thread_args* arg) {
188193
}
189194

190195
if(args->serve){
191-
192196
if (args->job_queue->size() > 0) {
193197
for (unsigned int i = 0; i < args->args.size(); i++) {
194198
if (!args->args.at(i)->busy && args->job_queue->size() > 0) {
195-
args->args.at(i)->job = args->job_queue->GetJob();
199+
args->args.at(i)->job = args->job_queue->GetJob();
200+
if(args->args.at(i)->job == 0) continue;
196201
args->stats_mtx.lock();
197202
args->stats[args->args.at(i)->job->m_id].processing++;
198203
args->stats_mtx.unlock();
@@ -206,7 +211,6 @@ void WorkerPoolManager::ManagerThread(Thread_args* arg) {
206211
}
207212

208213
if(args->manage){
209-
210214
args->free_threads = 0;
211215
unsigned int last_free = 0;
212216
for (unsigned int i = 0; i < args->args.size(); i++) {
@@ -216,8 +220,8 @@ void WorkerPoolManager::ManagerThread(Thread_args* arg) {
216220
}
217221
}
218222

219-
if (args->free_threads < 1 && args->args.size()<(*(args->thread_cap)) && ( !args->global_thread_cap || (*(args->global_thread_num))<(*(args->global_thread_cap)) ) ) CreateWorkerThread(args->args, args->self_serving, args->thread_sleep_us, args->job_queue, args->job_out_deque, args->thread_num, args->util, &args->stats, &args->stats_mtx, args->global_thread_num);
220-
223+
if (args->free_threads < 1 && args->args.size()<(*(args->thread_cap)) && ( !args->global_thread_cap || (*(args->global_thread_num))<(*(args->global_thread_cap)) ) ) CreateWorkerThread(args->args, args->self_serving, args->thread_sleep_us, args->job_queue, args->job_out_deque, args->thread_num, args->util, &args->stats, &args->stats_mtx, args->global_thread_num);
224+
221225
if (args->free_threads > 1) DeleteWorkerThread(last_free, args->util, args->args, args->global_thread_num);
222226

223227
args->managing_timer = std::chrono::high_resolution_clock::now();
@@ -243,7 +247,7 @@ std::string WorkerPoolManager::GetStats(){
243247

244248
std::string ret="";
245249

246-
ret="Queued Jobs Total = " + std::to_string(m_job_queue->size()) + " \n";
250+
ret="Queued Jobs Total = " + std::to_string(m_job_queue->size()) + " : Total Workers = " + std::to_string(NumThreads()) + " \n";
247251
m_job_queue->m_lock.lock();
248252
m_manager_args.stats_mtx.lock();
249253

src/DataModelBase/WorkerPoolManager.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <Utilities.h>
1010
#include <mutex>
1111
#include <chrono>
12+
#include <atomic>
1213

1314
namespace ToolFramework{
1415

@@ -72,7 +73,7 @@ namespace ToolFramework{
7273
JobQueue* job_queue;
7374
unsigned int* thread_cap; ///< Variable to cap the number of thread workers
7475
unsigned int* global_thread_cap;
75-
unsigned int* global_thread_num;
76+
std::atomic<unsigned int>* global_thread_num;
7677
JobDeque* job_out_deque;
7778
bool self_serving;
7879
unsigned int thread_sleep_us;
@@ -120,7 +121,7 @@ namespace ToolFramework{
120121
* @param thrad_management_period_us how long between evaluating the number of worker threads to avoid rapid killing and recreating
121122
* @param job_assignment_period how long the managers sleeps between checking if there are free workers to assign them new jobs
122123
*/
123-
WorkerPoolManager(JobQueue& job_queue, unsigned int* thread_cap=0, unsigned int* global_thread_cap=0, unsigned int* global_thread_num=0, JobDeque* job_out_deque=0, bool self_serving=false, bool threaded=true, unsigned int thread_sleep_us=100, unsigned int thread_management_period_us=10000, unsigned int job_assignment_period_us=1000);
124+
WorkerPoolManager(JobQueue& job_queue, unsigned int* thread_cap=0, unsigned int* global_thread_cap=0, std::atomic<unsigned int>* global_thread_num=0, JobDeque* job_out_deque=0, bool self_serving=false, bool threaded=true, unsigned int thread_sleep_us=100, unsigned int thread_management_period_us=10000, unsigned int job_assignment_period_us=1000);
124125
~WorkerPoolManager(); ///< Simple Destructor
125126

126127
void ManageWorkers(); ///< Function to manage workers and distribute jobs to be run when unthreaded if you choose to not have the managment run on a thread.
@@ -132,8 +133,8 @@ namespace ToolFramework{
132133
private:
133134

134135
void CreateManagerThread(); ///< Function to Create Manager Thread
135-
static void CreateWorkerThread(std::vector<PoolWorker_args*> &in_args, bool &in_self_serving, unsigned int &in_thread_sleep_us, JobQueue* in_job_queue, JobDeque* in_job_out_deque,unsigned long &thread_num, Utilities* in_util, std::map<std::string,PoolManagerStats>* in_stats, std::mutex* in_stats_mtx, unsigned int* global_thread_num=0 ); ///< Function to Create Worker Thread
136-
static void DeleteWorkerThread(unsigned int pos, Utilities* in_util, std::vector<PoolWorker_args*> &in_args, unsigned int* global_thread_num=0); ///< Function to delete thread @param pos is the position in the args vector below
136+
static void CreateWorkerThread(std::vector<PoolWorker_args*> &in_args, bool &in_self_serving, unsigned int &in_thread_sleep_us, JobQueue* in_job_queue, JobDeque* in_job_out_deque,unsigned long &thread_num, Utilities* in_util, std::map<std::string,PoolManagerStats>* in_stats, std::mutex* in_stats_mtx, std::atomic<unsigned int>* global_thread_num=0 ); ///< Function to Create Worker Thread
137+
static void DeleteWorkerThread(unsigned int pos, Utilities* in_util, std::vector<PoolWorker_args*> &in_args, std::atomic<unsigned int>* global_thread_num=0); ///< Function to delete thread @param pos is the position in the args vector below
137138

138139
static void WorkerThread(Thread_args* arg); ///< Function to be run by the thread in a loop. Make sure not to block in it
139140
static void ManagerThread(Thread_args* arg); ///< Function to be run by the thread manager. Make sure not to block in it

0 commit comments

Comments
 (0)