diff --git a/folly/io/async/EventBase.cpp b/folly/io/async/EventBase.cpp index 4fb54715042..3d75c042b2c 100644 --- a/folly/io/async/EventBase.cpp +++ b/folly/io/async/EventBase.cpp @@ -851,6 +851,21 @@ void EventBase::terminateLoopSoon() { queue_->putMessage([] {}); } +void EventBase::addLoopCallback(LoopCallback& callback, bool thisIteration) { + if (runOnceCallbacks_ != nullptr && thisIteration) { + runOnceCallbacks_->push_back(callback); + } else { + loopCallbacks_.push_back(callback); + // If we're adding to loopCallbacks_ while the loop is running (but not + // inside runLoopCallbacks), we need to break out of eb_event_base_loop() + // so the callback can be processed promptly. Otherwise the loop may be + // blocked in epoll_wait and won't process the callback until it times out. + if (isRunning()) { + evb_->eb_event_base_loopbreak(); + } + } +} + void EventBase::runInLoop( LoopCallback* callback, bool thisIteration, @@ -858,22 +873,14 @@ void EventBase::runInLoop( dcheckIsInEventBaseThread(); callback->cancelLoopCallback(); callback->context_ = std::move(rctx); - if (runOnceCallbacks_ != nullptr && thisIteration) { - runOnceCallbacks_->push_back(*callback); - } else { - loopCallbacks_.push_back(*callback); - } + addLoopCallback(*callback, thisIteration); } void EventBase::runInLoop(Func cob, bool thisIteration) { dcheckIsInEventBaseThread(); auto wrapper = new FunctionLoopCallback(std::move(cob)); wrapper->context_ = RequestContext::saveContext(); - if (runOnceCallbacks_ != nullptr && thisIteration) { - runOnceCallbacks_->push_back(*wrapper); - } else { - loopCallbacks_.push_back(*wrapper); - } + addLoopCallback(*wrapper, thisIteration); } void EventBase::runOnDestruction(OnDestructionCallback& callback) { diff --git a/folly/io/async/EventBase.h b/folly/io/async/EventBase.h index 3e77ae951a3..e9be04c0df9 100644 --- a/folly/io/async/EventBase.h +++ b/folly/io/async/EventBase.h @@ -1047,6 +1047,10 @@ class EventBase LoopCallbackList& currentCallbacks, const LoopCallbacksDeadline& deadline); + // Helper to add a callback to the appropriate list (runOnceCallbacks_ or + // loopCallbacks_) and break out of epoll_wait if needed. + void addLoopCallback(LoopCallback& callback, bool thisIteration); + // executes any callbacks queued by runInLoop(); returns false if none found bool runLoopCallbacks();