Skip to content

Commit

Permalink
refactor: refine loop exit behaviour
Browse files Browse the repository at this point in the history
1. clear loop-exit behviour
2. remove c99 struct init
  • Loading branch information
gonghuan committed Feb 18, 2024
1 parent 3d36ed9 commit cf1f62d
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 160 deletions.
17 changes: 14 additions & 3 deletions base/message_loop/fd_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <memory>

#include "base/lt_macro.h"
#include "base/queue/double_linked_list.h"
#include "event.h"

namespace base {
Expand Down Expand Up @@ -53,6 +52,18 @@ class FdEvent final {
virtual void HandleEvent(FdEvent* fdev, LtEv::Event ev) = 0;
} Handler;

using HandlerFunc = std::function<void(FdEvent* fdev, LtEv::Event ev)>;
class FuncHandler : public Handler {
public:
FuncHandler(HandlerFunc fn) : func_(fn){};

void HandleEvent(FdEvent* fdev, LtEv::Event ev) override {
return func_(fdev, ev);
};
private:
std::function<void(FdEvent* fdev, LtEv::Event ev)> func_;
};

static RefFdEvent Create(int fd, LtEv::Event ev);

static RefFdEvent Create(Handler* h, int fd, LtEv::Event ev);
Expand All @@ -67,9 +78,9 @@ class FdEvent final {

inline void SetHandler(Handler* h) { handler_ = h; }

inline Handler* GetHandler() const { return handler_;}
inline Handler* GetHandler() const { return handler_; }

inline void SetFdWatcher(Watcher* w) {watcher_ = w;}
inline void SetFdWatcher(Watcher* w) { watcher_ = w; }

inline Watcher* EventWatcher() { return watcher_; }

Expand Down
157 changes: 69 additions & 88 deletions base/message_loop/message_loop.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@
#include <sys/types.h>
#include <unistd.h>

#include <chrono>
#include <csignal>
#include <functional>
#include <iostream>
#include <sstream>
#include <string>

#include "file_util_linux.h"
Expand Down Expand Up @@ -81,38 +79,45 @@ uint64_t MessageLoop::GenLoopID() {
MessageLoop::MessageLoop() : MessageLoop(generate_loop_name()) {}

MessageLoop::MessageLoop(const std::string& name)
: start_flag_(0),
loop_name_(name),
: loop_name_(name),
wakeup_pipe_in_(0) {
start_flag_.clear();

notify_flag_.clear();

int fds[2];
CHECK(CreateLocalNoneBlockingPipe(fds));

wakeup_pipe_in_ = fds[1];
wakeup_event_ = FdEvent::Create(this, fds[0], LtEv::READ);
ctrl_ev_handler_.reset(
new base::FdEvent::FuncHandler(std::bind(&MessageLoop::HandleCtrlEvent,
this,
std::placeholders::_1,
std::placeholders::_2)));
wakeup_event_ = FdEvent::Create(ctrl_ev_handler_.get(), fds[0], LtEv::READ);

int ev_fd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
task_event_ = FdEvent::Create(this, ev_fd, LtEv::READ);

task_ev_handler_.reset(
new base::FdEvent::FuncHandler(std::bind(&MessageLoop::HandleTaskEvent,
this,
std::placeholders::_1,
std::placeholders::_2)));
task_event_ = FdEvent::Create(task_ev_handler_.get(), ev_fd, LtEv::READ);

pump_.SetLoopId(GenLoopID());
}

MessageLoop::~MessageLoop() {
CHECK(!(running_ && IsInLoopThread()));
CHECK(!IsInLoopThread());

QuitLoop();
Notify(wakeup_pipe_in_, &kQuit, sizeof(kQuit));
SyncStop();

// first join, others wait util loop end
if (thread_ptr_ && thread_ptr_->joinable()) {
VLOG(VINFO) << LOOP_LOG_DETAIL << "join here";
thread_ptr_->join();
}

WaitLoopEnd();

::close(wakeup_pipe_in_);
VLOG(VINFO) << LOOP_LOG_DETAIL << "Gone";
}
Expand All @@ -127,18 +132,26 @@ void MessageLoop::WakeUpIfNeeded() {
}
}

void MessageLoop::HandleEvent(FdEvent* fdev, LtEv::Event ev) {
if (LtEv::has_read(ev)) {
return HandleRead(fdev);
}
LOG(ERROR) << LOOP_LOG_DETAIL << "event:" << fdev->EventInfo();
void MessageLoop::HandleTaskEvent(FdEvent* fdev, LtEv::Event ev) {
uint64_t count = 0;
int ret = ::read(task_event_->GetFd(), &count, sizeof(count));
LOG_IF(ERROR, ret < 0) << " error:" << StrError()
<< " fd:" << task_event_->GetFd();

// must clear after read, minimum system call times
notify_flag_.clear();

RunScheduledTask();
}

void MessageLoop::HandleRead(FdEvent* fd_event) {
if (fd_event == task_event_.get()) {
RunCommandTask(ScheduledTaskType::TaskTypeDefault);
} else if (fd_event == wakeup_event_.get()) {
RunCommandTask(ScheduledTaskType::TaskTypeCtrl);
void MessageLoop::HandleCtrlEvent(FdEvent* fdev, LtEv::Event ev) {
char buf = 0x7F;
int ret = ::read(wakeup_event_->GetFd(), &buf, sizeof(buf));
LOG_IF(ERROR, ret < 0) << " error:" << StrError(errno)
<< " fd:" << wakeup_event_->GetFd();

if (buf == kQuit) {
running_ = false;
} else {
LOG(ERROR) << LOOP_LOG_DETAIL << "should not reached";
}
Expand All @@ -152,19 +165,6 @@ bool MessageLoop::IsInLoopThread() const {
return pump_.IsInLoop();
}

void MessageLoop::Start() {
if (start_flag_.test_and_set(std::memory_order_acquire)) {
LOG(INFO) << LOOP_LOG_DETAIL << "aready start runing...";
return;
}

thread_ptr_.reset(new std::thread(std::bind(&MessageLoop::ThreadMain, this)));
{
std::unique_lock<std::mutex> lk(start_stop_lock_);
cv_.wait(lk, [this]() { return running_; });
}
}

PersistRunner* MessageLoop::DelegateRunner() {
return delegate_runner_;
}
Expand All @@ -175,6 +175,29 @@ void MessageLoop::InstallPersistRunner(PersistRunner* runner) {
delegate_runner_ = runner;
}

void MessageLoop::Start() {
auto once_fn = [&]() {
auto loop_main = std::bind(&MessageLoop::ThreadMain, this);
thread_ptr_.reset(new std::thread(loop_main));

std::unique_lock<std::mutex> lk(start_stop_lock_);
cv_.wait(lk, [this]() { return running_; });
};
std::call_once(start_onece_, once_fn);
}

void MessageLoop::SyncStop() {
CHECK(!IsInLoopThread());

Notify(wakeup_pipe_in_, &kQuit, sizeof(kQuit));

WaitLoopEnd();
}

void MessageLoop::QuitLoop() {
Notify(wakeup_pipe_in_, &kQuit, sizeof(kQuit));
}

void MessageLoop::WaitLoopEnd(int32_t ms) {
if (!running_) {
return;
Expand All @@ -189,15 +212,14 @@ void MessageLoop::WaitLoopEnd(int32_t ms) {
}
}

uint64_t MessageLoop::PumpTimeout() {
uint64_t MessageLoop::CalcMaxPumpTimeout() {
if (in_loop_tasks_.size() > 0 || scheduled_tasks_.size_approx() > 0) {
return 0;
}
if (delegate_runner_ && delegate_runner_->HasPeedingTask()) {
return 0;
}
return PendingTasksCount() > 0 ? 0 : 50;
}

size_t MessageLoop::PendingTasksCount() const {
return in_loop_tasks_.size() + scheduled_tasks_.size_approx();
return 50;
}

void MessageLoop::ThreadMain() {
Expand All @@ -214,25 +236,25 @@ void MessageLoop::ThreadMain() {
pump_.InstallFdEvent(task_event_.get());
pump_.InstallFdEvent(wakeup_event_.get());

cv_.notify_all();
cv_.notify_all(); // notify loop start; see ::Start
while (running_) {
// pump io/timer event
pump_.Pump(PumpTimeout());
// pump io/timer event; trigger RunScheduledTask
pump_.Pump(CalcMaxPumpTimeout());

RunNestedTask();
}

RunNestedTask();

RunScheduledTask();

RunNestedTask();

pump_.RemoveFdEvent(task_event_.get());

pump_.RemoveFdEvent(wakeup_event_.get());

threadlocal_current_ = NULL;
VLOG(VINFO) << LOOP_LOG_DETAIL << "Thread End";
cv_.notify_all();
cv_.notify_all(); // notify loop stop; see ::WaitLoopEnd
}

bool MessageLoop::PostDelayTask(TaskBasePtr task, uint32_t ms) {
Expand Down Expand Up @@ -283,47 +305,6 @@ void MessageLoop::RunNestedTask() {
}
}

void MessageLoop::RunCommandTask(ScheduledTaskType type) {
switch (type) {
case ScheduledTaskType::TaskTypeDefault: {
uint64_t count = 0;
int ret = ::read(task_event_->GetFd(), &count, sizeof(count));
LOG_IF(ERROR, ret < 0)
<< " error:" << StrError() << " fd:" << task_event_->GetFd();

// clear must clear after read
notify_flag_.clear();

RunScheduledTask();

} break;
case ScheduledTaskType::TaskTypeCtrl: {
char buf = 0x7F;
int ret = ::read(wakeup_event_->GetFd(), &buf, sizeof(buf));
LOG_IF(ERROR, ret < 0)
<< " error:" << StrError(errno) << " fd:" << wakeup_event_->GetFd();

switch (buf) {
case kQuit: {
QuitLoop();
} break;
default: {
LOG(ERROR) << LOOP_LOG_DETAIL << "should not reached";
DCHECK(false);
} break;
}
} break;
default:
DCHECK(false);
LOG(ERROR) << LOOP_LOG_DETAIL << "should not reached";
break;
}
}

void MessageLoop::QuitLoop() {
running_ = false;
}

int MessageLoop::Notify(int fd, const void* data, size_t count) {
return ::write(fd, data, count);
}
Expand Down
46 changes: 21 additions & 25 deletions base/message_loop/message_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,10 @@ class PersistRunner {
LtClosure notifier_;
};

class MessageLoop : public FdEvent::Handler {
class MessageLoop {
public:
static MessageLoop* Current();

typedef enum {
TaskTypeDefault = 0,
TaskTypeReply = 1,
TaskTypeCtrl = 2
} ScheduledTaskType;

MessageLoop();

MessageLoop(const std::string& name);
Expand Down Expand Up @@ -111,6 +105,10 @@ class MessageLoop : public FdEvent::Handler {

void Start();

void SyncStop();

void QuitLoop();

bool IsInLoopThread() const;

void WakeUpIfNeeded();
Expand All @@ -126,58 +124,56 @@ class MessageLoop : public FdEvent::Handler {

const std::string& LoopName() const { return loop_name_; }

void QuitLoop();

// not preciese running status
bool Running() const { return running_; }

EventPump* Pump() { return &pump_; }

private:
void ThreadMain();
void SetThreadNativeName();

void RunCommandTask(ScheduledTaskType t);

uint64_t PumpTimeout();
void SetThreadNativeName();

size_t PendingTasksCount() const;
uint64_t CalcMaxPumpTimeout();

// nested task: post another task in current loop
// override from pump for nested task;
// nested task: task scheduled in current loop
void RunNestedTask();

void RunScheduledTask();

int Notify(int fd, const void* data, size_t count);

void HandleEvent(FdEvent* fdev, LtEv::Event ev) override;

void HandleRead(FdEvent* fd_event);

private:
using ThreadPtr = std::unique_ptr<std::thread>;
using EvFuncHandlerPtr = std::unique_ptr<base::FdEvent::FuncHandler>;

bool running_ = false;
std::atomic_flag start_flag_;
bool running_ = false; // only assign in loop thread
std::once_flag start_onece_;

std::mutex start_stop_lock_;
std::condition_variable cv_;

std::string loop_name_;
ThreadPtr thread_ptr_;

RefFdEvent task_event_;
std::atomic_flag notify_flag_;

// task relative
TaskQueue scheduled_tasks_;

std::vector<TaskBasePtr> in_loop_tasks_;

PersistRunner* delegate_runner_ = nullptr;

RefFdEvent task_event_;
std::atomic_flag notify_flag_;
EvFuncHandlerPtr task_ev_handler_;
void HandleTaskEvent(FdEvent* fdev, LtEv::Event ev);


// pipe just use for loop control
int wakeup_pipe_in_ = -1;
RefFdEvent wakeup_event_;
EvFuncHandlerPtr ctrl_ev_handler_;
void HandleCtrlEvent(FdEvent* fdev, LtEv::Event ev);

EventPump pump_;
DISALLOW_COPY_AND_ASSIGN(MessageLoop);
Expand Down
Loading

0 comments on commit cf1f62d

Please sign in to comment.