There is currently no standard class Future that would be compatible with coroutines and co_await .
Also Boost.Asio still does not support co_await out of the box.
Therefore, we will write both, in just a hundred lines of code.
Let's start with the universal Future , which can be returned from the coroutine, and which can be expected in co_await .
template<typename T> struct Future { // ΠΠ° ΠΏΠ°ΠΌΡΡΠΈ ΡΠΊΠΎΠ½ΠΎΠΌΠΈΡΡ Π½Π΅ Π±ΡΠ΄Π΅ΠΌ, // ΠΏΠΎΡΡΠΎΠΌΡ Π΄Π°Π½Π½ΡΠ΅ Π±ΡΠ΄Π΅ΠΌ Ρ
ΡΠ°Π½ΠΈΡΡ Π² Π½Π΅ΠΊΠΎΡΠΎΡΠΎΠΌ "ΠΎΠ±ΡΠ΅ΠΌ ΡΠΎΡΡΠΎΡΠ½ΠΈΠΈ", // ΡΠ΅Π·ΡΠ»ΡΡΠ°Ρ Π±ΡΠ΄Π΅ΠΌ ΠΊΠΎΠΏΠΈΡΠΎΠ²Π°ΡΡ (ΠΈΠ»ΠΈ ΠΏΠ΅ΡΠ΅ΠΌΠ΅ΡΠ°ΡΡ). struct SharedState { T value; std::experimental::coroutine_handle<> h; std::atomic<bool> is_ready; }; // ΠΠΎΠ΄Π΄Π΅ΡΠΆΠΊΠ° ΠΈΡΠΏΠΎΠ»ΡΠ·ΠΎΠ²Π°Π½ΠΈΡ Future ΠΊΠ°ΠΊ ΡΠ΅Π·ΡΠ»ΡΡΠ°ΡΠ° ΡΠΎΠΏΡΠΎΠ³ΡΠ°ΠΌΠΌΡ. struct promise_type { std::shared_ptr<SharedState> s = std::make_shared<SharedState>(); Future<T> get_return_object() { return {s}; } std::experimental::suspend_never initial_suspend() { return {}; } // SharedState ΠΏΠ΅ΡΠ΅ΠΆΠΈΠ²Π΅Ρ ΡΠ΄Π°Π»Π΅Π½ΠΈΠ΅ promise_type Π² ΠΊΠΎΠ½ΡΠ΅ ΡΠ°Π±ΠΎΡΡ ΡΠΎΠΏΡΠΎΠ³ΡΠ°ΠΌΠΌΡ std::experimental::suspend_never final_suspend() { return {}; } void return_value(T value) const { s->value = std::move(value); if (s->is_ready.exchange(true)) s->h.resume(); } }; std::shared_ptr<SharedState> s; // ΠΠΎΠ΄Π΄Π΅ΡΠΆΠΊΠ° co_await. bool await_ready() noexcept { return false; } bool await_suspend(std::experimental::coroutine_handle<> h) noexcept { s->h = h; return !s->is_ready.exchange(true); } T await_resume() { return std::move(s->value); } };
Now we write everything the same, but for the case when the coroutine does not return values.
template<> struct Future<void> { struct SharedState { std::experimental::coroutine_handle<> h; std::atomic<bool> is_ready; }; struct promise_type { std::shared_ptr<SharedState> s = std::make_shared<SharedState>(); Future<void> get_return_object() { return {s}; } std::experimental::suspend_never initial_suspend() { return {}; } std::experimental::suspend_never final_suspend() { return {}; } void return_void() const { if (s->is_ready.exchange(true)) s->h.resume(); } }; std::shared_ptr<SharedState> s; bool await_ready() noexcept { return false; } bool await_suspend(std::experimental::coroutine_handle<> h) noexcept { s->h = h; return !s->is_ready.exchange(true); } void await_resume() {} };
This was our future.
Now we write wrappers over async_* Boost.Asio functions.
We can use the same Future :: promise_type as if it were a coroutine.
Future<boost::system::error_code> coro_accept(boost::asio::ip::tcp::acceptor& acceptor, boost::asio::ip::tcp::socket& socket) { Future<boost::system::error_code>::promise_type p; acceptor.async_accept(socket, [p](auto error) { p.return_value(error); }); return p.get_return_object(); } Future<boost::system::error_code> coro_connect(boost::asio::ip::tcp::socket& socket, boost::asio::ip::tcp::endpoint endpoint) { Future<boost::system::error_code>::promise_type p; socket.async_connect(endpoint, [p](auto error) { p.return_value(error); }); return p.get_return_object(); }
If the callback accepts more than one parameter, they can be made out-parameters.
In C ++ 17, tuple and structured bindings can be used to decompress.
template<typename Buffers> Future<boost::system::error_code> coro_read(boost::asio::ip::tcp::socket& socket, Buffers bufs, std::size_t& bytes_read) { Future<boost::system::error_code>::promise_type p; socket.async_read_some(bufs, [p, &bytes_read](auto error, auto n) { bytes_read = n; p.return_value(error); }); return p.get_return_object(); } template<typename Buffers> Future<boost::system::error_code> coro_write_all(boost::asio::ip::tcp::socket& socket, Buffers bufs, std::size_t& bytes_written) { Future<boost::system::error_code>::promise_type p; async_write(socket, bufs, boost::asio::transfer_all(), [p, &bytes_written](auto error, auto n) { bytes_written = n; p.return_value(error); }); return p.get_return_object(); }
And finally, the server code itself, the same ~ 50 lines as in the original
boost::asio::io_service io_service; boost::asio::ip::tcp::resolver resolver(io_service); boost::asio::ip::tcp::resolver::query dst_query("arrowd.name", "80"); boost::asio::ip::tcp::resolver::iterator dst_iterator = resolver.resolve(dst_query); boost::asio::ip::tcp::endpoint dst_endpoint = *dst_iterator; Future<void> proxy(boost::asio::ip::tcp::socket& src, boost::asio::ip::tcp::socket& dst) { char buf[4096]; for (;;) { std::size_t bytes_read; auto error = co_await coro_read(src, boost::asio::buffer(buf), bytes_read); std::cout << "read " << bytes_read << ' ' << error << '\n'; if (error) break; std::size_t bytes_written; error = co_await coro_write_all(dst, boost::asio::buffer(buf, bytes_read), bytes_written); std::cout << "write " << bytes_written << ' ' << error << '\n'; if (error) break; } // ΠΠ°ΠΊΡΡΡΠΈΠ΅ ΡΠΎΠΊΠ΅ΡΠΎΠ² Π²ΡΠ·ΠΎΠ²Π΅Ρ ΠΎΡΠΈΠ±ΠΊΡ Π² ΡΠΎΠΏΡΠΎΠ³ΡΠ°ΠΌΠΌΠ΅ ΠΊΠΎΡΠΎΡΠ°Ρ ΠΊΠ°ΡΠ°Π΅Ρ Π² Π΄ΡΡΠ³ΡΡ ΡΡΠΎΡΠΎΠ½Ρ src.close(); dst.close(); } Future<void> connect(boost::asio::ip::tcp::socket src) { boost::asio::ip::tcp::socket dst(io_service); auto error = co_await coro_connect(dst, dst_endpoint); std::cout << "connect " << error << '\n'; if (error) co_return; auto _ = proxy(src, dst); // ΠΠ°ΠΏΡΡΠΊΠ°Π΅ΠΌ ΠΏΠ΅ΡΠ²ΡΡ ΡΠΎΠΏΡΠΎΠ³ΡΠ°ΠΌΠΌΡ Π±Π΅Π· ΠΎΠΆΠΈΠ΄Π°Π½ΠΈΡ co_await proxy(dst, src); // ΠΠ°ΠΏΡΡΠΊΠ°Π΅ΠΌ Π²ΡΠΎΡΡΡ ΠΈ ΠΆΠ΄Π΅ΠΌ // ΠΡ Π²ΡΡΠ»ΠΈ ΠΈΠ· Π²ΡΠΎΡΠΎΠΉ Ρ ΠΊΠ°ΠΊΠΎΠΉ-ΡΠΎ ΠΎΡΠΈΠ±ΠΊΠΎΠΉ co_await _; // ΠΠ΄Π΅ΠΌ Π·Π°Π²Π΅ΡΡΠ΅Π½ΠΈΡ ΠΏΠ΅ΡΠ²ΠΎΠΉ } Future<void> accept_loop() { boost::asio::ip::tcp::endpoint src_endpoint( boost::asio::ip::address_v4::loopback(), 8080); // localhost:8080 boost::asio::ip::tcp::acceptor acceptor{io_service, src_endpoint}; for (;;) { boost::asio::ip::tcp::socket src(io_service); auto error = co_await coro_accept(acceptor, src); std::cout << "accept " << error << '\n'; if (error) co_return; connect(std::move(src)); } } int main() { accept_loop(); io_service.run(); // ΠΠΎΠΆΠ½ΠΎ Π·Π°ΠΏΡΡΡΠΈΡΡ ΠΏΠ°ΡΠ°Π»Π»Π΅Π»ΡΠ½ΠΎ Π² Π½Π΅ΡΠΊΠΎΠ»ΡΠΊΠΈΡ
ΠΏΠΎΡΠΎΠΊΠ°Ρ
}
The code has become much cleaner - callbacks are gone.
connect had to be removed from accept_loop into a separate function, since This is a separate coroutine.
Everything that was clearly allocated in dynamic memory now looks like local variables. At the same time, it lives in a dynamic memory, in a coroutine-state . This allows you to send sockets to the proxy by reference - their lifetime is tied to connect .
There is no exception handling in this code.
To support exceptions, Future should be able to throw exceptions with std::exception_ptr .