queue.hpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. // Copyright (C) 2004-2006 The Trustees of Indiana University.
  2. // Use, modification and distribution is subject to the Boost Software
  3. // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
  4. // http://www.boost.org/LICENSE_1_0.txt)
  5. // Authors: Douglas Gregor
  6. // Andrew Lumsdaine
  7. #ifndef BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
  8. #define BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP
  9. #ifndef BOOST_GRAPH_USE_MPI
  10. #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
  11. #endif
  12. #include <boost/graph/parallel/process_group.hpp>
  13. #include <boost/optional.hpp>
  14. #include <boost/shared_ptr.hpp>
  15. #include <vector>
  16. namespace boost { namespace graph { namespace distributed {
  17. /// A unary predicate that always returns "true".
  18. struct always_push
  19. {
  20. template<typename T> bool operator()(const T&) const { return true; }
  21. };
  22. /** A distributed queue adaptor.
  23. *
  24. * Class template @c distributed_queue implements a distributed queue
  25. * across a process group. The distributed queue is an adaptor over an
  26. * existing (local) queue, which must model the @ref Buffer
  27. * concept. Each process stores a distinct copy of the local queue,
  28. * from which it draws or removes elements via the @ref pop and @ref
  29. * top members.
  30. *
  31. * The value type of the local queue must be a model of the @ref
  32. * GlobalDescriptor concept. The @ref push operation of the
  33. * distributed queue passes (via a message) the value to its owning
  34. * processor. Thus, the elements within a particular local queue are
  35. * guaranteed to have the process owning that local queue as an owner.
  36. *
  37. * Synchronization of distributed queues occurs in the @ref empty and
  38. * @ref size functions, which will only return "empty" values (true or
  39. * 0, respectively) when the entire distributed queue is empty. If the
  40. * local queue is empty but the distributed queue is not, the
  41. * operation will block until either condition changes. When the @ref
  42. * size function of a nonempty queue returns, it returns the size of
  43. * the local queue. These semantics were selected so that sequential
  44. * code that processes elements in the queue via the following idiom
  45. * can be parallelized via introduction of a distributed queue:
  46. *
  47. * distributed_queue<...> Q;
  48. * Q.push(x);
  49. * while (!Q.empty()) {
  50. * // do something, that may push a value onto Q
  51. * }
  52. *
  53. * In the parallel version, the initial @ref push operation will place
  54. * the value @c x onto its owner's queue. All processes will
  55. * synchronize at the call to empty, and only the process owning @c x
  56. * will be allowed to execute the loop (@ref Q.empty() returns
  57. * false). This iteration may in turn push values onto other remote
  58. * queues, so when that process finishes execution of the loop body
  59. * and all processes synchronize again in @ref empty, more processes
  60. * may have nonempty local queues to execute. Once all local queues
  61. * are empty, @ref Q.empty() returns @c false for all processes.
  62. *
  63. * The distributed queue can receive messages at two different times:
  64. * during synchronization and when polling @ref empty. Messages are
  65. * always received during synchronization, to ensure that accurate
  66. * local queue sizes can be determines. However, whether @ref empty
  67. * should poll for messages is specified as an option to the
  68. * constructor. Polling may be desired when the order in which
  69. * elements in the queue are processed is not important, because it
  70. * permits fewer synchronization steps and less communication
  71. * overhead. However, when more strict ordering guarantees are
  72. * required, polling may be semantically incorrect. By disabling
  73. * polling, one ensures that parallel execution using the idiom above
  74. * will not process an element at a later "level" before an earlier
  75. * "level".
  76. *
  77. * The distributed queue nearly models the @ref Buffer
  78. * concept. However, the @ref push routine does not necessarily
  79. * increase the result of @c size() by one (although the size of the
  80. * global queue does increase by one).
  81. */
  82. template<typename ProcessGroup, typename OwnerMap, typename Buffer,
  83. typename UnaryPredicate = always_push>
  84. class distributed_queue
  85. {
  86. typedef distributed_queue self_type;
  87. enum {
  88. /** Message indicating a remote push. The message contains a
  89. * single value x of type value_type that is to be pushed on the
  90. * receiver's queue.
  91. */
  92. msg_push,
  93. /** Push many elements at once. */
  94. msg_multipush
  95. };
  96. public:
  97. typedef ProcessGroup process_group_type;
  98. typedef Buffer buffer_type;
  99. typedef typename buffer_type::value_type value_type;
  100. typedef typename buffer_type::size_type size_type;
  101. /** Construct a new distributed queue.
  102. *
  103. * Build a new distributed queue that communicates over the given @p
  104. * process_group, whose local queue is initialized via @p buffer and
  105. * which may or may not poll for messages.
  106. */
  107. explicit
  108. distributed_queue(const ProcessGroup& process_group,
  109. const OwnerMap& owner,
  110. const Buffer& buffer,
  111. bool polling = false);
  112. /** Construct a new distributed queue.
  113. *
  114. * Build a new distributed queue that communicates over the given @p
  115. * process_group, whose local queue is initialized via @p buffer and
  116. * which may or may not poll for messages.
  117. */
  118. explicit
  119. distributed_queue(const ProcessGroup& process_group = ProcessGroup(),
  120. const OwnerMap& owner = OwnerMap(),
  121. const Buffer& buffer = Buffer(),
  122. const UnaryPredicate& pred = UnaryPredicate(),
  123. bool polling = false);
  124. /** Construct a new distributed queue.
  125. *
  126. * Build a new distributed queue that communicates over the given @p
  127. * process_group, whose local queue is default-initalized and which
  128. * may or may not poll for messages.
  129. */
  130. distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
  131. const UnaryPredicate& pred, bool polling = false);
  132. /** Virtual destructor required with virtual functions.
  133. *
  134. */
  135. virtual ~distributed_queue() {}
  136. /** Push an element onto the distributed queue.
  137. *
  138. * The element will be sent to its owner process to be added to that
  139. * process's local queue. If polling is enabled for this queue and
  140. * the owner process is the current process, the value will be
  141. * immediately pushed onto the local queue.
  142. *
  143. * Complexity: O(1) messages of size O(sizeof(value_type)) will be
  144. * transmitted.
  145. */
  146. void push(const value_type& x);
  147. /** Pop an element off the local queue.
  148. *
  149. * @p @c !empty()
  150. */
  151. void pop() { buffer.pop(); }
  152. /**
  153. * Return the element at the top of the local queue.
  154. *
  155. * @p @c !empty()
  156. */
  157. value_type& top() { return buffer.top(); }
  158. /**
  159. * \overload
  160. */
  161. const value_type& top() const { return buffer.top(); }
  162. /** Determine if the queue is empty.
  163. *
  164. * When the local queue is nonempty, returns @c true. If the local
  165. * queue is empty, synchronizes with all other processes in the
  166. * process group until either (1) the local queue is nonempty
  167. * (returns @c true) (2) the entire distributed queue is empty
  168. * (returns @c false).
  169. */
  170. bool empty() const;
  171. /** Determine the size of the local queue.
  172. *
  173. * The behavior of this routine is equivalent to the behavior of
  174. * @ref empty, except that when @ref empty returns true this
  175. * function returns the size of the local queue and when @ref empty
  176. * returns false this function returns zero.
  177. */
  178. size_type size() const;
  179. // private:
  180. /** Synchronize the distributed queue and determine if all queues
  181. * are empty.
  182. *
  183. * \returns \c true when all local queues are empty, or false if at least
  184. * one of the local queues is nonempty.
  185. * Defined as virtual for derived classes like depth_limited_distributed_queue.
  186. */
  187. virtual bool do_synchronize() const;
  188. private:
  189. // Setup triggers
  190. void setup_triggers();
  191. // Message handlers
  192. void
  193. handle_push(int source, int tag, const value_type& value,
  194. trigger_receive_context);
  195. void
  196. handle_multipush(int source, int tag, const std::vector<value_type>& values,
  197. trigger_receive_context);
  198. mutable ProcessGroup process_group;
  199. OwnerMap owner;
  200. mutable Buffer buffer;
  201. UnaryPredicate pred;
  202. bool polling;
  203. typedef std::vector<value_type> outgoing_buffer_t;
  204. typedef std::vector<outgoing_buffer_t> outgoing_buffers_t;
  205. shared_ptr<outgoing_buffers_t> outgoing_buffers;
  206. };
  207. /// Helper macro containing the normal names for the template
  208. /// parameters to distributed_queue.
  209. #define BOOST_DISTRIBUTED_QUEUE_PARMS \
  210. typename ProcessGroup, typename OwnerMap, typename Buffer, \
  211. typename UnaryPredicate
  212. /// Helper macro containing the normal template-id for
  213. /// distributed_queue.
  214. #define BOOST_DISTRIBUTED_QUEUE_TYPE \
  215. distributed_queue<ProcessGroup, OwnerMap, Buffer, UnaryPredicate>
  216. /** Synchronize all processes involved with the given distributed queue.
  217. *
  218. * This function will synchronize all of the local queues for a given
  219. * distributed queue, by ensuring that no additional messages are in
  220. * transit. It is rarely required by the user, because most
  221. * synchronization of distributed queues occurs via the @c empty or @c
  222. * size methods.
  223. */
  224. template<BOOST_DISTRIBUTED_QUEUE_PARMS>
  225. inline void
  226. synchronize(const BOOST_DISTRIBUTED_QUEUE_TYPE& Q)
  227. { Q.do_synchronize(); }
  228. /// Construct a new distributed queue.
  229. template<typename ProcessGroup, typename OwnerMap, typename Buffer>
  230. inline distributed_queue<ProcessGroup, OwnerMap, Buffer>
  231. make_distributed_queue(const ProcessGroup& process_group,
  232. const OwnerMap& owner,
  233. const Buffer& buffer,
  234. bool polling = false)
  235. {
  236. typedef distributed_queue<ProcessGroup, OwnerMap, Buffer> result_type;
  237. return result_type(process_group, owner, buffer, polling);
  238. }
  239. } } } // end namespace boost::graph::distributed
  240. #include <boost/graph/distributed/detail/queue.ipp>
  241. #undef BOOST_DISTRIBUTED_QUEUE_TYPE
  242. #undef BOOST_DISTRIBUTED_QUEUE_PARMS
  243. #endif // BOOST_GRAPH_DISTRIBUTED_QUEUE_HPP