libs/capy/include/boost/capy/io/any_buffer_source.hpp

83.5% Lines (66/79) 89.5% Functions (17/19) 57.1% Branches (8/14)
libs/capy/include/boost/capy/io/any_buffer_source.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
11 #define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_copy.hpp>
17 #include <boost/capy/buffers/slice.hpp>
18 #include <boost/capy/concept/buffer_source.hpp>
19 #include <boost/capy/concept/io_awaitable.hpp>
20 #include <boost/capy/concept/read_source.hpp>
21 #include <boost/capy/coro.hpp>
22 #include <boost/capy/error.hpp>
23 #include <boost/capy/ex/executor_ref.hpp>
24 #include <boost/capy/io_result.hpp>
25 #include <boost/capy/task.hpp>
26
27 #include <concepts>
28 #include <coroutine>
29 #include <cstddef>
30 #include <exception>
31 #include <new>
32 #include <span>
33 #include <stop_token>
34 #include <system_error>
35 #include <utility>
36
37 namespace boost {
38 namespace capy {
39
40 /** Type-erased wrapper for any BufferSource.
41
42 This class provides type erasure for any type satisfying the
43 @ref BufferSource concept, enabling runtime polymorphism for
44 buffer pull operations. The wrapper also satisfies @ref ReadSource,
45 allowing it to be used with code expecting either interface.
46 It uses cached awaitable storage to achieve zero steady-state
47 allocation after construction.
48
49 The wrapper also satisfies @ref ReadSource through the templated
50 @ref read method. This method copies data from the source's
51 internal buffers into the caller's buffers, incurring one extra
52 buffer copy compared to using @ref pull and @ref consume directly.
53
54 The wrapper supports two construction modes:
55 - **Owning**: Pass by value to transfer ownership. The wrapper
56 allocates storage and owns the source.
57 - **Reference**: Pass a pointer to wrap without ownership. The
58 pointed-to source must outlive this wrapper.
59
60 @par Awaitable Preallocation
61 The constructor preallocates storage for the type-erased awaitable.
62 This reserves all virtual address space at server startup
63 so memory usage can be measured up front, rather than
64 allocating piecemeal as traffic arrives.
65
66 @par Thread Safety
67 Not thread-safe. Concurrent operations on the same wrapper
68 are undefined behavior.
69
70 @par Example
71 @code
72 // Owning - takes ownership of the source
73 any_buffer_source abs(some_buffer_source{args...});
74
75 // Reference - wraps without ownership
76 some_buffer_source src;
77 any_buffer_source abs(&src);
78
79 const_buffer arr[16];
80 auto [ec, bufs] = co_await abs.pull(arr);
81 @endcode
82
83 @see any_buffer_sink, BufferSource, ReadSource
84 */
85 class any_buffer_source
86 {
87 struct vtable;
88 struct awaitable_ops;
89
90 template<BufferSource S>
91 struct vtable_for_impl;
92
93 void* source_ = nullptr;
94 vtable const* vt_ = nullptr;
95 void* cached_awaitable_ = nullptr;
96 void* storage_ = nullptr;
97 awaitable_ops const* active_ops_ = nullptr;
98
99 public:
100 /** Destructor.
101
102 Destroys the owned source (if any) and releases the cached
103 awaitable storage.
104 */
105 ~any_buffer_source();
106
107 /** Default constructor.
108
109 Constructs an empty wrapper. Operations on a default-constructed
110 wrapper result in undefined behavior.
111 */
112 any_buffer_source() = default;
113
114 /** Non-copyable.
115
116 The awaitable cache is per-instance and cannot be shared.
117 */
118 any_buffer_source(any_buffer_source const&) = delete;
119 any_buffer_source& operator=(any_buffer_source const&) = delete;
120
121 /** Move constructor.
122
123 Transfers ownership of the wrapped source (if owned) and
124 cached awaitable storage from `other`. After the move, `other` is
125 in a default-constructed state.
126
127 @param other The wrapper to move from.
128 */
129 1 any_buffer_source(any_buffer_source&& other) noexcept
130 1 : source_(std::exchange(other.source_, nullptr))
131 1 , vt_(std::exchange(other.vt_, nullptr))
132 1 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
133 1 , storage_(std::exchange(other.storage_, nullptr))
134 1 , active_ops_(std::exchange(other.active_ops_, nullptr))
135 {
136 1 }
137
138 /** Move assignment operator.
139
140 Destroys any owned source and releases existing resources,
141 then transfers ownership from `other`.
142
143 @param other The wrapper to move from.
144 @return Reference to this wrapper.
145 */
146 any_buffer_source&
147 operator=(any_buffer_source&& other) noexcept;
148
149 /** Construct by taking ownership of a BufferSource.
150
151 Allocates storage and moves the source into this wrapper.
152 The wrapper owns the source and will destroy it.
153
154 @param s The source to take ownership of.
155 */
156 template<BufferSource S>
157 requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
158 any_buffer_source(S s);
159
160 /** Construct by wrapping a BufferSource without ownership.
161
162 Wraps the given source by pointer. The source must remain
163 valid for the lifetime of this wrapper.
164
165 @param s Pointer to the source to wrap.
166 */
167 template<BufferSource S>
168 any_buffer_source(S* s);
169
170 /** Check if the wrapper contains a valid source.
171
172 @return `true` if wrapping a source, `false` if default-constructed
173 or moved-from.
174 */
175 bool
176 9 has_value() const noexcept
177 {
178 9 return source_ != nullptr;
179 }
180
181 /** Check if the wrapper contains a valid source.
182
183 @return `true` if wrapping a source, `false` if default-constructed
184 or moved-from.
185 */
186 explicit
187 2 operator bool() const noexcept
188 {
189 2 return has_value();
190 }
191
192 /** Consume bytes from the source.
193
194 Advances the internal read position of the underlying source
195 by the specified number of bytes. The next call to @ref pull
196 returns data starting after the consumed bytes.
197
198 @param n The number of bytes to consume. Must not exceed the
199 total size of buffers returned by the previous @ref pull.
200
201 @par Preconditions
202 The wrapper must contain a valid source (`has_value() == true`).
203 */
204 void
205 consume(std::size_t n) noexcept;
206
207 /** Pull buffer data from the source.
208
209 Fills the provided span with buffer descriptors from the
210 underlying source. The operation completes when data is
211 available, the source is exhausted, or an error occurs.
212
213 @param dest Span of const_buffer to fill.
214
215 @return An awaitable yielding `(error_code,std::span<const_buffer>)`.
216 On success with data, a non-empty span of filled buffers.
217 On success with empty span, source is exhausted.
218
219 @par Preconditions
220 The wrapper must contain a valid source (`has_value() == true`).
221 */
222 auto
223 pull(std::span<const_buffer> dest);
224
225 /** Read data into a mutable buffer sequence.
226
227 Fills the provided buffer sequence by pulling data from the
228 underlying source and copying it into the caller's buffers.
229 This satisfies @ref ReadSource but incurs a copy; for zero-copy
230 access, use @ref pull and @ref consume instead.
231
232 @note This operation copies data from the source's internal
233 buffers into the caller's buffers. For zero-copy reads,
234 use @ref pull and @ref consume directly.
235
236 @param buffers The buffer sequence to fill.
237
238 @return An awaitable yielding `(error_code,std::size_t)`.
239 On success, `n == buffer_size(buffers)`.
240 On EOF, `ec == error::eof` and `n` is bytes transferred.
241
242 @par Preconditions
243 The wrapper must contain a valid source (`has_value() == true`).
244
245 @see pull, consume
246 */
247 template<MutableBufferSequence MB>
248 task<io_result<std::size_t>>
249 read(MB buffers);
250
251 protected:
252 /** Rebind to a new source after move.
253
254 Updates the internal pointer to reference a new source object.
255 Used by owning wrappers after move assignment when the owned
256 object has moved to a new location.
257
258 @param new_source The new source to bind to. Must be the same
259 type as the original source.
260
261 @note Terminates if called with a source of different type
262 than the original.
263 */
264 template<BufferSource S>
265 void
266 rebind(S& new_source) noexcept
267 {
268 if(vt_ != &vtable_for_impl<S>::value)
269 std::terminate();
270 source_ = &new_source;
271 }
272 };
273
274 //----------------------------------------------------------
275
276 struct any_buffer_source::awaitable_ops
277 {
278 bool (*await_ready)(void*);
279 coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
280 io_result<std::span<const_buffer>> (*await_resume)(void*);
281 void (*destroy)(void*) noexcept;
282 };
283
284 struct any_buffer_source::vtable
285 {
286 void (*destroy)(void*) noexcept;
287 void (*do_consume)(void* source, std::size_t n) noexcept;
288 std::size_t awaitable_size;
289 std::size_t awaitable_align;
290 awaitable_ops const* (*construct_awaitable)(
291 void* source,
292 void* storage,
293 std::span<const_buffer> dest);
294 };
295
296 template<BufferSource S>
297 struct any_buffer_source::vtable_for_impl
298 {
299 using Awaitable = decltype(std::declval<S&>().pull(
300 std::declval<std::span<const_buffer>>()));
301
302 static void
303 do_destroy_impl(void* source) noexcept
304 {
305 static_cast<S*>(source)->~S();
306 }
307
308 static void
309 39 do_consume_impl(void* source, std::size_t n) noexcept
310 {
311 39 static_cast<S*>(source)->consume(n);
312 39 }
313
314 static awaitable_ops const*
315 92 construct_awaitable_impl(
316 void* source,
317 void* storage,
318 std::span<const_buffer> dest)
319 {
320 92 auto& s = *static_cast<S*>(source);
321 92 ::new(storage) Awaitable(s.pull(dest));
322
323 static constexpr awaitable_ops ops = {
324 92 +[](void* p) {
325 92 return static_cast<Awaitable*>(p)->await_ready();
326 },
327 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
328 return detail::call_await_suspend(
329 static_cast<Awaitable*>(p), h, ex, token);
330 },
331 92 +[](void* p) {
332 92 return static_cast<Awaitable*>(p)->await_resume();
333 },
334 92 +[](void* p) noexcept {
335 92 static_cast<Awaitable*>(p)->~Awaitable();
336 }
337 };
338 92 return &ops;
339 }
340
341 static constexpr vtable value = {
342 &do_destroy_impl,
343 &do_consume_impl,
344 sizeof(Awaitable),
345 alignof(Awaitable),
346 &construct_awaitable_impl
347 };
348 };
349
350 //----------------------------------------------------------
351
352 inline
353 59 any_buffer_source::~any_buffer_source()
354 {
355
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 59 times.
59 if(storage_)
356 {
357 vt_->destroy(source_);
358 ::operator delete(storage_);
359 }
360
2/2
✓ Branch 0 taken 56 times.
✓ Branch 1 taken 3 times.
59 if(cached_awaitable_)
361 56 ::operator delete(cached_awaitable_);
362 59 }
363
364 inline any_buffer_source&
365 1 any_buffer_source::operator=(any_buffer_source&& other) noexcept
366 {
367
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if(this != &other)
368 {
369
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if(storage_)
370 {
371 vt_->destroy(source_);
372 ::operator delete(storage_);
373 }
374
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if(cached_awaitable_)
375 ::operator delete(cached_awaitable_);
376 1 source_ = std::exchange(other.source_, nullptr);
377 1 vt_ = std::exchange(other.vt_, nullptr);
378 1 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
379 1 storage_ = std::exchange(other.storage_, nullptr);
380 1 active_ops_ = std::exchange(other.active_ops_, nullptr);
381 }
382 1 return *this;
383 }
384
385 template<BufferSource S>
386 requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
387 any_buffer_source::any_buffer_source(S s)
388 : vt_(&vtable_for_impl<S>::value)
389 {
390 struct guard {
391 any_buffer_source* self;
392 bool committed = false;
393 ~guard() {
394 if(!committed && self->storage_) {
395 self->vt_->destroy(self->source_);
396 ::operator delete(self->storage_);
397 self->storage_ = nullptr;
398 self->source_ = nullptr;
399 }
400 }
401 } g{this};
402
403 storage_ = ::operator new(sizeof(S));
404 source_ = ::new(storage_) S(std::move(s));
405
406 // Preallocate the awaitable storage
407 cached_awaitable_ = ::operator new(vt_->awaitable_size);
408
409 g.committed = true;
410 }
411
412 template<BufferSource S>
413 56 any_buffer_source::any_buffer_source(S* s)
414 56 : source_(s)
415 56 , vt_(&vtable_for_impl<S>::value)
416 {
417 // Preallocate the awaitable storage
418 56 cached_awaitable_ = ::operator new(vt_->awaitable_size);
419 56 }
420
421 //----------------------------------------------------------
422
423 inline void
424 39 any_buffer_source::consume(std::size_t n) noexcept
425 {
426 39 vt_->do_consume(source_, n);
427 39 }
428
429 inline auto
430 92 any_buffer_source::pull(std::span<const_buffer> dest)
431 {
432 struct awaitable
433 {
434 any_buffer_source* self_;
435 std::span<const_buffer> dest_;
436
437 bool
438 92 await_ready() const noexcept
439 {
440 92 return false;
441 }
442
443 coro
444 92 await_suspend(coro h, executor_ref ex, std::stop_token token)
445 {
446 // Construct the underlying awaitable into cached storage
447 184 self_->active_ops_ = self_->vt_->construct_awaitable(
448 92 self_->source_,
449 92 self_->cached_awaitable_,
450 dest_);
451
452 // Check if underlying is immediately ready
453
1/2
✓ Branch 1 taken 92 times.
✗ Branch 2 not taken.
92 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
454 92 return h;
455
456 // Forward to underlying awaitable
457 return self_->active_ops_->await_suspend(
458 self_->cached_awaitable_, h, ex, token);
459 }
460
461 io_result<std::span<const_buffer>>
462 92 await_resume()
463 {
464 struct guard {
465 any_buffer_source* self;
466 92 ~guard() {
467 92 self->active_ops_->destroy(self->cached_awaitable_);
468 92 self->active_ops_ = nullptr;
469 92 }
470 92 } g{self_};
471 92 return self_->active_ops_->await_resume(
472
1/1
✓ Branch 1 taken 73 times.
165 self_->cached_awaitable_);
473 92 }
474 };
475 92 return awaitable{this, dest};
476 }
477
478 template<MutableBufferSequence MB>
479 task<io_result<std::size_t>>
480 any_buffer_source::read(MB buffers)
481 {
482 std::size_t total = 0;
483 auto dest = sans_prefix(buffers, 0);
484
485 while(!buffer_empty(dest))
486 {
487 const_buffer arr[detail::max_iovec_];
488 auto [ec, bufs] = co_await pull(arr);
489
490 if(ec)
491 co_return {ec, total};
492
493 if(bufs.empty())
494 co_return {error::eof, total};
495
496 auto n = buffer_copy(dest, bufs);
497 consume(n);
498 total += n;
499 dest = sans_prefix(dest, n);
500 }
501
502 co_return {{}, total};
503 }
504
505 //----------------------------------------------------------
506
507 static_assert(BufferSource<any_buffer_source>);
508 static_assert(ReadSource<any_buffer_source>);
509
510 } // namespace capy
511 } // namespace boost
512
513 #endif
514