Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
11 : #define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_param.hpp>
17 : #include <boost/capy/concept/io_awaitable.hpp>
18 : #include <boost/capy/concept/write_stream.hpp>
19 : #include <boost/capy/coro.hpp>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <boost/capy/io_result.hpp>
22 :
23 : #include <concepts>
24 : #include <coroutine>
25 : #include <cstddef>
26 : #include <new>
27 : #include <span>
28 : #include <stop_token>
29 : #include <system_error>
30 : #include <utility>
31 :
32 : namespace boost {
33 : namespace capy {
34 :
35 : /** Type-erased wrapper for any WriteStream.
36 :
37 : This class provides type erasure for any type satisfying the
38 : @ref WriteStream concept, enabling runtime polymorphism for
39 : write operations. It uses cached awaitable storage to achieve
40 : zero steady-state allocation after construction.
41 :
42 : The wrapper supports two construction modes:
43 : - **Owning**: Pass by value to transfer ownership. The wrapper
44 : allocates storage and owns the stream.
45 : - **Reference**: Pass a pointer to wrap without ownership. The
46 : pointed-to stream must outlive this wrapper.
47 :
48 : @par Awaitable Preallocation
49 : The constructor preallocates storage for the type-erased awaitable.
50 : This reserves all virtual address space at server startup
51 : so memory usage can be measured up front, rather than
52 : allocating piecemeal as traffic arrives.
53 :
54 : @par Thread Safety
55 : Not thread-safe. Concurrent operations on the same wrapper
56 : are undefined behavior.
57 :
58 : @par Example
59 : @code
60 : // Owning - takes ownership of the stream
61 : any_write_stream stream(socket{ioc});
62 :
63 : // Reference - wraps without ownership
64 : socket sock(ioc);
65 : any_write_stream stream(&sock);
66 :
67 : const_buffer buf(data, size);
68 : auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
69 : @endcode
70 :
71 : @see any_read_stream, any_stream, WriteStream
72 : */
73 : class any_write_stream
74 : {
75 : struct vtable;
76 : struct awaitable_ops;
77 :
78 : template<WriteStream S>
79 : struct vtable_for_impl;
80 :
81 : void* stream_ = nullptr;
82 : vtable const* vt_ = nullptr;
83 : void* cached_awaitable_ = nullptr;
84 : void* storage_ = nullptr;
85 : awaitable_ops const* active_ops_ = nullptr;
86 :
87 : public:
88 : /** Destructor.
89 :
90 : Destroys the owned stream (if any) and releases the cached
91 : awaitable storage.
92 : */
93 : ~any_write_stream();
94 :
95 : /** Default constructor.
96 :
97 : Constructs an empty wrapper. Operations on a default-constructed
98 : wrapper result in undefined behavior.
99 : */
100 1 : any_write_stream() = default;
101 :
102 : /** Non-copyable.
103 :
104 : The awaitable cache is per-instance and cannot be shared.
105 : */
106 : any_write_stream(any_write_stream const&) = delete;
107 : any_write_stream& operator=(any_write_stream const&) = delete;
108 :
109 : /** Move constructor.
110 :
111 : Transfers ownership of the wrapped stream (if owned) and
112 : cached awaitable storage from `other`. After the move, `other` is
113 : in a default-constructed state.
114 :
115 : @param other The wrapper to move from.
116 : */
117 2 : any_write_stream(any_write_stream&& other) noexcept
118 2 : : stream_(std::exchange(other.stream_, nullptr))
119 2 : , vt_(std::exchange(other.vt_, nullptr))
120 2 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
121 2 : , storage_(std::exchange(other.storage_, nullptr))
122 2 : , active_ops_(std::exchange(other.active_ops_, nullptr))
123 : {
124 2 : }
125 :
126 : /** Move assignment operator.
127 :
128 : Destroys any owned stream and releases existing resources,
129 : then transfers ownership from `other`.
130 :
131 : @param other The wrapper to move from.
132 : @return Reference to this wrapper.
133 : */
134 : any_write_stream&
135 : operator=(any_write_stream&& other) noexcept;
136 :
137 : /** Construct by taking ownership of a WriteStream.
138 :
139 : Allocates storage and moves the stream into this wrapper.
140 : The wrapper owns the stream and will destroy it.
141 :
142 : @param s The stream to take ownership of.
143 : */
144 : template<WriteStream S>
145 : requires (!std::same_as<std::decay_t<S>, any_write_stream>)
146 : any_write_stream(S s);
147 :
148 : /** Construct by wrapping a WriteStream without ownership.
149 :
150 : Wraps the given stream by pointer. The stream must remain
151 : valid for the lifetime of this wrapper.
152 :
153 : @param s Pointer to the stream to wrap.
154 : */
155 : template<WriteStream S>
156 : any_write_stream(S* s);
157 :
158 : /** Check if the wrapper contains a valid stream.
159 :
160 : @return `true` if wrapping a stream, `false` if default-constructed
161 : or moved-from.
162 : */
163 : bool
164 15 : has_value() const noexcept
165 : {
166 15 : return stream_ != nullptr;
167 : }
168 :
169 : /** Check if the wrapper contains a valid stream.
170 :
171 : @return `true` if wrapping a stream, `false` if default-constructed
172 : or moved-from.
173 : */
174 : explicit
175 2 : operator bool() const noexcept
176 : {
177 2 : return has_value();
178 : }
179 :
180 : /** Initiate an asynchronous write operation.
181 :
182 : Writes data from the provided buffer sequence. The operation
183 : completes when at least one byte has been written, or an error
184 : occurs.
185 :
186 : @param buffers The buffer sequence containing data to write.
187 : Passed by value to ensure the sequence lives in the
188 : coroutine frame across suspension points.
189 :
190 : @return An awaitable yielding `(error_code,std::size_t)`.
191 :
192 : @par Preconditions
193 : The wrapper must contain a valid stream (`has_value() == true`).
194 : */
195 : template<ConstBufferSequence CB>
196 : auto
197 : write_some(CB buffers);
198 :
199 : protected:
200 : /** Rebind to a new stream after move.
201 :
202 : Updates the internal pointer to reference a new stream object.
203 : Used by owning wrappers after move assignment when the owned
204 : object has moved to a new location.
205 :
206 : @param new_stream The new stream to bind to. Must be the same
207 : type as the original stream.
208 :
209 : @note Terminates if called with a stream of different type
210 : than the original.
211 : */
212 : template<WriteStream S>
213 : void
214 : rebind(S& new_stream) noexcept
215 : {
216 : if(vt_ != &vtable_for_impl<S>::value)
217 : std::terminate();
218 : stream_ = &new_stream;
219 : }
220 : };
221 :
222 : //----------------------------------------------------------
223 :
224 : struct any_write_stream::awaitable_ops
225 : {
226 : bool (*await_ready)(void*);
227 : coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
228 : io_result<std::size_t> (*await_resume)(void*);
229 : void (*destroy)(void*) noexcept;
230 : };
231 :
232 : struct any_write_stream::vtable
233 : {
234 : void (*destroy)(void*) noexcept;
235 : std::size_t awaitable_size;
236 : std::size_t awaitable_align;
237 : awaitable_ops const* (*construct_awaitable)(
238 : void* stream,
239 : void* storage,
240 : std::span<const_buffer const> buffers);
241 : };
242 :
243 : template<WriteStream S>
244 : struct any_write_stream::vtable_for_impl
245 : {
246 : using Awaitable = decltype(std::declval<S&>().write_some(
247 : std::span<const_buffer const>{}));
248 :
249 : static void
250 0 : do_destroy_impl(void* stream) noexcept
251 : {
252 0 : static_cast<S*>(stream)->~S();
253 0 : }
254 :
255 : static awaitable_ops const*
256 60 : construct_awaitable_impl(
257 : void* stream,
258 : void* storage,
259 : std::span<const_buffer const> buffers)
260 : {
261 60 : auto& s = *static_cast<S*>(stream);
262 60 : ::new(storage) Awaitable(s.write_some(buffers));
263 :
264 : static constexpr awaitable_ops ops = {
265 60 : +[](void* p) {
266 60 : return static_cast<Awaitable*>(p)->await_ready();
267 : },
268 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
269 0 : return detail::call_await_suspend(
270 0 : static_cast<Awaitable*>(p), h, ex, token);
271 : },
272 60 : +[](void* p) {
273 60 : return static_cast<Awaitable*>(p)->await_resume();
274 : },
275 60 : +[](void* p) noexcept {
276 60 : static_cast<Awaitable*>(p)->~Awaitable();
277 : }
278 : };
279 60 : return &ops;
280 : }
281 :
282 : static constexpr vtable value = {
283 : &do_destroy_impl,
284 : sizeof(Awaitable),
285 : alignof(Awaitable),
286 : &construct_awaitable_impl
287 : };
288 : };
289 :
290 : //----------------------------------------------------------
291 :
292 : inline
293 72 : any_write_stream::~any_write_stream()
294 : {
295 72 : if(storage_)
296 : {
297 0 : vt_->destroy(stream_);
298 0 : ::operator delete(storage_);
299 : }
300 72 : if(cached_awaitable_)
301 65 : ::operator delete(cached_awaitable_);
302 72 : }
303 :
304 : inline any_write_stream&
305 3 : any_write_stream::operator=(any_write_stream&& other) noexcept
306 : {
307 3 : if(this != &other)
308 : {
309 3 : if(storage_)
310 : {
311 0 : vt_->destroy(stream_);
312 0 : ::operator delete(storage_);
313 : }
314 3 : if(cached_awaitable_)
315 0 : ::operator delete(cached_awaitable_);
316 3 : stream_ = std::exchange(other.stream_, nullptr);
317 3 : vt_ = std::exchange(other.vt_, nullptr);
318 3 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
319 3 : storage_ = std::exchange(other.storage_, nullptr);
320 3 : active_ops_ = std::exchange(other.active_ops_, nullptr);
321 : }
322 3 : return *this;
323 : }
324 :
325 : template<WriteStream S>
326 : requires (!std::same_as<std::decay_t<S>, any_write_stream>)
327 : any_write_stream::any_write_stream(S s)
328 : : vt_(&vtable_for_impl<S>::value)
329 : {
330 : struct guard {
331 : any_write_stream* self;
332 : bool committed = false;
333 : ~guard() {
334 : if(!committed && self->storage_) {
335 : self->vt_->destroy(self->stream_);
336 : ::operator delete(self->storage_);
337 : self->storage_ = nullptr;
338 : self->stream_ = nullptr;
339 : }
340 : }
341 : } g{this};
342 :
343 : storage_ = ::operator new(sizeof(S));
344 : stream_ = ::new(storage_) S(std::move(s));
345 :
346 : // Preallocate the awaitable storage
347 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
348 :
349 : g.committed = true;
350 : }
351 :
352 : template<WriteStream S>
353 65 : any_write_stream::any_write_stream(S* s)
354 65 : : stream_(s)
355 65 : , vt_(&vtable_for_impl<S>::value)
356 : {
357 : // Preallocate the awaitable storage
358 65 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
359 65 : }
360 :
361 : //----------------------------------------------------------
362 :
363 : template<ConstBufferSequence CB>
364 : auto
365 60 : any_write_stream::write_some(CB buffers)
366 : {
367 : struct awaitable
368 : {
369 : any_write_stream* self_;
370 : buffer_param<CB> bp_;
371 :
372 : bool
373 60 : await_ready() const noexcept
374 : {
375 60 : return false;
376 : }
377 :
378 : coro
379 60 : await_suspend(coro h, executor_ref ex, std::stop_token token)
380 : {
381 : // Construct the underlying awaitable into cached storage
382 60 : self_->active_ops_ = self_->vt_->construct_awaitable(
383 60 : self_->stream_,
384 60 : self_->cached_awaitable_,
385 60 : bp_.data());
386 :
387 : // Check if underlying is immediately ready
388 60 : if(self_->active_ops_->await_ready(self_->cached_awaitable_))
389 60 : return h;
390 :
391 : // Forward to underlying awaitable
392 0 : return self_->active_ops_->await_suspend(
393 0 : self_->cached_awaitable_, h, ex, token);
394 : }
395 :
396 : io_result<std::size_t>
397 60 : await_resume()
398 : {
399 : struct guard {
400 : any_write_stream* self;
401 60 : ~guard() {
402 60 : self->active_ops_->destroy(self->cached_awaitable_);
403 60 : self->active_ops_ = nullptr;
404 60 : }
405 60 : } g{self_};
406 60 : return self_->active_ops_->await_resume(
407 103 : self_->cached_awaitable_);
408 60 : }
409 : };
410 60 : return awaitable{this, buffer_param<CB>(buffers)};
411 : }
412 :
413 : } // namespace capy
414 : } // namespace boost
415 :
416 : #endif
|