LCOV - code coverage report
Current view: top level - capy/io - any_buffer_source.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 83.5 % 79 66
Test Date: 2026-02-02 05:00:52 Functions: 89.5 % 19 17

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
      11              : #define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/detail/await_suspend_helper.hpp>
      15              : #include <boost/capy/buffers.hpp>
      16              : #include <boost/capy/buffers/buffer_copy.hpp>
      17              : #include <boost/capy/buffers/slice.hpp>
      18              : #include <boost/capy/concept/buffer_source.hpp>
      19              : #include <boost/capy/concept/io_awaitable.hpp>
      20              : #include <boost/capy/concept/read_source.hpp>
      21              : #include <boost/capy/coro.hpp>
      22              : #include <boost/capy/error.hpp>
      23              : #include <boost/capy/ex/executor_ref.hpp>
      24              : #include <boost/capy/io_result.hpp>
      25              : #include <boost/capy/task.hpp>
      26              : 
      27              : #include <concepts>
      28              : #include <coroutine>
      29              : #include <cstddef>
      30              : #include <exception>
      31              : #include <new>
      32              : #include <span>
      33              : #include <stop_token>
      34              : #include <system_error>
      35              : #include <utility>
      36              : 
      37              : namespace boost {
      38              : namespace capy {
      39              : 
      40              : /** Type-erased wrapper for any BufferSource.
      41              : 
      42              :     This class provides type erasure for any type satisfying the
      43              :     @ref BufferSource concept, enabling runtime polymorphism for
      44              :     buffer pull operations. The wrapper also satisfies @ref ReadSource,
      45              :     allowing it to be used with code expecting either interface.
      46              :     It uses cached awaitable storage to achieve zero steady-state
      47              :     allocation after construction.
      48              : 
      49              :     The wrapper also satisfies @ref ReadSource through the templated
      50              :     @ref read method. This method copies data from the source's
      51              :     internal buffers into the caller's buffers, incurring one extra
      52              :     buffer copy compared to using @ref pull and @ref consume directly.
      53              : 
      54              :     The wrapper supports two construction modes:
      55              :     - **Owning**: Pass by value to transfer ownership. The wrapper
      56              :       allocates storage and owns the source.
      57              :     - **Reference**: Pass a pointer to wrap without ownership. The
      58              :       pointed-to source must outlive this wrapper.
      59              : 
      60              :     @par Awaitable Preallocation
      61              :     The constructor preallocates storage for the type-erased awaitable.
      62              :     This reserves all virtual address space at server startup
      63              :     so memory usage can be measured up front, rather than
      64              :     allocating piecemeal as traffic arrives.
      65              : 
      66              :     @par Thread Safety
      67              :     Not thread-safe. Concurrent operations on the same wrapper
      68              :     are undefined behavior.
      69              : 
      70              :     @par Example
      71              :     @code
      72              :     // Owning - takes ownership of the source
      73              :     any_buffer_source abs(some_buffer_source{args...});
      74              : 
      75              :     // Reference - wraps without ownership
      76              :     some_buffer_source src;
      77              :     any_buffer_source abs(&src);
      78              : 
      79              :     const_buffer arr[16];
      80              :     auto [ec, bufs] = co_await abs.pull(arr);
      81              :     @endcode
      82              : 
      83              :     @see any_buffer_sink, BufferSource, ReadSource
      84              : */
      85              : class any_buffer_source
      86              : {
      87              :     struct vtable;
      88              :     struct awaitable_ops;
      89              : 
      90              :     template<BufferSource S>
      91              :     struct vtable_for_impl;
      92              : 
      93              :     void* source_ = nullptr;
      94              :     vtable const* vt_ = nullptr;
      95              :     void* cached_awaitable_ = nullptr;
      96              :     void* storage_ = nullptr;
      97              :     awaitable_ops const* active_ops_ = nullptr;
      98              : 
      99              : public:
     100              :     /** Destructor.
     101              : 
     102              :         Destroys the owned source (if any) and releases the cached
     103              :         awaitable storage.
     104              :     */
     105              :     ~any_buffer_source();
     106              : 
     107              :     /** Default constructor.
     108              : 
     109              :         Constructs an empty wrapper. Operations on a default-constructed
     110              :         wrapper result in undefined behavior.
     111              :     */
     112              :     any_buffer_source() = default;
     113              : 
     114              :     /** Non-copyable.
     115              : 
     116              :         The awaitable cache is per-instance and cannot be shared.
     117              :     */
     118              :     any_buffer_source(any_buffer_source const&) = delete;
     119              :     any_buffer_source& operator=(any_buffer_source const&) = delete;
     120              : 
     121              :     /** Move constructor.
     122              : 
     123              :         Transfers ownership of the wrapped source (if owned) and
     124              :         cached awaitable storage from `other`. After the move, `other` is
     125              :         in a default-constructed state.
     126              : 
     127              :         @param other The wrapper to move from.
     128              :     */
     129            1 :     any_buffer_source(any_buffer_source&& other) noexcept
     130            1 :         : source_(std::exchange(other.source_, nullptr))
     131            1 :         , vt_(std::exchange(other.vt_, nullptr))
     132            1 :         , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
     133            1 :         , storage_(std::exchange(other.storage_, nullptr))
     134            1 :         , active_ops_(std::exchange(other.active_ops_, nullptr))
     135              :     {
     136            1 :     }
     137              : 
     138              :     /** Move assignment operator.
     139              : 
     140              :         Destroys any owned source and releases existing resources,
     141              :         then transfers ownership from `other`.
     142              : 
     143              :         @param other The wrapper to move from.
     144              :         @return Reference to this wrapper.
     145              :     */
     146              :     any_buffer_source&
     147              :     operator=(any_buffer_source&& other) noexcept;
     148              : 
     149              :     /** Construct by taking ownership of a BufferSource.
     150              : 
     151              :         Allocates storage and moves the source into this wrapper.
     152              :         The wrapper owns the source and will destroy it.
     153              : 
     154              :         @param s The source to take ownership of.
     155              :     */
     156              :     template<BufferSource S>
     157              :         requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
     158              :     any_buffer_source(S s);
     159              : 
     160              :     /** Construct by wrapping a BufferSource without ownership.
     161              : 
     162              :         Wraps the given source by pointer. The source must remain
     163              :         valid for the lifetime of this wrapper.
     164              : 
     165              :         @param s Pointer to the source to wrap.
     166              :     */
     167              :     template<BufferSource S>
     168              :     any_buffer_source(S* s);
     169              : 
     170              :     /** Check if the wrapper contains a valid source.
     171              : 
     172              :         @return `true` if wrapping a source, `false` if default-constructed
     173              :             or moved-from.
     174              :     */
     175              :     bool
     176            9 :     has_value() const noexcept
     177              :     {
     178            9 :         return source_ != nullptr;
     179              :     }
     180              : 
     181              :     /** Check if the wrapper contains a valid source.
     182              : 
     183              :         @return `true` if wrapping a source, `false` if default-constructed
     184              :             or moved-from.
     185              :     */
     186              :     explicit
     187            2 :     operator bool() const noexcept
     188              :     {
     189            2 :         return has_value();
     190              :     }
     191              : 
     192              :     /** Consume bytes from the source.
     193              : 
     194              :         Advances the internal read position of the underlying source
     195              :         by the specified number of bytes. The next call to @ref pull
     196              :         returns data starting after the consumed bytes.
     197              : 
     198              :         @param n The number of bytes to consume. Must not exceed the
     199              :         total size of buffers returned by the previous @ref pull.
     200              : 
     201              :         @par Preconditions
     202              :         The wrapper must contain a valid source (`has_value() == true`).
     203              :     */
     204              :     void
     205              :     consume(std::size_t n) noexcept;
     206              : 
     207              :     /** Pull buffer data from the source.
     208              : 
     209              :         Fills the provided span with buffer descriptors from the
     210              :         underlying source. The operation completes when data is
     211              :         available, the source is exhausted, or an error occurs.
     212              : 
     213              :         @param dest Span of const_buffer to fill.
     214              : 
     215              :         @return An awaitable yielding `(error_code,std::span<const_buffer>)`.
     216              :             On success with data, a non-empty span of filled buffers.
     217              :             On success with empty span, source is exhausted.
     218              : 
     219              :         @par Preconditions
     220              :         The wrapper must contain a valid source (`has_value() == true`).
     221              :     */
     222              :     auto
     223              :     pull(std::span<const_buffer> dest);
     224              : 
     225              :     /** Read data into a mutable buffer sequence.
     226              : 
     227              :         Fills the provided buffer sequence by pulling data from the
     228              :         underlying source and copying it into the caller's buffers.
     229              :         This satisfies @ref ReadSource but incurs a copy; for zero-copy
     230              :         access, use @ref pull and @ref consume instead.
     231              : 
     232              :         @note This operation copies data from the source's internal
     233              :         buffers into the caller's buffers. For zero-copy reads,
     234              :         use @ref pull and @ref consume directly.
     235              : 
     236              :         @param buffers The buffer sequence to fill.
     237              : 
     238              :         @return An awaitable yielding `(error_code,std::size_t)`.
     239              :             On success, `n == buffer_size(buffers)`.
     240              :             On EOF, `ec == error::eof` and `n` is bytes transferred.
     241              : 
     242              :         @par Preconditions
     243              :         The wrapper must contain a valid source (`has_value() == true`).
     244              : 
     245              :         @see pull, consume
     246              :     */
     247              :     template<MutableBufferSequence MB>
     248              :     task<io_result<std::size_t>>
     249              :     read(MB buffers);
     250              : 
     251              : protected:
     252              :     /** Rebind to a new source after move.
     253              : 
     254              :         Updates the internal pointer to reference a new source object.
     255              :         Used by owning wrappers after move assignment when the owned
     256              :         object has moved to a new location.
     257              : 
     258              :         @param new_source The new source to bind to. Must be the same
     259              :             type as the original source.
     260              : 
     261              :         @note Terminates if called with a source of different type
     262              :             than the original.
     263              :     */
     264              :     template<BufferSource S>
     265              :     void
     266              :     rebind(S& new_source) noexcept
     267              :     {
     268              :         if(vt_ != &vtable_for_impl<S>::value)
     269              :             std::terminate();
     270              :         source_ = &new_source;
     271              :     }
     272              : };
     273              : 
     274              : //----------------------------------------------------------
     275              : 
     276              : struct any_buffer_source::awaitable_ops
     277              : {
     278              :     bool (*await_ready)(void*);
     279              :     coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
     280              :     io_result<std::span<const_buffer>> (*await_resume)(void*);
     281              :     void (*destroy)(void*) noexcept;
     282              : };
     283              : 
     284              : struct any_buffer_source::vtable
     285              : {
     286              :     void (*destroy)(void*) noexcept;
     287              :     void (*do_consume)(void* source, std::size_t n) noexcept;
     288              :     std::size_t awaitable_size;
     289              :     std::size_t awaitable_align;
     290              :     awaitable_ops const* (*construct_awaitable)(
     291              :         void* source,
     292              :         void* storage,
     293              :         std::span<const_buffer> dest);
     294              : };
     295              : 
     296              : template<BufferSource S>
     297              : struct any_buffer_source::vtable_for_impl
     298              : {
     299              :     using Awaitable = decltype(std::declval<S&>().pull(
     300              :         std::declval<std::span<const_buffer>>()));
     301              : 
     302              :     static void
     303            0 :     do_destroy_impl(void* source) noexcept
     304              :     {
     305            0 :         static_cast<S*>(source)->~S();
     306            0 :     }
     307              : 
     308              :     static void
     309           39 :     do_consume_impl(void* source, std::size_t n) noexcept
     310              :     {
     311           39 :         static_cast<S*>(source)->consume(n);
     312           39 :     }
     313              : 
     314              :     static awaitable_ops const*
     315           92 :     construct_awaitable_impl(
     316              :         void* source,
     317              :         void* storage,
     318              :         std::span<const_buffer> dest)
     319              :     {
     320           92 :         auto& s = *static_cast<S*>(source);
     321           92 :         ::new(storage) Awaitable(s.pull(dest));
     322              : 
     323              :         static constexpr awaitable_ops ops = {
     324           92 :             +[](void* p) {
     325           92 :                 return static_cast<Awaitable*>(p)->await_ready();
     326              :             },
     327            0 :             +[](void* p, coro h, executor_ref ex, std::stop_token token) {
     328            0 :                 return detail::call_await_suspend(
     329            0 :                     static_cast<Awaitable*>(p), h, ex, token);
     330              :             },
     331           92 :             +[](void* p) {
     332           92 :                 return static_cast<Awaitable*>(p)->await_resume();
     333              :             },
     334           92 :             +[](void* p) noexcept {
     335           92 :                 static_cast<Awaitable*>(p)->~Awaitable();
     336              :             }
     337              :         };
     338           92 :         return &ops;
     339              :     }
     340              : 
     341              :     static constexpr vtable value = {
     342              :         &do_destroy_impl,
     343              :         &do_consume_impl,
     344              :         sizeof(Awaitable),
     345              :         alignof(Awaitable),
     346              :         &construct_awaitable_impl
     347              :     };
     348              : };
     349              : 
     350              : //----------------------------------------------------------
     351              : 
     352              : inline
     353           59 : any_buffer_source::~any_buffer_source()
     354              : {
     355           59 :     if(storage_)
     356              :     {
     357            0 :         vt_->destroy(source_);
     358            0 :         ::operator delete(storage_);
     359              :     }
     360           59 :     if(cached_awaitable_)
     361           56 :         ::operator delete(cached_awaitable_);
     362           59 : }
     363              : 
     364              : inline any_buffer_source&
     365            1 : any_buffer_source::operator=(any_buffer_source&& other) noexcept
     366              : {
     367            1 :     if(this != &other)
     368              :     {
     369            1 :         if(storage_)
     370              :         {
     371            0 :             vt_->destroy(source_);
     372            0 :             ::operator delete(storage_);
     373              :         }
     374            1 :         if(cached_awaitable_)
     375            0 :             ::operator delete(cached_awaitable_);
     376            1 :         source_ = std::exchange(other.source_, nullptr);
     377            1 :         vt_ = std::exchange(other.vt_, nullptr);
     378            1 :         cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
     379            1 :         storage_ = std::exchange(other.storage_, nullptr);
     380            1 :         active_ops_ = std::exchange(other.active_ops_, nullptr);
     381              :     }
     382            1 :     return *this;
     383              : }
     384              : 
     385              : template<BufferSource S>
     386              :     requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
     387              : any_buffer_source::any_buffer_source(S s)
     388              :     : vt_(&vtable_for_impl<S>::value)
     389              : {
     390              :     struct guard {
     391              :         any_buffer_source* self;
     392              :         bool committed = false;
     393              :         ~guard() {
     394              :             if(!committed && self->storage_) {
     395              :                 self->vt_->destroy(self->source_);
     396              :                 ::operator delete(self->storage_);
     397              :                 self->storage_ = nullptr;
     398              :                 self->source_ = nullptr;
     399              :             }
     400              :         }
     401              :     } g{this};
     402              : 
     403              :     storage_ = ::operator new(sizeof(S));
     404              :     source_ = ::new(storage_) S(std::move(s));
     405              : 
     406              :     // Preallocate the awaitable storage
     407              :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     408              : 
     409              :     g.committed = true;
     410              : }
     411              : 
     412              : template<BufferSource S>
     413           56 : any_buffer_source::any_buffer_source(S* s)
     414           56 :     : source_(s)
     415           56 :     , vt_(&vtable_for_impl<S>::value)
     416              : {
     417              :     // Preallocate the awaitable storage
     418           56 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     419           56 : }
     420              : 
     421              : //----------------------------------------------------------
     422              : 
     423              : inline void
     424           39 : any_buffer_source::consume(std::size_t n) noexcept
     425              : {
     426           39 :     vt_->do_consume(source_, n);
     427           39 : }
     428              : 
     429              : inline auto
     430           92 : any_buffer_source::pull(std::span<const_buffer> dest)
     431              : {
     432              :     struct awaitable
     433              :     {
     434              :         any_buffer_source* self_;
     435              :         std::span<const_buffer> dest_;
     436              : 
     437              :         bool
     438           92 :         await_ready() const noexcept
     439              :         {
     440           92 :             return false;
     441              :         }
     442              : 
     443              :         coro
     444           92 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     445              :         {
     446              :             // Construct the underlying awaitable into cached storage
     447          184 :             self_->active_ops_ = self_->vt_->construct_awaitable(
     448           92 :                 self_->source_,
     449           92 :                 self_->cached_awaitable_,
     450              :                 dest_);
     451              : 
     452              :             // Check if underlying is immediately ready
     453           92 :             if(self_->active_ops_->await_ready(self_->cached_awaitable_))
     454           92 :                 return h;
     455              : 
     456              :             // Forward to underlying awaitable
     457            0 :             return self_->active_ops_->await_suspend(
     458            0 :                 self_->cached_awaitable_, h, ex, token);
     459              :         }
     460              : 
     461              :         io_result<std::span<const_buffer>>
     462           92 :         await_resume()
     463              :         {
     464              :             struct guard {
     465              :                 any_buffer_source* self;
     466           92 :                 ~guard() {
     467           92 :                     self->active_ops_->destroy(self->cached_awaitable_);
     468           92 :                     self->active_ops_ = nullptr;
     469           92 :                 }
     470           92 :             } g{self_};
     471           92 :             return self_->active_ops_->await_resume(
     472          165 :                 self_->cached_awaitable_);
     473           92 :         }
     474              :     };
     475           92 :     return awaitable{this, dest};
     476              : }
     477              : 
     478              : template<MutableBufferSequence MB>
     479              : task<io_result<std::size_t>>
     480              : any_buffer_source::read(MB buffers)
     481              : {
     482              :     std::size_t total = 0;
     483              :     auto dest = sans_prefix(buffers, 0);
     484              : 
     485              :     while(!buffer_empty(dest))
     486              :     {
     487              :         const_buffer arr[detail::max_iovec_];
     488              :         auto [ec, bufs] = co_await pull(arr);
     489              : 
     490              :         if(ec)
     491              :             co_return {ec, total};
     492              : 
     493              :         if(bufs.empty())
     494              :             co_return {error::eof, total};
     495              : 
     496              :         auto n = buffer_copy(dest, bufs);
     497              :         consume(n);
     498              :         total += n;
     499              :         dest = sans_prefix(dest, n);
     500              :     }
     501              : 
     502              :     co_return {{}, total};
     503              : }
     504              : 
     505              : //----------------------------------------------------------
     506              : 
     507              : static_assert(BufferSource<any_buffer_source>);
     508              : static_assert(ReadSource<any_buffer_source>);
     509              : 
     510              : } // namespace capy
     511              : } // namespace boost
     512              : 
     513              : #endif
        

Generated by: LCOV version 2.3