LCOV - code coverage report
Current view: top level - capy/io - any_buffer_sink.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 84.3 % 115 97
Test Date: 2026-02-02 05:00:52 Functions: 90.0 % 30 27

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
      11              : #define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/detail/await_suspend_helper.hpp>
      15              : #include <boost/capy/buffers.hpp>
      16              : #include <boost/capy/buffers/buffer_copy.hpp>
      17              : #include <boost/capy/buffers/buffer_param.hpp>
      18              : #include <boost/capy/concept/buffer_sink.hpp>
      19              : #include <boost/capy/concept/io_awaitable.hpp>
      20              : #include <boost/capy/concept/write_sink.hpp>
      21              : #include <boost/capy/coro.hpp>
      22              : #include <boost/capy/ex/executor_ref.hpp>
      23              : #include <boost/capy/io_result.hpp>
      24              : #include <boost/capy/task.hpp>
      25              : 
      26              : #include <concepts>
      27              : #include <coroutine>
      28              : #include <cstddef>
      29              : #include <exception>
      30              : #include <new>
      31              : #include <stop_token>
      32              : #include <system_error>
      33              : #include <utility>
      34              : 
      35              : namespace boost {
      36              : namespace capy {
      37              : 
      38              : /** Type-erased wrapper for any BufferSink.
      39              : 
      40              :     This class provides type erasure for any type satisfying the
      41              :     @ref BufferSink concept, enabling runtime polymorphism for
      42              :     buffer sink operations. It uses cached awaitable storage to achieve
      43              :     zero steady-state allocation after construction.
      44              : 
      45              :     The wrapper also satisfies @ref WriteSink through templated
      46              :     @ref write methods. These methods copy data from the caller's
      47              :     buffers into the sink's internal storage, incurring one extra
      48              :     buffer copy compared to using @ref prepare and @ref commit
      49              :     directly.
      50              : 
      51              :     The wrapper supports two construction modes:
      52              :     - **Owning**: Pass by value to transfer ownership. The wrapper
      53              :       allocates storage and owns the sink.
      54              :     - **Reference**: Pass a pointer to wrap without ownership. The
      55              :       pointed-to sink must outlive this wrapper.
      56              : 
      57              :     @par Awaitable Preallocation
      58              :     The constructor preallocates storage for the type-erased awaitable.
      59              :     This reserves all virtual address space at server startup
      60              :     so memory usage can be measured up front, rather than
      61              :     allocating piecemeal as traffic arrives.
      62              : 
      63              :     @par Thread Safety
      64              :     Not thread-safe. Concurrent operations on the same wrapper
      65              :     are undefined behavior.
      66              : 
      67              :     @par Example
      68              :     @code
      69              :     // Owning - takes ownership of the sink
      70              :     any_buffer_sink abs(some_buffer_sink{args...});
      71              : 
      72              :     // Reference - wraps without ownership
      73              :     some_buffer_sink sink;
      74              :     any_buffer_sink abs(&sink);
      75              : 
      76              :     mutable_buffer arr[16];
      77              :     auto bufs = abs.prepare(arr);
      78              :     // Write data into bufs[0..bufs.size())
      79              :     auto [ec] = co_await abs.commit(bytes_written);
      80              :     auto [ec2] = co_await abs.commit_eof();
      81              :     @endcode
      82              : 
      83              :     @see any_buffer_source, BufferSink, WriteSink
      84              : */
      85              : class any_buffer_sink
      86              : {
      87              :     struct vtable;
      88              :     struct awaitable_ops;
      89              : 
      90              :     template<BufferSink S>
      91              :     struct vtable_for_impl;
      92              : 
      93              :     void* sink_ = nullptr;
      94              :     vtable const* vt_ = nullptr;
      95              :     void* cached_awaitable_ = nullptr;
      96              :     void* storage_ = nullptr;
      97              :     awaitable_ops const* active_ops_ = nullptr;
      98              : 
      99              : public:
     100              :     /** Destructor.
     101              : 
     102              :         Destroys the owned sink (if any) and releases the cached
     103              :         awaitable storage.
     104              :     */
     105              :     ~any_buffer_sink();
     106              : 
     107              :     /** Default constructor.
     108              : 
     109              :         Constructs an empty wrapper. Operations on a default-constructed
     110              :         wrapper result in undefined behavior.
     111              :     */
     112              :     any_buffer_sink() = default;
     113              : 
     114              :     /** Non-copyable.
     115              : 
     116              :         The awaitable cache is per-instance and cannot be shared.
     117              :     */
     118              :     any_buffer_sink(any_buffer_sink const&) = delete;
     119              :     any_buffer_sink& operator=(any_buffer_sink const&) = delete;
     120              : 
     121              :     /** Move constructor.
     122              : 
     123              :         Transfers ownership of the wrapped sink (if owned) and
     124              :         cached awaitable storage from `other`. After the move, `other` is
     125              :         in a default-constructed state.
     126              : 
     127              :         @param other The wrapper to move from.
     128              :     */
     129            1 :     any_buffer_sink(any_buffer_sink&& other) noexcept
     130            1 :         : sink_(std::exchange(other.sink_, nullptr))
     131            1 :         , vt_(std::exchange(other.vt_, nullptr))
     132            1 :         , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
     133            1 :         , storage_(std::exchange(other.storage_, nullptr))
     134            1 :         , active_ops_(std::exchange(other.active_ops_, nullptr))
     135              :     {
     136            1 :     }
     137              : 
     138              :     /** Move assignment operator.
     139              : 
     140              :         Destroys any owned sink and releases existing resources,
     141              :         then transfers ownership from `other`.
     142              : 
     143              :         @param other The wrapper to move from.
     144              :         @return Reference to this wrapper.
     145              :     */
     146              :     any_buffer_sink&
     147              :     operator=(any_buffer_sink&& other) noexcept;
     148              : 
     149              :     /** Construct by taking ownership of a BufferSink.
     150              : 
     151              :         Allocates storage and moves the sink into this wrapper.
     152              :         The wrapper owns the sink and will destroy it.
     153              : 
     154              :         @param s The sink to take ownership of.
     155              :     */
     156              :     template<BufferSink S>
     157              :         requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
     158              :     any_buffer_sink(S s);
     159              : 
     160              :     /** Construct by wrapping a BufferSink without ownership.
     161              : 
     162              :         Wraps the given sink by pointer. The sink must remain
     163              :         valid for the lifetime of this wrapper.
     164              : 
     165              :         @param s Pointer to the sink to wrap.
     166              :     */
     167              :     template<BufferSink S>
     168              :     any_buffer_sink(S* s);
     169              : 
     170              :     /** Check if the wrapper contains a valid sink.
     171              : 
     172              :         @return `true` if wrapping a sink, `false` if default-constructed
     173              :             or moved-from.
     174              :     */
     175              :     bool
     176            9 :     has_value() const noexcept
     177              :     {
     178            9 :         return sink_ != nullptr;
     179              :     }
     180              : 
     181              :     /** Check if the wrapper contains a valid sink.
     182              : 
     183              :         @return `true` if wrapping a sink, `false` if default-constructed
     184              :             or moved-from.
     185              :     */
     186              :     explicit
     187            2 :     operator bool() const noexcept
     188              :     {
     189            2 :         return has_value();
     190              :     }
     191              : 
     192              :     /** Prepare writable buffers.
     193              : 
     194              :         Fills the provided span with mutable buffer descriptors
     195              :         pointing to the underlying sink's internal storage. This
     196              :         operation is synchronous.
     197              : 
     198              :         @param dest Span of mutable_buffer to fill.
     199              : 
     200              :         @return A span of filled buffers.
     201              : 
     202              :         @par Preconditions
     203              :         The wrapper must contain a valid sink (`has_value() == true`).
     204              :     */
     205              :     std::span<mutable_buffer>
     206              :     prepare(std::span<mutable_buffer> dest);
     207              : 
     208              :     /** Commit bytes written to the prepared buffers.
     209              : 
     210              :         Commits `n` bytes written to the buffers returned by the
     211              :         most recent call to @ref prepare. The operation may trigger
     212              :         underlying I/O.
     213              : 
     214              :         @param n The number of bytes to commit.
     215              : 
     216              :         @return An awaitable yielding `(error_code)`.
     217              : 
     218              :         @par Preconditions
     219              :         The wrapper must contain a valid sink (`has_value() == true`).
     220              :     */
     221              :     auto
     222              :     commit(std::size_t n);
     223              : 
     224              :     /** Commit bytes written with optional end-of-stream.
     225              : 
     226              :         Commits `n` bytes written to the buffers returned by the
     227              :         most recent call to @ref prepare. If `eof` is true, also
     228              :         signals end-of-stream.
     229              : 
     230              :         @param n The number of bytes to commit.
     231              :         @param eof If true, signals end-of-stream after committing.
     232              : 
     233              :         @return An awaitable yielding `(error_code)`.
     234              : 
     235              :         @par Preconditions
     236              :         The wrapper must contain a valid sink (`has_value() == true`).
     237              :     */
     238              :     auto
     239              :     commit(std::size_t n, bool eof);
     240              : 
     241              :     /** Signal end-of-stream.
     242              : 
     243              :         Indicates that no more data will be written to the sink.
     244              :         The operation completes when the sink is finalized, or
     245              :         an error occurs.
     246              : 
     247              :         @return An awaitable yielding `(error_code)`.
     248              : 
     249              :         @par Preconditions
     250              :         The wrapper must contain a valid sink (`has_value() == true`).
     251              :     */
     252              :     auto
     253              :     commit_eof();
     254              : 
     255              :     /** Write data from a buffer sequence.
     256              : 
     257              :         Writes all data from the buffer sequence to the underlying
     258              :         sink. This method satisfies the @ref WriteSink concept.
     259              : 
     260              :         @note This operation copies data from the caller's buffers
     261              :         into the sink's internal buffers. For zero-copy writes,
     262              :         use @ref prepare and @ref commit directly.
     263              : 
     264              :         @param buffers The buffer sequence to write.
     265              : 
     266              :         @return An awaitable yielding `(error_code,std::size_t)`.
     267              : 
     268              :         @par Preconditions
     269              :         The wrapper must contain a valid sink (`has_value() == true`).
     270              :     */
     271              :     template<ConstBufferSequence CB>
     272              :     task<io_result<std::size_t>>
     273              :     write(CB buffers);
     274              : 
     275              :     /** Write data with optional end-of-stream.
     276              : 
     277              :         Writes all data from the buffer sequence to the underlying
     278              :         sink, optionally finalizing it afterwards. This method
     279              :         satisfies the @ref WriteSink concept.
     280              : 
     281              :         @note This operation copies data from the caller's buffers
     282              :         into the sink's internal buffers. For zero-copy writes,
     283              :         use @ref prepare and @ref commit directly.
     284              : 
     285              :         @param buffers The buffer sequence to write.
     286              :         @param eof If true, finalize the sink after writing.
     287              : 
     288              :         @return An awaitable yielding `(error_code,std::size_t)`.
     289              : 
     290              :         @par Preconditions
     291              :         The wrapper must contain a valid sink (`has_value() == true`).
     292              :     */
     293              :     template<ConstBufferSequence CB>
     294              :     task<io_result<std::size_t>>
     295              :     write(CB buffers, bool eof);
     296              : 
     297              :     /** Signal end-of-stream.
     298              : 
     299              :         Indicates that no more data will be written to the sink.
     300              :         This method satisfies the @ref WriteSink concept.
     301              : 
     302              :         @return An awaitable yielding `(error_code)`.
     303              : 
     304              :         @par Preconditions
     305              :         The wrapper must contain a valid sink (`has_value() == true`).
     306              :     */
     307              :     auto
     308              :     write_eof();
     309              : 
     310              : protected:
     311              :     /** Rebind to a new sink after move.
     312              : 
     313              :         Updates the internal pointer to reference a new sink object.
     314              :         Used by owning wrappers after move assignment when the owned
     315              :         object has moved to a new location.
     316              : 
     317              :         @param new_sink The new sink to bind to. Must be the same
     318              :             type as the original sink.
     319              : 
     320              :         @note Terminates if called with a sink of different type
     321              :             than the original.
     322              :     */
     323              :     template<BufferSink S>
     324              :     void
     325              :     rebind(S& new_sink) noexcept
     326              :     {
     327              :         if(vt_ != &vtable_for_impl<S>::value)
     328              :             std::terminate();
     329              :         sink_ = &new_sink;
     330              :     }
     331              : };
     332              : 
     333              : //----------------------------------------------------------
     334              : 
     335              : struct any_buffer_sink::awaitable_ops
     336              : {
     337              :     bool (*await_ready)(void*);
     338              :     coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
     339              :     io_result<> (*await_resume)(void*);
     340              :     void (*destroy)(void*) noexcept;
     341              : };
     342              : 
     343              : struct any_buffer_sink::vtable
     344              : {
     345              :     void (*destroy)(void*) noexcept;
     346              :     std::span<mutable_buffer> (*do_prepare)(
     347              :         void* sink,
     348              :         std::span<mutable_buffer> dest);
     349              :     std::size_t awaitable_size;
     350              :     std::size_t awaitable_align;
     351              :     awaitable_ops const* (*construct_commit_awaitable)(
     352              :         void* sink,
     353              :         void* storage,
     354              :         std::size_t n,
     355              :         bool eof);
     356              :     awaitable_ops const* (*construct_eof_awaitable)(
     357              :         void* sink,
     358              :         void* storage);
     359              : };
     360              : 
     361              : template<BufferSink S>
     362              : struct any_buffer_sink::vtable_for_impl
     363              : {
     364              :     using CommitAwaitable = decltype(std::declval<S&>().commit(
     365              :         std::size_t{}, false));
     366              :     using EofAwaitable = decltype(std::declval<S&>().commit_eof());
     367              : 
     368              :     static void
     369            0 :     do_destroy_impl(void* sink) noexcept
     370              :     {
     371            0 :         static_cast<S*>(sink)->~S();
     372            0 :     }
     373              : 
     374              :     static std::span<mutable_buffer>
     375           68 :     do_prepare_impl(
     376              :         void* sink,
     377              :         std::span<mutable_buffer> dest)
     378              :     {
     379           68 :         auto& s = *static_cast<S*>(sink);
     380           68 :         return s.prepare(dest);
     381              :     }
     382              : 
     383              :     static awaitable_ops const*
     384           48 :     construct_commit_awaitable_impl(
     385              :         void* sink,
     386              :         void* storage,
     387              :         std::size_t n,
     388              :         bool eof)
     389              :     {
     390           48 :         auto& s = *static_cast<S*>(sink);
     391           48 :         ::new(storage) CommitAwaitable(s.commit(n, eof));
     392              : 
     393              :         static constexpr awaitable_ops ops = {
     394           48 :             +[](void* p) {
     395           48 :                 return static_cast<CommitAwaitable*>(p)->await_ready();
     396              :             },
     397            0 :             +[](void* p, coro h, executor_ref ex, std::stop_token token) {
     398            0 :                 return detail::call_await_suspend(
     399            0 :                     static_cast<CommitAwaitable*>(p), h, ex, token);
     400              :             },
     401           48 :             +[](void* p) {
     402           48 :                 return static_cast<CommitAwaitable*>(p)->await_resume();
     403              :             },
     404           48 :             +[](void* p) noexcept {
     405           48 :                 static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
     406              :             }
     407              :         };
     408           48 :         return &ops;
     409              :     }
     410              : 
     411              :     static awaitable_ops const*
     412           18 :     construct_eof_awaitable_impl(
     413              :         void* sink,
     414              :         void* storage)
     415              :     {
     416           18 :         auto& s = *static_cast<S*>(sink);
     417           18 :         ::new(storage) EofAwaitable(s.commit_eof());
     418              : 
     419              :         static constexpr awaitable_ops ops = {
     420           18 :             +[](void* p) {
     421           18 :                 return static_cast<EofAwaitable*>(p)->await_ready();
     422              :             },
     423            0 :             +[](void* p, coro h, executor_ref ex, std::stop_token token) {
     424            0 :                 return detail::call_await_suspend(
     425            0 :                     static_cast<EofAwaitable*>(p), h, ex, token);
     426              :             },
     427           18 :             +[](void* p) {
     428           18 :                 return static_cast<EofAwaitable*>(p)->await_resume();
     429              :             },
     430           18 :             +[](void* p) noexcept {
     431           18 :                 static_cast<EofAwaitable*>(p)->~EofAwaitable();
     432              :             }
     433              :         };
     434           18 :         return &ops;
     435              :     }
     436              : 
     437              :     static constexpr std::size_t max_awaitable_size =
     438              :         sizeof(CommitAwaitable) > sizeof(EofAwaitable)
     439              :             ? sizeof(CommitAwaitable)
     440              :             : sizeof(EofAwaitable);
     441              : 
     442              :     static constexpr std::size_t max_awaitable_align =
     443              :         alignof(CommitAwaitable) > alignof(EofAwaitable)
     444              :             ? alignof(CommitAwaitable)
     445              :             : alignof(EofAwaitable);
     446              : 
     447              :     static constexpr vtable value = {
     448              :         &do_destroy_impl,
     449              :         &do_prepare_impl,
     450              :         max_awaitable_size,
     451              :         max_awaitable_align,
     452              :         &construct_commit_awaitable_impl,
     453              :         &construct_eof_awaitable_impl
     454              :     };
     455              : };
     456              : 
     457              : //----------------------------------------------------------
     458              : 
     459              : inline
     460           63 : any_buffer_sink::~any_buffer_sink()
     461              : {
     462           63 :     if(storage_)
     463              :     {
     464            0 :         vt_->destroy(sink_);
     465            0 :         ::operator delete(storage_);
     466              :     }
     467           63 :     if(cached_awaitable_)
     468           60 :         ::operator delete(cached_awaitable_);
     469           63 : }
     470              : 
     471              : inline any_buffer_sink&
     472            1 : any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
     473              : {
     474            1 :     if(this != &other)
     475              :     {
     476            1 :         if(storage_)
     477              :         {
     478            0 :             vt_->destroy(sink_);
     479            0 :             ::operator delete(storage_);
     480              :         }
     481            1 :         if(cached_awaitable_)
     482            0 :             ::operator delete(cached_awaitable_);
     483            1 :         sink_ = std::exchange(other.sink_, nullptr);
     484            1 :         vt_ = std::exchange(other.vt_, nullptr);
     485            1 :         cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
     486            1 :         storage_ = std::exchange(other.storage_, nullptr);
     487            1 :         active_ops_ = std::exchange(other.active_ops_, nullptr);
     488              :     }
     489            1 :     return *this;
     490              : }
     491              : 
     492              : template<BufferSink S>
     493              :     requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
     494              : any_buffer_sink::any_buffer_sink(S s)
     495              :     : vt_(&vtable_for_impl<S>::value)
     496              : {
     497              :     struct guard {
     498              :         any_buffer_sink* self;
     499              :         bool committed = false;
     500              :         ~guard() {
     501              :             if(!committed && self->storage_) {
     502              :                 self->vt_->destroy(self->sink_);
     503              :                 ::operator delete(self->storage_);
     504              :                 self->storage_ = nullptr;
     505              :                 self->sink_ = nullptr;
     506              :             }
     507              :         }
     508              :     } g{this};
     509              : 
     510              :     storage_ = ::operator new(sizeof(S));
     511              :     sink_ = ::new(storage_) S(std::move(s));
     512              : 
     513              :     // Preallocate the awaitable storage (sized for max of commit/eof)
     514              :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     515              : 
     516              :     g.committed = true;
     517              : }
     518              : 
     519              : template<BufferSink S>
     520           60 : any_buffer_sink::any_buffer_sink(S* s)
     521           60 :     : sink_(s)
     522           60 :     , vt_(&vtable_for_impl<S>::value)
     523              : {
     524              :     // Preallocate the awaitable storage (sized for max of commit/eof)
     525           60 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     526           60 : }
     527              : 
     528              : //----------------------------------------------------------
     529              : 
     530              : inline std::span<mutable_buffer>
     531           68 : any_buffer_sink::prepare(std::span<mutable_buffer> dest)
     532              : {
     533           68 :     return vt_->do_prepare(sink_, dest);
     534              : }
     535              : 
     536              : inline auto
     537           48 : any_buffer_sink::commit(std::size_t n, bool eof)
     538              : {
     539              :     struct awaitable
     540              :     {
     541              :         any_buffer_sink* self_;
     542              :         std::size_t n_;
     543              :         bool eof_;
     544              : 
     545              :         bool
     546           48 :         await_ready() const noexcept
     547              :         {
     548           48 :             return false;
     549              :         }
     550              : 
     551              :         coro
     552           48 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     553              :         {
     554              :             // Construct the underlying awaitable into cached storage
     555           96 :             self_->active_ops_ = self_->vt_->construct_commit_awaitable(
     556           48 :                 self_->sink_,
     557           48 :                 self_->cached_awaitable_,
     558              :                 n_,
     559           48 :                 eof_);
     560              : 
     561              :             // Check if underlying is immediately ready
     562           48 :             if(self_->active_ops_->await_ready(self_->cached_awaitable_))
     563           48 :                 return h;
     564              : 
     565              :             // Forward to underlying awaitable
     566            0 :             return self_->active_ops_->await_suspend(
     567            0 :                 self_->cached_awaitable_, h, ex, token);
     568              :         }
     569              : 
     570              :         io_result<>
     571           48 :         await_resume()
     572              :         {
     573              :             struct guard {
     574              :                 any_buffer_sink* self;
     575           48 :                 ~guard() {
     576           48 :                     self->active_ops_->destroy(self->cached_awaitable_);
     577           48 :                     self->active_ops_ = nullptr;
     578           48 :                 }
     579           48 :             } g{self_};
     580           48 :             return self_->active_ops_->await_resume(
     581           85 :                 self_->cached_awaitable_);
     582           48 :         }
     583              :     };
     584           48 :     return awaitable{this, n, eof};
     585              : }
     586              : 
     587              : inline auto
     588           38 : any_buffer_sink::commit(std::size_t n)
     589              : {
     590           38 :     return commit(n, false);
     591              : }
     592              : 
     593              : inline auto
     594           18 : any_buffer_sink::commit_eof()
     595              : {
     596              :     struct awaitable
     597              :     {
     598              :         any_buffer_sink* self_;
     599              : 
     600              :         bool
     601           18 :         await_ready() const noexcept
     602              :         {
     603           18 :             return false;
     604              :         }
     605              : 
     606              :         coro
     607           18 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     608              :         {
     609              :             // Construct the underlying awaitable into cached storage
     610           36 :             self_->active_ops_ = self_->vt_->construct_eof_awaitable(
     611           18 :                 self_->sink_,
     612           18 :                 self_->cached_awaitable_);
     613              : 
     614              :             // Check if underlying is immediately ready
     615           18 :             if(self_->active_ops_->await_ready(self_->cached_awaitable_))
     616           18 :                 return h;
     617              : 
     618              :             // Forward to underlying awaitable
     619            0 :             return self_->active_ops_->await_suspend(
     620            0 :                 self_->cached_awaitable_, h, ex, token);
     621              :         }
     622              : 
     623              :         io_result<>
     624           18 :         await_resume()
     625              :         {
     626              :             struct guard {
     627              :                 any_buffer_sink* self;
     628           18 :                 ~guard() {
     629           18 :                     self->active_ops_->destroy(self->cached_awaitable_);
     630           18 :                     self->active_ops_ = nullptr;
     631           18 :                 }
     632           18 :             } g{self_};
     633           18 :             return self_->active_ops_->await_resume(
     634           31 :                 self_->cached_awaitable_);
     635           18 :         }
     636              :     };
     637           18 :     return awaitable{this};
     638              : }
     639              : 
     640              : //----------------------------------------------------------
     641              : 
     642              : template<ConstBufferSequence CB>
     643              : task<io_result<std::size_t>>
     644              : any_buffer_sink::write(CB buffers)
     645              : {
     646              :     return write(buffers, false);
     647              : }
     648              : 
     649              : template<ConstBufferSequence CB>
     650              : task<io_result<std::size_t>>
     651              : any_buffer_sink::write(CB buffers, bool eof)
     652              : {
     653              :     buffer_param<CB> bp(buffers);
     654              :     std::size_t total = 0;
     655              : 
     656              :     for(;;)
     657              :     {
     658              :         auto src = bp.data();
     659              :         if(src.empty())
     660              :             break;
     661              : 
     662              :         mutable_buffer arr[detail::max_iovec_];
     663              :         auto dst_bufs = prepare(arr);
     664              :         if(dst_bufs.empty())
     665              :         {
     666              :             auto [ec] = co_await commit(0);
     667              :             if(ec)
     668              :                 co_return {ec, total};
     669              :             continue;
     670              :         }
     671              : 
     672              :         auto n = buffer_copy(dst_bufs, src);
     673              :         auto [ec] = co_await commit(n);
     674              :         if(ec)
     675              :             co_return {ec, total};
     676              :         bp.consume(n);
     677              :         total += n;
     678              :     }
     679              : 
     680              :     if(eof)
     681              :     {
     682              :         auto [ec] = co_await commit_eof();
     683              :         if(ec)
     684              :             co_return {ec, total};
     685              :     }
     686              : 
     687              :     co_return {{}, total};
     688              : }
     689              : 
     690              : inline auto
     691              : any_buffer_sink::write_eof()
     692              : {
     693              :     return commit_eof();
     694              : }
     695              : 
     696              : //----------------------------------------------------------
     697              : 
     698              : static_assert(WriteSink<any_buffer_sink>);
     699              : 
     700              : } // namespace capy
     701              : } // namespace boost
     702              : 
     703              : #endif
        

Generated by: LCOV version 2.3