Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions src/agnocast_components/src/agnocast_component_container_cie.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class ComponentManagerCallbackIsolated : public rclcpp_components::ComponentMana
struct ExecutorWrapper
{
explicit ExecutorWrapper(std::shared_ptr<rclcpp::Executor> executor)
: executor_(std::move(executor)), thread_initialized_(false)
: executor_(std::move(executor))
{
}

Expand All @@ -37,7 +37,6 @@ class ComponentManagerCallbackIsolated : public rclcpp_components::ComponentMana

std::shared_ptr<rclcpp::Executor> executor_;
std::thread thread_;
std::atomic_bool thread_initialized_;
};

public:
Expand All @@ -56,6 +55,7 @@ class ComponentManagerCallbackIsolated : public rclcpp_components::ComponentMana

monitor_callback_group_ =
this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
// NOLINTNEXTLINE(cppcoreguidelines-prefer-member-initializer)
monitor_timer_ = this->create_wall_timer(
std::chrono::milliseconds(monitor_polling_interval_ms_),
[this]() { check_for_new_callback_groups(); }, monitor_callback_group_);
Expand Down Expand Up @@ -182,7 +182,6 @@ void ComponentManagerCallbackIsolated::start_executor_for_callback_group(
agnocast::publish_callback_group_info(this->client_publisher_, tid, group_id);
}

executor_wrapper.thread_initialized_ = true;
executor_wrapper.executor_->spin();
});
}
Expand Down Expand Up @@ -217,10 +216,11 @@ void ComponentManagerCallbackIsolated::check_for_new_callback_groups()
{
std::lock_guard<std::mutex> lock{executor_wrappers_mutex_};
for (auto & [node_id, node_wrapper] : node_wrappers_) {
const auto nid = node_id;
auto node = node_wrapper.get_node_base_interface();

node->for_each_callback_group(
[node_id, &node, this](const rclcpp::CallbackGroup::SharedPtr & callback_group) {
[nid, &node, this](const rclcpp::CallbackGroup::SharedPtr & callback_group) {
if (!callback_group->automatically_add_to_executor_with_node()) {
return;
}
Expand All @@ -234,7 +234,7 @@ void ComponentManagerCallbackIsolated::check_for_new_callback_groups()
return;
}

start_executor_for_callback_group(node_id, callback_group, node);
start_executor_for_callback_group(nid, callback_group, node);
});
}
}
Expand All @@ -256,12 +256,18 @@ void ComponentManagerCallbackIsolated::remove_node_from_executor(uint64_t node_i

void ComponentManagerCallbackIsolated::cancel_executor(ExecutorWrapper & executor_wrapper)
{
if (!executor_wrapper.thread_initialized_) {
auto context = this->get_node_base_interface()->get_context();
if (!executor_wrapper.thread_.joinable()) {
return;
}

while (!executor_wrapper.executor_->is_spinning() && rclcpp::ok(context)) {
rclcpp::sleep_for(std::chrono::milliseconds(1));
}
// Always wait for is_spinning() before cancel(). Do not use a separate
// "thread_initialized" flag as a fast-path: such a flag is set before spin()
// sets spinning=true, creating a window where cancel() has no effect and the
// subsequent spin() runs indefinitely, blocking join() forever.
auto context = this->get_node_base_interface()->get_context();

while (!executor_wrapper.executor_->is_spinning() && rclcpp::ok(context)) {
rclcpp::sleep_for(std::chrono::milliseconds(1));
}

executor_wrapper.executor_->cancel();
Expand Down
Loading