-
Notifications
You must be signed in to change notification settings - Fork 11
Custom executor support #534
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: devel
Are you sure you want to change the base?
Conversation
86da4d0
to
112949e
Compare
91288d4
to
18d77af
Compare
AlicaTime beginTime = _clock.now(); | ||
|
||
while (_running) { | ||
AlicaTime beginTime = _clock.now(); | ||
|
||
if (_stepEngine) { | ||
if (_stepEngine) { | ||
#ifdef ALICA_DEBUG_ENABLED | ||
Logging::logDebug(LOGNAME) << "===CUR TREE==="; | ||
if (_rootNode == nullptr) { | ||
Logging::logDebug(LOGNAME) << "NULL"; | ||
} else { | ||
_rootNode->printRecursive(); | ||
} | ||
Logging::logDebug(LOGNAME) << "===END CUR TREE==="; | ||
Logging::logDebug(LOGNAME) << "===CUR TREE==="; | ||
if (_rootNode == nullptr) { | ||
Logging::logDebug(LOGNAME) << "NULL"; | ||
} else { | ||
_rootNode->printRecursive(); | ||
} | ||
Logging::logDebug(LOGNAME) << "===END CUR TREE==="; | ||
#endif | ||
{ | ||
std::unique_lock<std::mutex> lckStep(_stepMutex); | ||
_isWaiting = true; | ||
_stepModeCV.wait(lckStep, [&] { return _stepCalled.load(); }); | ||
_stepCalled = false; | ||
_isWaiting = false; | ||
if (!_running) { | ||
return; | ||
} | ||
{ | ||
std::unique_lock<std::mutex> lckStep(_stepMutex); | ||
_isWaiting = true; | ||
_stepModeCV.wait(lckStep, [&] { return _stepCalled.load(); }); | ||
_stepCalled = false; | ||
_isWaiting = false; | ||
if (!_running) { | ||
return; | ||
} | ||
beginTime = _clock.now(); | ||
} | ||
beginTime = _clock.now(); | ||
} | ||
|
||
// Send tick to other modules | ||
//_ae->getCommunicator().tick(); // not implemented as ros works asynchronous | ||
_teamObserver.tick(_rootNode); | ||
_roleAssignment.tick(); | ||
_syncModule.tick(); | ||
_authorityManager.tick(_rootNode); | ||
_teamManager.tick(); | ||
// Send tick to other modules | ||
//_ae->getCommunicator().tick(); // not implemented as ros works asynchronous | ||
_teamObserver.tick(_rootNode); | ||
_roleAssignment.tick(); | ||
_syncModule.tick(); | ||
_authorityManager.tick(_rootNode); | ||
_teamManager.tick(); | ||
|
||
if (_rootNode == nullptr) { | ||
_rootNode = _ruleBook.initialisationRule(masterPlan); | ||
} | ||
_rootNode->preTick(); | ||
if (_rootNode->tick(&_ruleBook) == PlanChange::FailChange) { | ||
Logging::logInfo(LOGNAME) << "MasterPlan Failed"; | ||
} | ||
// clear deepest node pointer before deleting plans: | ||
if (_deepestNode && _deepestNode->isRetired()) { | ||
_deepestNode = nullptr; | ||
} | ||
// remove deletable plans: | ||
// this should be done just before clearing fpEvents, to make sure no spurious pointers remain | ||
if (_rootNode == nullptr) { | ||
_rootNode = _ruleBook.initialisationRule(masterPlan); | ||
} | ||
_rootNode->preTick(); | ||
if (_rootNode->tick(&_ruleBook) == PlanChange::FailChange) { | ||
Logging::logInfo(LOGNAME) << "MasterPlan Failed"; | ||
} | ||
// clear deepest node pointer before deleting plans: | ||
if (_deepestNode && _deepestNode->isRetired()) { | ||
_deepestNode = nullptr; | ||
} | ||
// remove deletable plans: | ||
// this should be done just before clearing fpEvents, to make sure no spurious pointers remain | ||
#ifdef ALICA_DEBUG_ENABLED | ||
int retiredCount = 0; | ||
int inActiveCount = 0; | ||
int deleteCount = 0; | ||
int totalCount = static_cast<int>(_runningPlans.size()); | ||
int retiredCount = 0; | ||
int inActiveCount = 0; | ||
int deleteCount = 0; | ||
int totalCount = static_cast<int>(_runningPlans.size()); | ||
#endif | ||
for (int i = static_cast<int>(_runningPlans.size()) - 1; i >= 0; --i) { | ||
for (int i = static_cast<int>(_runningPlans.size()) - 1; i >= 0; --i) { | ||
#ifdef ALICA_DEBUG_ENABLED | ||
if (_runningPlans[i]->isRetired()) { | ||
++retiredCount; | ||
} else if (!_runningPlans[i]->isActive()) { | ||
++inActiveCount; | ||
} | ||
if (_runningPlans[i]->isRetired()) { | ||
++retiredCount; | ||
} else if (!_runningPlans[i]->isActive()) { | ||
++inActiveCount; | ||
} | ||
#endif | ||
if (_runningPlans[i]->isDeleteable()) { | ||
assert(_runningPlans[i].use_count() == 1); | ||
_runningPlans.erase(_runningPlans.begin() + i); | ||
if (_runningPlans[i]->isDeleteable()) { | ||
assert(_runningPlans[i].use_count() == 1); | ||
_runningPlans.erase(_runningPlans.begin() + i); | ||
#ifdef ALICA_DEBUG_ENABLED | ||
++deleteCount; | ||
++deleteCount; | ||
#endif | ||
} | ||
} | ||
} | ||
#ifdef ALICA_DEBUG_ENABLED | ||
Logging::logDebug(LOGNAME) << (totalCount - inActiveCount - retiredCount) << " active " << retiredCount << " retired " << inActiveCount | ||
<< " inactive deleted: " << deleteCount; | ||
Logging::logDebug(LOGNAME) << (totalCount - inActiveCount - retiredCount) << " active " << retiredCount << " retired " << inActiveCount | ||
<< " inactive deleted: " << deleteCount; | ||
#endif | ||
// lock for fpEvents | ||
{ | ||
std::lock_guard<std::mutex> lock(_lomutex); | ||
_fpEvents = std::queue<RunningPlan*>(); | ||
} | ||
// lock for fpEvents | ||
{ | ||
std::lock_guard<std::mutex> lock(_lomutex); | ||
_fpEvents = std::queue<RunningPlan*>(); | ||
} | ||
|
||
AlicaTime now = _clock.now(); | ||
AlicaTime now = _clock.now(); | ||
|
||
if (now < _lastSendTime) { | ||
Logging::logWarn(LOGNAME) << "lastSendTime is in the future of the current system time, did the system time change?"; | ||
_lastSendTime = now; | ||
} | ||
if (now < _lastSendTime) { | ||
Logging::logWarn(LOGNAME) << "lastSendTime is in the future of the current system time, did the system time change?"; | ||
_lastSendTime = now; | ||
} | ||
|
||
if ((_ruleBook.hasChangeOccurred() && _lastSendTime + _minSendInterval < now) || _lastSendTime + _maxSendInterval < now) { | ||
IdGrp msg; | ||
_deepestNode = _rootNode; | ||
_treeDepth = 0; | ||
_rootNode->toMessage(msg, _deepestNode, _treeDepth, 0); | ||
_teamObserver.doBroadCast(msg); | ||
_lastSendTime = now; | ||
_ruleBook.resetChangeOccurred(); | ||
} | ||
if ((_ruleBook.hasChangeOccurred() && _lastSendTime + _minSendInterval < now) || _lastSendTime + _maxSendInterval < now) { | ||
IdGrp msg; | ||
_deepestNode = _rootNode; | ||
_treeDepth = 0; | ||
_rootNode->toMessage(msg, _deepestNode, _treeDepth, 0); | ||
_teamObserver.doBroadCast(msg); | ||
_lastSendTime = now; | ||
_ruleBook.resetChangeOccurred(); | ||
} | ||
|
||
if (_sendStatusMessages && _lastSentStatusTime + _sendStatusInterval < _clock.now()) { | ||
if (_deepestNode != nullptr) { | ||
_statusMessage->robotIDsWithMe.clear(); | ||
_statusMessage->currentPlan = _deepestNode->getActivePlan()->getName(); | ||
if (_deepestNode->getActiveEntryPoint() != nullptr) { | ||
_statusMessage->currentTask = _deepestNode->getActiveEntryPoint()->getTask()->getName(); | ||
} else { | ||
_statusMessage->currentTask = "IDLE"; | ||
} | ||
if (_deepestNode->getActiveState() != nullptr) { | ||
_statusMessage->currentState = _deepestNode->getActiveState()->getName(); | ||
_deepestNode->getAssignment().getAgentsInState(_deepestNode->getActiveState(), _statusMessage->robotIDsWithMe); | ||
} else { | ||
_statusMessage->currentState = "NONE"; | ||
} | ||
auto tmpRole = _roleAssignment.getOwnRole(); | ||
if (tmpRole) { | ||
_statusMessage->currentRole = _roleAssignment.getOwnRole()->getName(); | ||
} else { | ||
_statusMessage->currentRole = "No Role"; | ||
} | ||
_communicator.sendAlicaEngineInfo(*_statusMessage); | ||
_lastSentStatusTime = _clock.now(); | ||
if (_sendStatusMessages && _lastSentStatusTime + _sendStatusInterval < _clock.now()) { | ||
if (_deepestNode != nullptr) { | ||
_statusMessage->robotIDsWithMe.clear(); | ||
_statusMessage->currentPlan = _deepestNode->getActivePlan()->getName(); | ||
if (_deepestNode->getActiveEntryPoint() != nullptr) { | ||
_statusMessage->currentTask = _deepestNode->getActiveEntryPoint()->getTask()->getName(); | ||
} else { | ||
_statusMessage->currentTask = "IDLE"; | ||
} | ||
if (_deepestNode->getActiveState() != nullptr) { | ||
_statusMessage->currentState = _deepestNode->getActiveState()->getName(); | ||
_deepestNode->getAssignment().getAgentsInState(_deepestNode->getActiveState(), _statusMessage->robotIDsWithMe); | ||
} else { | ||
_statusMessage->currentState = "NONE"; | ||
} | ||
auto tmpRole = _roleAssignment.getOwnRole(); | ||
if (tmpRole) { | ||
_statusMessage->currentRole = _roleAssignment.getOwnRole()->getName(); | ||
} else { | ||
_statusMessage->currentRole = "No Role"; | ||
} | ||
_communicator.sendAlicaEngineInfo(*_statusMessage); | ||
_lastSentStatusTime = _clock.now(); | ||
} | ||
} | ||
|
||
//_ae->iterationComplete(); TODO modify when AlicaEngine::iterationComplete will be written | ||
|
||
//_ae->iterationComplete(); TODO modify when AlicaEngine::iterationComplete will be written | ||
now = _clock.now(); | ||
|
||
now = _clock.now(); | ||
AlicaTime availTime = _loopTime - (now - beginTime); | ||
bool checkFp = false; | ||
if (availTime > AlicaTime::milliseconds(1)) { | ||
std::unique_lock<std::mutex> lock(_lomutex); | ||
checkFp = std::cv_status::no_timeout == _fpEventWait.wait_for(lock, std::chrono::nanoseconds(availTime.inNanoseconds())); | ||
} | ||
|
||
AlicaTime availTime = _loopTime - (now - beginTime); | ||
bool checkFp = false; | ||
if (availTime > AlicaTime::milliseconds(1)) { | ||
std::unique_lock<std::mutex> lock(_lomutex); | ||
checkFp = std::cv_status::no_timeout == _fpEventWait.wait_for(lock, std::chrono::nanoseconds(availTime.inNanoseconds())); | ||
if (checkFp) { | ||
std::queue<RunningPlan*> nextFpEvents; | ||
{ | ||
// move fast path events to a local variable. Prevents calling visit() on a RunningPlan which can add a fast path event double locking | ||
// _loMutex | ||
std::lock_guard<std::mutex> lock(_lomutex); | ||
nextFpEvents.swap(_fpEvents); | ||
} | ||
|
||
if (checkFp) { | ||
std::queue<RunningPlan*> nextFpEvents; | ||
{ | ||
// move fast path events to a local variable. Prevents calling visit() on a RunningPlan which can add a fast path event double locking | ||
// _loMutex | ||
std::lock_guard<std::mutex> lock(_lomutex); | ||
nextFpEvents.swap(_fpEvents); | ||
} | ||
while (_running && availTime > AlicaTime::milliseconds(1) && !nextFpEvents.empty()) { | ||
RunningPlan* rp = nextFpEvents.front(); | ||
nextFpEvents.pop(); | ||
|
||
while (_running && availTime > AlicaTime::milliseconds(1) && !nextFpEvents.empty()) { | ||
RunningPlan* rp = nextFpEvents.front(); | ||
nextFpEvents.pop(); | ||
|
||
if (rp->isActive()) { | ||
bool first = true; | ||
while (rp != nullptr) { | ||
PlanChange change = _ruleBook.visit(*rp); | ||
if (!first && change == PlanChange::NoChange) { | ||
break; | ||
} | ||
rp = rp->getParent(); | ||
first = false; | ||
if (rp->isActive()) { | ||
bool first = true; | ||
while (rp != nullptr) { | ||
PlanChange change = _ruleBook.visit(*rp); | ||
if (!first && change == PlanChange::NoChange) { | ||
break; | ||
} | ||
rp = rp->getParent(); | ||
first = false; | ||
} | ||
now = _clock.now(); | ||
availTime = _loopTime - (now - beginTime); | ||
} | ||
now = _clock.now(); | ||
availTime = _loopTime - (now - beginTime); | ||
} | ||
|
||
{ | ||
// if all fast path events could not be processed, prepend them back to _fpEvents | ||
if (!nextFpEvents.empty()) { | ||
std::lock_guard<std::mutex> lock(_lomutex); | ||
_fpEvents.swap(nextFpEvents); | ||
while (!nextFpEvents.empty()) { | ||
_fpEvents.push(nextFpEvents.front()); | ||
nextFpEvents.pop(); | ||
} | ||
{ | ||
// if all fast path events could not be processed, prepend them back to _fpEvents | ||
if (!nextFpEvents.empty()) { | ||
std::lock_guard<std::mutex> lock(_lomutex); | ||
_fpEvents.swap(nextFpEvents); | ||
while (!nextFpEvents.empty()) { | ||
_fpEvents.push(nextFpEvents.front()); | ||
nextFpEvents.pop(); | ||
} | ||
} | ||
} | ||
} | ||
|
||
now = _clock.now(); | ||
availTime = _loopTime - (now - beginTime); | ||
now = _clock.now(); | ||
availTime = _loopTime - (now - beginTime); | ||
|
||
if (_running && availTime > AlicaTime::microseconds(100) && !_stepEngine) { | ||
_clock.sleep(availTime); | ||
} | ||
if (_running && availTime > AlicaTime::microseconds(100) && !_stepEngine) { | ||
_clock.sleep(availTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the outer while
loop to support externally driven executors, nothing else is changed here.
4431c23
to
047e905
Compare
* @see AlicaCreators | ||
*/ | ||
int init(AlicaCreators&& creatorCtx, bool delayStart = false); | ||
int init(AlicaCreators&& creatorCtx, bool delayStart = false, const std::shared_ptr<IExecutor>& executor = nullptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use a setter for it like with the other injected dependencies?
virtual ~IExecutor() = default; | ||
virtual void start() = 0; | ||
virtual void stop() = 0; | ||
virtual void run() = 0; // only for driven executor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
F: Its also used by AsyncExecutor (but not supposed to be used externally, so maybe you should move current run implementation of AsyncExecutor to another private method?)
ExpressionHandler _expressionHandler; | ||
AuthorityManager _auth; | ||
DefaultTransitionConditionCreator _defaultTransitionConditionCreator; | ||
std::shared_ptr<IExecutor> _executor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
S: Can be removed, should be enough to store in in AlicaContext like other injected dependencies
_ruleBook.init(_globalBlackboard); | ||
if (!_running) { | ||
_running = true; | ||
_executor->start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wont this start calling the PlanBase run method while we are still accessing status msg (which is being accessed in run())? executor should probably be started at the end of the if block.
{ | ||
assert(_run_cb); | ||
while (_running) { | ||
_run_cb(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wont this call PlanBase::run() without respecting any configured frequencies and cause 100% cpu load?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it basically just moved the while
loop here. Unless the previous implementation was also suffering the same issue, I don't think the loop here will cause a 100% CPU load?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh yes, PlanBase run still has a sleep to enforce frequency. But this also causes the sleep to keep a max frequency on the DrivenExecutor, which might be an undesired side effect. It seems only using stepEngine will get us around the sleep.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these are more of a configuration issue? Maybe we shouldn't set stepEngine=true
when we have configured the alica context to use DrivenExecutor
.
@marcus65001 @bjoernschroeder can't we do whatever is there is in this PR using the timer factory? For async, you basically use the alicaSystemTimer & for driven you use the alicaRosTimer. With this we only need to decide if we use the same or different factory for the main thread (imo, we should be able to use the same factory). |
So instead of executor you are suggesting alica timer to be executor? My original suggestion to introduce executor was because of the reason there is a thread right now - events on transition conditions should be prioritized. Through executor it is straightforward to do control of job prioritization and make condition evaluation highest priority even with single thread. I guess any executor can be implemented as a timer to achieve this, even if we want to tick timer manually or with highest priority - it can be done in timer implementation. Maybe it will make more sense and will be a much cleaner solution to utilize timers instead with separate factories for condition evaluation and run method |
yes, my suggestion indeed was to use alica timer as the executor since it is essentially what we want here i.e. calling run() at a certain frequency & the timer code is already implemented, tested & being used without problems in production for quite a few years now. |
Thread was kinda "guaranteeing" execution of engine tick if node is stuck processing long subscription queue/waiting for blocking call |
yes, that's true. That's why I was wondering if we should use a different timer factory for it |
AB#83116