Skip to content

Commit 273c322

Browse files
Janosch MachowinskiJanosch Machowinski
authored andcommitted
feat: Added callback group events executor
This commit adds the callback group events executor. It features: - multithreading support - correct handling of sim time - usage of the events subsystem
1 parent b6e9b4c commit 273c322

File tree

11 files changed

+2664
-0
lines changed

11 files changed

+2664
-0
lines changed

rclcpp/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ set(${PROJECT_NAME}_SRCS
7272
src/rclcpp/executors/single_threaded_executor.cpp
7373
src/rclcpp/expand_topic_or_service_name.cpp
7474
src/rclcpp/experimental/executors/events_executor/events_executor.cpp
75+
src/rclcpp/experimental/executors/events_cbg_executor/events_cbg_executor.cpp
76+
src/rclcpp/experimental/executors/events_cbg_executor/first_in_first_out_scheduler.cpp
77+
src/rclcpp/experimental/executors/events_cbg_executor/global_event_id_provider.cpp
7578
src/rclcpp/experimental/timers_manager.cpp
7679
src/rclcpp/future_return_code.cpp
7780
src/rclcpp/generic_client.cpp
Lines changed: 320 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,320 @@
1+
// Copyright 2024 Cellumation GmbH.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#pragma once
16+
17+
#include <chrono>
18+
#include <memory>
19+
#include <mutex>
20+
#include <thread>
21+
#include <vector>
22+
23+
#include "rclcpp/executor.hpp"
24+
#include "rclcpp/macros.hpp"
25+
#include "rclcpp/visibility_control.hpp"
26+
27+
namespace rclcpp
28+
{
29+
namespace executors
30+
{
31+
32+
class TimerManager;
33+
struct RegisteredEntityCache;
34+
class CBGScheduler;
35+
struct GlobalWeakExecutableCache;
36+
37+
class EventsCBGExecutor : public rclcpp::Executor
38+
{
39+
public:
40+
RCLCPP_SMART_PTR_DEFINITIONS(EventsCBGExecutor)
41+
42+
/**
43+
* For the yield_before_execute option, when true std::this_thread::yield()
44+
* will be called after acquiring work (as an AnyExecutable) and
45+
* releasing the spinning lock, but before executing the work.
46+
* This is useful for reproducing some bugs related to taking work more than
47+
* once.
48+
*
49+
* \param options common options for all executors
50+
* \param number_of_threads number of threads to have in the thread pool,
51+
* the default 0 will use the number of cpu cores found (minimum of 2)
52+
* \param yield_before_execute if true std::this_thread::yield() is called
53+
* \param timeout maximum time to wait
54+
*/
55+
RCLCPP_PUBLIC
56+
explicit EventsCBGExecutor(
57+
const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions(),
58+
size_t number_of_threads = 0,
59+
std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
60+
61+
RCLCPP_PUBLIC
62+
virtual ~EventsCBGExecutor();
63+
64+
RCLCPP_PUBLIC
65+
void
66+
add_callback_group(
67+
const rclcpp::CallbackGroup::SharedPtr & group_ptr,
68+
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr & node_ptr,
69+
bool notify = true) override;
70+
71+
RCLCPP_PUBLIC
72+
std::vector<rclcpp::CallbackGroup::WeakPtr>
73+
get_all_callback_groups() override;
74+
75+
RCLCPP_PUBLIC
76+
void
77+
remove_callback_group(
78+
const rclcpp::CallbackGroup::SharedPtr & group_ptr,
79+
bool notify = true) override;
80+
81+
RCLCPP_PUBLIC
82+
void
83+
add_node(
84+
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr & node_ptr,
85+
bool notify = true) override;
86+
87+
/// Convenience function which takes Node and forwards NodeBaseInterface.
88+
/**
89+
* \see rclcpp::Executor::add_node
90+
*/
91+
RCLCPP_PUBLIC
92+
void
93+
add_node(const std::shared_ptr<rclcpp::Node> & node_ptr, bool notify = true) override;
94+
95+
RCLCPP_PUBLIC
96+
void
97+
remove_node(
98+
const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr & node_ptr,
99+
bool notify = true) override;
100+
101+
/// Convenience function which takes Node and forwards NodeBaseInterface.
102+
/**
103+
* \see rclcpp::Executor::remove_node
104+
*/
105+
RCLCPP_PUBLIC
106+
void
107+
remove_node(const std::shared_ptr<rclcpp::Node> & node_ptr, bool notify = true) override;
108+
109+
110+
// add a callback group to the executor, not bound to any node
111+
void add_callback_group_only(const rclcpp::CallbackGroup::SharedPtr & group_ptr);
112+
113+
/**
114+
* \sa rclcpp::Executor:spin() for more details
115+
* \throws std::runtime_error when spin() called while already spinning
116+
*/
117+
RCLCPP_PUBLIC
118+
void
119+
spin() override;
120+
121+
/**
122+
* \sa rclcpp::Executor:spin() for more details
123+
* \throws std::runtime_error when spin() called while already spinning
124+
* @param exception_handler will be called for every exception in the processing threads
125+
*
126+
* The exception_handler can be called from multiple threads at the same time.
127+
* The exception_handler shall rethrow the exception it if wants to terminate the program.
128+
*/
129+
RCLCPP_PUBLIC
130+
void
131+
spin(const std::function<void(const std::exception &)> & exception_handler);
132+
133+
void
134+
spin_once(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)) override;
135+
136+
RCLCPP_PUBLIC
137+
void
138+
spin_some(std::chrono::nanoseconds max_duration = std::chrono::nanoseconds(0)) override;
139+
140+
/**
141+
* @return true if work was available and executed
142+
*/
143+
bool collect_and_execute_ready_events(
144+
std::chrono::nanoseconds max_duration,
145+
bool recollect_if_no_work_available);
146+
147+
void
148+
spin_all(std::chrono::nanoseconds max_duration) override;
149+
150+
151+
/// Cancel any running spin* function, causing it to return.
152+
/**
153+
* This function can be called asynchonously from any thread.
154+
* \throws std::runtime_error if there is an issue triggering the guard condition
155+
*/
156+
RCLCPP_PUBLIC
157+
void
158+
cancel() override;
159+
160+
RCLCPP_PUBLIC
161+
size_t
162+
get_number_of_threads() const;
163+
164+
bool
165+
is_spinning()
166+
{
167+
return spinning;
168+
}
169+
170+
template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
171+
FutureReturnCode
172+
spin_until_future_complete(
173+
const FutureT & future,
174+
std::chrono::duration<TimeRepT, TimeT> timeout = std::chrono::duration<TimeRepT, TimeT>(-1))
175+
{
176+
// TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
177+
// inside a callback executed by an executor.
178+
179+
// Check the future before entering the while loop.
180+
// If the future is already complete, don't try to spin.
181+
std::future_status status = future.wait_for(std::chrono::seconds(0));
182+
if (status == std::future_status::ready) {
183+
return FutureReturnCode::SUCCESS;
184+
}
185+
186+
auto end_time = std::chrono::steady_clock::now();
187+
std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
188+
timeout);
189+
if (timeout_ns > std::chrono::nanoseconds::zero()) {
190+
end_time += timeout_ns;
191+
}
192+
std::chrono::nanoseconds timeout_left = timeout_ns;
193+
194+
if (spinning.exchange(true)) {
195+
throw std::runtime_error("spin_until_future_complete() called while already spinning");
196+
}
197+
RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); );
198+
while (rclcpp::ok(this->context_) && spinning.load()) {
199+
// Do one item of work.
200+
spin_once_internal(timeout_left);
201+
202+
// Check if the future is set, return SUCCESS if it is.
203+
status = future.wait_for(std::chrono::seconds(0));
204+
if (status == std::future_status::ready) {
205+
return FutureReturnCode::SUCCESS;
206+
}
207+
// If the original timeout is < 0, then this is blocking, never TIMEOUT.
208+
if (timeout_ns < std::chrono::nanoseconds::zero()) {
209+
continue;
210+
}
211+
// Otherwise check if we still have time to wait, return TIMEOUT if not.
212+
auto now = std::chrono::steady_clock::now();
213+
if (now >= end_time) {
214+
return FutureReturnCode::TIMEOUT;
215+
}
216+
// Subtract the elapsed time from the original timeout.
217+
timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
218+
}
219+
220+
// The future did not complete before ok() returned false, return INTERRUPTED.
221+
return FutureReturnCode::INTERRUPTED;
222+
}
223+
224+
/// We need these fuction to be public, as we use them in the callback_group_scheduler
225+
using rclcpp::Executor::execute_subscription;
226+
using rclcpp::Executor::execute_timer;
227+
using rclcpp::Executor::execute_service;
228+
using rclcpp::Executor::execute_client;
229+
230+
protected:
231+
RCLCPP_PUBLIC
232+
void
233+
run(size_t this_thread_number, bool blockInitially);
234+
235+
void
236+
run(
237+
size_t this_thread_number,
238+
const std::function<void(const std::exception &)> & exception_handler);
239+
240+
/**
241+
* Te be called in termination case. E.g. destructor of shutdown callback.
242+
* Stops the scheduler and cleans up the internal data structures.
243+
*/
244+
void shutdown();
245+
246+
std::unique_ptr<CBGScheduler> scheduler;
247+
248+
std::thread rcl_polling_thread;
249+
250+
struct CallbackGroupData
251+
{
252+
CallbackGroup::WeakPtr callback_group;
253+
254+
std::unique_ptr<RegisteredEntityCache> registered_entities;
255+
};
256+
257+
void set_callbacks(CallbackGroupData & cgd);
258+
259+
/**
260+
* This function will execute all available executables,
261+
* that were ready, before this function was called.
262+
*/
263+
bool execute_previous_ready_executables_until(
264+
const std::chrono::time_point<std::chrono::steady_clock> & stop_time);
265+
266+
void unregister_event_callbacks(const rclcpp::CallbackGroup::SharedPtr & cbg) const;
267+
268+
private:
269+
void remove_all_nodes_and_callback_groups();
270+
271+
void sync_callback_groups();
272+
273+
void spin_once_internal(std::chrono::nanoseconds timeout);
274+
275+
RCLCPP_DISABLE_COPY(EventsCBGExecutor)
276+
277+
std::mutex added_callback_groups_mutex_;
278+
std::vector<rclcpp::CallbackGroup::WeakPtr> added_callback_groups;
279+
280+
std::mutex added_nodes_mutex_;
281+
std::vector<node_interfaces::NodeBaseInterface::WeakPtr> added_nodes;
282+
283+
std::mutex callback_groups_mutex;
284+
285+
std::vector<CallbackGroupData> callback_groups;
286+
287+
// std::mutex wait_mutex_;
288+
size_t number_of_threads_;
289+
290+
std::chrono::nanoseconds next_exec_timeout_;
291+
292+
std::atomic_bool needs_callback_group_resync = false;
293+
294+
/// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
295+
std::atomic_bool spinning;
296+
297+
/// Guard condition for signaling the rmw layer to wake up for special events.
298+
std::shared_ptr<rclcpp::GuardCondition> interrupt_guard_condition_;
299+
300+
/// Guard condition for signaling the rmw layer to wake up for system shutdown.
301+
std::shared_ptr<rclcpp::GuardCondition> shutdown_guard_condition_;
302+
303+
/// shutdown callback handle registered to Context
304+
rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_;
305+
306+
/// The context associated with this executor.
307+
std::shared_ptr<rclcpp::Context> context_;
308+
309+
std::unique_ptr<TimerManager> timer_manager;
310+
311+
/// Stores the executables for the internal guard conditions
312+
/// e.g. interrupt_guard_condition_ and shutdown_guard_condition_
313+
std::unique_ptr<GlobalWeakExecutableCache> global_executable_cache;
314+
315+
/// Stores the executables for guard conditions of the nodes
316+
std::unique_ptr<GlobalWeakExecutableCache> nodes_executable_cache;
317+
};
318+
319+
} // namespace executors
320+
} // namespace rclcpp

0 commit comments

Comments
 (0)