actor.cpp 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. #include <boost/asio/defer.hpp>
  2. #include <boost/asio/executor.hpp>
  3. #include <boost/asio/post.hpp>
  4. #include <boost/asio/strand.hpp>
  5. #include <boost/asio/system_executor.hpp>
  6. #include <condition_variable>
  7. #include <deque>
  8. #include <memory>
  9. #include <mutex>
  10. #include <typeinfo>
  11. #include <vector>
  12. using boost::asio::defer;
  13. using boost::asio::executor;
  14. using boost::asio::post;
  15. using boost::asio::strand;
  16. using boost::asio::system_executor;
  17. //------------------------------------------------------------------------------
  18. // A tiny actor framework
  19. // ~~~~~~~~~~~~~~~~~~~~~~
  20. class actor;
  21. // Used to identify the sender and recipient of messages.
  22. typedef actor* actor_address;
  23. // Base class for all registered message handlers.
  24. class message_handler_base
  25. {
  26. public:
  27. virtual ~message_handler_base() {}
  28. // Used to determine which message handlers receive an incoming message.
  29. virtual const std::type_info& message_id() const = 0;
  30. };
  31. // Base class for a handler for a specific message type.
  32. template <class Message>
  33. class message_handler : public message_handler_base
  34. {
  35. public:
  36. // Handle an incoming message.
  37. virtual void handle_message(Message msg, actor_address from) = 0;
  38. };
  39. // Concrete message handler for a specific message type.
  40. template <class Actor, class Message>
  41. class mf_message_handler : public message_handler<Message>
  42. {
  43. public:
  44. // Construct a message handler to invoke the specified member function.
  45. mf_message_handler(void (Actor::* mf)(Message, actor_address), Actor* a)
  46. : function_(mf), actor_(a)
  47. {
  48. }
  49. // Used to determine which message handlers receive an incoming message.
  50. virtual const std::type_info& message_id() const
  51. {
  52. return typeid(Message);
  53. }
  54. // Handle an incoming message.
  55. virtual void handle_message(Message msg, actor_address from)
  56. {
  57. (actor_->*function_)(std::move(msg), from);
  58. }
  59. // Determine whether the message handler represents the specified function.
  60. bool is_function(void (Actor::* mf)(Message, actor_address)) const
  61. {
  62. return mf == function_;
  63. }
  64. private:
  65. void (Actor::* function_)(Message, actor_address);
  66. Actor* actor_;
  67. };
  68. // Base class for all actors.
  69. class actor
  70. {
  71. public:
  72. virtual ~actor()
  73. {
  74. }
  75. // Obtain the actor's address for use as a message sender or recipient.
  76. actor_address address()
  77. {
  78. return this;
  79. }
  80. // Send a message from one actor to another.
  81. template <class Message>
  82. friend void send(Message msg, actor_address from, actor_address to)
  83. {
  84. // Execute the message handler in the context of the target's executor.
  85. post(to->executor_,
  86. [=]
  87. {
  88. to->call_handler(std::move(msg), from);
  89. });
  90. }
  91. protected:
  92. // Construct the actor to use the specified executor for all message handlers.
  93. actor(executor e)
  94. : executor_(std::move(e))
  95. {
  96. }
  97. // Register a handler for a specific message type. Duplicates are permitted.
  98. template <class Actor, class Message>
  99. void register_handler(void (Actor::* mf)(Message, actor_address))
  100. {
  101. handlers_.push_back(
  102. std::make_shared<mf_message_handler<Actor, Message>>(
  103. mf, static_cast<Actor*>(this)));
  104. }
  105. // Deregister a handler. Removes only the first matching handler.
  106. template <class Actor, class Message>
  107. void deregister_handler(void (Actor::* mf)(Message, actor_address))
  108. {
  109. const std::type_info& id = typeid(message_handler<Message>);
  110. for (auto iter = handlers_.begin(); iter != handlers_.end(); ++iter)
  111. {
  112. if ((*iter)->message_id() == id)
  113. {
  114. auto mh = static_cast<mf_message_handler<Actor, Message>*>(iter->get());
  115. if (mh->is_function(mf))
  116. {
  117. handlers_.erase(iter);
  118. return;
  119. }
  120. }
  121. }
  122. }
  123. // Send a message from within a message handler.
  124. template <class Message>
  125. void tail_send(Message msg, actor_address to)
  126. {
  127. // Execute the message handler in the context of the target's executor.
  128. actor* from = this;
  129. defer(to->executor_,
  130. [=]
  131. {
  132. to->call_handler(std::move(msg), from);
  133. });
  134. }
  135. private:
  136. // Find the matching message handlers, if any, and call them.
  137. template <class Message>
  138. void call_handler(Message msg, actor_address from)
  139. {
  140. const std::type_info& message_id = typeid(Message);
  141. for (auto& h: handlers_)
  142. {
  143. if (h->message_id() == message_id)
  144. {
  145. auto mh = static_cast<message_handler<Message>*>(h.get());
  146. mh->handle_message(msg, from);
  147. }
  148. }
  149. }
  150. // All messages associated with a single actor object should be processed
  151. // non-concurrently. We use a strand to ensure non-concurrent execution even
  152. // if the underlying executor may use multiple threads.
  153. strand<executor> executor_;
  154. std::vector<std::shared_ptr<message_handler_base>> handlers_;
  155. };
  156. // A concrete actor that allows synchronous message retrieval.
  157. template <class Message>
  158. class receiver : public actor
  159. {
  160. public:
  161. receiver()
  162. : actor(system_executor())
  163. {
  164. register_handler(&receiver::message_handler);
  165. }
  166. // Block until a message has been received.
  167. Message wait()
  168. {
  169. std::unique_lock<std::mutex> lock(mutex_);
  170. condition_.wait(lock, [this]{ return !message_queue_.empty(); });
  171. Message msg(std::move(message_queue_.front()));
  172. message_queue_.pop_front();
  173. return msg;
  174. }
  175. private:
  176. // Handle a new message by adding it to the queue and waking a waiter.
  177. void message_handler(Message msg, actor_address /* from */)
  178. {
  179. std::lock_guard<std::mutex> lock(mutex_);
  180. message_queue_.push_back(std::move(msg));
  181. condition_.notify_one();
  182. }
  183. std::mutex mutex_;
  184. std::condition_variable condition_;
  185. std::deque<Message> message_queue_;
  186. };
  187. //------------------------------------------------------------------------------
  188. #include <boost/asio/thread_pool.hpp>
  189. #include <iostream>
  190. using boost::asio::thread_pool;
  191. class member : public actor
  192. {
  193. public:
  194. explicit member(executor e)
  195. : actor(std::move(e))
  196. {
  197. register_handler(&member::init_handler);
  198. }
  199. private:
  200. void init_handler(actor_address next, actor_address from)
  201. {
  202. next_ = next;
  203. caller_ = from;
  204. register_handler(&member::token_handler);
  205. deregister_handler(&member::init_handler);
  206. }
  207. void token_handler(int token, actor_address /*from*/)
  208. {
  209. int msg(token);
  210. actor_address to(caller_);
  211. if (token > 0)
  212. {
  213. msg = token - 1;
  214. to = next_;
  215. }
  216. tail_send(msg, to);
  217. }
  218. actor_address next_;
  219. actor_address caller_;
  220. };
  221. int main()
  222. {
  223. const std::size_t num_threads = 16;
  224. const int num_hops = 50000000;
  225. const std::size_t num_actors = 503;
  226. const int token_value = (num_hops + num_actors - 1) / num_actors;
  227. const std::size_t actors_per_thread = num_actors / num_threads;
  228. struct single_thread_pool : thread_pool { single_thread_pool() : thread_pool(1) {} };
  229. single_thread_pool pools[num_threads];
  230. std::vector<std::shared_ptr<member>> members(num_actors);
  231. receiver<int> rcvr;
  232. // Create the member actors.
  233. for (std::size_t i = 0; i < num_actors; ++i)
  234. members[i] = std::make_shared<member>(pools[(i / actors_per_thread) % num_threads].get_executor());
  235. // Initialise the actors by passing each one the address of the next actor in the ring.
  236. for (std::size_t i = num_actors, next_i = 0; i > 0; next_i = --i)
  237. send(members[next_i]->address(), rcvr.address(), members[i - 1]->address());
  238. // Send exactly one token to each actor, all with the same initial value, rounding up if required.
  239. for (std::size_t i = 0; i < num_actors; ++i)
  240. send(token_value, rcvr.address(), members[i]->address());
  241. // Wait for all signal messages, indicating the tokens have all reached zero.
  242. for (std::size_t i = 0; i < num_actors; ++i)
  243. rcvr.wait();
  244. }