websocket_server_async.cpp 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. //
  2. // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. //------------------------------------------------------------------------------
  10. //
  11. // Example: WebSocket server, asynchronous
  12. //
  13. //------------------------------------------------------------------------------
  14. #include <boost/beast/core.hpp>
  15. #include <boost/beast/websocket.hpp>
  16. #include <boost/asio/dispatch.hpp>
  17. #include <boost/asio/strand.hpp>
  18. #include <algorithm>
  19. #include <cstdlib>
  20. #include <functional>
  21. #include <iostream>
  22. #include <memory>
  23. #include <string>
  24. #include <thread>
  25. #include <vector>
  26. namespace beast = boost::beast; // from <boost/beast.hpp>
  27. namespace http = beast::http; // from <boost/beast/http.hpp>
  28. namespace websocket = beast::websocket; // from <boost/beast/websocket.hpp>
  29. namespace net = boost::asio; // from <boost/asio.hpp>
  30. using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
  31. //------------------------------------------------------------------------------
  32. // Report a failure
  33. void
  34. fail(beast::error_code ec, char const* what)
  35. {
  36. std::cerr << what << ": " << ec.message() << "\n";
  37. }
  38. // Echoes back all received WebSocket messages
  39. class session : public std::enable_shared_from_this<session>
  40. {
  41. websocket::stream<beast::tcp_stream> ws_;
  42. beast::flat_buffer buffer_;
  43. public:
  44. // Take ownership of the socket
  45. explicit
  46. session(tcp::socket&& socket)
  47. : ws_(std::move(socket))
  48. {
  49. }
  50. // Get on the correct executor
  51. void
  52. run()
  53. {
  54. // We need to be executing within a strand to perform async operations
  55. // on the I/O objects in this session. Although not strictly necessary
  56. // for single-threaded contexts, this example code is written to be
  57. // thread-safe by default.
  58. net::dispatch(ws_.get_executor(),
  59. beast::bind_front_handler(
  60. &session::on_run,
  61. shared_from_this()));
  62. }
  63. // Start the asynchronous operation
  64. void
  65. on_run()
  66. {
  67. // Set suggested timeout settings for the websocket
  68. ws_.set_option(
  69. websocket::stream_base::timeout::suggested(
  70. beast::role_type::server));
  71. // Set a decorator to change the Server of the handshake
  72. ws_.set_option(websocket::stream_base::decorator(
  73. [](websocket::response_type& res)
  74. {
  75. res.set(http::field::server,
  76. std::string(BOOST_BEAST_VERSION_STRING) +
  77. " websocket-server-async");
  78. }));
  79. // Accept the websocket handshake
  80. ws_.async_accept(
  81. beast::bind_front_handler(
  82. &session::on_accept,
  83. shared_from_this()));
  84. }
  85. void
  86. on_accept(beast::error_code ec)
  87. {
  88. if(ec)
  89. return fail(ec, "accept");
  90. // Read a message
  91. do_read();
  92. }
  93. void
  94. do_read()
  95. {
  96. // Read a message into our buffer
  97. ws_.async_read(
  98. buffer_,
  99. beast::bind_front_handler(
  100. &session::on_read,
  101. shared_from_this()));
  102. }
  103. void
  104. on_read(
  105. beast::error_code ec,
  106. std::size_t bytes_transferred)
  107. {
  108. boost::ignore_unused(bytes_transferred);
  109. // This indicates that the session was closed
  110. if(ec == websocket::error::closed)
  111. return;
  112. if(ec)
  113. fail(ec, "read");
  114. // Echo the message
  115. ws_.text(ws_.got_text());
  116. ws_.async_write(
  117. buffer_.data(),
  118. beast::bind_front_handler(
  119. &session::on_write,
  120. shared_from_this()));
  121. }
  122. void
  123. on_write(
  124. beast::error_code ec,
  125. std::size_t bytes_transferred)
  126. {
  127. boost::ignore_unused(bytes_transferred);
  128. if(ec)
  129. return fail(ec, "write");
  130. // Clear the buffer
  131. buffer_.consume(buffer_.size());
  132. // Do another read
  133. do_read();
  134. }
  135. };
  136. //------------------------------------------------------------------------------
  137. // Accepts incoming connections and launches the sessions
  138. class listener : public std::enable_shared_from_this<listener>
  139. {
  140. net::io_context& ioc_;
  141. tcp::acceptor acceptor_;
  142. public:
  143. listener(
  144. net::io_context& ioc,
  145. tcp::endpoint endpoint)
  146. : ioc_(ioc)
  147. , acceptor_(ioc)
  148. {
  149. beast::error_code ec;
  150. // Open the acceptor
  151. acceptor_.open(endpoint.protocol(), ec);
  152. if(ec)
  153. {
  154. fail(ec, "open");
  155. return;
  156. }
  157. // Allow address reuse
  158. acceptor_.set_option(net::socket_base::reuse_address(true), ec);
  159. if(ec)
  160. {
  161. fail(ec, "set_option");
  162. return;
  163. }
  164. // Bind to the server address
  165. acceptor_.bind(endpoint, ec);
  166. if(ec)
  167. {
  168. fail(ec, "bind");
  169. return;
  170. }
  171. // Start listening for connections
  172. acceptor_.listen(
  173. net::socket_base::max_listen_connections, ec);
  174. if(ec)
  175. {
  176. fail(ec, "listen");
  177. return;
  178. }
  179. }
  180. // Start accepting incoming connections
  181. void
  182. run()
  183. {
  184. do_accept();
  185. }
  186. private:
  187. void
  188. do_accept()
  189. {
  190. // The new connection gets its own strand
  191. acceptor_.async_accept(
  192. net::make_strand(ioc_),
  193. beast::bind_front_handler(
  194. &listener::on_accept,
  195. shared_from_this()));
  196. }
  197. void
  198. on_accept(beast::error_code ec, tcp::socket socket)
  199. {
  200. if(ec)
  201. {
  202. fail(ec, "accept");
  203. }
  204. else
  205. {
  206. // Create the session and run it
  207. std::make_shared<session>(std::move(socket))->run();
  208. }
  209. // Accept another connection
  210. do_accept();
  211. }
  212. };
  213. //------------------------------------------------------------------------------
  214. int main(int argc, char* argv[])
  215. {
  216. // Check command line arguments.
  217. if (argc != 4)
  218. {
  219. std::cerr <<
  220. "Usage: websocket-server-async <address> <port> <threads>\n" <<
  221. "Example:\n" <<
  222. " websocket-server-async 0.0.0.0 8080 1\n";
  223. return EXIT_FAILURE;
  224. }
  225. auto const address = net::ip::make_address(argv[1]);
  226. auto const port = static_cast<unsigned short>(std::atoi(argv[2]));
  227. auto const threads = std::max<int>(1, std::atoi(argv[3]));
  228. // The io_context is required for all I/O
  229. net::io_context ioc{threads};
  230. // Create and launch a listening port
  231. std::make_shared<listener>(ioc, tcp::endpoint{address, port})->run();
  232. // Run the I/O service on the requested number of threads
  233. std::vector<std::thread> v;
  234. v.reserve(threads - 1);
  235. for(auto i = threads - 1; i > 0; --i)
  236. v.emplace_back(
  237. [&ioc]
  238. {
  239. ioc.run();
  240. });
  241. ioc.run();
  242. return EXIT_SUCCESS;
  243. }