close.hpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. //
  2. // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. #ifndef BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_HPP
  10. #define BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_HPP
  11. #include <boost/beast/websocket/teardown.hpp>
  12. #include <boost/beast/websocket/detail/mask.hpp>
  13. #include <boost/beast/websocket/impl/stream_impl.hpp>
  14. #include <boost/beast/core/async_base.hpp>
  15. #include <boost/beast/core/flat_static_buffer.hpp>
  16. #include <boost/beast/core/stream_traits.hpp>
  17. #include <boost/beast/core/detail/bind_continuation.hpp>
  18. #include <boost/asio/coroutine.hpp>
  19. #include <boost/asio/post.hpp>
  20. #include <boost/throw_exception.hpp>
  21. #include <memory>
  22. namespace boost {
  23. namespace beast {
  24. namespace websocket {
  25. /* Close the WebSocket Connection
  26. This composed operation sends the close frame if it hasn't already
  27. been sent, then reads and discards frames until receiving a close
  28. frame. Finally it invokes the teardown operation to shut down the
  29. underlying connection.
  30. */
  31. template<class NextLayer, bool deflateSupported>
  32. template<class Handler>
  33. class stream<NextLayer, deflateSupported>::close_op
  34. : public beast::stable_async_base<
  35. Handler, beast::executor_type<stream>>
  36. , public asio::coroutine
  37. {
  38. boost::weak_ptr<impl_type> wp_;
  39. error_code ev_;
  40. detail::frame_buffer& fb_;
  41. public:
  42. static constexpr int id = 5; // for soft_mutex
  43. template<class Handler_>
  44. close_op(
  45. Handler_&& h,
  46. boost::shared_ptr<impl_type> const& sp,
  47. close_reason const& cr)
  48. : stable_async_base<Handler,
  49. beast::executor_type<stream>>(
  50. std::forward<Handler_>(h),
  51. sp->stream().get_executor())
  52. , wp_(sp)
  53. , fb_(beast::allocate_stable<
  54. detail::frame_buffer>(*this))
  55. {
  56. // Serialize the close frame
  57. sp->template write_close<
  58. flat_static_buffer_base>(fb_, cr);
  59. (*this)({}, 0, false);
  60. }
  61. void
  62. operator()(
  63. error_code ec = {},
  64. std::size_t bytes_transferred = 0,
  65. bool cont = true)
  66. {
  67. using beast::detail::clamp;
  68. auto sp = wp_.lock();
  69. if(! sp)
  70. {
  71. ec = net::error::operation_aborted;
  72. return this->complete(cont, ec);
  73. }
  74. auto& impl = *sp;
  75. BOOST_ASIO_CORO_REENTER(*this)
  76. {
  77. // Acquire the write lock
  78. if(! impl.wr_block.try_lock(this))
  79. {
  80. BOOST_ASIO_CORO_YIELD
  81. impl.op_close.emplace(std::move(*this));
  82. impl.wr_block.lock(this);
  83. BOOST_ASIO_CORO_YIELD
  84. net::post(std::move(*this));
  85. BOOST_ASSERT(impl.wr_block.is_locked(this));
  86. }
  87. if(impl.check_stop_now(ec))
  88. goto upcall;
  89. // Can't call close twice
  90. // TODO return a custom error code
  91. BOOST_ASSERT(! impl.wr_close);
  92. // Send close frame
  93. impl.wr_close = true;
  94. impl.change_status(status::closing);
  95. impl.update_timer(this->get_executor());
  96. BOOST_ASIO_CORO_YIELD
  97. net::async_write(impl.stream(), fb_.data(),
  98. beast::detail::bind_continuation(std::move(*this)));
  99. if(impl.check_stop_now(ec))
  100. goto upcall;
  101. if(impl.rd_close)
  102. {
  103. // This happens when the read_op gets a close frame
  104. // at the same time close_op is sending the close frame.
  105. // The read_op will be suspended on the write block.
  106. goto teardown;
  107. }
  108. // Acquire the read lock
  109. if(! impl.rd_block.try_lock(this))
  110. {
  111. BOOST_ASIO_CORO_YIELD
  112. impl.op_r_close.emplace(std::move(*this));
  113. impl.rd_block.lock(this);
  114. BOOST_ASIO_CORO_YIELD
  115. net::post(std::move(*this));
  116. BOOST_ASSERT(impl.rd_block.is_locked(this));
  117. if(impl.check_stop_now(ec))
  118. goto upcall;
  119. BOOST_ASSERT(! impl.rd_close);
  120. }
  121. // Read until a receiving a close frame
  122. // TODO There should be a timeout on this
  123. if(impl.rd_remain > 0)
  124. goto read_payload;
  125. for(;;)
  126. {
  127. // Read frame header
  128. while(! impl.parse_fh(
  129. impl.rd_fh, impl.rd_buf, ev_))
  130. {
  131. if(ev_)
  132. goto teardown;
  133. BOOST_ASIO_CORO_YIELD
  134. impl.stream().async_read_some(
  135. impl.rd_buf.prepare(read_size(
  136. impl.rd_buf, impl.rd_buf.max_size())),
  137. beast::detail::bind_continuation(std::move(*this)));
  138. impl.rd_buf.commit(bytes_transferred);
  139. if(impl.check_stop_now(ec))
  140. goto upcall;
  141. }
  142. if(detail::is_control(impl.rd_fh.op))
  143. {
  144. // Discard ping or pong frame
  145. if(impl.rd_fh.op != detail::opcode::close)
  146. {
  147. impl.rd_buf.consume(clamp(impl.rd_fh.len));
  148. continue;
  149. }
  150. // Process close frame
  151. // TODO Should we invoke the control callback?
  152. BOOST_ASSERT(! impl.rd_close);
  153. impl.rd_close = true;
  154. auto const mb = buffers_prefix(
  155. clamp(impl.rd_fh.len),
  156. impl.rd_buf.data());
  157. if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
  158. detail::mask_inplace(mb, impl.rd_key);
  159. detail::read_close(impl.cr, mb, ev_);
  160. if(ev_)
  161. goto teardown;
  162. impl.rd_buf.consume(clamp(impl.rd_fh.len));
  163. goto teardown;
  164. }
  165. read_payload:
  166. // Discard message frame
  167. while(impl.rd_buf.size() < impl.rd_remain)
  168. {
  169. impl.rd_remain -= impl.rd_buf.size();
  170. impl.rd_buf.consume(impl.rd_buf.size());
  171. BOOST_ASIO_CORO_YIELD
  172. impl.stream().async_read_some(
  173. impl.rd_buf.prepare(read_size(
  174. impl.rd_buf, impl.rd_buf.max_size())),
  175. beast::detail::bind_continuation(std::move(*this)));
  176. impl.rd_buf.commit(bytes_transferred);
  177. if(impl.check_stop_now(ec))
  178. goto upcall;
  179. }
  180. BOOST_ASSERT(impl.rd_buf.size() >= impl.rd_remain);
  181. impl.rd_buf.consume(clamp(impl.rd_remain));
  182. impl.rd_remain = 0;
  183. }
  184. teardown:
  185. // Teardown
  186. BOOST_ASSERT(impl.wr_block.is_locked(this));
  187. using beast::websocket::async_teardown;
  188. BOOST_ASIO_CORO_YIELD
  189. async_teardown(impl.role, impl.stream(),
  190. beast::detail::bind_continuation(std::move(*this)));
  191. BOOST_ASSERT(impl.wr_block.is_locked(this));
  192. if(ec == net::error::eof)
  193. {
  194. // Rationale:
  195. // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
  196. ec = {};
  197. }
  198. if(! ec)
  199. ec = ev_;
  200. if(ec)
  201. impl.change_status(status::failed);
  202. else
  203. impl.change_status(status::closed);
  204. impl.close();
  205. upcall:
  206. impl.wr_block.unlock(this);
  207. impl.rd_block.try_unlock(this)
  208. && impl.op_r_rd.maybe_invoke();
  209. impl.op_rd.maybe_invoke()
  210. || impl.op_idle_ping.maybe_invoke()
  211. || impl.op_ping.maybe_invoke()
  212. || impl.op_wr.maybe_invoke();
  213. this->complete(cont, ec);
  214. }
  215. }
  216. };
  217. template<class NextLayer, bool deflateSupported>
  218. struct stream<NextLayer, deflateSupported>::
  219. run_close_op
  220. {
  221. template<class CloseHandler>
  222. void
  223. operator()(
  224. CloseHandler&& h,
  225. boost::shared_ptr<impl_type> const& sp,
  226. close_reason const& cr)
  227. {
  228. // If you get an error on the following line it means
  229. // that your handler does not meet the documented type
  230. // requirements for the handler.
  231. static_assert(
  232. beast::detail::is_invocable<CloseHandler,
  233. void(error_code)>::value,
  234. "CloseHandler type requirements not met");
  235. close_op<
  236. typename std::decay<CloseHandler>::type>(
  237. std::forward<CloseHandler>(h),
  238. sp,
  239. cr);
  240. }
  241. };
  242. //------------------------------------------------------------------------------
  243. template<class NextLayer, bool deflateSupported>
  244. void
  245. stream<NextLayer, deflateSupported>::
  246. close(close_reason const& cr)
  247. {
  248. static_assert(is_sync_stream<next_layer_type>::value,
  249. "SyncStream type requirements not met");
  250. error_code ec;
  251. close(cr, ec);
  252. if(ec)
  253. BOOST_THROW_EXCEPTION(system_error{ec});
  254. }
  255. template<class NextLayer, bool deflateSupported>
  256. void
  257. stream<NextLayer, deflateSupported>::
  258. close(close_reason const& cr, error_code& ec)
  259. {
  260. static_assert(is_sync_stream<next_layer_type>::value,
  261. "SyncStream type requirements not met");
  262. using beast::detail::clamp;
  263. auto& impl = *impl_;
  264. ec = {};
  265. if(impl.check_stop_now(ec))
  266. return;
  267. BOOST_ASSERT(! impl.rd_close);
  268. // Can't call close twice
  269. // TODO return a custom error code
  270. BOOST_ASSERT(! impl.wr_close);
  271. // Send close frame
  272. {
  273. impl.wr_close = true;
  274. impl.change_status(status::closing);
  275. detail::frame_buffer fb;
  276. impl.template write_close<flat_static_buffer_base>(fb, cr);
  277. net::write(impl.stream(), fb.data(), ec);
  278. if(impl.check_stop_now(ec))
  279. return;
  280. }
  281. // Read until a receiving a close frame
  282. error_code ev;
  283. if(impl.rd_remain > 0)
  284. goto read_payload;
  285. for(;;)
  286. {
  287. // Read frame header
  288. while(! impl.parse_fh(
  289. impl.rd_fh, impl.rd_buf, ev))
  290. {
  291. if(ev)
  292. {
  293. // Protocol violation
  294. return do_fail(close_code::none, ev, ec);
  295. }
  296. impl.rd_buf.commit(impl.stream().read_some(
  297. impl.rd_buf.prepare(read_size(
  298. impl.rd_buf, impl.rd_buf.max_size())), ec));
  299. if(impl.check_stop_now(ec))
  300. return;
  301. }
  302. if(detail::is_control(impl.rd_fh.op))
  303. {
  304. // Discard ping/pong frame
  305. if(impl.rd_fh.op != detail::opcode::close)
  306. {
  307. impl.rd_buf.consume(clamp(impl.rd_fh.len));
  308. continue;
  309. }
  310. // Handle close frame
  311. // TODO Should we invoke the control callback?
  312. BOOST_ASSERT(! impl.rd_close);
  313. impl.rd_close = true;
  314. auto const mb = buffers_prefix(
  315. clamp(impl.rd_fh.len),
  316. impl.rd_buf.data());
  317. if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
  318. detail::mask_inplace(mb, impl.rd_key);
  319. detail::read_close(impl.cr, mb, ev);
  320. if(ev)
  321. {
  322. // Protocol violation
  323. return do_fail(close_code::none, ev, ec);
  324. }
  325. impl.rd_buf.consume(clamp(impl.rd_fh.len));
  326. break;
  327. }
  328. read_payload:
  329. // Discard message frame
  330. while(impl.rd_buf.size() < impl.rd_remain)
  331. {
  332. impl.rd_remain -= impl.rd_buf.size();
  333. impl.rd_buf.consume(impl.rd_buf.size());
  334. impl.rd_buf.commit(
  335. impl.stream().read_some(
  336. impl.rd_buf.prepare(
  337. read_size(
  338. impl.rd_buf,
  339. impl.rd_buf.max_size())),
  340. ec));
  341. if(impl.check_stop_now(ec))
  342. return;
  343. }
  344. BOOST_ASSERT(
  345. impl.rd_buf.size() >= impl.rd_remain);
  346. impl.rd_buf.consume(clamp(impl.rd_remain));
  347. impl.rd_remain = 0;
  348. }
  349. // _Close the WebSocket Connection_
  350. do_fail(close_code::none, error::closed, ec);
  351. if(ec == error::closed)
  352. ec = {};
  353. }
  354. template<class NextLayer, bool deflateSupported>
  355. template<class CloseHandler>
  356. BOOST_BEAST_ASYNC_RESULT1(CloseHandler)
  357. stream<NextLayer, deflateSupported>::
  358. async_close(close_reason const& cr, CloseHandler&& handler)
  359. {
  360. static_assert(is_async_stream<next_layer_type>::value,
  361. "AsyncStream type requirements not met");
  362. return net::async_initiate<
  363. CloseHandler,
  364. void(error_code)>(
  365. run_close_op{},
  366. handler,
  367. impl_,
  368. cr);
  369. }
  370. } // websocket
  371. } // beast
  372. } // boost
  373. #endif