libs/capy/include/boost/capy/io/any_write_sink.hpp

84.2% Lines (96/114) 90.3% Functions (28/31) 57.9% Branches (11/19)
libs/capy/include/boost/capy/io/any_write_sink.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
11 #define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_param.hpp>
17 #include <boost/capy/concept/io_awaitable.hpp>
18 #include <boost/capy/concept/write_sink.hpp>
19 #include <boost/capy/coro.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/io_result.hpp>
22 #include <boost/capy/task.hpp>
23
24 #include <concepts>
25 #include <coroutine>
26 #include <cstddef>
27 #include <exception>
28 #include <new>
29 #include <span>
30 #include <stop_token>
31 #include <system_error>
32 #include <utility>
33
34 namespace boost {
35 namespace capy {
36
37 /** Type-erased wrapper for any WriteSink.
38
39 This class provides type erasure for any type satisfying the
40 @ref WriteSink concept, enabling runtime polymorphism for
41 sink write operations. It uses cached awaitable storage to achieve
42 zero steady-state allocation after construction.
43
44 The wrapper supports two construction modes:
45 - **Owning**: Pass by value to transfer ownership. The wrapper
46 allocates storage and owns the sink.
47 - **Reference**: Pass a pointer to wrap without ownership. The
48 pointed-to sink must outlive this wrapper.
49
50 @par Awaitable Preallocation
51 The constructor preallocates storage for the type-erased awaitable.
52 This reserves all virtual address space at server startup
53 so memory usage can be measured up front, rather than
54 allocating piecemeal as traffic arrives.
55
56 @par Thread Safety
57 Not thread-safe. Concurrent operations on the same wrapper
58 are undefined behavior.
59
60 @par Example
61 @code
62 // Owning - takes ownership of the sink
63 any_write_sink ws(some_sink{args...});
64
65 // Reference - wraps without ownership
66 some_sink sink;
67 any_write_sink ws(&sink);
68
69 const_buffer buf(data, size);
70 auto [ec, n] = co_await ws.write(std::span(&buf, 1));
71 auto [ec2] = co_await ws.write_eof();
72 @endcode
73
74 @see any_write_stream, WriteSink
75 */
76 class any_write_sink
77 {
78 struct vtable;
79 struct write_awaitable_ops;
80 struct eof_awaitable_ops;
81
82 template<WriteSink S>
83 struct vtable_for_impl;
84
85 void* sink_ = nullptr;
86 vtable const* vt_ = nullptr;
87 void* cached_awaitable_ = nullptr;
88 void* storage_ = nullptr;
89 write_awaitable_ops const* active_write_ops_ = nullptr;
90 eof_awaitable_ops const* active_eof_ops_ = nullptr;
91
92 public:
93 /** Destructor.
94
95 Destroys the owned sink (if any) and releases the cached
96 awaitable storage.
97 */
98 ~any_write_sink();
99
100 /** Default constructor.
101
102 Constructs an empty wrapper. Operations on a default-constructed
103 wrapper result in undefined behavior.
104 */
105 any_write_sink() = default;
106
107 /** Non-copyable.
108
109 The awaitable cache is per-instance and cannot be shared.
110 */
111 any_write_sink(any_write_sink const&) = delete;
112 any_write_sink& operator=(any_write_sink const&) = delete;
113
114 /** Move constructor.
115
116 Transfers ownership of the wrapped sink (if owned) and
117 cached awaitable storage from `other`. After the move, `other` is
118 in a default-constructed state.
119
120 @param other The wrapper to move from.
121 */
122 1 any_write_sink(any_write_sink&& other) noexcept
123 1 : sink_(std::exchange(other.sink_, nullptr))
124 1 , vt_(std::exchange(other.vt_, nullptr))
125 1 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
126 1 , storage_(std::exchange(other.storage_, nullptr))
127 1 , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
128 1 , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
129 {
130 1 }
131
132 /** Move assignment operator.
133
134 Destroys any owned sink and releases existing resources,
135 then transfers ownership from `other`.
136
137 @param other The wrapper to move from.
138 @return Reference to this wrapper.
139 */
140 any_write_sink&
141 operator=(any_write_sink&& other) noexcept;
142
143 /** Construct by taking ownership of a WriteSink.
144
145 Allocates storage and moves the sink into this wrapper.
146 The wrapper owns the sink and will destroy it.
147
148 @param s The sink to take ownership of.
149 */
150 template<WriteSink S>
151 requires (!std::same_as<std::decay_t<S>, any_write_sink>)
152 any_write_sink(S s);
153
154 /** Construct by wrapping a WriteSink without ownership.
155
156 Wraps the given sink by pointer. The sink must remain
157 valid for the lifetime of this wrapper.
158
159 @param s Pointer to the sink to wrap.
160 */
161 template<WriteSink S>
162 any_write_sink(S* s);
163
164 /** Check if the wrapper contains a valid sink.
165
166 @return `true` if wrapping a sink, `false` if default-constructed
167 or moved-from.
168 */
169 bool
170 9 has_value() const noexcept
171 {
172 9 return sink_ != nullptr;
173 }
174
175 /** Check if the wrapper contains a valid sink.
176
177 @return `true` if wrapping a sink, `false` if default-constructed
178 or moved-from.
179 */
180 explicit
181 2 operator bool() const noexcept
182 {
183 2 return has_value();
184 }
185
186 /** Initiate an asynchronous write operation.
187
188 Writes data from the provided buffer sequence. The operation
189 completes when all bytes have been consumed, or an error
190 occurs.
191
192 @param buffers The buffer sequence containing data to write.
193 Passed by value to ensure the sequence lives in the
194 coroutine frame across suspension points.
195
196 @return An awaitable yielding `(error_code,std::size_t)`.
197
198 @par Preconditions
199 The wrapper must contain a valid sink (`has_value() == true`).
200 */
201 template<ConstBufferSequence CB>
202 task<io_result<std::size_t>>
203 write(CB buffers);
204
205 /** Initiate an asynchronous write operation with optional EOF.
206
207 Writes data from the provided buffer sequence, optionally
208 finalizing the sink afterwards. The operation completes when
209 all bytes have been consumed and (if eof is true) the sink
210 is finalized, or an error occurs.
211
212 @param buffers The buffer sequence containing data to write.
213 Passed by value to ensure the sequence lives in the
214 coroutine frame across suspension points.
215
216 @param eof If `true`, the sink is finalized after writing
217 the data.
218
219 @return An awaitable yielding `(error_code,std::size_t)`.
220
221 @par Preconditions
222 The wrapper must contain a valid sink (`has_value() == true`).
223 */
224 template<ConstBufferSequence CB>
225 task<io_result<std::size_t>>
226 write(CB buffers, bool eof);
227
228 /** Signal end of data.
229
230 Indicates that no more data will be written to the sink.
231 The operation completes when the sink is finalized, or
232 an error occurs.
233
234 @return An awaitable yielding `(error_code)`.
235
236 @par Preconditions
237 The wrapper must contain a valid sink (`has_value() == true`).
238 */
239 auto
240 write_eof();
241
242 protected:
243 /** Rebind to a new sink after move.
244
245 Updates the internal pointer to reference a new sink object.
246 Used by owning wrappers after move assignment when the owned
247 object has moved to a new location.
248
249 @param new_sink The new sink to bind to. Must be the same
250 type as the original sink.
251
252 @note Terminates if called with a sink of different type
253 than the original.
254 */
255 template<WriteSink S>
256 void
257 rebind(S& new_sink) noexcept
258 {
259 if(vt_ != &vtable_for_impl<S>::value)
260 std::terminate();
261 sink_ = &new_sink;
262 }
263
264 private:
265 auto
266 write_some_(std::span<const_buffer const> buffers, bool eof);
267 };
268
269 //----------------------------------------------------------
270
271 struct any_write_sink::write_awaitable_ops
272 {
273 bool (*await_ready)(void*);
274 coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
275 io_result<std::size_t> (*await_resume)(void*);
276 void (*destroy)(void*) noexcept;
277 };
278
279 struct any_write_sink::eof_awaitable_ops
280 {
281 bool (*await_ready)(void*);
282 coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
283 io_result<> (*await_resume)(void*);
284 void (*destroy)(void*) noexcept;
285 };
286
287 struct any_write_sink::vtable
288 {
289 void (*destroy)(void*) noexcept;
290 std::size_t awaitable_size;
291 std::size_t awaitable_align;
292 write_awaitable_ops const* (*construct_write_awaitable)(
293 void* sink,
294 void* storage,
295 std::span<const_buffer const> buffers,
296 bool eof);
297 eof_awaitable_ops const* (*construct_eof_awaitable)(
298 void* sink,
299 void* storage);
300 };
301
302 template<WriteSink S>
303 struct any_write_sink::vtable_for_impl
304 {
305 using WriteAwaitable = decltype(std::declval<S&>().write(
306 std::span<const_buffer const>{}, false));
307 using EofAwaitable = decltype(std::declval<S&>().write_eof());
308
309 static void
310 do_destroy_impl(void* sink) noexcept
311 {
312 static_cast<S*>(sink)->~S();
313 }
314
315 static write_awaitable_ops const*
316 126 construct_write_awaitable_impl(
317 void* sink,
318 void* storage,
319 std::span<const_buffer const> buffers,
320 bool eof)
321 {
322 126 auto& s = *static_cast<S*>(sink);
323 126 ::new(storage) WriteAwaitable(s.write(buffers, eof));
324
325 static constexpr write_awaitable_ops ops = {
326 126 +[](void* p) {
327 126 return static_cast<WriteAwaitable*>(p)->await_ready();
328 },
329 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
330 return detail::call_await_suspend(
331 static_cast<WriteAwaitable*>(p), h, ex, token);
332 },
333 126 +[](void* p) {
334 126 return static_cast<WriteAwaitable*>(p)->await_resume();
335 },
336 126 +[](void* p) noexcept {
337 126 static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
338 }
339 };
340 126 return &ops;
341 }
342
343 static eof_awaitable_ops const*
344 24 construct_eof_awaitable_impl(
345 void* sink,
346 void* storage)
347 {
348 24 auto& s = *static_cast<S*>(sink);
349 24 ::new(storage) EofAwaitable(s.write_eof());
350
351 static constexpr eof_awaitable_ops ops = {
352 24 +[](void* p) {
353 24 return static_cast<EofAwaitable*>(p)->await_ready();
354 },
355 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
356 return detail::call_await_suspend(
357 static_cast<EofAwaitable*>(p), h, ex, token);
358 },
359 24 +[](void* p) {
360 24 return static_cast<EofAwaitable*>(p)->await_resume();
361 },
362 24 +[](void* p) noexcept {
363 24 static_cast<EofAwaitable*>(p)->~EofAwaitable();
364 }
365 };
366 24 return &ops;
367 }
368
369 static constexpr std::size_t max_awaitable_size =
370 sizeof(WriteAwaitable) > sizeof(EofAwaitable)
371 ? sizeof(WriteAwaitable)
372 : sizeof(EofAwaitable);
373
374 static constexpr std::size_t max_awaitable_align =
375 alignof(WriteAwaitable) > alignof(EofAwaitable)
376 ? alignof(WriteAwaitable)
377 : alignof(EofAwaitable);
378
379 static constexpr vtable value = {
380 &do_destroy_impl,
381 max_awaitable_size,
382 max_awaitable_align,
383 &construct_write_awaitable_impl,
384 &construct_eof_awaitable_impl
385 };
386 };
387
388 //----------------------------------------------------------
389
390 inline
391 99 any_write_sink::~any_write_sink()
392 {
393
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 99 times.
99 if(storage_)
394 {
395 vt_->destroy(sink_);
396 ::operator delete(storage_);
397 }
398
2/2
✓ Branch 0 taken 96 times.
✓ Branch 1 taken 3 times.
99 if(cached_awaitable_)
399 96 ::operator delete(cached_awaitable_);
400 99 }
401
402 inline any_write_sink&
403 1 any_write_sink::operator=(any_write_sink&& other) noexcept
404 {
405
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if(this != &other)
406 {
407
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if(storage_)
408 {
409 vt_->destroy(sink_);
410 ::operator delete(storage_);
411 }
412
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if(cached_awaitable_)
413 ::operator delete(cached_awaitable_);
414 1 sink_ = std::exchange(other.sink_, nullptr);
415 1 vt_ = std::exchange(other.vt_, nullptr);
416 1 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
417 1 storage_ = std::exchange(other.storage_, nullptr);
418 1 active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
419 1 active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
420 }
421 1 return *this;
422 }
423
424 template<WriteSink S>
425 requires (!std::same_as<std::decay_t<S>, any_write_sink>)
426 any_write_sink::any_write_sink(S s)
427 : vt_(&vtable_for_impl<S>::value)
428 {
429 struct guard {
430 any_write_sink* self;
431 bool committed = false;
432 ~guard() {
433 if(!committed && self->storage_) {
434 self->vt_->destroy(self->sink_);
435 ::operator delete(self->storage_);
436 self->storage_ = nullptr;
437 self->sink_ = nullptr;
438 }
439 }
440 } g{this};
441
442 storage_ = ::operator new(sizeof(S));
443 sink_ = ::new(storage_) S(std::move(s));
444
445 // Preallocate the awaitable storage (sized for max of write/eof)
446 cached_awaitable_ = ::operator new(vt_->awaitable_size);
447
448 g.committed = true;
449 }
450
451 template<WriteSink S>
452 96 any_write_sink::any_write_sink(S* s)
453 96 : sink_(s)
454 96 , vt_(&vtable_for_impl<S>::value)
455 {
456 // Preallocate the awaitable storage (sized for max of write/eof)
457 96 cached_awaitable_ = ::operator new(vt_->awaitable_size);
458 96 }
459
460 //----------------------------------------------------------
461
462 inline auto
463 126 any_write_sink::write_some_(
464 std::span<const_buffer const> buffers,
465 bool eof)
466 {
467 struct awaitable
468 {
469 any_write_sink* self_;
470 std::span<const_buffer const> buffers_;
471 bool eof_;
472
473 bool
474 126 await_ready() const noexcept
475 {
476 126 return false;
477 }
478
479 coro
480 126 await_suspend(coro h, executor_ref ex, std::stop_token token)
481 {
482 // Construct the underlying awaitable into cached storage
483 252 self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
484 126 self_->sink_,
485 126 self_->cached_awaitable_,
486 buffers_,
487 126 eof_);
488
489 // Check if underlying is immediately ready
490
1/2
✓ Branch 1 taken 126 times.
✗ Branch 2 not taken.
126 if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
491 126 return h;
492
493 // Forward to underlying awaitable
494 return self_->active_write_ops_->await_suspend(
495 self_->cached_awaitable_, h, ex, token);
496 }
497
498 io_result<std::size_t>
499 126 await_resume()
500 {
501 struct guard {
502 any_write_sink* self;
503 126 ~guard() {
504 126 self->active_write_ops_->destroy(self->cached_awaitable_);
505 126 self->active_write_ops_ = nullptr;
506 126 }
507 126 } g{self_};
508 126 return self_->active_write_ops_->await_resume(
509
1/1
✓ Branch 1 taken 98 times.
224 self_->cached_awaitable_);
510 126 }
511 };
512 126 return awaitable{this, buffers, eof};
513 }
514
515 inline auto
516 24 any_write_sink::write_eof()
517 {
518 struct awaitable
519 {
520 any_write_sink* self_;
521
522 bool
523 24 await_ready() const noexcept
524 {
525 24 return false;
526 }
527
528 coro
529 24 await_suspend(coro h, executor_ref ex, std::stop_token token)
530 {
531 // Construct the underlying awaitable into cached storage
532 48 self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
533 24 self_->sink_,
534 24 self_->cached_awaitable_);
535
536 // Check if underlying is immediately ready
537
1/2
✓ Branch 1 taken 24 times.
✗ Branch 2 not taken.
24 if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
538 24 return h;
539
540 // Forward to underlying awaitable
541 return self_->active_eof_ops_->await_suspend(
542 self_->cached_awaitable_, h, ex, token);
543 }
544
545 io_result<>
546 24 await_resume()
547 {
548 struct guard {
549 any_write_sink* self;
550 24 ~guard() {
551 24 self->active_eof_ops_->destroy(self->cached_awaitable_);
552 24 self->active_eof_ops_ = nullptr;
553 24 }
554 24 } g{self_};
555 24 return self_->active_eof_ops_->await_resume(
556
1/1
✓ Branch 1 taken 17 times.
41 self_->cached_awaitable_);
557 24 }
558 };
559 24 return awaitable{this};
560 }
561
562 template<ConstBufferSequence CB>
563 task<io_result<std::size_t>>
564 66 any_write_sink::write(CB buffers)
565 {
566 66 return write(buffers, false);
567 }
568
569 template<ConstBufferSequence CB>
570 task<io_result<std::size_t>>
571
1/1
✓ Branch 1 taken 98 times.
98 any_write_sink::write(CB buffers, bool eof)
572 {
573 buffer_param<CB> bp(buffers);
574 std::size_t total = 0;
575
576 for(;;)
577 {
578 auto bufs = bp.data();
579 if(bufs.empty())
580 break;
581
582 auto [ec, n] = co_await write_some_(bufs, false);
583 if(ec)
584 co_return {ec, total + n};
585 bp.consume(n);
586 total += n;
587 }
588
589 if(eof)
590 {
591 auto [ec] = co_await write_eof();
592 if(ec)
593 co_return {ec, total};
594 }
595
596 co_return {{}, total};
597 196 }
598
599 } // namespace capy
600 } // namespace boost
601
602 #endif
603