libs/capy/include/boost/capy/io/any_write_stream.hpp

82.7% Lines (62/75) 84.0% Functions (21/25) 62.5% Branches (10/16)
libs/capy/include/boost/capy/io/any_write_stream.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
11 #define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_param.hpp>
17 #include <boost/capy/concept/io_awaitable.hpp>
18 #include <boost/capy/concept/write_stream.hpp>
19 #include <boost/capy/coro.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/io_result.hpp>
22
23 #include <concepts>
24 #include <coroutine>
25 #include <cstddef>
26 #include <new>
27 #include <span>
28 #include <stop_token>
29 #include <system_error>
30 #include <utility>
31
32 namespace boost {
33 namespace capy {
34
35 /** Type-erased wrapper for any WriteStream.
36
37 This class provides type erasure for any type satisfying the
38 @ref WriteStream concept, enabling runtime polymorphism for
39 write operations. It uses cached awaitable storage to achieve
40 zero steady-state allocation after construction.
41
42 The wrapper supports two construction modes:
43 - **Owning**: Pass by value to transfer ownership. The wrapper
44 allocates storage and owns the stream.
45 - **Reference**: Pass a pointer to wrap without ownership. The
46 pointed-to stream must outlive this wrapper.
47
48 @par Awaitable Preallocation
49 The constructor preallocates storage for the type-erased awaitable.
50 This reserves all virtual address space at server startup
51 so memory usage can be measured up front, rather than
52 allocating piecemeal as traffic arrives.
53
54 @par Thread Safety
55 Not thread-safe. Concurrent operations on the same wrapper
56 are undefined behavior.
57
58 @par Example
59 @code
60 // Owning - takes ownership of the stream
61 any_write_stream stream(socket{ioc});
62
63 // Reference - wraps without ownership
64 socket sock(ioc);
65 any_write_stream stream(&sock);
66
67 const_buffer buf(data, size);
68 auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
69 @endcode
70
71 @see any_read_stream, any_stream, WriteStream
72 */
73 class any_write_stream
74 {
75 struct vtable;
76 struct awaitable_ops;
77
78 template<WriteStream S>
79 struct vtable_for_impl;
80
81 void* stream_ = nullptr;
82 vtable const* vt_ = nullptr;
83 void* cached_awaitable_ = nullptr;
84 void* storage_ = nullptr;
85 awaitable_ops const* active_ops_ = nullptr;
86
87 public:
88 /** Destructor.
89
90 Destroys the owned stream (if any) and releases the cached
91 awaitable storage.
92 */
93 ~any_write_stream();
94
95 /** Default constructor.
96
97 Constructs an empty wrapper. Operations on a default-constructed
98 wrapper result in undefined behavior.
99 */
100 1 any_write_stream() = default;
101
102 /** Non-copyable.
103
104 The awaitable cache is per-instance and cannot be shared.
105 */
106 any_write_stream(any_write_stream const&) = delete;
107 any_write_stream& operator=(any_write_stream const&) = delete;
108
109 /** Move constructor.
110
111 Transfers ownership of the wrapped stream (if owned) and
112 cached awaitable storage from `other`. After the move, `other` is
113 in a default-constructed state.
114
115 @param other The wrapper to move from.
116 */
117 2 any_write_stream(any_write_stream&& other) noexcept
118 2 : stream_(std::exchange(other.stream_, nullptr))
119 2 , vt_(std::exchange(other.vt_, nullptr))
120 2 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
121 2 , storage_(std::exchange(other.storage_, nullptr))
122 2 , active_ops_(std::exchange(other.active_ops_, nullptr))
123 {
124 2 }
125
126 /** Move assignment operator.
127
128 Destroys any owned stream and releases existing resources,
129 then transfers ownership from `other`.
130
131 @param other The wrapper to move from.
132 @return Reference to this wrapper.
133 */
134 any_write_stream&
135 operator=(any_write_stream&& other) noexcept;
136
137 /** Construct by taking ownership of a WriteStream.
138
139 Allocates storage and moves the stream into this wrapper.
140 The wrapper owns the stream and will destroy it.
141
142 @param s The stream to take ownership of.
143 */
144 template<WriteStream S>
145 requires (!std::same_as<std::decay_t<S>, any_write_stream>)
146 any_write_stream(S s);
147
148 /** Construct by wrapping a WriteStream without ownership.
149
150 Wraps the given stream by pointer. The stream must remain
151 valid for the lifetime of this wrapper.
152
153 @param s Pointer to the stream to wrap.
154 */
155 template<WriteStream S>
156 any_write_stream(S* s);
157
158 /** Check if the wrapper contains a valid stream.
159
160 @return `true` if wrapping a stream, `false` if default-constructed
161 or moved-from.
162 */
163 bool
164 15 has_value() const noexcept
165 {
166 15 return stream_ != nullptr;
167 }
168
169 /** Check if the wrapper contains a valid stream.
170
171 @return `true` if wrapping a stream, `false` if default-constructed
172 or moved-from.
173 */
174 explicit
175 2 operator bool() const noexcept
176 {
177 2 return has_value();
178 }
179
180 /** Initiate an asynchronous write operation.
181
182 Writes data from the provided buffer sequence. The operation
183 completes when at least one byte has been written, or an error
184 occurs.
185
186 @param buffers The buffer sequence containing data to write.
187 Passed by value to ensure the sequence lives in the
188 coroutine frame across suspension points.
189
190 @return An awaitable yielding `(error_code,std::size_t)`.
191
192 @par Preconditions
193 The wrapper must contain a valid stream (`has_value() == true`).
194 */
195 template<ConstBufferSequence CB>
196 auto
197 write_some(CB buffers);
198
199 protected:
200 /** Rebind to a new stream after move.
201
202 Updates the internal pointer to reference a new stream object.
203 Used by owning wrappers after move assignment when the owned
204 object has moved to a new location.
205
206 @param new_stream The new stream to bind to. Must be the same
207 type as the original stream.
208
209 @note Terminates if called with a stream of different type
210 than the original.
211 */
212 template<WriteStream S>
213 void
214 rebind(S& new_stream) noexcept
215 {
216 if(vt_ != &vtable_for_impl<S>::value)
217 std::terminate();
218 stream_ = &new_stream;
219 }
220 };
221
222 //----------------------------------------------------------
223
224 struct any_write_stream::awaitable_ops
225 {
226 bool (*await_ready)(void*);
227 coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
228 io_result<std::size_t> (*await_resume)(void*);
229 void (*destroy)(void*) noexcept;
230 };
231
232 struct any_write_stream::vtable
233 {
234 void (*destroy)(void*) noexcept;
235 std::size_t awaitable_size;
236 std::size_t awaitable_align;
237 awaitable_ops const* (*construct_awaitable)(
238 void* stream,
239 void* storage,
240 std::span<const_buffer const> buffers);
241 };
242
243 template<WriteStream S>
244 struct any_write_stream::vtable_for_impl
245 {
246 using Awaitable = decltype(std::declval<S&>().write_some(
247 std::span<const_buffer const>{}));
248
249 static void
250 do_destroy_impl(void* stream) noexcept
251 {
252 static_cast<S*>(stream)->~S();
253 }
254
255 static awaitable_ops const*
256 60 construct_awaitable_impl(
257 void* stream,
258 void* storage,
259 std::span<const_buffer const> buffers)
260 {
261 60 auto& s = *static_cast<S*>(stream);
262 60 ::new(storage) Awaitable(s.write_some(buffers));
263
264 static constexpr awaitable_ops ops = {
265 60 +[](void* p) {
266 60 return static_cast<Awaitable*>(p)->await_ready();
267 },
268 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
269 return detail::call_await_suspend(
270 static_cast<Awaitable*>(p), h, ex, token);
271 },
272 60 +[](void* p) {
273 60 return static_cast<Awaitable*>(p)->await_resume();
274 },
275 60 +[](void* p) noexcept {
276 60 static_cast<Awaitable*>(p)->~Awaitable();
277 }
278 };
279 60 return &ops;
280 }
281
282 static constexpr vtable value = {
283 &do_destroy_impl,
284 sizeof(Awaitable),
285 alignof(Awaitable),
286 &construct_awaitable_impl
287 };
288 };
289
290 //----------------------------------------------------------
291
292 inline
293 72 any_write_stream::~any_write_stream()
294 {
295
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 72 times.
72 if(storage_)
296 {
297 vt_->destroy(stream_);
298 ::operator delete(storage_);
299 }
300
2/2
✓ Branch 0 taken 65 times.
✓ Branch 1 taken 7 times.
72 if(cached_awaitable_)
301 65 ::operator delete(cached_awaitable_);
302 72 }
303
304 inline any_write_stream&
305 3 any_write_stream::operator=(any_write_stream&& other) noexcept
306 {
307
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if(this != &other)
308 {
309
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if(storage_)
310 {
311 vt_->destroy(stream_);
312 ::operator delete(storage_);
313 }
314
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if(cached_awaitable_)
315 ::operator delete(cached_awaitable_);
316 3 stream_ = std::exchange(other.stream_, nullptr);
317 3 vt_ = std::exchange(other.vt_, nullptr);
318 3 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
319 3 storage_ = std::exchange(other.storage_, nullptr);
320 3 active_ops_ = std::exchange(other.active_ops_, nullptr);
321 }
322 3 return *this;
323 }
324
325 template<WriteStream S>
326 requires (!std::same_as<std::decay_t<S>, any_write_stream>)
327 any_write_stream::any_write_stream(S s)
328 : vt_(&vtable_for_impl<S>::value)
329 {
330 struct guard {
331 any_write_stream* self;
332 bool committed = false;
333 ~guard() {
334 if(!committed && self->storage_) {
335 self->vt_->destroy(self->stream_);
336 ::operator delete(self->storage_);
337 self->storage_ = nullptr;
338 self->stream_ = nullptr;
339 }
340 }
341 } g{this};
342
343 storage_ = ::operator new(sizeof(S));
344 stream_ = ::new(storage_) S(std::move(s));
345
346 // Preallocate the awaitable storage
347 cached_awaitable_ = ::operator new(vt_->awaitable_size);
348
349 g.committed = true;
350 }
351
352 template<WriteStream S>
353 65 any_write_stream::any_write_stream(S* s)
354 65 : stream_(s)
355 65 , vt_(&vtable_for_impl<S>::value)
356 {
357 // Preallocate the awaitable storage
358 65 cached_awaitable_ = ::operator new(vt_->awaitable_size);
359 65 }
360
361 //----------------------------------------------------------
362
363 template<ConstBufferSequence CB>
364 auto
365 60 any_write_stream::write_some(CB buffers)
366 {
367 struct awaitable
368 {
369 any_write_stream* self_;
370 buffer_param<CB> bp_;
371
372 bool
373 60 await_ready() const noexcept
374 {
375 60 return false;
376 }
377
378 coro
379 60 await_suspend(coro h, executor_ref ex, std::stop_token token)
380 {
381 // Construct the underlying awaitable into cached storage
382 60 self_->active_ops_ = self_->vt_->construct_awaitable(
383 60 self_->stream_,
384
1/1
✓ Branch 1 taken 10 times.
60 self_->cached_awaitable_,
385
1/1
✓ Branch 1 taken 10 times.
60 bp_.data());
386
387 // Check if underlying is immediately ready
388
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
60 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
389 60 return h;
390
391 // Forward to underlying awaitable
392 return self_->active_ops_->await_suspend(
393 self_->cached_awaitable_, h, ex, token);
394 }
395
396 io_result<std::size_t>
397 60 await_resume()
398 {
399 struct guard {
400 any_write_stream* self;
401 60 ~guard() {
402 60 self->active_ops_->destroy(self->cached_awaitable_);
403 60 self->active_ops_ = nullptr;
404 60 }
405 60 } g{self_};
406 60 return self_->active_ops_->await_resume(
407
1/1
✓ Branch 1 taken 7 times.
103 self_->cached_awaitable_);
408 60 }
409 };
410 60 return awaitable{this, buffer_param<CB>(buffers)};
411 }
412
413 } // namespace capy
414 } // namespace boost
415
416 #endif
417