Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_TEST_BUFFER_SOURCE_HPP
11 : #define BOOST_CAPY_TEST_BUFFER_SOURCE_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/buffers.hpp>
15 : #include <boost/capy/buffers/make_buffer.hpp>
16 : #include <boost/capy/coro.hpp>
17 : #include <boost/capy/ex/executor_ref.hpp>
18 : #include <boost/capy/io_result.hpp>
19 : #include <boost/capy/test/fuse.hpp>
20 :
21 : #include <algorithm>
22 : #include <stop_token>
23 : #include <string>
24 : #include <string_view>
25 :
26 : namespace boost {
27 : namespace capy {
28 : namespace test {
29 :
30 : /** A mock buffer source for testing push operations.
31 :
32 : Use this to verify code that transfers data from a buffer source to
33 : a sink without needing real I/O. Call @ref provide to supply data,
34 : then @ref pull to retrieve buffer descriptors. The associated
35 : @ref fuse enables error injection at controlled points.
36 :
37 : This class satisfies the @ref BufferSource concept by providing
38 : a pull interface that fills an array of buffer descriptors and
39 : a consume interface to indicate bytes used.
40 :
41 : @par Thread Safety
42 : Not thread-safe.
43 :
44 : @par Example
45 : @code
46 : fuse f;
47 : buffer_source bs( f );
48 : bs.provide( "Hello, " );
49 : bs.provide( "World!" );
50 :
51 : auto r = f.armed( [&]( fuse& ) -> task<void> {
52 : const_buffer arr[16];
53 : auto [ec, count] = co_await bs.pull( arr, 16 );
54 : if( ec )
55 : co_return;
56 : // arr[0..count) contains buffer descriptors
57 : std::size_t n = buffer_size( std::span( arr, count ) );
58 : bs.consume( n );
59 : } );
60 : @endcode
61 :
62 : @see fuse, BufferSource
63 : */
64 : class buffer_source
65 : {
66 : fuse* f_;
67 : std::string data_;
68 : std::size_t pos_ = 0;
69 : std::size_t max_pull_size_;
70 :
71 : public:
72 : /** Construct a buffer source.
73 :
74 : @param f The fuse used to inject errors during pulls.
75 :
76 : @param max_pull_size Maximum bytes returned per pull.
77 : Use to simulate chunked delivery.
78 : */
79 288 : explicit buffer_source(
80 : fuse& f,
81 : std::size_t max_pull_size = std::size_t(-1)) noexcept
82 288 : : f_(&f)
83 288 : , max_pull_size_(max_pull_size)
84 : {
85 288 : }
86 :
87 : /** Append data to be returned by subsequent pulls.
88 :
89 : Multiple calls accumulate data that @ref pull returns.
90 :
91 : @param sv The data to append.
92 : */
93 : void
94 302 : provide(std::string_view sv)
95 : {
96 302 : data_.append(sv);
97 302 : }
98 :
99 : /// Clear all data and reset the read position.
100 : void
101 : clear() noexcept
102 : {
103 : data_.clear();
104 : pos_ = 0;
105 : }
106 :
107 : /// Return the number of bytes available for pulling.
108 : std::size_t
109 : available() const noexcept
110 : {
111 : return data_.size() - pos_;
112 : }
113 :
114 : /** Consume bytes from the source.
115 :
116 : Advances the internal read position by the specified number
117 : of bytes. The next call to @ref pull returns data starting
118 : after the consumed bytes.
119 :
120 : @param n The number of bytes to consume. Must not exceed the
121 : total size of buffers returned by the previous @ref pull.
122 : */
123 : void
124 267 : consume(std::size_t n) noexcept
125 : {
126 267 : pos_ += n;
127 267 : }
128 :
129 : /** Pull buffer data from the source.
130 :
131 : Fills the provided span with buffer descriptors pointing to
132 : internal data starting from the current unconsumed position.
133 : Returns a span of filled buffers. When no data remains,
134 : returns an empty span to signal completion.
135 :
136 : Calling pull multiple times without intervening @ref consume
137 : returns the same data. Use consume to advance past processed
138 : bytes.
139 :
140 : @param dest Span of const_buffer to fill.
141 :
142 : @return An awaitable yielding `(error_code,std::span<const_buffer>)`.
143 :
144 : @see consume, fuse
145 : */
146 : auto
147 540 : pull(std::span<const_buffer> dest)
148 : {
149 : struct awaitable
150 : {
151 : buffer_source* self_;
152 : std::span<const_buffer> dest_;
153 :
154 540 : bool await_ready() const noexcept { return true; }
155 :
156 0 : void await_suspend(
157 : coro,
158 : executor_ref,
159 : std::stop_token) const noexcept
160 : {
161 0 : }
162 :
163 : io_result<std::span<const_buffer>>
164 540 : await_resume()
165 : {
166 540 : auto ec = self_->f_->maybe_fail();
167 459 : if(ec)
168 81 : return {ec, {}};
169 :
170 378 : if(self_->pos_ >= self_->data_.size())
171 66 : return {{}, {}}; // Source exhausted
172 :
173 312 : std::size_t avail = self_->data_.size() - self_->pos_;
174 312 : std::size_t to_return = (std::min)(avail, self_->max_pull_size_);
175 :
176 312 : if(dest_.empty())
177 0 : return {{}, {}};
178 :
179 : // Fill a single buffer descriptor
180 312 : dest_[0] = make_buffer(
181 312 : self_->data_.data() + self_->pos_,
182 : to_return);
183 :
184 312 : return {{}, dest_.first(1)};
185 : }
186 : };
187 540 : return awaitable{this, dest};
188 : }
189 : };
190 :
191 : } // test
192 : } // capy
193 : } // boost
194 :
195 : #endif
|