libs/capy/include/boost/capy/io/any_read_source.hpp

82.7% Lines (62/75) 89.5% Functions (17/19) 60.0% Branches (9/15)
libs/capy/include/boost/capy/io/any_read_source.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11 #define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_param.hpp>
17 #include <boost/capy/concept/io_awaitable.hpp>
18 #include <boost/capy/concept/read_source.hpp>
19 #include <boost/capy/coro.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/io_result.hpp>
22 #include <boost/capy/task.hpp>
23
24 #include <concepts>
25 #include <coroutine>
26 #include <cstddef>
27 #include <new>
28 #include <span>
29 #include <stop_token>
30 #include <system_error>
31 #include <utility>
32
33 namespace boost {
34 namespace capy {
35
36 /** Type-erased wrapper for any ReadSource.
37
38 This class provides type erasure for any type satisfying the
39 @ref ReadSource concept, enabling runtime polymorphism for
40 source read operations. It uses cached awaitable storage to achieve
41 zero steady-state allocation after construction.
42
43 The wrapper supports two construction modes:
44 - **Owning**: Pass by value to transfer ownership. The wrapper
45 allocates storage and owns the source.
46 - **Reference**: Pass a pointer to wrap without ownership. The
47 pointed-to source must outlive this wrapper.
48
49 @par Awaitable Preallocation
50 The constructor preallocates storage for the type-erased awaitable.
51 This reserves all virtual address space at server startup
52 so memory usage can be measured up front, rather than
53 allocating piecemeal as traffic arrives.
54
55 @par Thread Safety
56 Not thread-safe. Concurrent operations on the same wrapper
57 are undefined behavior.
58
59 @par Example
60 @code
61 // Owning - takes ownership of the source
62 any_read_source rs(some_source{args...});
63
64 // Reference - wraps without ownership
65 some_source source;
66 any_read_source rs(&source);
67
68 mutable_buffer buf(data, size);
69 auto [ec, n] = co_await rs.read(std::span(&buf, 1));
70 @endcode
71
72 @see any_read_stream, ReadSource
73 */
74 class any_read_source
75 {
76 struct vtable;
77 struct awaitable_ops;
78
79 template<ReadSource S>
80 struct vtable_for_impl;
81
82 void* source_ = nullptr;
83 vtable const* vt_ = nullptr;
84 void* cached_awaitable_ = nullptr;
85 void* storage_ = nullptr;
86 awaitable_ops const* active_ops_ = nullptr;
87
88 public:
89 /** Destructor.
90
91 Destroys the owned source (if any) and releases the cached
92 awaitable storage.
93 */
94 ~any_read_source();
95
96 /** Default constructor.
97
98 Constructs an empty wrapper. Operations on a default-constructed
99 wrapper result in undefined behavior.
100 */
101 any_read_source() = default;
102
103 /** Non-copyable.
104
105 The awaitable cache is per-instance and cannot be shared.
106 */
107 any_read_source(any_read_source const&) = delete;
108 any_read_source& operator=(any_read_source const&) = delete;
109
110 /** Move constructor.
111
112 Transfers ownership of the wrapped source (if owned) and
113 cached awaitable storage from `other`. After the move, `other` is
114 in a default-constructed state.
115
116 @param other The wrapper to move from.
117 */
118 1 any_read_source(any_read_source&& other) noexcept
119 1 : source_(std::exchange(other.source_, nullptr))
120 1 , vt_(std::exchange(other.vt_, nullptr))
121 1 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
122 1 , storage_(std::exchange(other.storage_, nullptr))
123 1 , active_ops_(std::exchange(other.active_ops_, nullptr))
124 {
125 1 }
126
127 /** Move assignment operator.
128
129 Destroys any owned source and releases existing resources,
130 then transfers ownership from `other`.
131
132 @param other The wrapper to move from.
133 @return Reference to this wrapper.
134 */
135 any_read_source&
136 operator=(any_read_source&& other) noexcept;
137
138 /** Construct by taking ownership of a ReadSource.
139
140 Allocates storage and moves the source into this wrapper.
141 The wrapper owns the source and will destroy it.
142
143 @param s The source to take ownership of.
144 */
145 template<ReadSource S>
146 requires (!std::same_as<std::decay_t<S>, any_read_source>)
147 any_read_source(S s);
148
149 /** Construct by wrapping a ReadSource without ownership.
150
151 Wraps the given source by pointer. The source must remain
152 valid for the lifetime of this wrapper.
153
154 @param s Pointer to the source to wrap.
155 */
156 template<ReadSource S>
157 any_read_source(S* s);
158
159 /** Check if the wrapper contains a valid source.
160
161 @return `true` if wrapping a source, `false` if default-constructed
162 or moved-from.
163 */
164 bool
165 9 has_value() const noexcept
166 {
167 9 return source_ != nullptr;
168 }
169
170 /** Check if the wrapper contains a valid source.
171
172 @return `true` if wrapping a source, `false` if default-constructed
173 or moved-from.
174 */
175 explicit
176 2 operator bool() const noexcept
177 {
178 2 return has_value();
179 }
180
181 /** Initiate an asynchronous read operation.
182
183 Reads data into the provided buffer sequence. The operation
184 completes when the entire buffer sequence is filled, end-of-file
185 is reached, or an error occurs.
186
187 @param buffers The buffer sequence to read into. Passed by
188 value to ensure the sequence lives in the coroutine frame
189 across suspension points.
190
191 @return An awaitable yielding `(error_code,std::size_t)`.
192
193 @par Postconditions
194 Exactly one of the following is true on return:
195 @li **Success**: `!ec` and `n == buffer_size(buffers)`.
196 The entire buffer was filled.
197 @li **End-of-stream or Error**: `ec` and `n` indicates
198 the number of bytes transferred before the failure.
199
200 @par Preconditions
201 The wrapper must contain a valid source (`has_value() == true`).
202 */
203 template<MutableBufferSequence MB>
204 task<io_result<std::size_t>>
205 read(MB buffers);
206
207 protected:
208 /** Rebind to a new source after move.
209
210 Updates the internal pointer to reference a new source object.
211 Used by owning wrappers after move assignment when the owned
212 object has moved to a new location.
213
214 @param new_source The new source to bind to. Must be the same
215 type as the original source.
216
217 @note Terminates if called with a source of different type
218 than the original.
219 */
220 template<ReadSource S>
221 void
222 rebind(S& new_source) noexcept
223 {
224 if(vt_ != &vtable_for_impl<S>::value)
225 std::terminate();
226 source_ = &new_source;
227 }
228
229 private:
230 auto
231 read_some_(std::span<mutable_buffer const> buffers);
232 };
233
234 //----------------------------------------------------------
235
236 struct any_read_source::awaitable_ops
237 {
238 bool (*await_ready)(void*);
239 coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
240 io_result<std::size_t> (*await_resume)(void*);
241 void (*destroy)(void*) noexcept;
242 };
243
244 struct any_read_source::vtable
245 {
246 void (*destroy)(void*) noexcept;
247 std::size_t awaitable_size;
248 std::size_t awaitable_align;
249 awaitable_ops const* (*construct_awaitable)(
250 void* source,
251 void* storage,
252 std::span<mutable_buffer const> buffers);
253 };
254
255 template<ReadSource S>
256 struct any_read_source::vtable_for_impl
257 {
258 using Awaitable = decltype(std::declval<S&>().read(
259 std::span<mutable_buffer const>{}));
260
261 static void
262 do_destroy_impl(void* source) noexcept
263 {
264 static_cast<S*>(source)->~S();
265 }
266
267 static awaitable_ops const*
268 130 construct_awaitable_impl(
269 void* source,
270 void* storage,
271 std::span<mutable_buffer const> buffers)
272 {
273 130 auto& s = *static_cast<S*>(source);
274 130 ::new(storage) Awaitable(s.read(buffers));
275
276 static constexpr awaitable_ops ops = {
277 130 +[](void* p) {
278 130 return static_cast<Awaitable*>(p)->await_ready();
279 },
280 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
281 return detail::call_await_suspend(
282 static_cast<Awaitable*>(p), h, ex, token);
283 },
284 130 +[](void* p) {
285 130 return static_cast<Awaitable*>(p)->await_resume();
286 },
287 130 +[](void* p) noexcept {
288 130 static_cast<Awaitable*>(p)->~Awaitable();
289 }
290 };
291 130 return &ops;
292 }
293
294 static constexpr vtable value = {
295 &do_destroy_impl,
296 sizeof(Awaitable),
297 alignof(Awaitable),
298 &construct_awaitable_impl
299 };
300 };
301
302 //----------------------------------------------------------
303
304 inline
305 91 any_read_source::~any_read_source()
306 {
307
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 91 times.
91 if(storage_)
308 {
309 vt_->destroy(source_);
310 ::operator delete(storage_);
311 }
312
2/2
✓ Branch 0 taken 88 times.
✓ Branch 1 taken 3 times.
91 if(cached_awaitable_)
313 88 ::operator delete(cached_awaitable_);
314 91 }
315
316 inline any_read_source&
317 1 any_read_source::operator=(any_read_source&& other) noexcept
318 {
319
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if(this != &other)
320 {
321
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if(storage_)
322 {
323 vt_->destroy(source_);
324 ::operator delete(storage_);
325 }
326
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if(cached_awaitable_)
327 ::operator delete(cached_awaitable_);
328 1 source_ = std::exchange(other.source_, nullptr);
329 1 vt_ = std::exchange(other.vt_, nullptr);
330 1 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
331 1 storage_ = std::exchange(other.storage_, nullptr);
332 1 active_ops_ = std::exchange(other.active_ops_, nullptr);
333 }
334 1 return *this;
335 }
336
337 template<ReadSource S>
338 requires (!std::same_as<std::decay_t<S>, any_read_source>)
339 any_read_source::any_read_source(S s)
340 : vt_(&vtable_for_impl<S>::value)
341 {
342 struct guard {
343 any_read_source* self;
344 bool committed = false;
345 ~guard() {
346 if(!committed && self->storage_) {
347 self->vt_->destroy(self->source_);
348 ::operator delete(self->storage_);
349 self->storage_ = nullptr;
350 self->source_ = nullptr;
351 }
352 }
353 } g{this};
354
355 storage_ = ::operator new(sizeof(S));
356 source_ = ::new(storage_) S(std::move(s));
357
358 // Preallocate the awaitable storage
359 cached_awaitable_ = ::operator new(vt_->awaitable_size);
360
361 g.committed = true;
362 }
363
364 template<ReadSource S>
365 88 any_read_source::any_read_source(S* s)
366 88 : source_(s)
367 88 , vt_(&vtable_for_impl<S>::value)
368 {
369 // Preallocate the awaitable storage
370 88 cached_awaitable_ = ::operator new(vt_->awaitable_size);
371 88 }
372
373 //----------------------------------------------------------
374
375 inline auto
376 130 any_read_source::read_some_(std::span<mutable_buffer const> buffers)
377 {
378 struct awaitable
379 {
380 any_read_source* self_;
381 std::span<mutable_buffer const> buffers_;
382
383 bool
384 130 await_ready() const noexcept
385 {
386 130 return false;
387 }
388
389 coro
390 130 await_suspend(coro h, executor_ref ex, std::stop_token token)
391 {
392 // Construct the underlying awaitable into cached storage
393 260 self_->active_ops_ = self_->vt_->construct_awaitable(
394 130 self_->source_,
395 130 self_->cached_awaitable_,
396 buffers_);
397
398 // Check if underlying is immediately ready
399
1/2
✓ Branch 1 taken 130 times.
✗ Branch 2 not taken.
130 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
400 130 return h;
401
402 // Forward to underlying awaitable
403 return self_->active_ops_->await_suspend(
404 self_->cached_awaitable_, h, ex, token);
405 }
406
407 io_result<std::size_t>
408 130 await_resume()
409 {
410 struct guard {
411 any_read_source* self;
412 130 ~guard() {
413 130 self->active_ops_->destroy(self->cached_awaitable_);
414 130 self->active_ops_ = nullptr;
415 130 }
416 130 } g{self_};
417 130 return self_->active_ops_->await_resume(
418
1/1
✓ Branch 1 taken 99 times.
229 self_->cached_awaitable_);
419 130 }
420 };
421 130 return awaitable{this, buffers};
422 }
423
424 template<MutableBufferSequence MB>
425 task<io_result<std::size_t>>
426
1/1
✓ Branch 1 taken 106 times.
106 any_read_source::read(MB buffers)
427 {
428 buffer_param<MB> bp(std::move(buffers));
429 std::size_t total = 0;
430
431 for(;;)
432 {
433 auto bufs = bp.data();
434 if(bufs.empty())
435 break;
436
437 auto [ec, n] = co_await read_some_(bufs);
438 total += n;
439 if(ec)
440 co_return {ec, total};
441 bp.consume(n);
442 }
443
444 co_return {{}, total};
445 212 }
446
447 } // namespace capy
448 } // namespace boost
449
450 #endif
451