async_tcp_client.cpp 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. //
  2. // async_tcp_client.cpp
  3. // ~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2019 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #include <boost/asio/buffer.hpp>
  11. #include <boost/asio/io_context.hpp>
  12. #include <boost/asio/ip/tcp.hpp>
  13. #include <boost/asio/read_until.hpp>
  14. #include <boost/asio/steady_timer.hpp>
  15. #include <boost/asio/write.hpp>
  16. #include <functional>
  17. #include <iostream>
  18. #include <string>
  19. using boost::asio::steady_timer;
  20. using boost::asio::ip::tcp;
  21. using std::placeholders::_1;
  22. using std::placeholders::_2;
  23. //
  24. // This class manages socket timeouts by applying the concept of a deadline.
  25. // Some asynchronous operations are given deadlines by which they must complete.
  26. // Deadlines are enforced by an "actor" that persists for the lifetime of the
  27. // client object:
  28. //
  29. // +----------------+
  30. // | |
  31. // | check_deadline |<---+
  32. // | | |
  33. // +----------------+ | async_wait()
  34. // | |
  35. // +---------+
  36. //
  37. // If the deadline actor determines that the deadline has expired, the socket
  38. // is closed and any outstanding operations are consequently cancelled.
  39. //
  40. // Connection establishment involves trying each endpoint in turn until a
  41. // connection is successful, or the available endpoints are exhausted. If the
  42. // deadline actor closes the socket, the connect actor is woken up and moves to
  43. // the next endpoint.
  44. //
  45. // +---------------+
  46. // | |
  47. // | start_connect |<---+
  48. // | | |
  49. // +---------------+ |
  50. // | |
  51. // async_- | +----------------+
  52. // connect() | | |
  53. // +--->| handle_connect |
  54. // | |
  55. // +----------------+
  56. // :
  57. // Once a connection is :
  58. // made, the connect :
  59. // actor forks in two - :
  60. // :
  61. // an actor for reading : and an actor for
  62. // inbound messages: : sending heartbeats:
  63. // :
  64. // +------------+ : +-------------+
  65. // | |<- - - - -+- - - - ->| |
  66. // | start_read | | start_write |<---+
  67. // | |<---+ | | |
  68. // +------------+ | +-------------+ | async_wait()
  69. // | | | |
  70. // async_- | +-------------+ async_- | +--------------+
  71. // read_- | | | write() | | |
  72. // until() +--->| handle_read | +--->| handle_write |
  73. // | | | |
  74. // +-------------+ +--------------+
  75. //
  76. // The input actor reads messages from the socket, where messages are delimited
  77. // by the newline character. The deadline for a complete message is 30 seconds.
  78. //
  79. // The heartbeat actor sends a heartbeat (a message that consists of a single
  80. // newline character) every 10 seconds. In this example, no deadline is applied
  81. // to message sending.
  82. //
  83. class client
  84. {
  85. public:
  86. client(boost::asio::io_context& io_context)
  87. : socket_(io_context),
  88. deadline_(io_context),
  89. heartbeat_timer_(io_context)
  90. {
  91. }
  92. // Called by the user of the client class to initiate the connection process.
  93. // The endpoints will have been obtained using a tcp::resolver.
  94. void start(tcp::resolver::results_type endpoints)
  95. {
  96. // Start the connect actor.
  97. endpoints_ = endpoints;
  98. start_connect(endpoints_.begin());
  99. // Start the deadline actor. You will note that we're not setting any
  100. // particular deadline here. Instead, the connect and input actors will
  101. // update the deadline prior to each asynchronous operation.
  102. deadline_.async_wait(std::bind(&client::check_deadline, this));
  103. }
  104. // This function terminates all the actors to shut down the connection. It
  105. // may be called by the user of the client class, or by the class itself in
  106. // response to graceful termination or an unrecoverable error.
  107. void stop()
  108. {
  109. stopped_ = true;
  110. boost::system::error_code ignored_error;
  111. socket_.close(ignored_error);
  112. deadline_.cancel();
  113. heartbeat_timer_.cancel();
  114. }
  115. private:
  116. void start_connect(tcp::resolver::results_type::iterator endpoint_iter)
  117. {
  118. if (endpoint_iter != endpoints_.end())
  119. {
  120. std::cout << "Trying " << endpoint_iter->endpoint() << "...\n";
  121. // Set a deadline for the connect operation.
  122. deadline_.expires_after(std::chrono::seconds(60));
  123. // Start the asynchronous connect operation.
  124. socket_.async_connect(endpoint_iter->endpoint(),
  125. std::bind(&client::handle_connect,
  126. this, _1, endpoint_iter));
  127. }
  128. else
  129. {
  130. // There are no more endpoints to try. Shut down the client.
  131. stop();
  132. }
  133. }
  134. void handle_connect(const boost::system::error_code& error,
  135. tcp::resolver::results_type::iterator endpoint_iter)
  136. {
  137. if (stopped_)
  138. return;
  139. // The async_connect() function automatically opens the socket at the start
  140. // of the asynchronous operation. If the socket is closed at this time then
  141. // the timeout handler must have run first.
  142. if (!socket_.is_open())
  143. {
  144. std::cout << "Connect timed out\n";
  145. // Try the next available endpoint.
  146. start_connect(++endpoint_iter);
  147. }
  148. // Check if the connect operation failed before the deadline expired.
  149. else if (error)
  150. {
  151. std::cout << "Connect error: " << error.message() << "\n";
  152. // We need to close the socket used in the previous connection attempt
  153. // before starting a new one.
  154. socket_.close();
  155. // Try the next available endpoint.
  156. start_connect(++endpoint_iter);
  157. }
  158. // Otherwise we have successfully established a connection.
  159. else
  160. {
  161. std::cout << "Connected to " << endpoint_iter->endpoint() << "\n";
  162. // Start the input actor.
  163. start_read();
  164. // Start the heartbeat actor.
  165. start_write();
  166. }
  167. }
  168. void start_read()
  169. {
  170. // Set a deadline for the read operation.
  171. deadline_.expires_after(std::chrono::seconds(30));
  172. // Start an asynchronous operation to read a newline-delimited message.
  173. boost::asio::async_read_until(socket_,
  174. boost::asio::dynamic_buffer(input_buffer_), '\n',
  175. std::bind(&client::handle_read, this, _1, _2));
  176. }
  177. void handle_read(const boost::system::error_code& error, std::size_t n)
  178. {
  179. if (stopped_)
  180. return;
  181. if (!error)
  182. {
  183. // Extract the newline-delimited message from the buffer.
  184. std::string line(input_buffer_.substr(0, n - 1));
  185. input_buffer_.erase(0, n);
  186. // Empty messages are heartbeats and so ignored.
  187. if (!line.empty())
  188. {
  189. std::cout << "Received: " << line << "\n";
  190. }
  191. start_read();
  192. }
  193. else
  194. {
  195. std::cout << "Error on receive: " << error.message() << "\n";
  196. stop();
  197. }
  198. }
  199. void start_write()
  200. {
  201. if (stopped_)
  202. return;
  203. // Start an asynchronous operation to send a heartbeat message.
  204. boost::asio::async_write(socket_, boost::asio::buffer("\n", 1),
  205. std::bind(&client::handle_write, this, _1));
  206. }
  207. void handle_write(const boost::system::error_code& error)
  208. {
  209. if (stopped_)
  210. return;
  211. if (!error)
  212. {
  213. // Wait 10 seconds before sending the next heartbeat.
  214. heartbeat_timer_.expires_after(std::chrono::seconds(10));
  215. heartbeat_timer_.async_wait(std::bind(&client::start_write, this));
  216. }
  217. else
  218. {
  219. std::cout << "Error on heartbeat: " << error.message() << "\n";
  220. stop();
  221. }
  222. }
  223. void check_deadline()
  224. {
  225. if (stopped_)
  226. return;
  227. // Check whether the deadline has passed. We compare the deadline against
  228. // the current time since a new asynchronous operation may have moved the
  229. // deadline before this actor had a chance to run.
  230. if (deadline_.expiry() <= steady_timer::clock_type::now())
  231. {
  232. // The deadline has passed. The socket is closed so that any outstanding
  233. // asynchronous operations are cancelled.
  234. socket_.close();
  235. // There is no longer an active deadline. The expiry is set to the
  236. // maximum time point so that the actor takes no action until a new
  237. // deadline is set.
  238. deadline_.expires_at(steady_timer::time_point::max());
  239. }
  240. // Put the actor back to sleep.
  241. deadline_.async_wait(std::bind(&client::check_deadline, this));
  242. }
  243. private:
  244. bool stopped_ = false;
  245. tcp::resolver::results_type endpoints_;
  246. tcp::socket socket_;
  247. std::string input_buffer_;
  248. steady_timer deadline_;
  249. steady_timer heartbeat_timer_;
  250. };
  251. int main(int argc, char* argv[])
  252. {
  253. try
  254. {
  255. if (argc != 3)
  256. {
  257. std::cerr << "Usage: client <host> <port>\n";
  258. return 1;
  259. }
  260. boost::asio::io_context io_context;
  261. tcp::resolver r(io_context);
  262. client c(io_context);
  263. c.start(r.resolve(argv[1], argv[2]));
  264. io_context.run();
  265. }
  266. catch (std::exception& e)
  267. {
  268. std::cerr << "Exception: " << e.what() << "\n";
  269. }
  270. return 0;
  271. }