123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433 |
- //
- // server.cpp
- // ~~~~~~~~~~
- //
- // Copyright (c) 2003-2019 Christopher M. Kohlhoff (chris at kohlhoff dot com)
- //
- // Distributed under the Boost Software License, Version 1.0. (See accompanying
- // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
- //
- #include <algorithm>
- #include <cstdlib>
- #include <deque>
- #include <iostream>
- #include <memory>
- #include <set>
- #include <string>
- #include <boost/asio/buffer.hpp>
- #include <boost/asio/io_context.hpp>
- #include <boost/asio/ip/tcp.hpp>
- #include <boost/asio/ip/udp.hpp>
- #include <boost/asio/read_until.hpp>
- #include <boost/asio/steady_timer.hpp>
- #include <boost/asio/write.hpp>
- using boost::asio::steady_timer;
- using boost::asio::ip::tcp;
- using boost::asio::ip::udp;
- //----------------------------------------------------------------------
- class subscriber
- {
- public:
- virtual ~subscriber() = default;
- virtual void deliver(const std::string& msg) = 0;
- };
- typedef std::shared_ptr<subscriber> subscriber_ptr;
- //----------------------------------------------------------------------
- class channel
- {
- public:
- void join(subscriber_ptr subscriber)
- {
- subscribers_.insert(subscriber);
- }
- void leave(subscriber_ptr subscriber)
- {
- subscribers_.erase(subscriber);
- }
- void deliver(const std::string& msg)
- {
- for (const auto& s : subscribers_)
- {
- s->deliver(msg);
- }
- }
- private:
- std::set<subscriber_ptr> subscribers_;
- };
- //----------------------------------------------------------------------
- //
- // This class manages socket timeouts by applying the concept of a deadline.
- // Some asynchronous operations are given deadlines by which they must complete.
- // Deadlines are enforced by two "actors" that persist for the lifetime of the
- // session object, one for input and one for output:
- //
- // +----------------+ +----------------+
- // | | | |
- // | check_deadline |<-------+ | check_deadline |<-------+
- // | | | | | |
- // +----------------+ | +----------------+ |
- // | | | |
- // async_wait() | +----------------+ async_wait() | +----------------+
- // on input | | lambda | on output | | lambda |
- // deadline +--->| in | deadline +--->| in |
- // | check_deadline | | check_deadline |
- // +----------------+ +----------------+
- //
- // If either deadline actor determines that the corresponding deadline has
- // expired, the socket is closed and any outstanding operations are cancelled.
- //
- // The input actor reads messages from the socket, where messages are delimited
- // by the newline character:
- //
- // +-------------+
- // | |
- // | read_line |<----+
- // | | |
- // +-------------+ |
- // | |
- // async_- | +-------------+
- // read_- | | lambda |
- // until() +--->| in |
- // | read_line |
- // +-------------+
- //
- // The deadline for receiving a complete message is 30 seconds. If a non-empty
- // message is received, it is delivered to all subscribers. If a heartbeat (a
- // message that consists of a single newline character) is received, a heartbeat
- // is enqueued for the client, provided there are no other messages waiting to
- // be sent.
- //
- // The output actor is responsible for sending messages to the client:
- //
- // +----------------+
- // | |<---------------------+
- // | await_output | |
- // | |<-------+ |
- // +----------------+ | |
- // | | | |
- // | async_- | +----------------+ |
- // | wait() | | lambda | |
- // | +->| in | |
- // | | await_output | |
- // | +----------------+ |
- // V |
- // +--------------+ +--------------+
- // | | async_write() | lambda |
- // | write_line |-------------->| in |
- // | | | write_line |
- // +--------------+ +--------------+
- //
- // The output actor first waits for an output message to be enqueued. It does
- // this by using a steady_timer as an asynchronous condition variable. The
- // steady_timer will be signalled whenever the output queue is non-empty.
- //
- // Once a message is available, it is sent to the client. The deadline for
- // sending a complete message is 30 seconds. After the message is successfully
- // sent, the output actor again waits for the output queue to become non-empty.
- //
- class tcp_session
- : public subscriber,
- public std::enable_shared_from_this<tcp_session>
- {
- public:
- tcp_session(tcp::socket socket, channel& ch)
- : channel_(ch),
- socket_(std::move(socket))
- {
- input_deadline_.expires_at(steady_timer::time_point::max());
- output_deadline_.expires_at(steady_timer::time_point::max());
- // The non_empty_output_queue_ steady_timer is set to the maximum time
- // point whenever the output queue is empty. This ensures that the output
- // actor stays asleep until a message is put into the queue.
- non_empty_output_queue_.expires_at(steady_timer::time_point::max());
- }
- // Called by the server object to initiate the four actors.
- void start()
- {
- channel_.join(shared_from_this());
- read_line();
- check_deadline(input_deadline_);
- await_output();
- check_deadline(output_deadline_);
- }
- private:
- void stop()
- {
- channel_.leave(shared_from_this());
- boost::system::error_code ignored_error;
- socket_.close(ignored_error);
- input_deadline_.cancel();
- non_empty_output_queue_.cancel();
- output_deadline_.cancel();
- }
- bool stopped() const
- {
- return !socket_.is_open();
- }
- void deliver(const std::string& msg) override
- {
- output_queue_.push_back(msg + "\n");
- // Signal that the output queue contains messages. Modifying the expiry
- // will wake the output actor, if it is waiting on the timer.
- non_empty_output_queue_.expires_at(steady_timer::time_point::min());
- }
- void read_line()
- {
- // Set a deadline for the read operation.
- input_deadline_.expires_after(std::chrono::seconds(30));
- // Start an asynchronous operation to read a newline-delimited message.
- auto self(shared_from_this());
- boost::asio::async_read_until(socket_,
- boost::asio::dynamic_buffer(input_buffer_), '\n',
- [this, self](const boost::system::error_code& error, std::size_t n)
- {
- // Check if the session was stopped while the operation was pending.
- if (stopped())
- return;
- if (!error)
- {
- // Extract the newline-delimited message from the buffer.
- std::string msg(input_buffer_.substr(0, n - 1));
- input_buffer_.erase(0, n);
- if (!msg.empty())
- {
- channel_.deliver(msg);
- }
- else
- {
- // We received a heartbeat message from the client. If there's
- // nothing else being sent or ready to be sent, send a heartbeat
- // right back.
- if (output_queue_.empty())
- {
- output_queue_.push_back("\n");
- // Signal that the output queue contains messages. Modifying
- // the expiry will wake the output actor, if it is waiting on
- // the timer.
- non_empty_output_queue_.expires_at(
- steady_timer::time_point::min());
- }
- }
- read_line();
- }
- else
- {
- stop();
- }
- });
- }
- void await_output()
- {
- auto self(shared_from_this());
- non_empty_output_queue_.async_wait(
- [this, self](const boost::system::error_code& /*error*/)
- {
- // Check if the session was stopped while the operation was pending.
- if (stopped())
- return;
- if (output_queue_.empty())
- {
- // There are no messages that are ready to be sent. The actor goes
- // to sleep by waiting on the non_empty_output_queue_ timer. When a
- // new message is added, the timer will be modified and the actor
- // will wake.
- non_empty_output_queue_.expires_at(steady_timer::time_point::max());
- await_output();
- }
- else
- {
- write_line();
- }
- });
- }
- void write_line()
- {
- // Set a deadline for the write operation.
- output_deadline_.expires_after(std::chrono::seconds(30));
- // Start an asynchronous operation to send a message.
- auto self(shared_from_this());
- boost::asio::async_write(socket_,
- boost::asio::buffer(output_queue_.front()),
- [this, self](const boost::system::error_code& error, std::size_t /*n*/)
- {
- // Check if the session was stopped while the operation was pending.
- if (stopped())
- return;
- if (!error)
- {
- output_queue_.pop_front();
- await_output();
- }
- else
- {
- stop();
- }
- });
- }
- void check_deadline(steady_timer& deadline)
- {
- auto self(shared_from_this());
- deadline.async_wait(
- [this, self, &deadline](const boost::system::error_code& /*error*/)
- {
- // Check if the session was stopped while the operation was pending.
- if (stopped())
- return;
- // Check whether the deadline has passed. We compare the deadline
- // against the current time since a new asynchronous operation may
- // have moved the deadline before this actor had a chance to run.
- if (deadline.expiry() <= steady_timer::clock_type::now())
- {
- // The deadline has passed. Stop the session. The other actors will
- // terminate as soon as possible.
- stop();
- }
- else
- {
- // Put the actor back to sleep.
- check_deadline(deadline);
- }
- });
- }
- channel& channel_;
- tcp::socket socket_;
- std::string input_buffer_;
- steady_timer input_deadline_{socket_.get_executor()};
- std::deque<std::string> output_queue_;
- steady_timer non_empty_output_queue_{socket_.get_executor()};
- steady_timer output_deadline_{socket_.get_executor()};
- };
- typedef std::shared_ptr<tcp_session> tcp_session_ptr;
- //----------------------------------------------------------------------
- class udp_broadcaster
- : public subscriber
- {
- public:
- udp_broadcaster(boost::asio::io_context& io_context,
- const udp::endpoint& broadcast_endpoint)
- : socket_(io_context)
- {
- socket_.connect(broadcast_endpoint);
- socket_.set_option(udp::socket::broadcast(true));
- }
- private:
- void deliver(const std::string& msg)
- {
- boost::system::error_code ignored_error;
- socket_.send(boost::asio::buffer(msg), 0, ignored_error);
- }
- udp::socket socket_;
- };
- //----------------------------------------------------------------------
- class server
- {
- public:
- server(boost::asio::io_context& io_context,
- const tcp::endpoint& listen_endpoint,
- const udp::endpoint& broadcast_endpoint)
- : io_context_(io_context),
- acceptor_(io_context, listen_endpoint)
- {
- channel_.join(
- std::make_shared<udp_broadcaster>(
- io_context_, broadcast_endpoint));
- accept();
- }
- private:
- void accept()
- {
- acceptor_.async_accept(
- [this](const boost::system::error_code& error, tcp::socket socket)
- {
- if (!error)
- {
- std::make_shared<tcp_session>(std::move(socket), channel_)->start();
- }
- accept();
- });
- }
- boost::asio::io_context& io_context_;
- tcp::acceptor acceptor_;
- channel channel_;
- };
- //----------------------------------------------------------------------
- int main(int argc, char* argv[])
- {
- try
- {
- using namespace std; // For atoi.
- if (argc != 4)
- {
- std::cerr << "Usage: server <listen_port> <bcast_address> <bcast_port>\n";
- return 1;
- }
- boost::asio::io_context io_context;
- tcp::endpoint listen_endpoint(tcp::v4(), atoi(argv[1]));
- udp::endpoint broadcast_endpoint(
- boost::asio::ip::make_address(argv[2]), atoi(argv[3]));
- server s(io_context, listen_endpoint, broadcast_endpoint);
- io_context.run();
- }
- catch (std::exception& e)
- {
- std::cerr << "Exception: " << e.what() << "\n";
- }
- return 0;
- }
|