#include #include #include #include #include #include #include #include #include #include #include using boost::asio::defer; using boost::asio::executor; using boost::asio::post; using boost::asio::strand; using boost::asio::system_executor; //------------------------------------------------------------------------------ // A tiny actor framework // ~~~~~~~~~~~~~~~~~~~~~~ class actor; // Used to identify the sender and recipient of messages. typedef actor* actor_address; // Base class for all registered message handlers. class message_handler_base { public: virtual ~message_handler_base() {} // Used to determine which message handlers receive an incoming message. virtual const std::type_info& message_id() const = 0; }; // Base class for a handler for a specific message type. template class message_handler : public message_handler_base { public: // Handle an incoming message. virtual void handle_message(Message msg, actor_address from) = 0; }; // Concrete message handler for a specific message type. template class mf_message_handler : public message_handler { public: // Construct a message handler to invoke the specified member function. mf_message_handler(void (Actor::* mf)(Message, actor_address), Actor* a) : function_(mf), actor_(a) { } // Used to determine which message handlers receive an incoming message. virtual const std::type_info& message_id() const { return typeid(Message); } // Handle an incoming message. virtual void handle_message(Message msg, actor_address from) { (actor_->*function_)(std::move(msg), from); } // Determine whether the message handler represents the specified function. bool is_function(void (Actor::* mf)(Message, actor_address)) const { return mf == function_; } private: void (Actor::* function_)(Message, actor_address); Actor* actor_; }; // Base class for all actors. class actor { public: virtual ~actor() { } // Obtain the actor's address for use as a message sender or recipient. actor_address address() { return this; } // Send a message from one actor to another. template friend void send(Message msg, actor_address from, actor_address to) { // Execute the message handler in the context of the target's executor. post(to->executor_, [=] { to->call_handler(std::move(msg), from); }); } protected: // Construct the actor to use the specified executor for all message handlers. actor(executor e) : executor_(std::move(e)) { } // Register a handler for a specific message type. Duplicates are permitted. template void register_handler(void (Actor::* mf)(Message, actor_address)) { handlers_.push_back( std::make_shared>( mf, static_cast(this))); } // Deregister a handler. Removes only the first matching handler. template void deregister_handler(void (Actor::* mf)(Message, actor_address)) { const std::type_info& id = typeid(message_handler); for (auto iter = handlers_.begin(); iter != handlers_.end(); ++iter) { if ((*iter)->message_id() == id) { auto mh = static_cast*>(iter->get()); if (mh->is_function(mf)) { handlers_.erase(iter); return; } } } } // Send a message from within a message handler. template void tail_send(Message msg, actor_address to) { // Execute the message handler in the context of the target's executor. actor* from = this; defer(to->executor_, [=] { to->call_handler(std::move(msg), from); }); } private: // Find the matching message handlers, if any, and call them. template void call_handler(Message msg, actor_address from) { const std::type_info& message_id = typeid(Message); for (auto& h: handlers_) { if (h->message_id() == message_id) { auto mh = static_cast*>(h.get()); mh->handle_message(msg, from); } } } // All messages associated with a single actor object should be processed // non-concurrently. We use a strand to ensure non-concurrent execution even // if the underlying executor may use multiple threads. strand executor_; std::vector> handlers_; }; // A concrete actor that allows synchronous message retrieval. template class receiver : public actor { public: receiver() : actor(system_executor()) { register_handler(&receiver::message_handler); } // Block until a message has been received. Message wait() { std::unique_lock lock(mutex_); condition_.wait(lock, [this]{ return !message_queue_.empty(); }); Message msg(std::move(message_queue_.front())); message_queue_.pop_front(); return msg; } private: // Handle a new message by adding it to the queue and waking a waiter. void message_handler(Message msg, actor_address /* from */) { std::lock_guard lock(mutex_); message_queue_.push_back(std::move(msg)); condition_.notify_one(); } std::mutex mutex_; std::condition_variable condition_; std::deque message_queue_; }; //------------------------------------------------------------------------------ #include #include using boost::asio::thread_pool; class member : public actor { public: explicit member(executor e) : actor(std::move(e)) { register_handler(&member::init_handler); } private: void init_handler(actor_address next, actor_address from) { next_ = next; caller_ = from; register_handler(&member::token_handler); deregister_handler(&member::init_handler); } void token_handler(int token, actor_address /*from*/) { int msg(token); actor_address to(caller_); if (token > 0) { msg = token - 1; to = next_; } tail_send(msg, to); } actor_address next_; actor_address caller_; }; int main() { const std::size_t num_threads = 16; const int num_hops = 50000000; const std::size_t num_actors = 503; const int token_value = (num_hops + num_actors - 1) / num_actors; const std::size_t actors_per_thread = num_actors / num_threads; struct single_thread_pool : thread_pool { single_thread_pool() : thread_pool(1) {} }; single_thread_pool pools[num_threads]; std::vector> members(num_actors); receiver rcvr; // Create the member actors. for (std::size_t i = 0; i < num_actors; ++i) members[i] = std::make_shared(pools[(i / actors_per_thread) % num_threads].get_executor()); // Initialise the actors by passing each one the address of the next actor in the ring. for (std::size_t i = num_actors, next_i = 0; i > 0; next_i = --i) send(members[next_i]->address(), rcvr.address(), members[i - 1]->address()); // Send exactly one token to each actor, all with the same initial value, rounding up if required. for (std::size_t i = 0; i < num_actors; ++i) send(token_value, rcvr.address(), members[i]->address()); // Wait for all signal messages, indicating the tokens have all reached zero. for (std::size_t i = 0; i < num_actors; ++i) rcvr.wait(); }