libs/capy/include/boost/capy/when_all.hpp

81.5% Lines (75/92) 7.5% Functions (24/319) 68.2% Branches (15/22)
libs/capy/include/boost/capy/when_all.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_WHEN_ALL_HPP
11 #define BOOST_CAPY_WHEN_ALL_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/concept/executor.hpp>
15 #include <boost/capy/concept/io_launchable_task.hpp>
16 #include <boost/capy/coro.hpp>
17 #include <boost/capy/ex/executor_ref.hpp>
18 #include <boost/capy/ex/frame_allocator.hpp>
19 #include <boost/capy/task.hpp>
20
21 #include <array>
22 #include <atomic>
23 #include <exception>
24 #include <optional>
25 #include <stop_token>
26 #include <tuple>
27 #include <type_traits>
28 #include <utility>
29
30 namespace boost {
31 namespace capy {
32
33 namespace detail {
34
35 /** Type trait to filter void types from a tuple.
36
37 Void-returning tasks do not contribute a value to the result tuple.
38 This trait computes the filtered result type.
39
40 Example: filter_void_tuple_t<int, void, string> = tuple<int, string>
41 */
42 template<typename T>
43 using wrap_non_void_t = std::conditional_t<std::is_void_v<T>, std::tuple<>, std::tuple<T>>;
44
45 template<typename... Ts>
46 using filter_void_tuple_t = decltype(std::tuple_cat(std::declval<wrap_non_void_t<Ts>>()...));
47
48 /** Holds the result of a single task within when_all.
49 */
50 template<typename T>
51 struct result_holder
52 {
53 std::optional<T> value_;
54
55 1 void set(T v)
56 {
57 1 value_ = std::move(v);
58 1 }
59
60 1 T get() &&
61 {
62 1 return std::move(*value_);
63 }
64 };
65
66 /** Specialization for void tasks - no value storage needed.
67 */
68 template<>
69 struct result_holder<void>
70 {
71 };
72
73 /** Shared state for when_all operation.
74
75 @tparam Ts The result types of the tasks.
76 */
77 template<typename... Ts>
78 struct when_all_state
79 {
80 static constexpr std::size_t task_count = sizeof...(Ts);
81
82 // Completion tracking - when_all waits for all children
83 std::atomic<std::size_t> remaining_count_;
84
85 // Result storage in input order
86 std::tuple<result_holder<Ts>...> results_;
87
88 // Runner handles - destroyed in await_resume while allocator is valid
89 std::array<coro, task_count> runner_handles_{};
90
91 // Exception storage - first error wins, others discarded
92 std::atomic<bool> has_exception_{false};
93 std::exception_ptr first_exception_;
94
95 // Stop propagation - on error, request stop for siblings
96 std::stop_source stop_source_;
97
98 // Connects parent's stop_token to our stop_source
99 struct stop_callback_fn
100 {
101 std::stop_source* source_;
102 void operator()() const { source_->request_stop(); }
103 };
104 using stop_callback_t = std::stop_callback<stop_callback_fn>;
105 std::optional<stop_callback_t> parent_stop_callback_;
106
107 // Parent resumption
108 coro continuation_;
109 executor_ref caller_ex_;
110
111 1 when_all_state()
112
1/1
✓ Branch 5 taken 1 time.
1 : remaining_count_(task_count)
113 {
114 1 }
115
116 1 ~when_all_state()
117 {
118
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
2 for(auto h : runner_handles_)
119
1/2
✓ Branch 1 taken 1 time.
✗ Branch 2 not taken.
1 if(h)
120 1 h.destroy();
121 1 }
122
123 /** Capture an exception (first one wins).
124 */
125 void capture_exception(std::exception_ptr ep)
126 {
127 bool expected = false;
128 if(has_exception_.compare_exchange_strong(
129 expected, true, std::memory_order_relaxed))
130 first_exception_ = ep;
131 }
132
133 /** Signal that a task has completed.
134
135 The last child to complete triggers resumption of the parent.
136 */
137 1 coro signal_completion()
138 {
139 1 auto remaining = remaining_count_.fetch_sub(1, std::memory_order_acq_rel);
140
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if(remaining == 1)
141 1 return caller_ex_.dispatch(continuation_);
142 return std::noop_coroutine();
143 }
144
145 };
146
147 /** Wrapper coroutine that intercepts task completion.
148
149 This runner awaits its assigned task and stores the result in
150 the shared state, or captures the exception and requests stop.
151 */
152 template<typename T, typename... Ts>
153 struct when_all_runner
154 {
155 struct promise_type // : frame_allocating_base // DISABLED FOR TESTING
156 {
157 when_all_state<Ts...>* state_ = nullptr;
158 executor_ref ex_;
159 std::stop_token stop_token_;
160
161 1 when_all_runner get_return_object()
162 {
163 1 return when_all_runner(std::coroutine_handle<promise_type>::from_promise(*this));
164 }
165
166 1 std::suspend_always initial_suspend() noexcept
167 {
168 1 return {};
169 }
170
171 1 auto final_suspend() noexcept
172 {
173 struct awaiter
174 {
175 promise_type* p_;
176
177 bool await_ready() const noexcept
178 {
179 return false;
180 }
181
182 coro await_suspend(coro) noexcept
183 {
184 // Signal completion; last task resumes parent
185 return p_->state_->signal_completion();
186 }
187
188 void await_resume() const noexcept
189 {
190 }
191 };
192 1 return awaiter{this};
193 }
194
195 1 void return_void()
196 {
197 1 }
198
199 void unhandled_exception()
200 {
201 state_->capture_exception(std::current_exception());
202 // Request stop for sibling tasks
203 state_->stop_source_.request_stop();
204 }
205
206 template<class Awaitable>
207 struct transform_awaiter
208 {
209 std::decay_t<Awaitable> a_;
210 promise_type* p_;
211
212 1 bool await_ready()
213 {
214 1 return a_.await_ready();
215 }
216
217 1 decltype(auto) await_resume()
218 {
219 1 return a_.await_resume();
220 }
221
222 template<class Promise>
223 1 auto await_suspend(std::coroutine_handle<Promise> h)
224 {
225
1/1
✓ Branch 3 taken 1 time.
1 return a_.await_suspend(h, p_->ex_, p_->stop_token_);
226 }
227 };
228
229 template<class Awaitable>
230 1 auto await_transform(Awaitable&& a)
231 {
232 using A = std::decay_t<Awaitable>;
233 if constexpr (IoAwaitable<A>)
234 {
235 return transform_awaiter<Awaitable>{
236 2 std::forward<Awaitable>(a), this};
237 }
238 else
239 {
240 static_assert(sizeof(A) == 0, "requires IoAwaitable");
241 }
242 1 }
243 };
244
245 std::coroutine_handle<promise_type> h_;
246
247 1 explicit when_all_runner(std::coroutine_handle<promise_type> h)
248 1 : h_(h)
249 {
250 1 }
251
252 // Enable move for all clang versions - some versions need it
253 when_all_runner(when_all_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
254
255 // Non-copyable
256 when_all_runner(when_all_runner const&) = delete;
257 when_all_runner& operator=(when_all_runner const&) = delete;
258 when_all_runner& operator=(when_all_runner&&) = delete;
259
260 1 auto release() noexcept
261 {
262 1 return std::exchange(h_, nullptr);
263 }
264 };
265
266 /** Create a runner coroutine for a single task.
267
268 Task is passed directly to ensure proper coroutine frame storage.
269 */
270 template<std::size_t Index, typename T, typename... Ts>
271 when_all_runner<T, Ts...>
272
1/1
✓ Branch 1 taken 1 time.
1 make_when_all_runner(task<T> inner, when_all_state<Ts...>* state)
273 {
274 if constexpr (std::is_void_v<T>)
275 {
276 co_await std::move(inner);
277 }
278 else
279 {
280 std::get<Index>(state->results_).set(co_await std::move(inner));
281 }
282 2 }
283
284 /** Internal awaitable that launches all runner coroutines and waits.
285
286 This awaitable is used inside the when_all coroutine to handle
287 the concurrent execution of child tasks.
288 */
289 template<typename... Ts>
290 class when_all_launcher
291 {
292 std::tuple<task<Ts>...>* tasks_;
293 when_all_state<Ts...>* state_;
294
295 public:
296 1 when_all_launcher(
297 std::tuple<task<Ts>...>* tasks,
298 when_all_state<Ts...>* state)
299 1 : tasks_(tasks)
300 1 , state_(state)
301 {
302 1 }
303
304 1 bool await_ready() const noexcept
305 {
306 1 return sizeof...(Ts) == 0;
307 }
308
309 1 coro await_suspend(coro continuation, executor_ref caller_ex, std::stop_token parent_token = {})
310 {
311 1 state_->continuation_ = continuation;
312 1 state_->caller_ex_ = caller_ex;
313
314 // Forward parent's stop requests to children
315
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if(parent_token.stop_possible())
316 {
317 state_->parent_stop_callback_.emplace(
318 parent_token,
319 typename when_all_state<Ts...>::stop_callback_fn{&state_->stop_source_});
320
321 if(parent_token.stop_requested())
322 state_->stop_source_.request_stop();
323 }
324
325 // Launch all tasks concurrently
326 1 auto token = state_->stop_source_.get_token();
327 [&]<std::size_t... Is>(std::index_sequence<Is...>) {
328 (..., launch_one<Is>(caller_ex, token));
329
1/1
✓ Branch 1 taken 1 time.
1 }(std::index_sequence_for<Ts...>{});
330
331 // Let signal_completion() handle resumption
332 2 return std::noop_coroutine();
333 1 }
334
335 1 void await_resume() const noexcept
336 {
337 // Results are extracted by the when_all coroutine from state
338 1 }
339
340 private:
341 template<std::size_t I>
342 1 void launch_one(executor_ref caller_ex, std::stop_token token)
343 {
344
1/1
✓ Branch 2 taken 1 time.
1 auto runner = make_when_all_runner<I>(
345 1 std::move(std::get<I>(*tasks_)), state_);
346
347 1 auto h = runner.release();
348 1 h.promise().state_ = state_;
349 1 h.promise().ex_ = caller_ex;
350 1 h.promise().stop_token_ = token;
351
352 1 coro ch{h};
353 1 state_->runner_handles_[I] = ch;
354
2/2
✓ Branch 1 taken 1 time.
✓ Branch 4 taken 1 time.
1 state_->caller_ex_.dispatch(ch).resume();
355 1 }
356 };
357
358 /** Compute the result type for when_all.
359
360 Returns void when all tasks are void (P2300 aligned),
361 otherwise returns a tuple with void types filtered out.
362 */
363 template<typename... Ts>
364 using when_all_result_t = std::conditional_t<
365 std::is_same_v<filter_void_tuple_t<Ts...>, std::tuple<>>,
366 void,
367 filter_void_tuple_t<Ts...>>;
368
369 /** Helper to extract a single result, returning empty tuple for void.
370 This is a separate function to work around a GCC-11 ICE that occurs
371 when using nested immediately-invoked lambdas with pack expansion.
372 */
373 template<std::size_t I, typename... Ts>
374 1 auto extract_single_result(when_all_state<Ts...>& state)
375 {
376 using T = std::tuple_element_t<I, std::tuple<Ts...>>;
377 if constexpr (std::is_void_v<T>)
378 return std::tuple<>();
379 else
380
1/1
✓ Branch 4 taken 1 time.
1 return std::make_tuple(std::move(std::get<I>(state.results_)).get());
381 }
382
383 /** Extract results from state, filtering void types.
384 */
385 template<typename... Ts>
386 1 auto extract_results(when_all_state<Ts...>& state)
387 {
388 1 return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
389 return std::tuple_cat(extract_single_result<Is>(state)...);
390
1/1
✓ Branch 1 taken 1 time.
2 }(std::index_sequence_for<Ts...>{});
391 }
392
393 } // namespace detail
394
395 /** Execute multiple tasks concurrently and collect their results.
396
397 Launches all tasks simultaneously and waits for all to complete
398 before returning. Results are collected in input order. If any
399 task throws, cancellation is requested for siblings and the first
400 exception is rethrown after all tasks complete.
401
402 @li All child tasks run concurrently on the caller's executor
403 @li Results are returned as a tuple in input order
404 @li Void-returning tasks do not contribute to the result tuple
405 @li If all tasks return void, `when_all` returns `task<void>`
406 @li First exception wins; subsequent exceptions are discarded
407 @li Stop is requested for siblings on first error
408 @li Completes only after all children have finished
409
410 @par Thread Safety
411 The returned task must be awaited from a single execution context.
412 Child tasks execute concurrently but complete through the caller's
413 executor.
414
415 @param tasks The tasks to execute concurrently. Each task is
416 consumed (moved-from) when `when_all` is awaited.
417
418 @return A task yielding a tuple of non-void results. Returns
419 `task<void>` when all input tasks return void.
420
421 @par Example
422
423 @code
424 task<> example()
425 {
426 // Concurrent fetch, results collected in order
427 auto [user, posts] = co_await when_all(
428 fetch_user( id ), // task<User>
429 fetch_posts( id ) // task<std::vector<Post>>
430 );
431
432 // Void tasks don't contribute to result
433 co_await when_all(
434 log_event( "start" ), // task<void>
435 notify_user( id ) // task<void>
436 );
437 // Returns task<void>, no result tuple
438 }
439 @endcode
440
441 @see task
442 */
443 template<typename... Ts>
444 [[nodiscard]] task<detail::when_all_result_t<Ts...>>
445
1/1
✓ Branch 1 taken 1 time.
1 when_all(task<Ts>... tasks)
446 {
447 using result_type = detail::when_all_result_t<Ts...>;
448
449 // State is stored in the coroutine frame, using the frame allocator
450 detail::when_all_state<Ts...> state;
451
452 // Store tasks in the frame
453 std::tuple<task<Ts>...> task_tuple(std::move(tasks)...);
454
455 // Launch all tasks and wait for completion
456 co_await detail::when_all_launcher<Ts...>(&task_tuple, &state);
457
458 // Propagate first exception if any.
459 // Safe without explicit acquire: capture_exception() is sequenced-before
460 // signal_completion()'s acq_rel fetch_sub, which synchronizes-with the
461 // last task's decrement that resumes this coroutine.
462 if(state.first_exception_)
463 std::rethrow_exception(state.first_exception_);
464
465 // Extract and return results
466 if constexpr (std::is_void_v<result_type>)
467 co_return;
468 else
469 co_return detail::extract_results(state);
470 2 }
471
472 /// Compute the result type of `when_all` for the given task types.
473 template<typename... Ts>
474 using when_all_result_type = detail::when_all_result_t<Ts...>;
475
476 } // namespace capy
477 } // namespace boost
478
479 #endif
480