Skip to content

Commit 1c66a03

Browse files
cheniujhbrother-jin
authored andcommitted
fix: changed the calculating logic of epoll timeout provided by TimerTaskManager (#2794)
* changed the returned timeout of TimerTaskManager --------- Co-authored-by: cheniujh <[email protected]>
1 parent aa59b40 commit 1c66a03

File tree

3 files changed

+40
-57
lines changed

3 files changed

+40
-57
lines changed

src/net/src/dispatch_thread.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ int DispatchThread::StartThread() {
6666
// Adding timer tasks and run timertaskThread
6767
timer_task_thread_.AddTimerTask("blrpop_blocking_info_scan", 250, true,
6868
[this] { this->ScanExpiredBlockedConnsOfBlrpop(); });
69-
timer_task_thread_.set_thread_name("TimerTaskThread");
69+
timer_task_thread_.set_thread_name("DispacherTimerTaskThread");
7070
timer_task_thread_.StartThread();
7171
return ServerThread::StartThread();
7272
}

src/net/src/net_util.cc

Lines changed: 23 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -27,39 +27,39 @@ int Setnonblocking(int sockfd) {
2727
return flags;
2828
}
2929

30-
uint32_t TimerTaskManager::AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec,
30+
TimerTaskID TimerTaskManager::AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec,
3131
const std::function<void()>& task) {
3232
TimedTask new_task = {last_task_id_++, task_name, interval_ms, repeat_exec, task};
3333
id_to_task_[new_task.task_id] = new_task;
3434

3535
int64_t next_expired_time = NowInMs() + interval_ms;
3636
exec_queue_.insert({next_expired_time, new_task.task_id});
3737

38-
if (min_interval_ms_ > interval_ms || min_interval_ms_ == -1) {
39-
min_interval_ms_ = interval_ms;
40-
}
4138
// return the id of this task
4239
return new_task.task_id;
4340
}
41+
4442
int64_t TimerTaskManager::NowInMs() {
4543
auto now = std::chrono::system_clock::now();
4644
return std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count();
4745
}
48-
int TimerTaskManager::ExecTimerTask() {
46+
47+
int64_t TimerTaskManager::ExecTimerTask() {
4948
std::vector<ExecTsWithId> fired_tasks_;
5049
int64_t now_in_ms = NowInMs();
51-
// traverse in ascending order
52-
for (auto pair = exec_queue_.begin(); pair != exec_queue_.end(); pair++) {
53-
if (pair->exec_ts <= now_in_ms) {
54-
auto it = id_to_task_.find(pair->id);
50+
// traverse in ascending order, and exec expired tasks
51+
for (const auto& task : exec_queue_) {
52+
if (task.exec_ts <= now_in_ms) {
53+
auto it = id_to_task_.find(task.id);
5554
assert(it != id_to_task_.end());
5655
it->second.fun();
57-
fired_tasks_.push_back({pair->exec_ts, pair->id});
56+
fired_tasks_.push_back({task.exec_ts, task.id});
5857
now_in_ms = NowInMs();
5958
} else {
6059
break;
6160
}
6261
}
62+
6363
for (auto task : fired_tasks_) {
6464
exec_queue_.erase(task);
6565
auto it = id_to_task_.find(task.id);
@@ -69,16 +69,21 @@ int TimerTaskManager::ExecTimerTask() {
6969
exec_queue_.insert({now_in_ms + it->second.interval_ms, task.id});
7070
} else {
7171
// this task only need to be exec once, completely remove this task
72-
int interval_del = it->second.interval_ms;
7372
id_to_task_.erase(task.id);
74-
if (interval_del == min_interval_ms_) {
75-
RenewMinIntervalMs();
76-
}
7773
}
7874
}
79-
return min_interval_ms_;
75+
76+
if (exec_queue_.empty()) {
77+
//to avoid wasting of cpu resources, epoll use 5000ms as timeout value when no task to exec
78+
return 5000;
79+
}
80+
81+
int64_t gap_between_now_and_next_task = exec_queue_.begin()->exec_ts - NowInMs();
82+
gap_between_now_and_next_task = gap_between_now_and_next_task < 0 ? 0 : gap_between_now_and_next_task;
83+
return gap_between_now_and_next_task;
8084
}
81-
bool TimerTaskManager::DelTimerTaskByTaskId(uint32_t task_id) {
85+
86+
bool TimerTaskManager::DelTimerTaskByTaskId(TimerTaskID task_id) {
8287
// remove the task
8388
auto task_to_del = id_to_task_.find(task_id);
8489
if (task_to_del == id_to_task_.end()) {
@@ -87,11 +92,6 @@ bool TimerTaskManager::DelTimerTaskByTaskId(uint32_t task_id) {
8792
int interval_del = task_to_del->second.interval_ms;
8893
id_to_task_.erase(task_to_del);
8994

90-
// renew the min_interval_ms_
91-
if (interval_del == min_interval_ms_) {
92-
RenewMinIntervalMs();
93-
}
94-
9595
// remove from exec queue
9696
ExecTsWithId target_key = {-1, 0};
9797
for (auto pair : exec_queue_) {
@@ -106,15 +106,6 @@ bool TimerTaskManager::DelTimerTaskByTaskId(uint32_t task_id) {
106106
return true;
107107
}
108108

109-
void TimerTaskManager::RenewMinIntervalMs() {
110-
min_interval_ms_ = -1;
111-
for (auto pair : id_to_task_) {
112-
if (pair.second.interval_ms < min_interval_ms_ || min_interval_ms_ == -1) {
113-
min_interval_ms_ = pair.second.interval_ms;
114-
}
115-
}
116-
}
117-
118109
TimerTaskThread::~TimerTaskThread() {
119110
if (!timer_task_manager_.Empty()) {
120111
LOG(INFO) << "TimerTaskThread exit !!!";
@@ -140,9 +131,9 @@ int TimerTaskThread::StopThread() {
140131
}
141132

142133
void* TimerTaskThread::ThreadMain() {
143-
int timeout;
134+
int32_t timeout;
144135
while (!should_stop()) {
145-
timeout = timer_task_manager_.ExecTimerTask();
136+
timeout = static_cast<int32_t>(timer_task_manager_.ExecTimerTask());
146137
net_multiplexer_->NetPoll(timeout);
147138
}
148139
return nullptr;

src/net/src/net_util.h

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
namespace net {
2222

2323
int Setnonblocking(int sockfd);
24-
24+
using TimerTaskID = int64_t;
2525
struct TimedTask{
26-
uint32_t task_id;
26+
TimerTaskID task_id;
2727
std::string task_name;
2828
int interval_ms;
2929
bool repeat_exec;
@@ -34,7 +34,7 @@ struct ExecTsWithId {
3434
//the next exec time of the task, unit in ms
3535
int64_t exec_ts;
3636
//id of the task to be exec
37-
uint32_t id;
37+
TimerTaskID id;
3838

3939
bool operator<(const ExecTsWithId& other) const{
4040
if(exec_ts == other.exec_ts){
@@ -47,36 +47,28 @@ struct ExecTsWithId {
4747
}
4848
};
4949

50-
/*
51-
* For simplicity, current version of TimerTaskThread has no lock inside and all task should be registered before TimerTaskThread started,
52-
* but if you have the needs of dynamically add/remove timer task after TimerTaskThread started, you can simply add a mutex to protect
53-
* the timer_task_manager_ and also a pipe to wake up the maybe being endless-wait epoll(if all task consumed, epoll will sink into
54-
* endless wait) to implement the feature.
55-
*/
5650
class TimerTaskManager {
5751
public:
5852
TimerTaskManager() = default;
5953
~TimerTaskManager() = default;
60-
61-
uint32_t AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function<void()> &task);
62-
//return the newest min_minterval_ms
63-
int ExecTimerTask();
64-
bool DelTimerTaskByTaskId(uint32_t task_id);
65-
int GetMinIntervalMs() const { return min_interval_ms_; }
54+
TimerTaskID AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function<void()> &task);
55+
//return the time gap between now and next task-expired time, which can be used as the timeout value of epoll
56+
int64_t ExecTimerTask();
57+
bool DelTimerTaskByTaskId(TimerTaskID task_id);
6658
int64_t NowInMs();
67-
void RenewMinIntervalMs();
68-
bool Empty(){ return 0 == last_task_id_; }
69-
59+
bool Empty() const { return exec_queue_.empty(); }
7060
private:
7161
//items stored in std::set are ascending ordered, we regard it as an auto sorted queue
7262
std::set<ExecTsWithId> exec_queue_;
73-
std::unordered_map<uint32_t, TimedTask> id_to_task_;
74-
uint32_t last_task_id_{0};
75-
int min_interval_ms_{-1};
63+
std::unordered_map<TimerTaskID, TimedTask> id_to_task_;
64+
TimerTaskID last_task_id_{0};
7665
};
7766

7867

79-
68+
/*
69+
* For simplicity, current version of TimerTaskThread has no lock inside and all task should be registered before TimerTaskThread started,
70+
* but if you have the needs of dynamically add/remove timer task after TimerTaskThread started, you can simply add a mutex to protect the timer_task_manager_
71+
*/
8072
class TimerTaskThread : public Thread {
8173
public:
8274
TimerTaskThread(){
@@ -88,11 +80,11 @@ class TimerTaskThread : public Thread {
8880
int StopThread() override;
8981
void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }
9082

91-
uint32_t AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function<void()> &task){
83+
TimerTaskID AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function<void()> &task){
9284
return timer_task_manager_.AddTimerTask(task_name, interval_ms, repeat_exec, task);
9385
};
9486

95-
bool DelTimerTaskByTaskId(uint32_t task_id){
87+
bool DelTimerTaskByTaskId(TimerTaskID task_id){
9688
return timer_task_manager_.DelTimerTaskByTaskId(task_id);
9789
};
9890

0 commit comments

Comments
 (0)