server.cpp 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. //
  2. // server.cpp
  3. // ~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2019 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #include <boost/asio.hpp>
  11. #include <boost/bind.hpp>
  12. #include <boost/shared_ptr.hpp>
  13. #include <cmath>
  14. #include <cstdlib>
  15. #include <exception>
  16. #include <iostream>
  17. #include <set>
  18. #include "protocol.hpp"
  19. using boost::asio::ip::tcp;
  20. using boost::asio::ip::udp;
  21. typedef boost::shared_ptr<tcp::socket> tcp_socket_ptr;
  22. typedef boost::shared_ptr<boost::asio::steady_timer> timer_ptr;
  23. typedef boost::shared_ptr<control_request> control_request_ptr;
  24. class server
  25. {
  26. public:
  27. // Construct the server to wait for incoming control connections.
  28. server(boost::asio::io_context& io_context, unsigned short port)
  29. : acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
  30. timer_(io_context),
  31. udp_socket_(io_context, udp::endpoint(udp::v4(), 0)),
  32. next_frame_number_(1)
  33. {
  34. // Start waiting for a new control connection.
  35. tcp_socket_ptr new_socket(new tcp::socket(acceptor_.get_executor()));
  36. acceptor_.async_accept(*new_socket,
  37. boost::bind(&server::handle_accept, this,
  38. boost::asio::placeholders::error, new_socket));
  39. // Start the timer used to generate outgoing frames.
  40. timer_.expires_after(boost::asio::chrono::milliseconds(100));
  41. timer_.async_wait(boost::bind(&server::handle_timer, this));
  42. }
  43. // Handle a new control connection.
  44. void handle_accept(const boost::system::error_code& ec, tcp_socket_ptr socket)
  45. {
  46. if (!ec)
  47. {
  48. // Start receiving control requests on the connection.
  49. control_request_ptr request(new control_request);
  50. boost::asio::async_read(*socket, request->to_buffers(),
  51. boost::bind(&server::handle_control_request, this,
  52. boost::asio::placeholders::error, socket, request));
  53. }
  54. // Start waiting for a new control connection.
  55. tcp_socket_ptr new_socket(new tcp::socket(acceptor_.get_executor()));
  56. acceptor_.async_accept(*new_socket,
  57. boost::bind(&server::handle_accept, this,
  58. boost::asio::placeholders::error, new_socket));
  59. }
  60. // Handle a new control request.
  61. void handle_control_request(const boost::system::error_code& ec,
  62. tcp_socket_ptr socket, control_request_ptr request)
  63. {
  64. if (!ec)
  65. {
  66. // Delay handling of the control request to simulate network latency.
  67. timer_ptr delay_timer(
  68. new boost::asio::steady_timer(acceptor_.get_executor()));
  69. delay_timer->expires_after(boost::asio::chrono::seconds(2));
  70. delay_timer->async_wait(
  71. boost::bind(&server::handle_control_request_timer, this,
  72. socket, request, delay_timer));
  73. }
  74. }
  75. void handle_control_request_timer(tcp_socket_ptr socket,
  76. control_request_ptr request, timer_ptr /*delay_timer*/)
  77. {
  78. // Determine what address this client is connected from, since
  79. // subscriptions must be stored on the server as a complete endpoint, not
  80. // just a port. We use the non-throwing overload of remote_endpoint() since
  81. // it may fail if the socket is no longer connected.
  82. boost::system::error_code ec;
  83. tcp::endpoint remote_endpoint = socket->remote_endpoint(ec);
  84. if (!ec)
  85. {
  86. // Remove old port subscription, if any.
  87. if (unsigned short old_port = request->old_port())
  88. {
  89. udp::endpoint old_endpoint(remote_endpoint.address(), old_port);
  90. subscribers_.erase(old_endpoint);
  91. std::cout << "Removing subscription " << old_endpoint << std::endl;
  92. }
  93. // Add new port subscription, if any.
  94. if (unsigned short new_port = request->new_port())
  95. {
  96. udp::endpoint new_endpoint(remote_endpoint.address(), new_port);
  97. subscribers_.insert(new_endpoint);
  98. std::cout << "Adding subscription " << new_endpoint << std::endl;
  99. }
  100. }
  101. // Wait for next control request on this connection.
  102. boost::asio::async_read(*socket, request->to_buffers(),
  103. boost::bind(&server::handle_control_request, this,
  104. boost::asio::placeholders::error, socket, request));
  105. }
  106. // Every time the timer fires we will generate a new frame and send it to all
  107. // subscribers.
  108. void handle_timer()
  109. {
  110. // Generate payload.
  111. double x = next_frame_number_ * 0.2;
  112. double y = std::sin(x);
  113. int char_index = static_cast<int>((y + 1.0) * (frame::payload_size / 2));
  114. std::string payload;
  115. for (int i = 0; i < frame::payload_size; ++i)
  116. payload += (i == char_index ? '*' : '.');
  117. // Create the frame to be sent to all subscribers.
  118. frame f(next_frame_number_++, payload);
  119. // Send frame to all subscribers. We can use synchronous calls here since
  120. // UDP send operations typically do not block.
  121. std::set<udp::endpoint>::iterator j;
  122. for (j = subscribers_.begin(); j != subscribers_.end(); ++j)
  123. {
  124. boost::system::error_code ec;
  125. udp_socket_.send_to(f.to_buffers(), *j, 0, ec);
  126. }
  127. // Wait for next timeout.
  128. timer_.expires_after(boost::asio::chrono::milliseconds(100));
  129. timer_.async_wait(boost::bind(&server::handle_timer, this));
  130. }
  131. private:
  132. // The acceptor used to accept incoming control connections.
  133. tcp::acceptor acceptor_;
  134. // The timer used for generating data.
  135. boost::asio::steady_timer timer_;
  136. // The socket used to send data to subscribers.
  137. udp::socket udp_socket_;
  138. // The next frame number.
  139. unsigned long next_frame_number_;
  140. // The set of endpoints that are subscribed.
  141. std::set<udp::endpoint> subscribers_;
  142. };
  143. int main(int argc, char* argv[])
  144. {
  145. try
  146. {
  147. if (argc != 2)
  148. {
  149. std::cerr << "Usage: server <port>\n";
  150. return 1;
  151. }
  152. boost::asio::io_context io_context;
  153. using namespace std; // For atoi.
  154. server s(io_context, atoi(argv[1]));
  155. io_context.run();
  156. }
  157. catch (std::exception& e)
  158. {
  159. std::cerr << "Exception: " << e.what() << std::endl;
  160. }
  161. return 0;
  162. }