server.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. // Copyright Oliver Kowalke 2015.
  2. // Distributed under the Boost Software License, Version 1.0.
  3. // (See accompanying file LICENSE_1_0.txt or copy at
  4. // http://www.boost.org/LICENSE_1_0.txt)
  5. #include <cstddef>
  6. #include <cstdlib>
  7. #include <map>
  8. #include <memory>
  9. #include <set>
  10. #include <iostream>
  11. #include <string>
  12. #include <boost/asio.hpp>
  13. #include <boost/utility.hpp>
  14. #include <boost/fiber/all.hpp>
  15. #include "../round_robin.hpp"
  16. #include "../yield.hpp"
  17. using boost::asio::ip::tcp;
  18. const std::size_t max_length = 1024;
  19. class subscriber_session;
  20. typedef std::shared_ptr< subscriber_session > subscriber_session_ptr;
  21. // a queue has n subscribers (subscriptions)
  22. // this class holds a list of subcribers for one queue
  23. class subscriptions {
  24. public:
  25. ~subscriptions();
  26. // subscribe to this queue
  27. void subscribe( subscriber_session_ptr const& s) {
  28. subscribers_.insert( s);
  29. }
  30. // unsubscribe from this queue
  31. void unsubscribe( subscriber_session_ptr const& s) {
  32. subscribers_.erase(s);
  33. }
  34. // publish a message, e.g. push this message to all subscribers
  35. void publish( std::string const& msg);
  36. private:
  37. // list of subscribers
  38. std::set< subscriber_session_ptr > subscribers_;
  39. };
  40. // a class to register queues and to subsribe clients to this queues
  41. class registry : private boost::noncopyable {
  42. private:
  43. typedef std::map< std::string, std::shared_ptr< subscriptions > > queues_cont;
  44. typedef queues_cont::iterator queues_iter;
  45. boost::fibers::mutex mtx_;
  46. queues_cont queues_;
  47. void register_queue_( std::string const& queue) {
  48. if ( queues_.end() != queues_.find( queue) ) {
  49. throw std::runtime_error("queue already exists");
  50. }
  51. queues_[queue] = std::make_shared< subscriptions >();
  52. std::cout << "new queue '" << queue << "' registered" << std::endl;
  53. }
  54. void unregister_queue_( std::string const& queue) {
  55. queues_.erase( queue);
  56. std::cout << "queue '" << queue << "' unregistered" << std::endl;
  57. }
  58. void subscribe_( std::string const& queue, subscriber_session_ptr s) {
  59. queues_iter iter = queues_.find( queue);
  60. if ( queues_.end() == iter ) {
  61. throw std::runtime_error("queue does not exist");
  62. }
  63. iter->second->subscribe( s);
  64. std::cout << "new subscription to queue '" << queue << "'" << std::endl;
  65. }
  66. void unsubscribe_( std::string const& queue, subscriber_session_ptr s) {
  67. queues_iter iter = queues_.find( queue);
  68. if ( queues_.end() != iter ) {
  69. iter->second->unsubscribe( s);
  70. }
  71. }
  72. void publish_( std::string const& queue, std::string const& msg) {
  73. queues_iter iter = queues_.find( queue);
  74. if ( queues_.end() == iter ) {
  75. throw std::runtime_error("queue does not exist");
  76. }
  77. iter->second->publish( msg);
  78. std::cout << "message '" << msg << "' to publish on queue '" << queue << "'" << std::endl;
  79. }
  80. public:
  81. // add a queue to registry
  82. void register_queue( std::string const& queue) {
  83. std::unique_lock< boost::fibers::mutex > lk( mtx_);
  84. register_queue_( queue);
  85. }
  86. // remove a queue from registry
  87. void unregister_queue( std::string const& queue) {
  88. std::unique_lock< boost::fibers::mutex > lk( mtx_);
  89. unregister_queue_( queue);
  90. }
  91. // subscribe to a queue
  92. void subscribe( std::string const& queue, subscriber_session_ptr s) {
  93. std::unique_lock< boost::fibers::mutex > lk( mtx_);
  94. subscribe_( queue, s);
  95. }
  96. // unsubscribe from a queue
  97. void unsubscribe( std::string const& queue, subscriber_session_ptr s) {
  98. std::unique_lock< boost::fibers::mutex > lk( mtx_);
  99. unsubscribe_( queue, s);
  100. }
  101. // publish a message to all subscribers registerd to the queue
  102. void publish( std::string const& queue, std::string const& msg) {
  103. std::unique_lock< boost::fibers::mutex > lk( mtx_);
  104. publish_( queue, msg);
  105. }
  106. };
  107. // a subscriber subscribes to a given queue in order to receive messages published on this queue
  108. class subscriber_session : public std::enable_shared_from_this< subscriber_session > {
  109. public:
  110. explicit subscriber_session( std::shared_ptr< boost::asio::io_service > const& io_service, registry & reg) :
  111. socket_( * io_service),
  112. reg_( reg) {
  113. }
  114. tcp::socket& socket() {
  115. return socket_;
  116. }
  117. // this function is executed inside the fiber
  118. void run() {
  119. std::string queue;
  120. try {
  121. boost::system::error_code ec;
  122. // read first message == queue name
  123. // async_ready() returns if the the complete message is read
  124. // until this the fiber is suspended until the complete message
  125. // is read int the given buffer 'data'
  126. boost::asio::async_read(
  127. socket_,
  128. boost::asio::buffer( data_),
  129. boost::fibers::asio::yield[ec]);
  130. if ( ec) {
  131. throw std::runtime_error("no queue from subscriber");
  132. }
  133. // first message ist equal to the queue name the publisher
  134. // publishes to
  135. queue = data_;
  136. // subscribe to new queue
  137. reg_.subscribe( queue, shared_from_this() );
  138. // read published messages
  139. for (;;) {
  140. // wait for a conditon-variable for new messages
  141. // the fiber will be suspended until the condtion
  142. // becomes true and the fiber is resumed
  143. // published message is stored in buffer 'data_'
  144. std::unique_lock< boost::fibers::mutex > lk( mtx_);
  145. cond_.wait( lk);
  146. std::string data( data_);
  147. lk.unlock();
  148. // message '<fini>' terminates subscription
  149. if ( "<fini>" == data) {
  150. break;
  151. }
  152. // async. write message to socket connected with
  153. // subscriber
  154. // async_write() returns if the complete message was writen
  155. // the fiber is suspended in the meanwhile
  156. boost::asio::async_write(
  157. socket_,
  158. boost::asio::buffer( data, data.size() ),
  159. boost::fibers::asio::yield[ec]);
  160. if ( ec == boost::asio::error::eof) {
  161. break; //connection closed cleanly by peer
  162. } else if ( ec) {
  163. throw boost::system::system_error( ec); //some other error
  164. }
  165. std::cout << "subscriber::run(): '" << data << "' written" << std::endl;
  166. }
  167. } catch ( std::exception const& e) {
  168. std::cerr << "subscriber [" << queue << "] failed: " << e.what() << std::endl;
  169. }
  170. // close socket
  171. socket_.close();
  172. // unregister queue
  173. reg_.unsubscribe( queue, shared_from_this() );
  174. }
  175. // called from publisher_session (running in other fiber)
  176. void publish( std::string const& msg) {
  177. std::unique_lock< boost::fibers::mutex > lk( mtx_);
  178. std::memset( data_, '\0', sizeof( data_));
  179. std::memcpy( data_, msg.c_str(), (std::min)(max_length, msg.size()));
  180. cond_.notify_one();
  181. }
  182. private:
  183. tcp::socket socket_;
  184. registry & reg_;
  185. boost::fibers::mutex mtx_;
  186. boost::fibers::condition_variable cond_;
  187. // fixed size message
  188. char data_[max_length];
  189. };
  190. subscriptions::~subscriptions() {
  191. for ( subscriber_session_ptr s : subscribers_) {
  192. s->publish("<fini>");
  193. }
  194. }
  195. void
  196. subscriptions::publish( std::string const& msg) {
  197. for ( subscriber_session_ptr s : subscribers_) {
  198. s->publish( msg);
  199. }
  200. }
  201. // a publisher publishes messages on its queue
  202. // subscriber might register to this queue to get the published messages
  203. class publisher_session : public std::enable_shared_from_this< publisher_session > {
  204. public:
  205. explicit publisher_session( std::shared_ptr< boost::asio::io_service > const& io_service, registry & reg) :
  206. socket_( * io_service),
  207. reg_( reg) {
  208. }
  209. tcp::socket& socket() {
  210. return socket_;
  211. }
  212. // this function is executed inside the fiber
  213. void run() {
  214. std::string queue;
  215. try {
  216. boost::system::error_code ec;
  217. // fixed size message
  218. char data[max_length];
  219. // read first message == queue name
  220. // async_ready() returns if the the complete message is read
  221. // until this the fiber is suspended until the complete message
  222. // is read int the given buffer 'data'
  223. boost::asio::async_read(
  224. socket_,
  225. boost::asio::buffer( data),
  226. boost::fibers::asio::yield[ec]);
  227. if ( ec) {
  228. throw std::runtime_error("no queue from publisher");
  229. }
  230. // first message ist equal to the queue name the publisher
  231. // publishes to
  232. queue = data;
  233. // register the new queue
  234. reg_.register_queue( queue);
  235. // start publishing messages
  236. for (;;) {
  237. // read message from publisher asyncronous
  238. // async_read() suspends this fiber until the complete emssage is read
  239. // and stored in the given buffer 'data'
  240. boost::asio::async_read(
  241. socket_,
  242. boost::asio::buffer( data),
  243. boost::fibers::asio::yield[ec]);
  244. if ( ec == boost::asio::error::eof) {
  245. break; //connection closed cleanly by peer
  246. } else if ( ec) {
  247. throw boost::system::system_error( ec); //some other error
  248. }
  249. // publish message to all subscribers
  250. reg_.publish( queue, std::string( data) );
  251. }
  252. } catch ( std::exception const& e) {
  253. std::cerr << "publisher [" << queue << "] failed: " << e.what() << std::endl;
  254. }
  255. // close socket
  256. socket_.close();
  257. // unregister queue
  258. reg_.unregister_queue( queue);
  259. }
  260. private:
  261. tcp::socket socket_;
  262. registry & reg_;
  263. };
  264. typedef std::shared_ptr< publisher_session > publisher_session_ptr;
  265. // function accepts connections requests from clients acting as a publisher
  266. void accept_publisher( std::shared_ptr< boost::asio::io_service > const& io_service,
  267. unsigned short port,
  268. registry & reg) {
  269. // create TCP-acceptor
  270. tcp::acceptor acceptor( * io_service, tcp::endpoint( tcp::v4(), port) );
  271. // loop for accepting connection requests
  272. for (;;) {
  273. boost::system::error_code ec;
  274. // create new publisher-session
  275. // this instance will be associated with one publisher
  276. publisher_session_ptr new_publisher_session =
  277. std::make_shared< publisher_session >( io_service, std::ref( reg) );
  278. // async. accept of new connection request
  279. // this function will suspend this execution context (fiber) until a
  280. // connection was established, after returning from this function a new client (publisher)
  281. // is connected
  282. acceptor.async_accept(
  283. new_publisher_session->socket(),
  284. boost::fibers::asio::yield[ec]);
  285. if ( ! ec) {
  286. // run the new publisher in its own fiber (one fiber for one client)
  287. boost::fibers::fiber(
  288. std::bind( & publisher_session::run, new_publisher_session) ).detach();
  289. }
  290. }
  291. }
  292. // function accepts connections requests from clients acting as a subscriber
  293. void accept_subscriber( std::shared_ptr< boost::asio::io_service > const& io_service,
  294. unsigned short port,
  295. registry & reg) {
  296. // create TCP-acceptor
  297. tcp::acceptor acceptor( * io_service, tcp::endpoint( tcp::v4(), port) );
  298. // loop for accepting connection requests
  299. for (;;) {
  300. boost::system::error_code ec;
  301. // create new subscriber-session
  302. // this instance will be associated with one subscriber
  303. subscriber_session_ptr new_subscriber_session =
  304. std::make_shared< subscriber_session >( io_service, std::ref( reg) );
  305. // async. accept of new connection request
  306. // this function will suspend this execution context (fiber) until a
  307. // connection was established, after returning from this function a new client (subscriber)
  308. // is connected
  309. acceptor.async_accept(
  310. new_subscriber_session->socket(),
  311. boost::fibers::asio::yield[ec]);
  312. if ( ! ec) {
  313. // run the new subscriber in its own fiber (one fiber for one client)
  314. boost::fibers::fiber(
  315. std::bind( & subscriber_session::run, new_subscriber_session) ).detach();
  316. }
  317. }
  318. }
  319. int main( int argc, char* argv[]) {
  320. try {
  321. // create io_service for async. I/O
  322. std::shared_ptr< boost::asio::io_service > io_service = std::make_shared< boost::asio::io_service >();
  323. // register asio scheduler
  324. boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_service);
  325. // registry for queues and its subscription
  326. registry reg;
  327. // create an acceptor for publishers, run it as fiber
  328. boost::fibers::fiber(
  329. accept_publisher, std::ref( io_service), 9997, std::ref( reg) ).detach();
  330. // create an acceptor for subscribers, run it as fiber
  331. boost::fibers::fiber(
  332. accept_subscriber, std::ref( io_service), 9998, std::ref( reg) ).detach();
  333. // dispatch
  334. io_service->run();
  335. return EXIT_SUCCESS;
  336. } catch ( std::exception const& e) {
  337. std::cerr << "Exception: " << e.what() << "\n";
  338. }
  339. return EXIT_FAILURE;
  340. }