Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
11 : #define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_param.hpp>
17 : #include <boost/capy/concept/io_awaitable.hpp>
18 : #include <boost/capy/concept/write_sink.hpp>
19 : #include <boost/capy/coro.hpp>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <boost/capy/io_result.hpp>
22 : #include <boost/capy/task.hpp>
23 :
24 : #include <concepts>
25 : #include <coroutine>
26 : #include <cstddef>
27 : #include <exception>
28 : #include <new>
29 : #include <span>
30 : #include <stop_token>
31 : #include <system_error>
32 : #include <utility>
33 :
34 : namespace boost {
35 : namespace capy {
36 :
37 : /** Type-erased wrapper for any WriteSink.
38 :
39 : This class provides type erasure for any type satisfying the
40 : @ref WriteSink concept, enabling runtime polymorphism for
41 : sink write operations. It uses cached awaitable storage to achieve
42 : zero steady-state allocation after construction.
43 :
44 : The wrapper supports two construction modes:
45 : - **Owning**: Pass by value to transfer ownership. The wrapper
46 : allocates storage and owns the sink.
47 : - **Reference**: Pass a pointer to wrap without ownership. The
48 : pointed-to sink must outlive this wrapper.
49 :
50 : @par Awaitable Preallocation
51 : The constructor preallocates storage for the type-erased awaitable.
52 : This reserves all virtual address space at server startup
53 : so memory usage can be measured up front, rather than
54 : allocating piecemeal as traffic arrives.
55 :
56 : @par Thread Safety
57 : Not thread-safe. Concurrent operations on the same wrapper
58 : are undefined behavior.
59 :
60 : @par Example
61 : @code
62 : // Owning - takes ownership of the sink
63 : any_write_sink ws(some_sink{args...});
64 :
65 : // Reference - wraps without ownership
66 : some_sink sink;
67 : any_write_sink ws(&sink);
68 :
69 : const_buffer buf(data, size);
70 : auto [ec, n] = co_await ws.write(std::span(&buf, 1));
71 : auto [ec2] = co_await ws.write_eof();
72 : @endcode
73 :
74 : @see any_write_stream, WriteSink
75 : */
76 : class any_write_sink
77 : {
78 : struct vtable;
79 : struct write_awaitable_ops;
80 : struct eof_awaitable_ops;
81 :
82 : template<WriteSink S>
83 : struct vtable_for_impl;
84 :
85 : void* sink_ = nullptr;
86 : vtable const* vt_ = nullptr;
87 : void* cached_awaitable_ = nullptr;
88 : void* storage_ = nullptr;
89 : write_awaitable_ops const* active_write_ops_ = nullptr;
90 : eof_awaitable_ops const* active_eof_ops_ = nullptr;
91 :
92 : public:
93 : /** Destructor.
94 :
95 : Destroys the owned sink (if any) and releases the cached
96 : awaitable storage.
97 : */
98 : ~any_write_sink();
99 :
100 : /** Default constructor.
101 :
102 : Constructs an empty wrapper. Operations on a default-constructed
103 : wrapper result in undefined behavior.
104 : */
105 : any_write_sink() = default;
106 :
107 : /** Non-copyable.
108 :
109 : The awaitable cache is per-instance and cannot be shared.
110 : */
111 : any_write_sink(any_write_sink const&) = delete;
112 : any_write_sink& operator=(any_write_sink const&) = delete;
113 :
114 : /** Move constructor.
115 :
116 : Transfers ownership of the wrapped sink (if owned) and
117 : cached awaitable storage from `other`. After the move, `other` is
118 : in a default-constructed state.
119 :
120 : @param other The wrapper to move from.
121 : */
122 1 : any_write_sink(any_write_sink&& other) noexcept
123 1 : : sink_(std::exchange(other.sink_, nullptr))
124 1 : , vt_(std::exchange(other.vt_, nullptr))
125 1 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
126 1 : , storage_(std::exchange(other.storage_, nullptr))
127 1 : , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
128 1 : , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
129 : {
130 1 : }
131 :
132 : /** Move assignment operator.
133 :
134 : Destroys any owned sink and releases existing resources,
135 : then transfers ownership from `other`.
136 :
137 : @param other The wrapper to move from.
138 : @return Reference to this wrapper.
139 : */
140 : any_write_sink&
141 : operator=(any_write_sink&& other) noexcept;
142 :
143 : /** Construct by taking ownership of a WriteSink.
144 :
145 : Allocates storage and moves the sink into this wrapper.
146 : The wrapper owns the sink and will destroy it.
147 :
148 : @param s The sink to take ownership of.
149 : */
150 : template<WriteSink S>
151 : requires (!std::same_as<std::decay_t<S>, any_write_sink>)
152 : any_write_sink(S s);
153 :
154 : /** Construct by wrapping a WriteSink without ownership.
155 :
156 : Wraps the given sink by pointer. The sink must remain
157 : valid for the lifetime of this wrapper.
158 :
159 : @param s Pointer to the sink to wrap.
160 : */
161 : template<WriteSink S>
162 : any_write_sink(S* s);
163 :
164 : /** Check if the wrapper contains a valid sink.
165 :
166 : @return `true` if wrapping a sink, `false` if default-constructed
167 : or moved-from.
168 : */
169 : bool
170 9 : has_value() const noexcept
171 : {
172 9 : return sink_ != nullptr;
173 : }
174 :
175 : /** Check if the wrapper contains a valid sink.
176 :
177 : @return `true` if wrapping a sink, `false` if default-constructed
178 : or moved-from.
179 : */
180 : explicit
181 2 : operator bool() const noexcept
182 : {
183 2 : return has_value();
184 : }
185 :
186 : /** Initiate an asynchronous write operation.
187 :
188 : Writes data from the provided buffer sequence. The operation
189 : completes when all bytes have been consumed, or an error
190 : occurs.
191 :
192 : @param buffers The buffer sequence containing data to write.
193 : Passed by value to ensure the sequence lives in the
194 : coroutine frame across suspension points.
195 :
196 : @return An awaitable yielding `(error_code,std::size_t)`.
197 :
198 : @par Preconditions
199 : The wrapper must contain a valid sink (`has_value() == true`).
200 : */
201 : template<ConstBufferSequence CB>
202 : task<io_result<std::size_t>>
203 : write(CB buffers);
204 :
205 : /** Initiate an asynchronous write operation with optional EOF.
206 :
207 : Writes data from the provided buffer sequence, optionally
208 : finalizing the sink afterwards. The operation completes when
209 : all bytes have been consumed and (if eof is true) the sink
210 : is finalized, or an error occurs.
211 :
212 : @param buffers The buffer sequence containing data to write.
213 : Passed by value to ensure the sequence lives in the
214 : coroutine frame across suspension points.
215 :
216 : @param eof If `true`, the sink is finalized after writing
217 : the data.
218 :
219 : @return An awaitable yielding `(error_code,std::size_t)`.
220 :
221 : @par Preconditions
222 : The wrapper must contain a valid sink (`has_value() == true`).
223 : */
224 : template<ConstBufferSequence CB>
225 : task<io_result<std::size_t>>
226 : write(CB buffers, bool eof);
227 :
228 : /** Signal end of data.
229 :
230 : Indicates that no more data will be written to the sink.
231 : The operation completes when the sink is finalized, or
232 : an error occurs.
233 :
234 : @return An awaitable yielding `(error_code)`.
235 :
236 : @par Preconditions
237 : The wrapper must contain a valid sink (`has_value() == true`).
238 : */
239 : auto
240 : write_eof();
241 :
242 : protected:
243 : /** Rebind to a new sink after move.
244 :
245 : Updates the internal pointer to reference a new sink object.
246 : Used by owning wrappers after move assignment when the owned
247 : object has moved to a new location.
248 :
249 : @param new_sink The new sink to bind to. Must be the same
250 : type as the original sink.
251 :
252 : @note Terminates if called with a sink of different type
253 : than the original.
254 : */
255 : template<WriteSink S>
256 : void
257 : rebind(S& new_sink) noexcept
258 : {
259 : if(vt_ != &vtable_for_impl<S>::value)
260 : std::terminate();
261 : sink_ = &new_sink;
262 : }
263 :
264 : private:
265 : auto
266 : write_some_(std::span<const_buffer const> buffers, bool eof);
267 : };
268 :
269 : //----------------------------------------------------------
270 :
271 : struct any_write_sink::write_awaitable_ops
272 : {
273 : bool (*await_ready)(void*);
274 : coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
275 : io_result<std::size_t> (*await_resume)(void*);
276 : void (*destroy)(void*) noexcept;
277 : };
278 :
279 : struct any_write_sink::eof_awaitable_ops
280 : {
281 : bool (*await_ready)(void*);
282 : coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
283 : io_result<> (*await_resume)(void*);
284 : void (*destroy)(void*) noexcept;
285 : };
286 :
287 : struct any_write_sink::vtable
288 : {
289 : void (*destroy)(void*) noexcept;
290 : std::size_t awaitable_size;
291 : std::size_t awaitable_align;
292 : write_awaitable_ops const* (*construct_write_awaitable)(
293 : void* sink,
294 : void* storage,
295 : std::span<const_buffer const> buffers,
296 : bool eof);
297 : eof_awaitable_ops const* (*construct_eof_awaitable)(
298 : void* sink,
299 : void* storage);
300 : };
301 :
302 : template<WriteSink S>
303 : struct any_write_sink::vtable_for_impl
304 : {
305 : using WriteAwaitable = decltype(std::declval<S&>().write(
306 : std::span<const_buffer const>{}, false));
307 : using EofAwaitable = decltype(std::declval<S&>().write_eof());
308 :
309 : static void
310 0 : do_destroy_impl(void* sink) noexcept
311 : {
312 0 : static_cast<S*>(sink)->~S();
313 0 : }
314 :
315 : static write_awaitable_ops const*
316 126 : construct_write_awaitable_impl(
317 : void* sink,
318 : void* storage,
319 : std::span<const_buffer const> buffers,
320 : bool eof)
321 : {
322 126 : auto& s = *static_cast<S*>(sink);
323 126 : ::new(storage) WriteAwaitable(s.write(buffers, eof));
324 :
325 : static constexpr write_awaitable_ops ops = {
326 126 : +[](void* p) {
327 126 : return static_cast<WriteAwaitable*>(p)->await_ready();
328 : },
329 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
330 0 : return detail::call_await_suspend(
331 0 : static_cast<WriteAwaitable*>(p), h, ex, token);
332 : },
333 126 : +[](void* p) {
334 126 : return static_cast<WriteAwaitable*>(p)->await_resume();
335 : },
336 126 : +[](void* p) noexcept {
337 126 : static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
338 : }
339 : };
340 126 : return &ops;
341 : }
342 :
343 : static eof_awaitable_ops const*
344 24 : construct_eof_awaitable_impl(
345 : void* sink,
346 : void* storage)
347 : {
348 24 : auto& s = *static_cast<S*>(sink);
349 24 : ::new(storage) EofAwaitable(s.write_eof());
350 :
351 : static constexpr eof_awaitable_ops ops = {
352 24 : +[](void* p) {
353 24 : return static_cast<EofAwaitable*>(p)->await_ready();
354 : },
355 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
356 0 : return detail::call_await_suspend(
357 0 : static_cast<EofAwaitable*>(p), h, ex, token);
358 : },
359 24 : +[](void* p) {
360 24 : return static_cast<EofAwaitable*>(p)->await_resume();
361 : },
362 24 : +[](void* p) noexcept {
363 24 : static_cast<EofAwaitable*>(p)->~EofAwaitable();
364 : }
365 : };
366 24 : return &ops;
367 : }
368 :
369 : static constexpr std::size_t max_awaitable_size =
370 : sizeof(WriteAwaitable) > sizeof(EofAwaitable)
371 : ? sizeof(WriteAwaitable)
372 : : sizeof(EofAwaitable);
373 :
374 : static constexpr std::size_t max_awaitable_align =
375 : alignof(WriteAwaitable) > alignof(EofAwaitable)
376 : ? alignof(WriteAwaitable)
377 : : alignof(EofAwaitable);
378 :
379 : static constexpr vtable value = {
380 : &do_destroy_impl,
381 : max_awaitable_size,
382 : max_awaitable_align,
383 : &construct_write_awaitable_impl,
384 : &construct_eof_awaitable_impl
385 : };
386 : };
387 :
388 : //----------------------------------------------------------
389 :
390 : inline
391 99 : any_write_sink::~any_write_sink()
392 : {
393 99 : if(storage_)
394 : {
395 0 : vt_->destroy(sink_);
396 0 : ::operator delete(storage_);
397 : }
398 99 : if(cached_awaitable_)
399 96 : ::operator delete(cached_awaitable_);
400 99 : }
401 :
402 : inline any_write_sink&
403 1 : any_write_sink::operator=(any_write_sink&& other) noexcept
404 : {
405 1 : if(this != &other)
406 : {
407 1 : if(storage_)
408 : {
409 0 : vt_->destroy(sink_);
410 0 : ::operator delete(storage_);
411 : }
412 1 : if(cached_awaitable_)
413 0 : ::operator delete(cached_awaitable_);
414 1 : sink_ = std::exchange(other.sink_, nullptr);
415 1 : vt_ = std::exchange(other.vt_, nullptr);
416 1 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
417 1 : storage_ = std::exchange(other.storage_, nullptr);
418 1 : active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
419 1 : active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
420 : }
421 1 : return *this;
422 : }
423 :
424 : template<WriteSink S>
425 : requires (!std::same_as<std::decay_t<S>, any_write_sink>)
426 : any_write_sink::any_write_sink(S s)
427 : : vt_(&vtable_for_impl<S>::value)
428 : {
429 : struct guard {
430 : any_write_sink* self;
431 : bool committed = false;
432 : ~guard() {
433 : if(!committed && self->storage_) {
434 : self->vt_->destroy(self->sink_);
435 : ::operator delete(self->storage_);
436 : self->storage_ = nullptr;
437 : self->sink_ = nullptr;
438 : }
439 : }
440 : } g{this};
441 :
442 : storage_ = ::operator new(sizeof(S));
443 : sink_ = ::new(storage_) S(std::move(s));
444 :
445 : // Preallocate the awaitable storage (sized for max of write/eof)
446 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
447 :
448 : g.committed = true;
449 : }
450 :
451 : template<WriteSink S>
452 96 : any_write_sink::any_write_sink(S* s)
453 96 : : sink_(s)
454 96 : , vt_(&vtable_for_impl<S>::value)
455 : {
456 : // Preallocate the awaitable storage (sized for max of write/eof)
457 96 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
458 96 : }
459 :
460 : //----------------------------------------------------------
461 :
462 : inline auto
463 126 : any_write_sink::write_some_(
464 : std::span<const_buffer const> buffers,
465 : bool eof)
466 : {
467 : struct awaitable
468 : {
469 : any_write_sink* self_;
470 : std::span<const_buffer const> buffers_;
471 : bool eof_;
472 :
473 : bool
474 126 : await_ready() const noexcept
475 : {
476 126 : return false;
477 : }
478 :
479 : coro
480 126 : await_suspend(coro h, executor_ref ex, std::stop_token token)
481 : {
482 : // Construct the underlying awaitable into cached storage
483 252 : self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
484 126 : self_->sink_,
485 126 : self_->cached_awaitable_,
486 : buffers_,
487 126 : eof_);
488 :
489 : // Check if underlying is immediately ready
490 126 : if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
491 126 : return h;
492 :
493 : // Forward to underlying awaitable
494 0 : return self_->active_write_ops_->await_suspend(
495 0 : self_->cached_awaitable_, h, ex, token);
496 : }
497 :
498 : io_result<std::size_t>
499 126 : await_resume()
500 : {
501 : struct guard {
502 : any_write_sink* self;
503 126 : ~guard() {
504 126 : self->active_write_ops_->destroy(self->cached_awaitable_);
505 126 : self->active_write_ops_ = nullptr;
506 126 : }
507 126 : } g{self_};
508 126 : return self_->active_write_ops_->await_resume(
509 224 : self_->cached_awaitable_);
510 126 : }
511 : };
512 126 : return awaitable{this, buffers, eof};
513 : }
514 :
515 : inline auto
516 24 : any_write_sink::write_eof()
517 : {
518 : struct awaitable
519 : {
520 : any_write_sink* self_;
521 :
522 : bool
523 24 : await_ready() const noexcept
524 : {
525 24 : return false;
526 : }
527 :
528 : coro
529 24 : await_suspend(coro h, executor_ref ex, std::stop_token token)
530 : {
531 : // Construct the underlying awaitable into cached storage
532 48 : self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
533 24 : self_->sink_,
534 24 : self_->cached_awaitable_);
535 :
536 : // Check if underlying is immediately ready
537 24 : if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
538 24 : return h;
539 :
540 : // Forward to underlying awaitable
541 0 : return self_->active_eof_ops_->await_suspend(
542 0 : self_->cached_awaitable_, h, ex, token);
543 : }
544 :
545 : io_result<>
546 24 : await_resume()
547 : {
548 : struct guard {
549 : any_write_sink* self;
550 24 : ~guard() {
551 24 : self->active_eof_ops_->destroy(self->cached_awaitable_);
552 24 : self->active_eof_ops_ = nullptr;
553 24 : }
554 24 : } g{self_};
555 24 : return self_->active_eof_ops_->await_resume(
556 41 : self_->cached_awaitable_);
557 24 : }
558 : };
559 24 : return awaitable{this};
560 : }
561 :
562 : template<ConstBufferSequence CB>
563 : task<io_result<std::size_t>>
564 66 : any_write_sink::write(CB buffers)
565 : {
566 66 : return write(buffers, false);
567 : }
568 :
569 : template<ConstBufferSequence CB>
570 : task<io_result<std::size_t>>
571 98 : any_write_sink::write(CB buffers, bool eof)
572 : {
573 : buffer_param<CB> bp(buffers);
574 : std::size_t total = 0;
575 :
576 : for(;;)
577 : {
578 : auto bufs = bp.data();
579 : if(bufs.empty())
580 : break;
581 :
582 : auto [ec, n] = co_await write_some_(bufs, false);
583 : if(ec)
584 : co_return {ec, total + n};
585 : bp.consume(n);
586 : total += n;
587 : }
588 :
589 : if(eof)
590 : {
591 : auto [ec] = co_await write_eof();
592 : if(ec)
593 : co_return {ec, total};
594 : }
595 :
596 : co_return {{}, total};
597 196 : }
598 :
599 : } // namespace capy
600 : } // namespace boost
601 :
602 : #endif
|