Streamable I/O: A Gentle Introduction

Data flows through programs like water through pipes. Sometimes you need a quick sip—just a few bytes at a time. Other times you need to fill an entire bucket before moving on. And occasionally, you want the pipe itself to tell you where to pour rather than bringing your own container.

This tutorial introduces Capy’s streaming abstractions: a family of concepts and type-erased wrappers that make asynchronous I/O both flexible and efficient. By the end, you’ll understand when to use each abstraction and how to compose them into powerful data pipelines.

The Two Fundamental Questions

When designing I/O operations, two questions shape everything:

  1. How much data per operation? Do you want "give me whatever’s available" or "fill this entire buffer"?

  2. Who owns the buffers? Does the caller provide memory, or does the I/O object provide it?

These questions yield four patterns, and Capy provides concepts for each. But before diving into the matrix, let’s start with the simplest case.

Streams: The Familiar Pattern

If you’ve worked with sockets, files, or serial ports, you know streams. A stream is an I/O channel where read_some returns at least one byte (or an error), and write_some sends at least one byte. You might ask for 1024 bytes but get only 17—the network decided that’s all it had ready.

template<ReadStream Stream>
task<> read_all(Stream& s, char* buf, std::size_t size)
{
    std::size_t total = 0;
    while (total < size)
    {
        auto [ec, n] = co_await s.read_some(
            mutable_buffer(buf + total, size - total));
        if (ec)
            co_return;
        total += n;
    }
}

Notice the loop. Streams give you partial results, so you loop until you have everything. This is the ReadStream concept—the result decomposes to (error_code, size_t), and on success, n is at least 1.

WriteStream is the mirror image:

template<WriteStream Stream>
task<> write_all(Stream& s, char const* buf, std::size_t size)
{
    std::size_t total = 0;
    while (total < size)
    {
        auto [ec, n] = co_await s.write_some(
            const_buffer(buf + total, size - total));
        if (ec)
            co_return;
        total += n;
    }
}

When a type satisfies both ReadStream and WriteStream, it satisfies Stream:

template<Stream S>
task<> echo(S& stream)
{
    char buf[1024];
    auto [ec, n] = co_await stream.read_some(mutable_buffer(buf));
    if (ec)
        co_return;
    co_await stream.write_some(const_buffer(buf, n));
}

Streams are the workhorses of I/O. They map directly to operating system primitives and give you fine-grained control. But sometimes that control becomes a burden.

Sources and Sinks: Complete Transfers

Imagine reading an HTTP response body. You don’t care about partial reads—you want the whole thing. Enter ReadSource:

template<ReadSource Source>
task<std::string> read_body(Source& source)
{
    std::string result;
    char buf[4096];
    for (;;)
    {
        auto [ec, n] = co_await source.read(mutable_buffer(buf));
        if (ec == cond::eof)
            break;
        if (ec)
            co_return {};
        result.append(buf, n);
    }
    return result;
}

A ReadSource fills your entire buffer or tells you it hit end-of-stream. No partial results in the success path—either you get everything you asked for, or you get EOF/error with a count of what was transferred.

WriteSink completes the picture for output. It adds a crucial capability: signaling end-of-stream:

template<WriteSink Sink>
task<> send_body(Sink& sink, std::string_view data)
{
    auto [ec, n] = co_await sink.write(make_buffer(data));
    if (ec)
        co_return;
    auto [ec2] = co_await sink.write_eof();
}

The write_eof() call tells the sink no more data is coming. This matters for protocols like HTTP where the receiver needs to know when the body ends. There’s also a combined form:

auto [ec, n] = co_await sink.write(make_buffer(data), true);  // true = EOF

The distinction between streams and sources/sinks reflects their use cases:

Type Partial results? EOF signal? Typical use

ReadStream

Yes

Via error code

Raw sockets, files

WriteStream

Yes

No

Raw sockets, files

ReadSource

No

Via error code

HTTP bodies, decompression

WriteSink

No

Explicit write_eof

HTTP bodies, compression

Buffer Ownership: A Deeper Cut

So far, you’ve always provided your own buffers. But consider a compression sink that needs to write to its internal compression dictionary. Or a network stack that manages its own packet buffers. These want to say "here’s where you should write" rather than copying from your buffer into theirs.

This is the "callee owns buffers" pattern.

BufferSource: Pull Model

A BufferSource gives you buffer descriptors pointing to data it already has:

template<BufferSource Source, WriteStream Stream>
task<io_result<std::size_t>> transfer(Source& source, Stream& stream)
{
    const_buffer arr[16];
    std::size_t total = 0;
    for (;;)
    {
        auto [ec, bufs] = co_await source.pull(arr);
        if (ec)
            co_return {ec, total};
        if (bufs.empty())
            co_return {{}, total};  // Done

        auto [write_ec, n] = co_await stream.write_some(bufs);
        if (write_ec)
            co_return {write_ec, total};
        source.consume(n);
        total += n;
    }
}

The key operations are pull and consume. When you pull, the source fills your span with buffer descriptors pointing to its internal data. When you’re done with some of that data, you consume it. The next pull returns data starting after what you consumed.

Think of it like a conveyor belt: pull shows you what’s on the belt, consume moves the belt forward.

BufferSink: Push Model

A BufferSink provides writable buffers for you to fill:

template<ReadStream Stream, BufferSink Sink>
task<io_result<std::size_t>> transfer(Stream& stream, Sink& sink)
{
    mutable_buffer arr[16];
    std::size_t total = 0;

    for (;;)
    {
        auto dst_bufs = sink.prepare(arr);  // Synchronous!
        if (dst_bufs.empty())
        {
            auto [ec] = co_await sink.commit(0);  // Flush
            if (ec)
                co_return {ec, total};
            continue;
        }

        auto [ec, n] = co_await stream.read_some(dst_bufs);
        if (n > 0)
        {
            auto [commit_ec] = co_await sink.commit(n);
            if (commit_ec)
                co_return {commit_ec, total};
            total += n;
        }

        if (ec == cond::eof)
        {
            auto [eof_ec] = co_await sink.commit_eof();
            co_return {eof_ec, total};
        }
        if (ec)
            co_return {ec, total};
    }
}

Notice prepare is synchronous—it just gives you buffer descriptors. The asynchronous work happens in commit, which may flush data downstream.

The workflow: prepare → write into buffers → commit. Repeat until done, then commit_eof.

The Complete Picture

Here’s how all seven concepts relate:

                  Caller Owns Buffers    Callee Owns Buffers
                  ───────────────────    ───────────────────
Reading (partial)    ReadStream               BufferSource
                     read_some(bufs)          pull(dest) + consume(n)

Reading (complete)   ReadSource               BufferSource
                     read(bufs)               (same interface)

Writing (partial)    WriteStream              BufferSink
                     write_some(bufs)         prepare(dest) + commit(n)

Writing (complete)   WriteSink                BufferSink
                     write(bufs) + write_eof  prepare + commit + commit_eof

The "callee owns buffers" patterns (BufferSource, BufferSink) enable zero-copy transfers when the I/O object has its own memory. The "caller owns buffers" patterns (Read/WriteStream, Read/WriteSink) are simpler and work everywhere.

Type Erasure: Runtime Flexibility

Sometimes you don’t know the concrete type at compile time. Maybe you’re building a plugin system, or you want to store different stream types in a container. Capy provides type-erased wrappers:

Wrapper Wraps

any_read_stream

Any ReadStream

any_write_stream

Any WriteStream

any_stream

Any bidirectional Stream

any_read_source

Any ReadSource

any_write_sink

Any WriteSink

any_buffer_source

Any BufferSource

any_buffer_sink

Any BufferSink

Each wrapper can either own the underlying object or reference it:

// Owning - takes ownership of the socket
any_read_stream stream(socket{ioc});

// Reference - wraps without ownership
socket sock(ioc);
any_read_stream stream(&sock);  // sock must outlive stream

The owning form moves the object into heap storage. The reference form just stores a pointer—perfect when the concrete object has a well-defined lifetime.

Zero Steady-State Allocation

A clever detail: these wrappers preallocate awaitable storage at construction time. When you co_await an operation, the underlying awaitable is constructed into this pre-allocated space. No heap allocation per operation—just one at wrapper construction.

This matters for servers handling thousands of connections. Memory usage is predictable from startup rather than growing with traffic.

Dual Interfaces

Some wrappers provide multiple interfaces. any_buffer_sink wraps a BufferSink but also satisfies WriteSink:

any_buffer_sink abs(some_buffer_sink{});

// Use BufferSink interface (zero-copy)
auto bufs = abs.prepare(arr);
// write into bufs...
co_await abs.commit(n);

// Or use WriteSink interface (copies data)
co_await abs.write(const_buffer(data, size));

The WriteSink interface copies from your buffers into the sink’s internal storage. It’s convenient but loses the zero-copy benefit. Choose based on your performance requirements.

Similarly, any_buffer_source wraps a BufferSource but satisfies ReadSource. The read method copies from the source’s internal buffers into yours.

Transfer Functions: Composing Sources and Sinks

With all these abstractions, you’ll often want to pipe data from one to another. Capy provides two utility functions:

push_to: BufferSource to Sink

template<BufferSource Src, WriteSink Sink>
task<io_result<std::size_t>>
push_to(Src& source, Sink& sink);

This pulls data from the source and writes it to the sink:

task<> transfer_body(BufferSource auto& source, WriteSink auto& sink)
{
    auto [ec, n] = co_await push_to(source, sink);
    if (ec)
        handle_error(ec);
    // n bytes transferred
}

When the source is exhausted, push_to calls write_eof() on the sink.

There’s also an overload for WriteStream destinations:

template<BufferSource Src, WriteStream Stream>
task<io_result<std::size_t>>
push_to(Src& source, Stream& stream);

This version doesn’t call write_eof because streams don’t have that concept.

pull_from: Source to BufferSink

template<ReadSource Src, BufferSink Sink>
task<io_result<std::size_t>>
pull_from(Src& source, Sink& sink);

This reads from the source directly into the sink’s internal buffers:

task<> transfer_body(ReadSource auto& source, BufferSink auto& sink)
{
    auto [ec, n] = co_await pull_from(source, sink);
    // ...
}

The sink provides buffers via prepare(), the source reads into them, and commit() pushes data downstream. When the source hits EOF, commit_eof() finalizes the sink.

An overload accepts ReadStream sources:

template<ReadStream Src, BufferSink Sink>
task<io_result<std::size_t>>
pull_from(Src& source, Sink& sink);

This handles partial reads internally, looping until EOF.

Choosing the Right Abstraction

With seven concepts and seven wrappers, how do you choose?

Start with your constraints:

  1. Do you control the buffer memory?

    • Yes → Use Read/WriteStream or Read/WriteSink

    • No, the I/O object has internal buffers → Use BufferSource/BufferSink

  2. Do you need partial results or complete transfers?

    • Partial is fine (low-level I/O) → Use ReadStream/WriteStream

    • Want complete transfers (protocol bodies) → Use ReadSource/WriteSink

  3. Need to signal end-of-stream on writes?

    • Yes → Use WriteSink or BufferSink

    • No → WriteStream suffices

  4. Need runtime polymorphism?

    • Yes → Use any_* wrappers

    • No → Use concepts directly for zero overhead

Common patterns:

  • Raw socket I/O: Stream (both read and write)

  • HTTP request body input: ReadSource or BufferSource

  • HTTP response body output: WriteSink or BufferSink

  • Compression/decompression layers: BufferSource/BufferSink (they have internal dictionaries)

  • Plugin architectures: any_* wrappers

A Complete Example

Let’s build a function that transfers an HTTP body from a decompressor to a network socket:

task<> send_decompressed_body(
    any_buffer_source& decompressor,  // Has internal decompression buffers
    any_write_stream& socket)         // Raw socket for output
{
    auto [ec, total] = co_await push_to(decompressor, socket);
    if (ec)
        throw std::system_error(ec);
    log("Sent {} decompressed bytes", total);
}

The decompressor satisfies BufferSource—it produces decompressed data into internal buffers. The socket satisfies WriteStream—it sends partial chunks to the network. push_to bridges them, handling the pull/consume loop internally.

Type erasure (any_buffer_source, any_write_stream) lets this function work with any decompressor or socket implementation. The caller decides the concrete types:

zlib_decompressor decomp(compressed_data);
tcp_socket sock(io_context);

any_buffer_source src(std::move(decomp));  // Takes ownership
any_write_stream dst(&sock);               // Wraps by reference

co_await send_decompressed_body(src, dst);

Buffer Lifetime: A Critical Detail

One warning recurs throughout the concepts: buffer lifetime. When you pass a buffer sequence to an async operation, that memory must remain valid until co_await returns.

This sounds obvious, but coroutines make it subtle. Consider:

// DANGER: buffer may be destroyed before read completes
task<> risky(ReadStream auto& stream)
{
    char buf[1024];
    auto [ec, n] = co_await stream.read_some(mutable_buffer(buf));
    // ...
}

This is actually fine—buf lives in the coroutine frame. But this is not:

// DANGER: temp goes out of scope immediately
task<> broken(ReadStream auto& stream)
{
    std::string temp = get_data();
    auto future = stream.write_some(const_buffer(temp));  // temp destroyed here!
    auto [ec, n] = co_await future;
}

If you implement your own types satisfying these concepts, accept buffer parameters by value, not reference. This copies the buffer descriptor (not the data!) into the coroutine frame, ensuring it survives suspension points.

What You’ve Learned

Capy’s streaming abstractions decompose I/O into orthogonal choices:

  • Partial vs complete transfers (streams vs sources/sinks)

  • Caller vs callee buffer ownership

  • Compile-time vs runtime polymorphism (concepts vs type erasure)

The concepts define contracts. The any_* wrappers provide runtime flexibility with zero steady-state allocation. The push_to and pull_from utilities compose these pieces into data pipelines.

Whether you’re building a high-performance server, a protocol implementation, or a data processing pipeline, these abstractions give you the flexibility to express your intent clearly while maintaining efficiency.

The water flows through the pipes. Now you can shape those pipes however you need.