123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381 |
- // Copyright Oliver Kowalke 2015.
- // Distributed under the Boost Software License, Version 1.0.
- // (See accompanying file LICENSE_1_0.txt or copy at
- // http://www.boost.org/LICENSE_1_0.txt)
- #include <cstddef>
- #include <cstdlib>
- #include <map>
- #include <memory>
- #include <set>
- #include <iostream>
- #include <string>
- #include <boost/asio.hpp>
- #include <boost/utility.hpp>
- #include <boost/fiber/all.hpp>
- #include "../round_robin.hpp"
- #include "../yield.hpp"
- using boost::asio::ip::tcp;
- const std::size_t max_length = 1024;
- class subscriber_session;
- typedef std::shared_ptr< subscriber_session > subscriber_session_ptr;
- // a queue has n subscribers (subscriptions)
- // this class holds a list of subcribers for one queue
- class subscriptions {
- public:
- ~subscriptions();
- // subscribe to this queue
- void subscribe( subscriber_session_ptr const& s) {
- subscribers_.insert( s);
- }
- // unsubscribe from this queue
- void unsubscribe( subscriber_session_ptr const& s) {
- subscribers_.erase(s);
- }
- // publish a message, e.g. push this message to all subscribers
- void publish( std::string const& msg);
- private:
- // list of subscribers
- std::set< subscriber_session_ptr > subscribers_;
- };
- // a class to register queues and to subsribe clients to this queues
- class registry : private boost::noncopyable {
- private:
- typedef std::map< std::string, std::shared_ptr< subscriptions > > queues_cont;
- typedef queues_cont::iterator queues_iter;
- boost::fibers::mutex mtx_;
- queues_cont queues_;
- void register_queue_( std::string const& queue) {
- if ( queues_.end() != queues_.find( queue) ) {
- throw std::runtime_error("queue already exists");
- }
- queues_[queue] = std::make_shared< subscriptions >();
- std::cout << "new queue '" << queue << "' registered" << std::endl;
- }
- void unregister_queue_( std::string const& queue) {
- queues_.erase( queue);
- std::cout << "queue '" << queue << "' unregistered" << std::endl;
- }
- void subscribe_( std::string const& queue, subscriber_session_ptr s) {
- queues_iter iter = queues_.find( queue);
- if ( queues_.end() == iter ) {
- throw std::runtime_error("queue does not exist");
- }
- iter->second->subscribe( s);
- std::cout << "new subscription to queue '" << queue << "'" << std::endl;
- }
- void unsubscribe_( std::string const& queue, subscriber_session_ptr s) {
- queues_iter iter = queues_.find( queue);
- if ( queues_.end() != iter ) {
- iter->second->unsubscribe( s);
- }
- }
- void publish_( std::string const& queue, std::string const& msg) {
- queues_iter iter = queues_.find( queue);
- if ( queues_.end() == iter ) {
- throw std::runtime_error("queue does not exist");
- }
- iter->second->publish( msg);
- std::cout << "message '" << msg << "' to publish on queue '" << queue << "'" << std::endl;
- }
- public:
- // add a queue to registry
- void register_queue( std::string const& queue) {
- std::unique_lock< boost::fibers::mutex > lk( mtx_);
- register_queue_( queue);
- }
- // remove a queue from registry
- void unregister_queue( std::string const& queue) {
- std::unique_lock< boost::fibers::mutex > lk( mtx_);
- unregister_queue_( queue);
- }
- // subscribe to a queue
- void subscribe( std::string const& queue, subscriber_session_ptr s) {
- std::unique_lock< boost::fibers::mutex > lk( mtx_);
- subscribe_( queue, s);
- }
- // unsubscribe from a queue
- void unsubscribe( std::string const& queue, subscriber_session_ptr s) {
- std::unique_lock< boost::fibers::mutex > lk( mtx_);
- unsubscribe_( queue, s);
- }
- // publish a message to all subscribers registerd to the queue
- void publish( std::string const& queue, std::string const& msg) {
- std::unique_lock< boost::fibers::mutex > lk( mtx_);
- publish_( queue, msg);
- }
- };
- // a subscriber subscribes to a given queue in order to receive messages published on this queue
- class subscriber_session : public std::enable_shared_from_this< subscriber_session > {
- public:
- explicit subscriber_session( std::shared_ptr< boost::asio::io_service > const& io_service, registry & reg) :
- socket_( * io_service),
- reg_( reg) {
- }
- tcp::socket& socket() {
- return socket_;
- }
- // this function is executed inside the fiber
- void run() {
- std::string queue;
- try {
- boost::system::error_code ec;
- // read first message == queue name
- // async_ready() returns if the the complete message is read
- // until this the fiber is suspended until the complete message
- // is read int the given buffer 'data'
- boost::asio::async_read(
- socket_,
- boost::asio::buffer( data_),
- boost::fibers::asio::yield[ec]);
- if ( ec) {
- throw std::runtime_error("no queue from subscriber");
- }
- // first message ist equal to the queue name the publisher
- // publishes to
- queue = data_;
- // subscribe to new queue
- reg_.subscribe( queue, shared_from_this() );
- // read published messages
- for (;;) {
- // wait for a conditon-variable for new messages
- // the fiber will be suspended until the condtion
- // becomes true and the fiber is resumed
- // published message is stored in buffer 'data_'
- std::unique_lock< boost::fibers::mutex > lk( mtx_);
- cond_.wait( lk);
- std::string data( data_);
- lk.unlock();
- // message '<fini>' terminates subscription
- if ( "<fini>" == data) {
- break;
- }
- // async. write message to socket connected with
- // subscriber
- // async_write() returns if the complete message was writen
- // the fiber is suspended in the meanwhile
- boost::asio::async_write(
- socket_,
- boost::asio::buffer( data, data.size() ),
- boost::fibers::asio::yield[ec]);
- if ( ec == boost::asio::error::eof) {
- break; //connection closed cleanly by peer
- } else if ( ec) {
- throw boost::system::system_error( ec); //some other error
- }
- std::cout << "subscriber::run(): '" << data << "' written" << std::endl;
- }
- } catch ( std::exception const& e) {
- std::cerr << "subscriber [" << queue << "] failed: " << e.what() << std::endl;
- }
- // close socket
- socket_.close();
- // unregister queue
- reg_.unsubscribe( queue, shared_from_this() );
- }
- // called from publisher_session (running in other fiber)
- void publish( std::string const& msg) {
- std::unique_lock< boost::fibers::mutex > lk( mtx_);
- std::memset( data_, '\0', sizeof( data_));
- std::memcpy( data_, msg.c_str(), (std::min)(max_length, msg.size()));
- cond_.notify_one();
- }
- private:
- tcp::socket socket_;
- registry & reg_;
- boost::fibers::mutex mtx_;
- boost::fibers::condition_variable cond_;
- // fixed size message
- char data_[max_length];
- };
- subscriptions::~subscriptions() {
- for ( subscriber_session_ptr s : subscribers_) {
- s->publish("<fini>");
- }
- }
- void
- subscriptions::publish( std::string const& msg) {
- for ( subscriber_session_ptr s : subscribers_) {
- s->publish( msg);
- }
- }
- // a publisher publishes messages on its queue
- // subscriber might register to this queue to get the published messages
- class publisher_session : public std::enable_shared_from_this< publisher_session > {
- public:
- explicit publisher_session( std::shared_ptr< boost::asio::io_service > const& io_service, registry & reg) :
- socket_( * io_service),
- reg_( reg) {
- }
- tcp::socket& socket() {
- return socket_;
- }
- // this function is executed inside the fiber
- void run() {
- std::string queue;
- try {
- boost::system::error_code ec;
- // fixed size message
- char data[max_length];
- // read first message == queue name
- // async_ready() returns if the the complete message is read
- // until this the fiber is suspended until the complete message
- // is read int the given buffer 'data'
- boost::asio::async_read(
- socket_,
- boost::asio::buffer( data),
- boost::fibers::asio::yield[ec]);
- if ( ec) {
- throw std::runtime_error("no queue from publisher");
- }
- // first message ist equal to the queue name the publisher
- // publishes to
- queue = data;
- // register the new queue
- reg_.register_queue( queue);
- // start publishing messages
- for (;;) {
- // read message from publisher asyncronous
- // async_read() suspends this fiber until the complete emssage is read
- // and stored in the given buffer 'data'
- boost::asio::async_read(
- socket_,
- boost::asio::buffer( data),
- boost::fibers::asio::yield[ec]);
- if ( ec == boost::asio::error::eof) {
- break; //connection closed cleanly by peer
- } else if ( ec) {
- throw boost::system::system_error( ec); //some other error
- }
- // publish message to all subscribers
- reg_.publish( queue, std::string( data) );
- }
- } catch ( std::exception const& e) {
- std::cerr << "publisher [" << queue << "] failed: " << e.what() << std::endl;
- }
- // close socket
- socket_.close();
- // unregister queue
- reg_.unregister_queue( queue);
- }
- private:
- tcp::socket socket_;
- registry & reg_;
- };
- typedef std::shared_ptr< publisher_session > publisher_session_ptr;
- // function accepts connections requests from clients acting as a publisher
- void accept_publisher( std::shared_ptr< boost::asio::io_service > const& io_service,
- unsigned short port,
- registry & reg) {
- // create TCP-acceptor
- tcp::acceptor acceptor( * io_service, tcp::endpoint( tcp::v4(), port) );
- // loop for accepting connection requests
- for (;;) {
- boost::system::error_code ec;
- // create new publisher-session
- // this instance will be associated with one publisher
- publisher_session_ptr new_publisher_session =
- std::make_shared< publisher_session >( io_service, std::ref( reg) );
- // async. accept of new connection request
- // this function will suspend this execution context (fiber) until a
- // connection was established, after returning from this function a new client (publisher)
- // is connected
- acceptor.async_accept(
- new_publisher_session->socket(),
- boost::fibers::asio::yield[ec]);
- if ( ! ec) {
- // run the new publisher in its own fiber (one fiber for one client)
- boost::fibers::fiber(
- std::bind( & publisher_session::run, new_publisher_session) ).detach();
- }
- }
- }
- // function accepts connections requests from clients acting as a subscriber
- void accept_subscriber( std::shared_ptr< boost::asio::io_service > const& io_service,
- unsigned short port,
- registry & reg) {
- // create TCP-acceptor
- tcp::acceptor acceptor( * io_service, tcp::endpoint( tcp::v4(), port) );
- // loop for accepting connection requests
- for (;;) {
- boost::system::error_code ec;
- // create new subscriber-session
- // this instance will be associated with one subscriber
- subscriber_session_ptr new_subscriber_session =
- std::make_shared< subscriber_session >( io_service, std::ref( reg) );
- // async. accept of new connection request
- // this function will suspend this execution context (fiber) until a
- // connection was established, after returning from this function a new client (subscriber)
- // is connected
- acceptor.async_accept(
- new_subscriber_session->socket(),
- boost::fibers::asio::yield[ec]);
- if ( ! ec) {
- // run the new subscriber in its own fiber (one fiber for one client)
- boost::fibers::fiber(
- std::bind( & subscriber_session::run, new_subscriber_session) ).detach();
- }
- }
- }
- int main( int argc, char* argv[]) {
- try {
- // create io_service for async. I/O
- std::shared_ptr< boost::asio::io_service > io_service = std::make_shared< boost::asio::io_service >();
- // register asio scheduler
- boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_service);
- // registry for queues and its subscription
- registry reg;
- // create an acceptor for publishers, run it as fiber
- boost::fibers::fiber(
- accept_publisher, std::ref( io_service), 9997, std::ref( reg) ).detach();
- // create an acceptor for subscribers, run it as fiber
- boost::fibers::fiber(
- accept_subscriber, std::ref( io_service), 9998, std::ref( reg) ).detach();
- // dispatch
- io_service->run();
- return EXIT_SUCCESS;
- } catch ( std::exception const& e) {
- std::cerr << "Exception: " << e.what() << "\n";
- }
- return EXIT_FAILURE;
- }
|