server.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. //
  2. // server.cpp
  3. // ~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2019 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #include <algorithm>
  11. #include <cstdlib>
  12. #include <deque>
  13. #include <iostream>
  14. #include <memory>
  15. #include <set>
  16. #include <string>
  17. #include <boost/asio/buffer.hpp>
  18. #include <boost/asio/io_context.hpp>
  19. #include <boost/asio/ip/tcp.hpp>
  20. #include <boost/asio/ip/udp.hpp>
  21. #include <boost/asio/read_until.hpp>
  22. #include <boost/asio/steady_timer.hpp>
  23. #include <boost/asio/write.hpp>
  24. using boost::asio::steady_timer;
  25. using boost::asio::ip::tcp;
  26. using boost::asio::ip::udp;
  27. //----------------------------------------------------------------------
  28. class subscriber
  29. {
  30. public:
  31. virtual ~subscriber() = default;
  32. virtual void deliver(const std::string& msg) = 0;
  33. };
  34. typedef std::shared_ptr<subscriber> subscriber_ptr;
  35. //----------------------------------------------------------------------
  36. class channel
  37. {
  38. public:
  39. void join(subscriber_ptr subscriber)
  40. {
  41. subscribers_.insert(subscriber);
  42. }
  43. void leave(subscriber_ptr subscriber)
  44. {
  45. subscribers_.erase(subscriber);
  46. }
  47. void deliver(const std::string& msg)
  48. {
  49. for (const auto& s : subscribers_)
  50. {
  51. s->deliver(msg);
  52. }
  53. }
  54. private:
  55. std::set<subscriber_ptr> subscribers_;
  56. };
  57. //----------------------------------------------------------------------
  58. //
  59. // This class manages socket timeouts by applying the concept of a deadline.
  60. // Some asynchronous operations are given deadlines by which they must complete.
  61. // Deadlines are enforced by two "actors" that persist for the lifetime of the
  62. // session object, one for input and one for output:
  63. //
  64. // +----------------+ +----------------+
  65. // | | | |
  66. // | check_deadline |<-------+ | check_deadline |<-------+
  67. // | | | | | |
  68. // +----------------+ | +----------------+ |
  69. // | | | |
  70. // async_wait() | +----------------+ async_wait() | +----------------+
  71. // on input | | lambda | on output | | lambda |
  72. // deadline +--->| in | deadline +--->| in |
  73. // | check_deadline | | check_deadline |
  74. // +----------------+ +----------------+
  75. //
  76. // If either deadline actor determines that the corresponding deadline has
  77. // expired, the socket is closed and any outstanding operations are cancelled.
  78. //
  79. // The input actor reads messages from the socket, where messages are delimited
  80. // by the newline character:
  81. //
  82. // +-------------+
  83. // | |
  84. // | read_line |<----+
  85. // | | |
  86. // +-------------+ |
  87. // | |
  88. // async_- | +-------------+
  89. // read_- | | lambda |
  90. // until() +--->| in |
  91. // | read_line |
  92. // +-------------+
  93. //
  94. // The deadline for receiving a complete message is 30 seconds. If a non-empty
  95. // message is received, it is delivered to all subscribers. If a heartbeat (a
  96. // message that consists of a single newline character) is received, a heartbeat
  97. // is enqueued for the client, provided there are no other messages waiting to
  98. // be sent.
  99. //
  100. // The output actor is responsible for sending messages to the client:
  101. //
  102. // +----------------+
  103. // | |<---------------------+
  104. // | await_output | |
  105. // | |<-------+ |
  106. // +----------------+ | |
  107. // | | | |
  108. // | async_- | +----------------+ |
  109. // | wait() | | lambda | |
  110. // | +->| in | |
  111. // | | await_output | |
  112. // | +----------------+ |
  113. // V |
  114. // +--------------+ +--------------+
  115. // | | async_write() | lambda |
  116. // | write_line |-------------->| in |
  117. // | | | write_line |
  118. // +--------------+ +--------------+
  119. //
  120. // The output actor first waits for an output message to be enqueued. It does
  121. // this by using a steady_timer as an asynchronous condition variable. The
  122. // steady_timer will be signalled whenever the output queue is non-empty.
  123. //
  124. // Once a message is available, it is sent to the client. The deadline for
  125. // sending a complete message is 30 seconds. After the message is successfully
  126. // sent, the output actor again waits for the output queue to become non-empty.
  127. //
  128. class tcp_session
  129. : public subscriber,
  130. public std::enable_shared_from_this<tcp_session>
  131. {
  132. public:
  133. tcp_session(tcp::socket socket, channel& ch)
  134. : channel_(ch),
  135. socket_(std::move(socket))
  136. {
  137. input_deadline_.expires_at(steady_timer::time_point::max());
  138. output_deadline_.expires_at(steady_timer::time_point::max());
  139. // The non_empty_output_queue_ steady_timer is set to the maximum time
  140. // point whenever the output queue is empty. This ensures that the output
  141. // actor stays asleep until a message is put into the queue.
  142. non_empty_output_queue_.expires_at(steady_timer::time_point::max());
  143. }
  144. // Called by the server object to initiate the four actors.
  145. void start()
  146. {
  147. channel_.join(shared_from_this());
  148. read_line();
  149. check_deadline(input_deadline_);
  150. await_output();
  151. check_deadline(output_deadline_);
  152. }
  153. private:
  154. void stop()
  155. {
  156. channel_.leave(shared_from_this());
  157. boost::system::error_code ignored_error;
  158. socket_.close(ignored_error);
  159. input_deadline_.cancel();
  160. non_empty_output_queue_.cancel();
  161. output_deadline_.cancel();
  162. }
  163. bool stopped() const
  164. {
  165. return !socket_.is_open();
  166. }
  167. void deliver(const std::string& msg) override
  168. {
  169. output_queue_.push_back(msg + "\n");
  170. // Signal that the output queue contains messages. Modifying the expiry
  171. // will wake the output actor, if it is waiting on the timer.
  172. non_empty_output_queue_.expires_at(steady_timer::time_point::min());
  173. }
  174. void read_line()
  175. {
  176. // Set a deadline for the read operation.
  177. input_deadline_.expires_after(std::chrono::seconds(30));
  178. // Start an asynchronous operation to read a newline-delimited message.
  179. auto self(shared_from_this());
  180. boost::asio::async_read_until(socket_,
  181. boost::asio::dynamic_buffer(input_buffer_), '\n',
  182. [this, self](const boost::system::error_code& error, std::size_t n)
  183. {
  184. // Check if the session was stopped while the operation was pending.
  185. if (stopped())
  186. return;
  187. if (!error)
  188. {
  189. // Extract the newline-delimited message from the buffer.
  190. std::string msg(input_buffer_.substr(0, n - 1));
  191. input_buffer_.erase(0, n);
  192. if (!msg.empty())
  193. {
  194. channel_.deliver(msg);
  195. }
  196. else
  197. {
  198. // We received a heartbeat message from the client. If there's
  199. // nothing else being sent or ready to be sent, send a heartbeat
  200. // right back.
  201. if (output_queue_.empty())
  202. {
  203. output_queue_.push_back("\n");
  204. // Signal that the output queue contains messages. Modifying
  205. // the expiry will wake the output actor, if it is waiting on
  206. // the timer.
  207. non_empty_output_queue_.expires_at(
  208. steady_timer::time_point::min());
  209. }
  210. }
  211. read_line();
  212. }
  213. else
  214. {
  215. stop();
  216. }
  217. });
  218. }
  219. void await_output()
  220. {
  221. auto self(shared_from_this());
  222. non_empty_output_queue_.async_wait(
  223. [this, self](const boost::system::error_code& /*error*/)
  224. {
  225. // Check if the session was stopped while the operation was pending.
  226. if (stopped())
  227. return;
  228. if (output_queue_.empty())
  229. {
  230. // There are no messages that are ready to be sent. The actor goes
  231. // to sleep by waiting on the non_empty_output_queue_ timer. When a
  232. // new message is added, the timer will be modified and the actor
  233. // will wake.
  234. non_empty_output_queue_.expires_at(steady_timer::time_point::max());
  235. await_output();
  236. }
  237. else
  238. {
  239. write_line();
  240. }
  241. });
  242. }
  243. void write_line()
  244. {
  245. // Set a deadline for the write operation.
  246. output_deadline_.expires_after(std::chrono::seconds(30));
  247. // Start an asynchronous operation to send a message.
  248. auto self(shared_from_this());
  249. boost::asio::async_write(socket_,
  250. boost::asio::buffer(output_queue_.front()),
  251. [this, self](const boost::system::error_code& error, std::size_t /*n*/)
  252. {
  253. // Check if the session was stopped while the operation was pending.
  254. if (stopped())
  255. return;
  256. if (!error)
  257. {
  258. output_queue_.pop_front();
  259. await_output();
  260. }
  261. else
  262. {
  263. stop();
  264. }
  265. });
  266. }
  267. void check_deadline(steady_timer& deadline)
  268. {
  269. auto self(shared_from_this());
  270. deadline.async_wait(
  271. [this, self, &deadline](const boost::system::error_code& /*error*/)
  272. {
  273. // Check if the session was stopped while the operation was pending.
  274. if (stopped())
  275. return;
  276. // Check whether the deadline has passed. We compare the deadline
  277. // against the current time since a new asynchronous operation may
  278. // have moved the deadline before this actor had a chance to run.
  279. if (deadline.expiry() <= steady_timer::clock_type::now())
  280. {
  281. // The deadline has passed. Stop the session. The other actors will
  282. // terminate as soon as possible.
  283. stop();
  284. }
  285. else
  286. {
  287. // Put the actor back to sleep.
  288. check_deadline(deadline);
  289. }
  290. });
  291. }
  292. channel& channel_;
  293. tcp::socket socket_;
  294. std::string input_buffer_;
  295. steady_timer input_deadline_{socket_.get_executor()};
  296. std::deque<std::string> output_queue_;
  297. steady_timer non_empty_output_queue_{socket_.get_executor()};
  298. steady_timer output_deadline_{socket_.get_executor()};
  299. };
  300. typedef std::shared_ptr<tcp_session> tcp_session_ptr;
  301. //----------------------------------------------------------------------
  302. class udp_broadcaster
  303. : public subscriber
  304. {
  305. public:
  306. udp_broadcaster(boost::asio::io_context& io_context,
  307. const udp::endpoint& broadcast_endpoint)
  308. : socket_(io_context)
  309. {
  310. socket_.connect(broadcast_endpoint);
  311. socket_.set_option(udp::socket::broadcast(true));
  312. }
  313. private:
  314. void deliver(const std::string& msg)
  315. {
  316. boost::system::error_code ignored_error;
  317. socket_.send(boost::asio::buffer(msg), 0, ignored_error);
  318. }
  319. udp::socket socket_;
  320. };
  321. //----------------------------------------------------------------------
  322. class server
  323. {
  324. public:
  325. server(boost::asio::io_context& io_context,
  326. const tcp::endpoint& listen_endpoint,
  327. const udp::endpoint& broadcast_endpoint)
  328. : io_context_(io_context),
  329. acceptor_(io_context, listen_endpoint)
  330. {
  331. channel_.join(
  332. std::make_shared<udp_broadcaster>(
  333. io_context_, broadcast_endpoint));
  334. accept();
  335. }
  336. private:
  337. void accept()
  338. {
  339. acceptor_.async_accept(
  340. [this](const boost::system::error_code& error, tcp::socket socket)
  341. {
  342. if (!error)
  343. {
  344. std::make_shared<tcp_session>(std::move(socket), channel_)->start();
  345. }
  346. accept();
  347. });
  348. }
  349. boost::asio::io_context& io_context_;
  350. tcp::acceptor acceptor_;
  351. channel channel_;
  352. };
  353. //----------------------------------------------------------------------
  354. int main(int argc, char* argv[])
  355. {
  356. try
  357. {
  358. using namespace std; // For atoi.
  359. if (argc != 4)
  360. {
  361. std::cerr << "Usage: server <listen_port> <bcast_address> <bcast_port>\n";
  362. return 1;
  363. }
  364. boost::asio::io_context io_context;
  365. tcp::endpoint listen_endpoint(tcp::v4(), atoi(argv[1]));
  366. udp::endpoint broadcast_endpoint(
  367. boost::asio::ip::make_address(argv[2]), atoi(argv[3]));
  368. server s(io_context, listen_endpoint, broadcast_endpoint);
  369. io_context.run();
  370. }
  371. catch (std::exception& e)
  372. {
  373. std::cerr << "Exception: " << e.what() << "\n";
  374. }
  375. return 0;
  376. }