diff --git a/heron/common/src/cpp/network/event_loop_impl.cpp b/heron/common/src/cpp/network/event_loop_impl.cpp index c37fc7ef0ad..24859055958 100644 --- a/heron/common/src/cpp/network/event_loop_impl.cpp +++ b/heron/common/src/cpp/network/event_loop_impl.cpp @@ -37,6 +37,12 @@ void EventLoopImpl::eventLoopImplWriteCallback(sp_int32 fd, sp_int16 event, void el->handleWriteCallback(fd, event); } +// 'C' style callback for libevent on signal events +void EventLoopImpl::eventLoopImplSignalCallback(sp_int32 sig, sp_int16 event, void* arg) { + auto* el = reinterpret_cast(arg); + el->handleSignalCallback(sig, event); +} + // 'C' style callback for libevent on timer events void EventLoopImpl::eventLoopImplTimerCallback(sp_int32, sp_int16 event, void* arg) { // TODO(vikasr): this needs to change to VCallback @@ -83,6 +89,40 @@ void EventLoopImpl::loop() { int EventLoopImpl::loopExit() { return event_base_loopbreak(mDispatcher); } +int EventLoopImpl::registerSignal(int sig, VCallback cb) { + // Create the appropriate structures and init them. + auto* event = new SS_RegisteredEvent(sig, false, std::move(cb), -1); + evsignal_set(event->event(), sig, &EventLoopImpl::eventLoopImplSignalCallback, this); + if (event_base_set(mDispatcher, event->event()) < 0) { + delete event; + throw heron::error::Error_Exception(errno); + } + + // Now add it to the list of signals monitored by the mDispatcher + if (event_add(event->event(), NULL) < 0) { + delete event; + throw heron::error::Error_Exception(errno); + } + mSignalEvents[sig] = event; + return 0; +} + +int EventLoopImpl::unRegisterSignal(int sig) { + if (mSignalEvents.find(sig) == mSignalEvents.end()) { + // This signal wasn't registed. Hence we can't unregister it. + return -1; + } + + // Delete the underlying event in libevent + if (event_del(mSignalEvents[sig]->event()) != 0) { + throw heron::error::Error_Exception(errno); + } + auto event = mSignalEvents[sig]; + mSignalEvents.erase(sig); + delete event; + return 0; +} + int EventLoopImpl::registerForRead(int fd, VCallback cb, bool persistent) { return registerForRead(fd, std::move(cb), persistent, -1); } @@ -136,8 +176,9 @@ int EventLoopImpl::unRegisterForRead(int fd) { // cout << "event_del failed for fd " << fd; throw heron::error::Error_Exception(errno); } - delete mReadEvents[fd]; + auto event = mReadEvents[fd]; mReadEvents.erase(fd); + delete event; return 0; } @@ -195,8 +236,9 @@ int EventLoopImpl::unRegisterForWrite(int fd) { // cout << "event_del failed for fd " << fd; throw heron::error::Error_Exception(errno); } - delete mWriteEvents[fd]; + auto event = mWriteEvents[fd]; mWriteEvents.erase(fd); + delete event; return 0; } @@ -251,8 +293,9 @@ sp_int32 EventLoopImpl::unRegisterTimer(sp_int64 timerId) { // cout << "event_del failed for timer " << timerId; throw heron::error::Error_Exception(errno); } - delete mTimerEvents[timerId]; + auto event = mTimerEvents[timerId]; mTimerEvents.erase(timerId); + delete event; return 0; } @@ -373,6 +416,16 @@ void EventLoopImpl::handleTimerCallback(sp_int16 event, sp_int64 timerId) { } } +void EventLoopImpl::handleSignalCallback(sp_int32 sig, sp_int16 event) { + if (mSignalEvents.find(sig) == mSignalEvents.end()) { + // This is possible when unRegisterSignal has been called before we handle this event + // Just ignore this event. + return; + } + auto* registeredEvent = mSignalEvents[sig]; + registeredEvent->get_callback()(mapStatusCode(event)); +} + EventLoop::Status EventLoopImpl::mapStatusCode(sp_int16 event) { switch (event) { case EV_READ: diff --git a/heron/common/src/cpp/network/event_loop_impl.h b/heron/common/src/cpp/network/event_loop_impl.h index c807ff39ce3..b85406e240c 100644 --- a/heron/common/src/cpp/network/event_loop_impl.h +++ b/heron/common/src/cpp/network/event_loop_impl.h @@ -44,6 +44,8 @@ class EventLoopImpl : public EventLoop { // Methods inherited from EventLoop. virtual void loop(); virtual sp_int32 loopExit(); + virtual sp_int32 registerSignal(sp_int32 sig, VCallback cb); + virtual sp_int32 unRegisterSignal(sp_int32 sig); virtual sp_int32 registerForRead(sp_int32 fd, VCallback cb, bool persistent, sp_int64 timeoutMicroSecs); virtual sp_int32 registerForRead(sp_int32 fd, VCallback cb, bool persistent); @@ -63,6 +65,7 @@ class EventLoopImpl : public EventLoop { // Static member functions to interact with C libevent API static void eventLoopImplReadCallback(sp_int32 fd, sp_int16 event, void* arg); static void eventLoopImplWriteCallback(sp_int32 fd, sp_int16 event, void* arg); + static void eventLoopImplSignalCallback(sp_int32 sig, sp_int16 event, void* arg); static void eventLoopImplTimerCallback(sp_int32, sp_int16 event, void* arg); private: @@ -81,6 +84,9 @@ class EventLoopImpl : public EventLoop { // libevent callback on timer events. void handleTimerCallback(sp_int16 event, sp_int64 timerid); + // libevent callback on signal events. + void handleSignalCallback(sp_int32 sig, sp_int16 event); + // The underlying dispatcher that we wrap around. struct event_base* mDispatcher; @@ -93,6 +99,9 @@ class EventLoopImpl : public EventLoop { // The registered timers. std::unordered_map*> mTimerEvents; + // The registered signals. + std::unordered_map*> mSignalEvents; + // The registered instant callbacks typedef std::list> OrderedCallbackList; OrderedCallbackList mListInstantCallbacks; diff --git a/heron/stmgr/src/cpp/server/stmgr-main.cpp b/heron/stmgr/src/cpp/server/stmgr-main.cpp index 371be9db9cc..c6e36a68887 100644 --- a/heron/stmgr/src/cpp/server/stmgr-main.cpp +++ b/heron/stmgr/src/cpp/server/stmgr-main.cpp @@ -48,6 +48,12 @@ DEFINE_string(ckptmgr_id, "", "The id of the local ckptmgr"); DEFINE_int32(ckptmgr_port, 0, "The port of the local ckptmgr"); DEFINE_string(metricscachemgr_mode, "disabled", "MetricsCacheMgr mode, default `disabled`"); +EventLoopImpl ss; + +void sigtermHandler(int signum) { + ss.loopExit(); +} + int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); @@ -56,8 +62,6 @@ int main(int argc, char* argv[]) { } std::vector instances = StrUtils::split(FLAGS_instance_ids, ","); - EventLoopImpl ss; - // Read heron internals config from local file // Create the heron-internals-config-reader to read the heron internals config heron::config::HeronInternalsConfigReader::Create(&ss, @@ -85,6 +89,7 @@ int main(int argc, char* argv[]) { FLAGS_shell_port, FLAGS_ckptmgr_port, FLAGS_ckptmgr_id, high_watermark, low_watermark, FLAGS_metricscachemgr_mode); mgr.Init(); + ss.registerSignal(SIGTERM, &sigtermHandler); ss.loop(); return 0; }