There is a proxy server written on the asynchronous API Boost.Asio - async_* functions and callbacks.

The full code is in this answer .
It can be schematically described as:

The cycle of receiving incoming connections, establishing a connection with the destination server:

 void accept_loop() { acceptor.async_accept(src_socket, [](auto err) { accept_loop(); // повторяСм accept dst_socket.async_connect(dst_endpoint, [](auto err) { proxy_loop(src_socket, dst_socket); proxy_loop(dst_socket, src_socket); }); }); } 

Data transmission cycle, 2 pcs. per connection:

 void proxy_loop(socket src, socket dst) { async_read(src, buf, [](auto error, auto n) { async_write(dst, buf, [](auto error, auto n) { proxy_loop(src, dst); // повторяСм read }); }); } 

How to rewrite this server using C ++ coroutines ?

    1 answer 1

    There is currently no standard class Future that would be compatible with coroutines and co_await .
    Also Boost.Asio still does not support co_await out of the box.

    Therefore, we will write both, in just a hundred lines of code.

    Let's start with the universal Future , which can be returned from the coroutine, and which can be expected in co_await .

     template<typename T> struct Future { // На памяти ΡΠΊΠΎΠ½ΠΎΠΌΠΈΡ‚ΡŒ Π½Π΅ Π±ΡƒΠ΄Π΅ΠΌ, // поэтому Π΄Π°Π½Π½Ρ‹Π΅ Π±ΡƒΠ΄Π΅ΠΌ Ρ…Ρ€Π°Π½ΠΈΡ‚ΡŒ Π² Π½Π΅ΠΊΠΎΡ‚ΠΎΡ€ΠΎΠΌ "ΠΎΠ±Ρ‰Π΅ΠΌ состоянии", // Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚ Π±ΡƒΠ΄Π΅ΠΌ ΠΊΠΎΠΏΠΈΡ€ΠΎΠ²Π°Ρ‚ΡŒ (ΠΈΠ»ΠΈ ΠΏΠ΅Ρ€Π΅ΠΌΠ΅Ρ‰Π°Ρ‚ΡŒ). struct SharedState { T value; std::experimental::coroutine_handle<> h; std::atomic<bool> is_ready; }; // ΠŸΠΎΠ΄Π΄Π΅Ρ€ΠΆΠΊΠ° использования Future ΠΊΠ°ΠΊ Ρ€Π΅Π·ΡƒΠ»ΡŒΡ‚Π°Ρ‚Π° сопрограммы. struct promise_type { std::shared_ptr<SharedState> s = std::make_shared<SharedState>(); Future<T> get_return_object() { return {s}; } std::experimental::suspend_never initial_suspend() { return {}; } // SharedState ΠΏΠ΅Ρ€Π΅ΠΆΠΈΠ²Π΅Ρ‚ ΡƒΠ΄Π°Π»Π΅Π½ΠΈΠ΅ promise_type Π² ΠΊΠΎΠ½Ρ†Π΅ Ρ€Π°Π±ΠΎΡ‚Ρ‹ сопрограммы std::experimental::suspend_never final_suspend() { return {}; } void return_value(T value) const { s->value = std::move(value); if (s->is_ready.exchange(true)) s->h.resume(); } }; std::shared_ptr<SharedState> s; // ΠŸΠΎΠ΄Π΄Π΅Ρ€ΠΆΠΊΠ° co_await. bool await_ready() noexcept { return false; } bool await_suspend(std::experimental::coroutine_handle<> h) noexcept { s->h = h; return !s->is_ready.exchange(true); } T await_resume() { return std::move(s->value); } }; 

    Now we write everything the same, but for the case when the coroutine does not return values.

     template<> struct Future<void> { struct SharedState { std::experimental::coroutine_handle<> h; std::atomic<bool> is_ready; }; struct promise_type { std::shared_ptr<SharedState> s = std::make_shared<SharedState>(); Future<void> get_return_object() { return {s}; } std::experimental::suspend_never initial_suspend() { return {}; } std::experimental::suspend_never final_suspend() { return {}; } void return_void() const { if (s->is_ready.exchange(true)) s->h.resume(); } }; std::shared_ptr<SharedState> s; bool await_ready() noexcept { return false; } bool await_suspend(std::experimental::coroutine_handle<> h) noexcept { s->h = h; return !s->is_ready.exchange(true); } void await_resume() {} }; 

    This was our future.


    Now we write wrappers over async_* Boost.Asio functions.

    We can use the same Future :: promise_type as if it were a coroutine.

     Future<boost::system::error_code> coro_accept(boost::asio::ip::tcp::acceptor& acceptor, boost::asio::ip::tcp::socket& socket) { Future<boost::system::error_code>::promise_type p; acceptor.async_accept(socket, [p](auto error) { p.return_value(error); }); return p.get_return_object(); } Future<boost::system::error_code> coro_connect(boost::asio::ip::tcp::socket& socket, boost::asio::ip::tcp::endpoint endpoint) { Future<boost::system::error_code>::promise_type p; socket.async_connect(endpoint, [p](auto error) { p.return_value(error); }); return p.get_return_object(); } 

    If the callback accepts more than one parameter, they can be made out-parameters.
    In C ++ 17, tuple and structured bindings can be used to decompress.

     template<typename Buffers> Future<boost::system::error_code> coro_read(boost::asio::ip::tcp::socket& socket, Buffers bufs, std::size_t& bytes_read) { Future<boost::system::error_code>::promise_type p; socket.async_read_some(bufs, [p, &bytes_read](auto error, auto n) { bytes_read = n; p.return_value(error); }); return p.get_return_object(); } template<typename Buffers> Future<boost::system::error_code> coro_write_all(boost::asio::ip::tcp::socket& socket, Buffers bufs, std::size_t& bytes_written) { Future<boost::system::error_code>::promise_type p; async_write(socket, bufs, boost::asio::transfer_all(), [p, &bytes_written](auto error, auto n) { bytes_written = n; p.return_value(error); }); return p.get_return_object(); } 

    And finally, the server code itself, the same ~ 50 lines as in the original

     boost::asio::io_service io_service; boost::asio::ip::tcp::resolver resolver(io_service); boost::asio::ip::tcp::resolver::query dst_query("arrowd.name", "80"); boost::asio::ip::tcp::resolver::iterator dst_iterator = resolver.resolve(dst_query); boost::asio::ip::tcp::endpoint dst_endpoint = *dst_iterator; Future<void> proxy(boost::asio::ip::tcp::socket& src, boost::asio::ip::tcp::socket& dst) { char buf[4096]; for (;;) { std::size_t bytes_read; auto error = co_await coro_read(src, boost::asio::buffer(buf), bytes_read); std::cout << "read " << bytes_read << ' ' << error << '\n'; if (error) break; std::size_t bytes_written; error = co_await coro_write_all(dst, boost::asio::buffer(buf, bytes_read), bytes_written); std::cout << "write " << bytes_written << ' ' << error << '\n'; if (error) break; } // Π—Π°ΠΊΡ€Ρ‹Ρ‚ΠΈΠ΅ сокСтов Π²Ρ‹Π·ΠΎΠ²Π΅Ρ‚ ΠΎΡˆΠΈΠ±ΠΊΡƒ Π² сопрограммС которая ΠΊΠ°Ρ‡Π°Π΅Ρ‚ Π² Π΄Ρ€ΡƒΠ³ΡƒΡŽ сторону src.close(); dst.close(); } Future<void> connect(boost::asio::ip::tcp::socket src) { boost::asio::ip::tcp::socket dst(io_service); auto error = co_await coro_connect(dst, dst_endpoint); std::cout << "connect " << error << '\n'; if (error) co_return; auto _ = proxy(src, dst); // ЗапускаСм ΠΏΠ΅Ρ€Π²ΡƒΡŽ сопрограмму Π±Π΅Π· оТидания co_await proxy(dst, src); // ЗапускаСм Π²Ρ‚ΠΎΡ€ΡƒΡŽ ΠΈ ΠΆΠ΄Π΅ΠΌ // ΠœΡ‹ Π²Ρ‹ΡˆΠ»ΠΈ ΠΈΠ· Π²Ρ‚ΠΎΡ€ΠΎΠΉ с ΠΊΠ°ΠΊΠΎΠΉ-Ρ‚ΠΎ ошибкой co_await _; // Π–Π΄Π΅ΠΌ Π·Π°Π²Π΅Ρ€ΡˆΠ΅Π½ΠΈΡ ΠΏΠ΅Ρ€Π²ΠΎΠΉ } Future<void> accept_loop() { boost::asio::ip::tcp::endpoint src_endpoint( boost::asio::ip::address_v4::loopback(), 8080); // localhost:8080 boost::asio::ip::tcp::acceptor acceptor{io_service, src_endpoint}; for (;;) { boost::asio::ip::tcp::socket src(io_service); auto error = co_await coro_accept(acceptor, src); std::cout << "accept " << error << '\n'; if (error) co_return; connect(std::move(src)); } } int main() { accept_loop(); io_service.run(); // МоТно Π·Π°ΠΏΡƒΡΡ‚ΠΈΡ‚ΡŒ ΠΏΠ°Ρ€Π°Π»Π»Π΅Π»ΡŒΠ½ΠΎ Π² Π½Π΅ΡΠΊΠΎΠ»ΡŒΠΊΠΈΡ… ΠΏΠΎΡ‚ΠΎΠΊΠ°Ρ… } 

    The code has become much cleaner - callbacks are gone.
    connect had to be removed from accept_loop into a separate function, since This is a separate coroutine.

    Everything that was clearly allocated in dynamic memory now looks like local variables. At the same time, it lives in a dynamic memory, in a coroutine-state . This allows you to send sockets to the proxy by reference - their lifetime is tied to connect .

    There is no exception handling in this code.
    To support exceptions, Future should be able to throw exceptions with std::exception_ptr .

    • Wow, beautiful. But when there are more async wrappers over Boost.Asio in Boost.Asio itself, it will be great in general. - VladD
    • Asio can be made to return Future<T> via async_result , and there already is boost::asio::use_future . So an alternative approach is to tie co_await support to boost :: future. - Abyx