queue.ipp 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. // Copyright (C) 2004-2006 The Trustees of Indiana University.
  2. // Use, modification and distribution is subject to the Boost Software
  3. // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
  4. // http://www.boost.org/LICENSE_1_0.txt)
  5. // Authors: Douglas Gregor
  6. // Andrew Lumsdaine
  7. #include <boost/optional.hpp>
  8. #include <cassert>
  9. #include <boost/graph/parallel/algorithm.hpp>
  10. #include <boost/graph/parallel/process_group.hpp>
  11. #include <functional>
  12. #include <algorithm>
  13. #include <boost/graph/parallel/simple_trigger.hpp>
  14. #ifndef BOOST_GRAPH_USE_MPI
  15. #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
  16. #endif
  17. namespace boost { namespace graph { namespace distributed {
  18. template<BOOST_DISTRIBUTED_QUEUE_PARMS>
  19. BOOST_DISTRIBUTED_QUEUE_TYPE::
  20. distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
  21. const Buffer& buffer, bool polling)
  22. : process_group(process_group, attach_distributed_object()),
  23. owner(owner),
  24. buffer(buffer),
  25. polling(polling)
  26. {
  27. if (!polling)
  28. outgoing_buffers.reset(
  29. new outgoing_buffers_t(num_processes(process_group)));
  30. setup_triggers();
  31. }
  32. template<BOOST_DISTRIBUTED_QUEUE_PARMS>
  33. BOOST_DISTRIBUTED_QUEUE_TYPE::
  34. distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
  35. const Buffer& buffer, const UnaryPredicate& pred,
  36. bool polling)
  37. : process_group(process_group, attach_distributed_object()),
  38. owner(owner),
  39. buffer(buffer),
  40. pred(pred),
  41. polling(polling)
  42. {
  43. if (!polling)
  44. outgoing_buffers.reset(
  45. new outgoing_buffers_t(num_processes(process_group)));
  46. setup_triggers();
  47. }
  48. template<BOOST_DISTRIBUTED_QUEUE_PARMS>
  49. BOOST_DISTRIBUTED_QUEUE_TYPE::
  50. distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,
  51. const UnaryPredicate& pred, bool polling)
  52. : process_group(process_group, attach_distributed_object()),
  53. owner(owner),
  54. pred(pred),
  55. polling(polling)
  56. {
  57. if (!polling)
  58. outgoing_buffers.reset(
  59. new outgoing_buffers_t(num_processes(process_group)));
  60. setup_triggers();
  61. }
  62. template<BOOST_DISTRIBUTED_QUEUE_PARMS>
  63. void
  64. BOOST_DISTRIBUTED_QUEUE_TYPE::push(const value_type& x)
  65. {
  66. typename ProcessGroup::process_id_type dest = get(owner, x);
  67. if (outgoing_buffers)
  68. outgoing_buffers->at(dest).push_back(x);
  69. else if (dest == process_id(process_group))
  70. buffer.push(x);
  71. else
  72. send(process_group, get(owner, x), msg_push, x);
  73. }
  74. template<BOOST_DISTRIBUTED_QUEUE_PARMS>
  75. bool
  76. BOOST_DISTRIBUTED_QUEUE_TYPE::empty() const
  77. {
  78. /* Processes will stay here until the buffer is nonempty or
  79. synchronization with the other processes indicates that all local
  80. buffers are empty (and no messages are in transit).
  81. */
  82. while (buffer.empty() && !do_synchronize()) ;
  83. return buffer.empty();
  84. }
  85. template<BOOST_DISTRIBUTED_QUEUE_PARMS>
  86. typename BOOST_DISTRIBUTED_QUEUE_TYPE::size_type
  87. BOOST_DISTRIBUTED_QUEUE_TYPE::size() const
  88. {
  89. empty();
  90. return buffer.size();
  91. }
  92. template<BOOST_DISTRIBUTED_QUEUE_PARMS>
  93. void BOOST_DISTRIBUTED_QUEUE_TYPE::setup_triggers()
  94. {
  95. using boost::graph::parallel::simple_trigger;
  96. simple_trigger(process_group, msg_push, this,
  97. &distributed_queue::handle_push);
  98. simple_trigger(process_group, msg_multipush, this,
  99. &distributed_queue::handle_multipush);
  100. }
  101. template<BOOST_DISTRIBUTED_QUEUE_PARMS>
  102. void
  103. BOOST_DISTRIBUTED_QUEUE_TYPE::
  104. handle_push(int /*source*/, int /*tag*/, const value_type& value,
  105. trigger_receive_context)
  106. {
  107. if (pred(value)) buffer.push(value);
  108. }
  109. template<BOOST_DISTRIBUTED_QUEUE_PARMS>
  110. void
  111. BOOST_DISTRIBUTED_QUEUE_TYPE::
  112. handle_multipush(int /*source*/, int /*tag*/,
  113. const std::vector<value_type>& values,
  114. trigger_receive_context)
  115. {
  116. for (std::size_t i = 0; i < values.size(); ++i)
  117. if (pred(values[i])) buffer.push(values[i]);
  118. }
  119. template<BOOST_DISTRIBUTED_QUEUE_PARMS>
  120. bool
  121. BOOST_DISTRIBUTED_QUEUE_TYPE::do_synchronize() const
  122. {
  123. #ifdef PBGL_ACCOUNTING
  124. ++num_synchronizations;
  125. #endif
  126. using boost::parallel::all_reduce;
  127. using std::swap;
  128. typedef typename ProcessGroup::process_id_type process_id_type;
  129. if (outgoing_buffers) {
  130. // Transfer all of the push requests
  131. process_id_type id = process_id(process_group);
  132. process_id_type np = num_processes(process_group);
  133. for (process_id_type dest = 0; dest < np; ++dest) {
  134. outgoing_buffer_t& outgoing = outgoing_buffers->at(dest);
  135. std::size_t size = outgoing.size();
  136. if (size != 0) {
  137. if (dest != id) {
  138. send(process_group, dest, msg_multipush, outgoing);
  139. } else {
  140. for (std::size_t i = 0; i < size; ++i)
  141. buffer.push(outgoing[i]);
  142. }
  143. outgoing.clear();
  144. }
  145. }
  146. }
  147. synchronize(process_group);
  148. unsigned local_size = buffer.size();
  149. unsigned global_size =
  150. all_reduce(process_group, local_size, std::plus<unsigned>());
  151. return global_size == 0;
  152. }
  153. } } } // end namespace boost::graph::distributed