libs/capy/include/boost/capy/io/any_buffer_sink.hpp

84.3% Lines (97/115) 90.0% Functions (27/30) 55.6% Branches (10/18)
libs/capy/include/boost/capy/io/any_buffer_sink.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11 #define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_copy.hpp>
17 #include <boost/capy/buffers/buffer_param.hpp>
18 #include <boost/capy/concept/buffer_sink.hpp>
19 #include <boost/capy/concept/io_awaitable.hpp>
20 #include <boost/capy/concept/write_sink.hpp>
21 #include <boost/capy/coro.hpp>
22 #include <boost/capy/ex/executor_ref.hpp>
23 #include <boost/capy/io_result.hpp>
24 #include <boost/capy/task.hpp>
25
26 #include <concepts>
27 #include <coroutine>
28 #include <cstddef>
29 #include <exception>
30 #include <new>
31 #include <stop_token>
32 #include <system_error>
33 #include <utility>
34
35 namespace boost {
36 namespace capy {
37
38 /** Type-erased wrapper for any BufferSink.
39
40 This class provides type erasure for any type satisfying the
41 @ref BufferSink concept, enabling runtime polymorphism for
42 buffer sink operations. It uses cached awaitable storage to achieve
43 zero steady-state allocation after construction.
44
45 The wrapper also satisfies @ref WriteSink through templated
46 @ref write methods. These methods copy data from the caller's
47 buffers into the sink's internal storage, incurring one extra
48 buffer copy compared to using @ref prepare and @ref commit
49 directly.
50
51 The wrapper supports two construction modes:
52 - **Owning**: Pass by value to transfer ownership. The wrapper
53 allocates storage and owns the sink.
54 - **Reference**: Pass a pointer to wrap without ownership. The
55 pointed-to sink must outlive this wrapper.
56
57 @par Awaitable Preallocation
58 The constructor preallocates storage for the type-erased awaitable.
59 This reserves all virtual address space at server startup
60 so memory usage can be measured up front, rather than
61 allocating piecemeal as traffic arrives.
62
63 @par Thread Safety
64 Not thread-safe. Concurrent operations on the same wrapper
65 are undefined behavior.
66
67 @par Example
68 @code
69 // Owning - takes ownership of the sink
70 any_buffer_sink abs(some_buffer_sink{args...});
71
72 // Reference - wraps without ownership
73 some_buffer_sink sink;
74 any_buffer_sink abs(&sink);
75
76 mutable_buffer arr[16];
77 auto bufs = abs.prepare(arr);
78 // Write data into bufs[0..bufs.size())
79 auto [ec] = co_await abs.commit(bytes_written);
80 auto [ec2] = co_await abs.commit_eof();
81 @endcode
82
83 @see any_buffer_source, BufferSink, WriteSink
84 */
85 class any_buffer_sink
86 {
87 struct vtable;
88 struct awaitable_ops;
89
90 template<BufferSink S>
91 struct vtable_for_impl;
92
93 void* sink_ = nullptr;
94 vtable const* vt_ = nullptr;
95 void* cached_awaitable_ = nullptr;
96 void* storage_ = nullptr;
97 awaitable_ops const* active_ops_ = nullptr;
98
99 public:
100 /** Destructor.
101
102 Destroys the owned sink (if any) and releases the cached
103 awaitable storage.
104 */
105 ~any_buffer_sink();
106
107 /** Default constructor.
108
109 Constructs an empty wrapper. Operations on a default-constructed
110 wrapper result in undefined behavior.
111 */
112 any_buffer_sink() = default;
113
114 /** Non-copyable.
115
116 The awaitable cache is per-instance and cannot be shared.
117 */
118 any_buffer_sink(any_buffer_sink const&) = delete;
119 any_buffer_sink& operator=(any_buffer_sink const&) = delete;
120
121 /** Move constructor.
122
123 Transfers ownership of the wrapped sink (if owned) and
124 cached awaitable storage from `other`. After the move, `other` is
125 in a default-constructed state.
126
127 @param other The wrapper to move from.
128 */
129 1 any_buffer_sink(any_buffer_sink&& other) noexcept
130 1 : sink_(std::exchange(other.sink_, nullptr))
131 1 , vt_(std::exchange(other.vt_, nullptr))
132 1 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
133 1 , storage_(std::exchange(other.storage_, nullptr))
134 1 , active_ops_(std::exchange(other.active_ops_, nullptr))
135 {
136 1 }
137
138 /** Move assignment operator.
139
140 Destroys any owned sink and releases existing resources,
141 then transfers ownership from `other`.
142
143 @param other The wrapper to move from.
144 @return Reference to this wrapper.
145 */
146 any_buffer_sink&
147 operator=(any_buffer_sink&& other) noexcept;
148
149 /** Construct by taking ownership of a BufferSink.
150
151 Allocates storage and moves the sink into this wrapper.
152 The wrapper owns the sink and will destroy it.
153
154 @param s The sink to take ownership of.
155 */
156 template<BufferSink S>
157 requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
158 any_buffer_sink(S s);
159
160 /** Construct by wrapping a BufferSink without ownership.
161
162 Wraps the given sink by pointer. The sink must remain
163 valid for the lifetime of this wrapper.
164
165 @param s Pointer to the sink to wrap.
166 */
167 template<BufferSink S>
168 any_buffer_sink(S* s);
169
170 /** Check if the wrapper contains a valid sink.
171
172 @return `true` if wrapping a sink, `false` if default-constructed
173 or moved-from.
174 */
175 bool
176 9 has_value() const noexcept
177 {
178 9 return sink_ != nullptr;
179 }
180
181 /** Check if the wrapper contains a valid sink.
182
183 @return `true` if wrapping a sink, `false` if default-constructed
184 or moved-from.
185 */
186 explicit
187 2 operator bool() const noexcept
188 {
189 2 return has_value();
190 }
191
192 /** Prepare writable buffers.
193
194 Fills the provided span with mutable buffer descriptors
195 pointing to the underlying sink's internal storage. This
196 operation is synchronous.
197
198 @param dest Span of mutable_buffer to fill.
199
200 @return A span of filled buffers.
201
202 @par Preconditions
203 The wrapper must contain a valid sink (`has_value() == true`).
204 */
205 std::span<mutable_buffer>
206 prepare(std::span<mutable_buffer> dest);
207
208 /** Commit bytes written to the prepared buffers.
209
210 Commits `n` bytes written to the buffers returned by the
211 most recent call to @ref prepare. The operation may trigger
212 underlying I/O.
213
214 @param n The number of bytes to commit.
215
216 @return An awaitable yielding `(error_code)`.
217
218 @par Preconditions
219 The wrapper must contain a valid sink (`has_value() == true`).
220 */
221 auto
222 commit(std::size_t n);
223
224 /** Commit bytes written with optional end-of-stream.
225
226 Commits `n` bytes written to the buffers returned by the
227 most recent call to @ref prepare. If `eof` is true, also
228 signals end-of-stream.
229
230 @param n The number of bytes to commit.
231 @param eof If true, signals end-of-stream after committing.
232
233 @return An awaitable yielding `(error_code)`.
234
235 @par Preconditions
236 The wrapper must contain a valid sink (`has_value() == true`).
237 */
238 auto
239 commit(std::size_t n, bool eof);
240
241 /** Signal end-of-stream.
242
243 Indicates that no more data will be written to the sink.
244 The operation completes when the sink is finalized, or
245 an error occurs.
246
247 @return An awaitable yielding `(error_code)`.
248
249 @par Preconditions
250 The wrapper must contain a valid sink (`has_value() == true`).
251 */
252 auto
253 commit_eof();
254
255 /** Write data from a buffer sequence.
256
257 Writes all data from the buffer sequence to the underlying
258 sink. This method satisfies the @ref WriteSink concept.
259
260 @note This operation copies data from the caller's buffers
261 into the sink's internal buffers. For zero-copy writes,
262 use @ref prepare and @ref commit directly.
263
264 @param buffers The buffer sequence to write.
265
266 @return An awaitable yielding `(error_code,std::size_t)`.
267
268 @par Preconditions
269 The wrapper must contain a valid sink (`has_value() == true`).
270 */
271 template<ConstBufferSequence CB>
272 task<io_result<std::size_t>>
273 write(CB buffers);
274
275 /** Write data with optional end-of-stream.
276
277 Writes all data from the buffer sequence to the underlying
278 sink, optionally finalizing it afterwards. This method
279 satisfies the @ref WriteSink concept.
280
281 @note This operation copies data from the caller's buffers
282 into the sink's internal buffers. For zero-copy writes,
283 use @ref prepare and @ref commit directly.
284
285 @param buffers The buffer sequence to write.
286 @param eof If true, finalize the sink after writing.
287
288 @return An awaitable yielding `(error_code,std::size_t)`.
289
290 @par Preconditions
291 The wrapper must contain a valid sink (`has_value() == true`).
292 */
293 template<ConstBufferSequence CB>
294 task<io_result<std::size_t>>
295 write(CB buffers, bool eof);
296
297 /** Signal end-of-stream.
298
299 Indicates that no more data will be written to the sink.
300 This method satisfies the @ref WriteSink concept.
301
302 @return An awaitable yielding `(error_code)`.
303
304 @par Preconditions
305 The wrapper must contain a valid sink (`has_value() == true`).
306 */
307 auto
308 write_eof();
309
310 protected:
311 /** Rebind to a new sink after move.
312
313 Updates the internal pointer to reference a new sink object.
314 Used by owning wrappers after move assignment when the owned
315 object has moved to a new location.
316
317 @param new_sink The new sink to bind to. Must be the same
318 type as the original sink.
319
320 @note Terminates if called with a sink of different type
321 than the original.
322 */
323 template<BufferSink S>
324 void
325 rebind(S& new_sink) noexcept
326 {
327 if(vt_ != &vtable_for_impl<S>::value)
328 std::terminate();
329 sink_ = &new_sink;
330 }
331 };
332
333 //----------------------------------------------------------
334
335 struct any_buffer_sink::awaitable_ops
336 {
337 bool (*await_ready)(void*);
338 coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
339 io_result<> (*await_resume)(void*);
340 void (*destroy)(void*) noexcept;
341 };
342
343 struct any_buffer_sink::vtable
344 {
345 void (*destroy)(void*) noexcept;
346 std::span<mutable_buffer> (*do_prepare)(
347 void* sink,
348 std::span<mutable_buffer> dest);
349 std::size_t awaitable_size;
350 std::size_t awaitable_align;
351 awaitable_ops const* (*construct_commit_awaitable)(
352 void* sink,
353 void* storage,
354 std::size_t n,
355 bool eof);
356 awaitable_ops const* (*construct_eof_awaitable)(
357 void* sink,
358 void* storage);
359 };
360
361 template<BufferSink S>
362 struct any_buffer_sink::vtable_for_impl
363 {
364 using CommitAwaitable = decltype(std::declval<S&>().commit(
365 std::size_t{}, false));
366 using EofAwaitable = decltype(std::declval<S&>().commit_eof());
367
368 static void
369 do_destroy_impl(void* sink) noexcept
370 {
371 static_cast<S*>(sink)->~S();
372 }
373
374 static std::span<mutable_buffer>
375 68 do_prepare_impl(
376 void* sink,
377 std::span<mutable_buffer> dest)
378 {
379 68 auto& s = *static_cast<S*>(sink);
380 68 return s.prepare(dest);
381 }
382
383 static awaitable_ops const*
384 48 construct_commit_awaitable_impl(
385 void* sink,
386 void* storage,
387 std::size_t n,
388 bool eof)
389 {
390 48 auto& s = *static_cast<S*>(sink);
391 48 ::new(storage) CommitAwaitable(s.commit(n, eof));
392
393 static constexpr awaitable_ops ops = {
394 48 +[](void* p) {
395 48 return static_cast<CommitAwaitable*>(p)->await_ready();
396 },
397 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
398 return detail::call_await_suspend(
399 static_cast<CommitAwaitable*>(p), h, ex, token);
400 },
401 48 +[](void* p) {
402 48 return static_cast<CommitAwaitable*>(p)->await_resume();
403 },
404 48 +[](void* p) noexcept {
405 48 static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
406 }
407 };
408 48 return &ops;
409 }
410
411 static awaitable_ops const*
412 18 construct_eof_awaitable_impl(
413 void* sink,
414 void* storage)
415 {
416 18 auto& s = *static_cast<S*>(sink);
417 18 ::new(storage) EofAwaitable(s.commit_eof());
418
419 static constexpr awaitable_ops ops = {
420 18 +[](void* p) {
421 18 return static_cast<EofAwaitable*>(p)->await_ready();
422 },
423 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
424 return detail::call_await_suspend(
425 static_cast<EofAwaitable*>(p), h, ex, token);
426 },
427 18 +[](void* p) {
428 18 return static_cast<EofAwaitable*>(p)->await_resume();
429 },
430 18 +[](void* p) noexcept {
431 18 static_cast<EofAwaitable*>(p)->~EofAwaitable();
432 }
433 };
434 18 return &ops;
435 }
436
437 static constexpr std::size_t max_awaitable_size =
438 sizeof(CommitAwaitable) > sizeof(EofAwaitable)
439 ? sizeof(CommitAwaitable)
440 : sizeof(EofAwaitable);
441
442 static constexpr std::size_t max_awaitable_align =
443 alignof(CommitAwaitable) > alignof(EofAwaitable)
444 ? alignof(CommitAwaitable)
445 : alignof(EofAwaitable);
446
447 static constexpr vtable value = {
448 &do_destroy_impl,
449 &do_prepare_impl,
450 max_awaitable_size,
451 max_awaitable_align,
452 &construct_commit_awaitable_impl,
453 &construct_eof_awaitable_impl
454 };
455 };
456
457 //----------------------------------------------------------
458
459 inline
460 63 any_buffer_sink::~any_buffer_sink()
461 {
462
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 63 times.
63 if(storage_)
463 {
464 vt_->destroy(sink_);
465 ::operator delete(storage_);
466 }
467
2/2
✓ Branch 0 taken 60 times.
✓ Branch 1 taken 3 times.
63 if(cached_awaitable_)
468 60 ::operator delete(cached_awaitable_);
469 63 }
470
471 inline any_buffer_sink&
472 1 any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
473 {
474
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if(this != &other)
475 {
476
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if(storage_)
477 {
478 vt_->destroy(sink_);
479 ::operator delete(storage_);
480 }
481
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if(cached_awaitable_)
482 ::operator delete(cached_awaitable_);
483 1 sink_ = std::exchange(other.sink_, nullptr);
484 1 vt_ = std::exchange(other.vt_, nullptr);
485 1 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
486 1 storage_ = std::exchange(other.storage_, nullptr);
487 1 active_ops_ = std::exchange(other.active_ops_, nullptr);
488 }
489 1 return *this;
490 }
491
492 template<BufferSink S>
493 requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
494 any_buffer_sink::any_buffer_sink(S s)
495 : vt_(&vtable_for_impl<S>::value)
496 {
497 struct guard {
498 any_buffer_sink* self;
499 bool committed = false;
500 ~guard() {
501 if(!committed && self->storage_) {
502 self->vt_->destroy(self->sink_);
503 ::operator delete(self->storage_);
504 self->storage_ = nullptr;
505 self->sink_ = nullptr;
506 }
507 }
508 } g{this};
509
510 storage_ = ::operator new(sizeof(S));
511 sink_ = ::new(storage_) S(std::move(s));
512
513 // Preallocate the awaitable storage (sized for max of commit/eof)
514 cached_awaitable_ = ::operator new(vt_->awaitable_size);
515
516 g.committed = true;
517 }
518
519 template<BufferSink S>
520 60 any_buffer_sink::any_buffer_sink(S* s)
521 60 : sink_(s)
522 60 , vt_(&vtable_for_impl<S>::value)
523 {
524 // Preallocate the awaitable storage (sized for max of commit/eof)
525 60 cached_awaitable_ = ::operator new(vt_->awaitable_size);
526 60 }
527
528 //----------------------------------------------------------
529
530 inline std::span<mutable_buffer>
531 68 any_buffer_sink::prepare(std::span<mutable_buffer> dest)
532 {
533 68 return vt_->do_prepare(sink_, dest);
534 }
535
536 inline auto
537 48 any_buffer_sink::commit(std::size_t n, bool eof)
538 {
539 struct awaitable
540 {
541 any_buffer_sink* self_;
542 std::size_t n_;
543 bool eof_;
544
545 bool
546 48 await_ready() const noexcept
547 {
548 48 return false;
549 }
550
551 coro
552 48 await_suspend(coro h, executor_ref ex, std::stop_token token)
553 {
554 // Construct the underlying awaitable into cached storage
555 96 self_->active_ops_ = self_->vt_->construct_commit_awaitable(
556 48 self_->sink_,
557 48 self_->cached_awaitable_,
558 n_,
559 48 eof_);
560
561 // Check if underlying is immediately ready
562
1/2
✓ Branch 1 taken 48 times.
✗ Branch 2 not taken.
48 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
563 48 return h;
564
565 // Forward to underlying awaitable
566 return self_->active_ops_->await_suspend(
567 self_->cached_awaitable_, h, ex, token);
568 }
569
570 io_result<>
571 48 await_resume()
572 {
573 struct guard {
574 any_buffer_sink* self;
575 48 ~guard() {
576 48 self->active_ops_->destroy(self->cached_awaitable_);
577 48 self->active_ops_ = nullptr;
578 48 }
579 48 } g{self_};
580 48 return self_->active_ops_->await_resume(
581
1/1
✓ Branch 1 taken 37 times.
85 self_->cached_awaitable_);
582 48 }
583 };
584 48 return awaitable{this, n, eof};
585 }
586
587 inline auto
588 38 any_buffer_sink::commit(std::size_t n)
589 {
590 38 return commit(n, false);
591 }
592
593 inline auto
594 18 any_buffer_sink::commit_eof()
595 {
596 struct awaitable
597 {
598 any_buffer_sink* self_;
599
600 bool
601 18 await_ready() const noexcept
602 {
603 18 return false;
604 }
605
606 coro
607 18 await_suspend(coro h, executor_ref ex, std::stop_token token)
608 {
609 // Construct the underlying awaitable into cached storage
610 36 self_->active_ops_ = self_->vt_->construct_eof_awaitable(
611 18 self_->sink_,
612 18 self_->cached_awaitable_);
613
614 // Check if underlying is immediately ready
615
1/2
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
18 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
616 18 return h;
617
618 // Forward to underlying awaitable
619 return self_->active_ops_->await_suspend(
620 self_->cached_awaitable_, h, ex, token);
621 }
622
623 io_result<>
624 18 await_resume()
625 {
626 struct guard {
627 any_buffer_sink* self;
628 18 ~guard() {
629 18 self->active_ops_->destroy(self->cached_awaitable_);
630 18 self->active_ops_ = nullptr;
631 18 }
632 18 } g{self_};
633 18 return self_->active_ops_->await_resume(
634
1/1
✓ Branch 1 taken 13 times.
31 self_->cached_awaitable_);
635 18 }
636 };
637 18 return awaitable{this};
638 }
639
640 //----------------------------------------------------------
641
642 template<ConstBufferSequence CB>
643 task<io_result<std::size_t>>
644 any_buffer_sink::write(CB buffers)
645 {
646 return write(buffers, false);
647 }
648
649 template<ConstBufferSequence CB>
650 task<io_result<std::size_t>>
651 any_buffer_sink::write(CB buffers, bool eof)
652 {
653 buffer_param<CB> bp(buffers);
654 std::size_t total = 0;
655
656 for(;;)
657 {
658 auto src = bp.data();
659 if(src.empty())
660 break;
661
662 mutable_buffer arr[detail::max_iovec_];
663 auto dst_bufs = prepare(arr);
664 if(dst_bufs.empty())
665 {
666 auto [ec] = co_await commit(0);
667 if(ec)
668 co_return {ec, total};
669 continue;
670 }
671
672 auto n = buffer_copy(dst_bufs, src);
673 auto [ec] = co_await commit(n);
674 if(ec)
675 co_return {ec, total};
676 bp.consume(n);
677 total += n;
678 }
679
680 if(eof)
681 {
682 auto [ec] = co_await commit_eof();
683 if(ec)
684 co_return {ec, total};
685 }
686
687 co_return {{}, total};
688 }
689
690 inline auto
691 any_buffer_sink::write_eof()
692 {
693 return commit_eof();
694 }
695
696 //----------------------------------------------------------
697
698 static_assert(WriteSink<any_buffer_sink>);
699
700 } // namespace capy
701 } // namespace boost
702
703 #endif
704