// // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // // Official repository: https://github.com/boostorg/beast // //------------------------------------------------------------------------------ // // wsload // // Measure the performance of a WebSocket server // //------------------------------------------------------------------------------ #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace beast = boost::beast; // from namespace http = beast::http; // from namespace websocket = beast::websocket; // from namespace net = boost::asio; // from using tcp = boost::asio::ip::tcp; // from class test_buffer { char data_[4096]; net::const_buffer b_; public: using const_iterator = net::const_buffer const*; using value_type = net::const_buffer; test_buffer() : b_(data_, sizeof(data_)) { std::mt19937_64 rng; std::uniform_int_distribution dist; for(auto& c : data_) c = static_cast(dist(rng)); } const_iterator begin() const { return &b_; } const_iterator end() const { return begin() + 1; } }; class report { std::mutex m_; std::size_t bytes_ = 0; std::size_t messages_ = 0; public: void insert(std::size_t messages, std::size_t bytes) { std::lock_guard lock(m_); bytes_ += bytes; messages_ += messages; } std::size_t bytes() const { return bytes_; } std::size_t messages() const { return messages_; } }; void fail(beast::error_code ec, char const* what) { std::cerr << what << ": " << ec.message() << "\n"; } class connection : public std::enable_shared_from_this { websocket::stream ws_; tcp::endpoint ep_; std::size_t messages_; report& rep_; test_buffer const& tb_; net::strand< net::io_context::executor_type> strand_; beast::flat_buffer buffer_; std::mt19937_64 rng_; std::size_t count_ = 0; std::size_t bytes_ = 0; public: connection( net::io_context& ioc, tcp::endpoint const& ep, std::size_t messages, bool deflate, report& rep, test_buffer const& tb) : ws_(ioc) , ep_(ep) , messages_(messages) , rep_(rep) , tb_(tb) , strand_(ioc.get_executor()) { websocket::permessage_deflate pmd; pmd.client_enable = deflate; ws_.set_option(pmd); ws_.binary(true); ws_.auto_fragment(false); ws_.write_buffer_bytes(64 * 1024); } ~connection() { rep_.insert(count_, bytes_); } void run() { ws_.next_layer().async_connect(ep_, beast::bind_front_handler( &connection::on_connect, this->shared_from_this())); } private: void on_connect(beast::error_code ec) { if(ec) return fail(ec, "on_connect"); ws_.async_handshake( ep_.address().to_string() + ":" + std::to_string(ep_.port()), "/", beast::bind_front_handler( &connection::on_handshake, this->shared_from_this())); } void on_handshake(beast::error_code ec) { if(ec) return fail(ec, "handshake"); do_write(); } void do_write() { std::geometric_distribution dist{ double(4) / beast::buffer_bytes(tb_)}; ws_.async_write_some(true, beast::buffers_prefix(dist(rng_), tb_), beast::bind_front_handler( &connection::on_write, this->shared_from_this())); } void on_write(beast::error_code ec, std::size_t) { if(ec) return fail(ec, "write"); if(messages_--) return do_read(); ws_.async_close({}, beast::bind_front_handler( &connection::on_close, this->shared_from_this())); } void do_read() { ws_.async_read(buffer_, beast::bind_front_handler( &connection::on_read, this->shared_from_this())); } void on_read(beast::error_code ec, std::size_t) { if(ec) return fail(ec, "read"); ++count_; bytes_ += buffer_.size(); buffer_.consume(buffer_.size()); do_write(); } void on_close(beast::error_code ec) { if(ec) return fail(ec, "close"); } }; class timer { using clock_type = std::chrono::system_clock; clock_type::time_point when_; public: using duration = clock_type::duration; timer() : when_(clock_type::now()) { } duration elapsed() const { return clock_type::now() - when_; } }; inline std::uint64_t throughput( std::chrono::duration const& elapsed, std::uint64_t items) { using namespace std::chrono; return static_cast( 1 / (elapsed/items).count()); } int main(int argc, char** argv) { beast::unit_test::dstream dout(std::cerr); try { // Check command line arguments. if(argc != 8) { std::cerr << "Usage: bench-wsload
"; return EXIT_FAILURE; } auto const address = net::ip::make_address(argv[1]); auto const port = static_cast(std::atoi(argv[2])); auto const trials = static_cast(std::atoi(argv[3])); auto const messages= static_cast(std::atoi(argv[4])); auto const workers = static_cast(std::atoi(argv[5])); auto const threads = static_cast(std::atoi(argv[6])); auto const deflate = std::atoi(argv[7]) != 0; auto const work = (messages + workers - 1) / workers; test_buffer tb; for(auto i = trials; i != 0; --i) { report rep; net::io_context ioc{1}; for(auto j = workers; j; --j) { auto sp = std::make_shared( ioc, tcp::endpoint{address, port}, work, deflate, rep, tb); sp->run(); } timer clock; std::vector tv; if(threads > 1) { tv.reserve(threads); tv.emplace_back([&ioc]{ ioc.run(); }); } ioc.run(); for(auto& t : tv) t.join(); auto const elapsed = clock.elapsed(); dout << throughput(elapsed, rep.bytes()) << " bytes/s in " << (std::chrono::duration_cast< std::chrono::milliseconds>( elapsed).count() / 1000.) << "ms and " << rep.bytes() << " bytes" << std::endl; } } catch(std::exception const& e) { std::cerr << "Error: " << e.what() << std::endl; return EXIT_FAILURE; } return EXIT_SUCCESS; }