websocket_server_stackless.cpp 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. //
  2. // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. //------------------------------------------------------------------------------
  10. //
  11. // Example: WebSocket server, stackless coroutine
  12. //
  13. //------------------------------------------------------------------------------
  14. #include <boost/beast/core.hpp>
  15. #include <boost/beast/websocket.hpp>
  16. #include <boost/asio/coroutine.hpp>
  17. #include <boost/asio/strand.hpp>
  18. #include <boost/asio/dispatch.hpp>
  19. #include <algorithm>
  20. #include <cstdlib>
  21. #include <functional>
  22. #include <iostream>
  23. #include <memory>
  24. #include <string>
  25. #include <thread>
  26. #include <vector>
  27. namespace beast = boost::beast; // from <boost/beast.hpp>
  28. namespace http = beast::http; // from <boost/beast/http.hpp>
  29. namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
  30. namespace net = boost::asio; // from <boost/asio.hpp>
  31. using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
  32. //------------------------------------------------------------------------------
  33. // Report a failure
  34. void
  35. fail(beast::error_code ec, char const* what)
  36. {
  37. std::cerr << what << ": " << ec.message() << "\n";
  38. }
  39. // Echoes back all received WebSocket messages
  40. class session
  41. : public boost::asio::coroutine
  42. , public std::enable_shared_from_this<session>
  43. {
  44. websocket::stream<beast::tcp_stream> ws_;
  45. beast::flat_buffer buffer_;
  46. public:
  47. // Take ownership of the socket
  48. explicit
  49. session(tcp::socket socket)
  50. : ws_(std::move(socket))
  51. {
  52. }
  53. // Start the asynchronous operation
  54. void
  55. run()
  56. {
  57. // We need to be executing within a strand to perform async operations
  58. // on the I/O objects in this session. Although not strictly necessary
  59. // for single-threaded contexts, this example code is written to be
  60. // thread-safe by default.
  61. net::dispatch(ws_.get_executor(),
  62. beast::bind_front_handler(&session::loop,
  63. shared_from_this(),
  64. beast::error_code{},
  65. 0));
  66. }
  67. #include <boost/asio/yield.hpp>
  68. void
  69. loop(
  70. beast::error_code ec,
  71. std::size_t bytes_transferred)
  72. {
  73. boost::ignore_unused(bytes_transferred);
  74. reenter(*this)
  75. {
  76. // Set suggested timeout settings for the websocket
  77. ws_.set_option(
  78. websocket::stream_base::timeout::suggested(
  79. beast::role_type::server));
  80. // Set a decorator to change the Server of the handshake
  81. ws_.set_option(websocket::stream_base::decorator(
  82. [](websocket::response_type& res)
  83. {
  84. res.set(http::field::server,
  85. std::string(BOOST_BEAST_VERSION_STRING) +
  86. " websocket-server-stackless");
  87. }));
  88. // Accept the websocket handshake
  89. yield ws_.async_accept(
  90. std::bind(
  91. &session::loop,
  92. shared_from_this(),
  93. std::placeholders::_1,
  94. 0));
  95. if(ec)
  96. return fail(ec, "accept");
  97. for(;;)
  98. {
  99. // Read a message into our buffer
  100. yield ws_.async_read(
  101. buffer_,
  102. std::bind(
  103. &session::loop,
  104. shared_from_this(),
  105. std::placeholders::_1,
  106. std::placeholders::_2));
  107. if(ec == websocket::error::closed)
  108. {
  109. // This indicates that the session was closed
  110. return;
  111. }
  112. if(ec)
  113. fail(ec, "read");
  114. // Echo the message
  115. ws_.text(ws_.got_text());
  116. yield ws_.async_write(
  117. buffer_.data(),
  118. std::bind(
  119. &session::loop,
  120. shared_from_this(),
  121. std::placeholders::_1,
  122. std::placeholders::_2));
  123. if(ec)
  124. return fail(ec, "write");
  125. // Clear the buffer
  126. buffer_.consume(buffer_.size());
  127. }
  128. }
  129. }
  130. #include <boost/asio/unyield.hpp>
  131. };
  132. //------------------------------------------------------------------------------
  133. // Accepts incoming connections and launches the sessions
  134. class listener
  135. : public boost::asio::coroutine
  136. , public std::enable_shared_from_this<listener>
  137. {
  138. net::io_context& ioc_;
  139. tcp::acceptor acceptor_;
  140. tcp::socket socket_;
  141. public:
  142. listener(
  143. net::io_context& ioc,
  144. tcp::endpoint endpoint)
  145. : ioc_(ioc)
  146. , acceptor_(net::make_strand(ioc))
  147. , socket_(net::make_strand(ioc))
  148. {
  149. beast::error_code ec;
  150. // Open the acceptor
  151. acceptor_.open(endpoint.protocol(), ec);
  152. if(ec)
  153. {
  154. fail(ec, "open");
  155. return;
  156. }
  157. // Allow address reuse
  158. acceptor_.set_option(net::socket_base::reuse_address(true), ec);
  159. if(ec)
  160. {
  161. fail(ec, "set_option");
  162. return;
  163. }
  164. // Bind to the server address
  165. acceptor_.bind(endpoint, ec);
  166. if(ec)
  167. {
  168. fail(ec, "bind");
  169. return;
  170. }
  171. // Start listening for connections
  172. acceptor_.listen(
  173. net::socket_base::max_listen_connections, ec);
  174. if(ec)
  175. {
  176. fail(ec, "listen");
  177. return;
  178. }
  179. }
  180. // Start accepting incoming connections
  181. void
  182. run()
  183. {
  184. loop();
  185. }
  186. private:
  187. #include <boost/asio/yield.hpp>
  188. void
  189. loop(beast::error_code ec = {})
  190. {
  191. reenter(*this)
  192. {
  193. for(;;)
  194. {
  195. yield acceptor_.async_accept(
  196. socket_,
  197. std::bind(
  198. &listener::loop,
  199. shared_from_this(),
  200. std::placeholders::_1));
  201. if(ec)
  202. {
  203. fail(ec, "accept");
  204. }
  205. else
  206. {
  207. // Create the session and run it
  208. std::make_shared<session>(std::move(socket_))->run();
  209. }
  210. // Make sure each session gets its own strand
  211. socket_ = tcp::socket(net::make_strand(ioc_));
  212. }
  213. }
  214. }
  215. #include <boost/asio/unyield.hpp>
  216. };
  217. //------------------------------------------------------------------------------
  218. int main(int argc, char* argv[])
  219. {
  220. // Check command line arguments.
  221. if (argc != 4)
  222. {
  223. std::cerr <<
  224. "Usage: websocket-server-stackless <address> <port> <threads>\n" <<
  225. "Example:\n" <<
  226. " websocket-server-stackless 0.0.0.0 8080 1\n";
  227. return EXIT_FAILURE;
  228. }
  229. auto const address = net::ip::make_address(argv[1]);
  230. auto const port = static_cast<unsigned short>(std::atoi(argv[2]));
  231. auto const threads = std::max<int>(1, std::atoi(argv[3]));
  232. // The io_context is required for all I/O
  233. net::io_context ioc{threads};
  234. // Create and launch a listening port
  235. std::make_shared<listener>(ioc, tcp::endpoint{address, port})->run();
  236. // Run the I/O service on the requested number of threads
  237. std::vector<std::thread> v;
  238. v.reserve(threads - 1);
  239. for(auto i = threads - 1; i > 0; --i)
  240. v.emplace_back(
  241. [&ioc]
  242. {
  243. ioc.run();
  244. });
  245. ioc.run();
  246. return EXIT_SUCCESS;
  247. }