Skip to content

Commit f683943

Browse files
Martin ValaMartin Vala
Martin Vala
authored and
Martin Vala
committed
Added TThread::Lock when new task is assinging
1 parent fcdf251 commit f683943

File tree

6 files changed

+121
-141
lines changed

6 files changed

+121
-141
lines changed

ParallelTasks/TTaskManager.cxx

+45-33
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@ ClassImp(TTaskManager)
2525

2626
//_________________________________________________________________________________________________
2727
TTaskManager::TTaskManager(const char *name, const char *title) :
28-
TTaskParallel(name, title),
29-
fTaskThreadPools(0),
30-
fIsAllAssigned(kFALSE),
31-
fDepCondition(),
32-
fUseMonitoring(kFALSE),
33-
fMonThreadPool(0),
34-
fTaskMon("monServ", "Task Monitor Serv")
28+
TTaskParallel(name, title),
29+
fTaskThreadPools(0),
30+
fIsAllAssigned(kFALSE),
31+
fDepCondition(),
32+
fUseMonitoring(kFALSE),
33+
fMonThreadPool(0),
34+
fTaskMon("monServ", "Task Monitor Serv")
3535
{
3636
// Std constructor
3737

@@ -54,12 +54,13 @@ TTaskManager::~TTaskManager() {
5454
void TTaskManager::Exec(Option_t *option) {
5555
// Exec of manager task
5656

57-
// Printf("TTaskManager::Exec START ...");
57+
// Printf("TTaskManager::Exec START ...");
5858

5959
if (!fParent && !fTaskThreadPools) {
60+
// TLockGuard lock(&fMutex);
6061

6162
Printf("TTaskManager::SHouldBeInit ...");
62-
TLockGuard lock(&fMutex);
63+
6364
TTaskPoolManager *tpm;
6465
fTaskThreadPools = new TObjArray();
6566

@@ -73,50 +74,61 @@ void TTaskManager::Exec(Option_t *option) {
7374
for (i = 0; i < kAllTypes; i++) {
7475
tpm = new TTaskPoolManager(fNumOfThreads[i]);
7576
fTaskThreadPools->Add(tpm);
76-
// tpm->Print();
77+
// tpm->Print();
7778
}
78-
7979
}
8080

8181
// Loops until not all tasks are assigned
82-
// Int_t counter = 0;
82+
// Int_t counter = 0;
8383
while (1) {
84-
TLockGuard lock(&fMutex);
85-
// Printf("Manager loop %d", counter++);
86-
SetAllAssigned();
84+
85+
TThread::Lock();
86+
// TLockGuard lock(&fMutex);
87+
// Printf("Manager loop %d", counter++);
88+
// {
89+
// TLockGuard lock(&fMutex);
90+
fIsAllAssigned = kTRUE;
91+
// }
8792
RunTask(option);
88-
if (fIsAllAssigned) break;
93+
if (fIsAllAssigned) {TThread::UnLock();break;}
94+
TThread::UnLock();
8995
fDepCondition.Wait();
9096
}
9197

92-
TIter next(fTaskThreadPools);
93-
TTaskPoolManager *pool;
94-
while ((pool = (TTaskPoolManager *)next())) {
95-
pool->Stop(kTRUE,kTRUE);
98+
{
99+
TLockGuard lock(&fMutex);
100+
101+
TIter next(fTaskThreadPools);
102+
TTaskPoolManager *pool;
103+
while ((pool = (TTaskPoolManager *)next())) {
104+
pool->Stop(kTRUE,kTRUE);
105+
}
96106
}
97107
//
98108
////// TODO Serving tasks
99109
//// FinishServingTasks();
100110
////
101111
// wait untill serving task are are finished
102-
// next.Reset();
103-
// while ((pool = (TTaskPoolManager *)next())) {
104-
// pool->Stop(kTRUE);
105-
// }
112+
// next.Reset();
113+
// while ((pool = (TTaskPoolManager *)next())) {
114+
// pool->Stop(kTRUE);
115+
// }
106116
//
107117
// // sync monitoring
108118
// if (fUseMonitoring) fMonThreadPool->Stop(kTRUE);
109119
//
110120
if (!fParent) RestoreManager();
111121

112-
// Printf("Done");
122+
// Printf("Done");
113123
}
114124

115125
//_________________________________________________________________________________________________
116126
void TTaskManager::PushTask(TTaskParallel *t) {
117127

118-
TTaskPoolManager *taskPoolMgr = (TTaskPoolManager *)fTaskThreadPools->At(t->GetType());
119-
taskPoolMgr->PushTask(t);
128+
{
129+
TTaskPoolManager *taskPoolMgr = (TTaskPoolManager *)fTaskThreadPools->At(t->GetType());
130+
taskPoolMgr->PushTask(t);
131+
}
120132
}
121133

122134
//_________________________________________________________________________________________________
@@ -162,12 +174,12 @@ void TTaskManager::RestoreManager() {
162174
TThread::Lock();
163175
SetStatusType(TTaskParallel::kWaiting, kTRUE);
164176
TThread::UnLock();
165-
// TIter next(fTaskThreadPools);
166-
// TTaskPoolManager *pool;
167-
// Printf("Restore Manager Pools %lld %s", fTaskThreadPools->GetEntries(), GetName());
168-
// while ((pool = (TTaskPoolManager *)next())) {
169-
// pool->SetStop(kFALSE);
170-
// }
177+
// TIter next(fTaskThreadPools);
178+
// TTaskPoolManager *pool;
179+
// Printf("Restore Manager Pools %lld %s", fTaskThreadPools->GetEntries(), GetName());
180+
// while ((pool = (TTaskPoolManager *)next())) {
181+
// pool->SetStop(kFALSE);
182+
// }
171183

172184

173185
// Printf("Main Task Manager sleeping ");

ParallelTasks/TTaskManager.h

-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ class TTaskManager : public TTaskParallel {
4444
private:
4545

4646
static TTaskManager *fgTaskManager; // global manager
47-
TMutex fMutex; // mutex for manager
4847
TObjArray *fTaskThreadPools; // list of thread pools
4948
Int_t fNumOfThreads[kAllTypes]; // number of threads reserved for different type
5049
Bool_t fIsAllAssigned; // flag if all tasks are done

ParallelTasks/TTaskParallel.cxx

+51-39
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@ ClassImp(TTaskParallel)
1818

1919
//_________________________________________________________________________________________________
2020
TTaskParallel::TTaskParallel(const char *name, const char *title) :
21-
TTask(name, title),
22-
fTaskStatusType(kWaiting),
23-
fTaskType(kCpu),
24-
fParent(0),
25-
fListDeps(0)
21+
TTask(name, title),
22+
fTaskStatusType(kWaiting),
23+
fTaskType(kCpu),
24+
fParent(0),
25+
fListDeps(0),
26+
fMutex()
2627
{
2728
// Std constructor
2829

@@ -37,11 +38,11 @@ TTaskParallel::~TTaskParallel() {
3738

3839
//_________________________________________________________________________________________________
3940
TTaskParallel::TTaskParallel(const TTaskParallel &obj) :
40-
TTask(obj),
41-
fTaskStatusType(obj.fTaskStatusType),
42-
fTaskType(obj.fTaskType),
43-
fParent(obj.fParent),
44-
fListDeps(obj.fListDeps)
41+
TTask(obj),
42+
fTaskStatusType(obj.fTaskStatusType),
43+
fTaskType(obj.fTaskType),
44+
fParent(obj.fParent),
45+
fListDeps(obj.fListDeps)
4546

4647
{
4748
//
@@ -109,24 +110,31 @@ void TTaskParallel::RunTask(Option_t *option, TTaskParallel::ETaskType /*type*/)
109110
if (!fTasks) return;
110111

111112
TIter next(fTasks);
112-
TTask *task;
113+
// TTask *task;
113114
TTaskParallel *t;
114115
TTaskManager *taskMgr = TTaskManager::GetTaskManager();
115-
while ((task = (TTask *) next())) {
116-
if (!task->IsActive()) continue;
117-
t = (TTaskParallel *) task;
118-
// Printf("Testing %s [%s] %p", t->GetName(), t->GetStatusTypeName(), t->GetParent());
116+
while ((t = (TTaskParallel *) next())) {
117+
if (!t->IsActive()) continue;
118+
// t = (TTaskParallel *) task;
119+
// Printf("Testing %s [%s] %p", t->GetName(), t->GetStatusTypeName(), t->GetParent());
119120
if (t->GetStatusType() == TTaskParallel::kWaiting) {
120121
if (t->HasDependency()) {
121122
// task was not assigned
122-
taskMgr->SetAllAssigned(kFALSE);
123+
{
124+
// TLockGuard lock(taskMgr->GetMutex());
125+
taskMgr->SetAllAssigned(kFALSE);
126+
}
123127
} else {
124-
t->SetStatusType(TTaskParallel::kAssigned);
125-
// Printf("Pushing task %s [%s]", t->GetName(), t->GetStatusTypeName());
128+
{
129+
// TLockGuard lock2(taskMgr->GetMutex());
130+
// TLockGuard lock(t->GetMutex());
131+
t->SetStatusType(TTaskParallel::kAssigned);
132+
}
133+
// Printf("Pushing task %s [%s]", t->GetName(), t->GetStatusTypeName());
126134
taskMgr->PushTask(t);
127135
}
128136
}
129-
// gSystem->Sleep(100);
137+
// gSystem->Sleep(100);
130138
t->RunTask(option);
131139
}
132140

@@ -145,11 +153,15 @@ Bool_t TTaskParallel::HasDependency() {
145153

146154
//_________________________________________________________________________________________________
147155
void TTaskParallel::SetStatusType(ETaskStatusType t, Bool_t recursivly) {
156+
157+
// TLockGuard lock2(GetMutex());
158+
148159
fTaskStatusType = t;
149160
if (recursivly) {
150161
TIter next(fTasks);
151162
TTaskParallel *task;
152163
while ((task = (TTaskParallel *)next())) {
164+
// TLockGuard lock(task->GetMutex());
153165
task->SetStatusType(t, recursivly);
154166
}
155167
}
@@ -159,18 +171,18 @@ void TTaskParallel::SetStatusType(ETaskStatusType t, Bool_t recursivly) {
159171
const char *TTaskParallel::GetStatusTypeName(ETaskStatusType t) {
160172

161173
switch (t) {
162-
case kWaiting:
163-
return "W";
164-
case kAssigned:
165-
return "A";
166-
case kRunning:
167-
return "R";
168-
case kDoneServing:
169-
return "DS";
170-
case kDone:
171-
return "D";
172-
default:
173-
return "";
174+
case kWaiting:
175+
return "W";
176+
case kAssigned:
177+
return "A";
178+
case kRunning:
179+
return "R";
180+
case kDoneServing:
181+
return "DS";
182+
case kDone:
183+
return "D";
184+
default:
185+
return "";
174186
}
175187
return "";
176188
}
@@ -179,14 +191,14 @@ const char *TTaskParallel::GetStatusTypeName(ETaskStatusType t) {
179191
const char *TTaskParallel::GetTypeName(ETaskType t) {
180192

181193
switch (t) {
182-
case kCpu:
183-
return "CPU";
184-
case kIO:
185-
return "IO";
186-
case kFake:
187-
return "FAKE";
188-
default:
189-
return "";
194+
case kCpu:
195+
return "CPU";
196+
case kIO:
197+
return "IO";
198+
case kFake:
199+
return "FAKE";
200+
default:
201+
return "";
190202
}
191203
return "";
192204
}

ParallelTasks/TTaskParallel.h

+7-4
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,16 @@ class TTaskParallel : public TTask {
4848
void RunTask(Option_t *option, TTaskParallel::ETaskType type = TTaskParallel::kCpu);
4949

5050
Bool_t HasDependency();
51+
TMutex *GetMutex() { return &fMutex;}
52+
5153

5254
protected:
5355

54-
ETaskStatusType fTaskStatusType;// Status tyoe
55-
ETaskType fTaskType; // task type
56-
TTask *fParent; // partent taks
57-
TList *fListDeps; // dependency list
56+
ETaskStatusType fTaskStatusType; // Status tyoe
57+
ETaskType fTaskType; // task type
58+
TTask *fParent; // partent taks
59+
TList *fListDeps; // dependency list
60+
TMutex fMutex; // mutex for manager
5861

5962
ClassDef(TTaskParallel, 1)
6063

ParallelTasks/TTaskStress.cxx

+11-4
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ void TTaskStress::Exec(Option_t *option) {
6363
// Exec of manager task
6464
//
6565

66+
// Printf("Running task %s",GetName());
67+
// gSystem->Sleep(1);
68+
return;
69+
6670
TStopwatch timer;
6771
timer.Start();
6872

@@ -83,12 +87,15 @@ void TTaskStress::Exec(Option_t *option) {
8387

8488
if (fType == kSleep) {
8589
// TThread::Lock();
86-
UInt_t time = (UInt_t) r.Gaus(5000, 1000);
87-
time = 1;
90+
91+
Int_t multi=1000;
92+
multi = 10;
93+
UInt_t time = (UInt_t) r.Gaus(5*multi, 1*multi);
94+
// time = 1;
8895
// TThread::UnLock();
89-
// Printf("[%s]%s%s S (%ld)", option, prefix.Data(), GetName(), time);
96+
// Printf("[%s]%s%s S (%lld)", option, prefix.Data(), GetName(), time);
9097
gSystem->Sleep(time);
91-
// Printf("[%s]%s%s D (%ld)", option, prefix.Data(), GetName(), time);
98+
// Printf("[%s]%s%s D (%lld)", option, prefix.Data(), GetName(), time);
9299
} else if (fType == kCpu) {
93100
TH1D h(TString::Format("myTaskStress_%s", GetName()).Data(), "My Stress task hist", 100, -10, 10);
94101
h.SetDirectory(0);

0 commit comments

Comments
 (0)