mpi_process_group.hpp 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810
  1. // Copyright (C) 2004-2008 The Trustees of Indiana University.
  2. // Copyright (C) 2007 Douglas Gregor
  3. // Copyright (C) 2007 Matthias Troyer <troyer@boost-consulting.com>
  4. // Use, modification and distribution is subject to the Boost Software
  5. // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
  6. // http://www.boost.org/LICENSE_1_0.txt)
  7. // Authors: Douglas Gregor
  8. // Matthias Troyer
  9. // Andrew Lumsdaine
  10. #ifndef BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP
  11. #define BOOST_GRAPH_DISTRIBUTED_MPI_PROCESS_GROUP
  12. #ifndef BOOST_GRAPH_USE_MPI
  13. #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
  14. #endif
  15. //#define NO_SPLIT_BATCHES
  16. #define SEND_OOB_BSEND
  17. #include <boost/optional.hpp>
  18. #include <boost/shared_ptr.hpp>
  19. #include <boost/weak_ptr.hpp>
  20. #include <utility>
  21. #include <memory>
  22. #include <boost/function/function1.hpp>
  23. #include <boost/function/function2.hpp>
  24. #include <boost/function/function0.hpp>
  25. #include <boost/mpi.hpp>
  26. #include <boost/property_map/parallel/process_group.hpp>
  27. #include <boost/serialization/vector.hpp>
  28. #include <boost/utility/enable_if.hpp>
  29. namespace boost { namespace graph { namespace distributed {
  30. // Process group tags
  31. struct mpi_process_group_tag : virtual boost::parallel::linear_process_group_tag { };
  32. class mpi_process_group
  33. {
  34. struct impl;
  35. public:
  36. /// Number of tags available to each data structure.
  37. static const int max_tags = 256;
  38. /**
  39. * The type of a "receive" handler, that will be provided with
  40. * (source, tag) pairs when a message is received. Users can provide a
  41. * receive handler for a distributed data structure, for example, to
  42. * automatically pick up and respond to messages as needed.
  43. */
  44. typedef function<void(int source, int tag)> receiver_type;
  45. /**
  46. * The type of a handler for the on-synchronize event, which will be
  47. * executed at the beginning of synchronize().
  48. */
  49. typedef function0<void> on_synchronize_event_type;
  50. /// Used as a tag to help create an "empty" process group.
  51. struct create_empty {};
  52. /// The type used to buffer message data
  53. typedef boost::mpi::packed_oprimitive::buffer_type buffer_type;
  54. /// The type used to identify a process
  55. typedef int process_id_type;
  56. /// The type used to count the number of processes
  57. typedef int process_size_type;
  58. /// The type of communicator used to transmit data via MPI
  59. typedef boost::mpi::communicator communicator_type;
  60. /// Classification of the capabilities of this process group
  61. struct communication_category
  62. : virtual boost::parallel::bsp_process_group_tag,
  63. virtual mpi_process_group_tag { };
  64. // TBD: We can eliminate the "source" field and possibly the
  65. // "offset" field.
  66. struct message_header {
  67. /// The process that sent the message
  68. process_id_type source;
  69. /// The message tag
  70. int tag;
  71. /// The offset of the message into the buffer
  72. std::size_t offset;
  73. /// The length of the message in the buffer, in bytes
  74. std::size_t bytes;
  75. template <class Archive>
  76. void serialize(Archive& ar, int)
  77. {
  78. ar & source & tag & offset & bytes;
  79. }
  80. };
  81. /**
  82. * Stores the outgoing messages for a particular processor.
  83. *
  84. * @todo Evaluate whether we should use a deque instance, which
  85. * would reduce could reduce the cost of "sending" messages but
  86. * increases the time spent in the synchronization step.
  87. */
  88. struct outgoing_messages {
  89. outgoing_messages() {}
  90. ~outgoing_messages() {}
  91. std::vector<message_header> headers;
  92. buffer_type buffer;
  93. template <class Archive>
  94. void serialize(Archive& ar, int)
  95. {
  96. ar & headers & buffer;
  97. }
  98. void swap(outgoing_messages& x)
  99. {
  100. headers.swap(x.headers);
  101. buffer.swap(x.buffer);
  102. }
  103. };
  104. private:
  105. /**
  106. * Virtual base from which every trigger will be launched. See @c
  107. * trigger_launcher for more information.
  108. */
  109. class trigger_base : boost::noncopyable
  110. {
  111. public:
  112. explicit trigger_base(int tag) : tag_(tag) { }
  113. /// Retrieve the tag associated with this trigger
  114. int tag() const { return tag_; }
  115. virtual ~trigger_base() { }
  116. /**
  117. * Invoked to receive a message that matches a particular trigger.
  118. *
  119. * @param source the source of the message
  120. * @param tag the (local) tag of the message
  121. * @param context the context under which the trigger is being
  122. * invoked
  123. */
  124. virtual void
  125. receive(mpi_process_group const& pg, int source, int tag,
  126. trigger_receive_context context, int block=-1) const = 0;
  127. protected:
  128. // The message tag associated with this trigger
  129. int tag_;
  130. };
  131. /**
  132. * Launches a specific handler in response to a trigger. This
  133. * function object wraps up the handler function object and a buffer
  134. * for incoming data.
  135. */
  136. template<typename Type, typename Handler>
  137. class trigger_launcher : public trigger_base
  138. {
  139. public:
  140. explicit trigger_launcher(mpi_process_group& self, int tag,
  141. const Handler& handler)
  142. : trigger_base(tag), self(self), handler(handler)
  143. {}
  144. void
  145. receive(mpi_process_group const& pg, int source, int tag,
  146. trigger_receive_context context, int block=-1) const;
  147. private:
  148. mpi_process_group& self;
  149. mutable Handler handler;
  150. };
  151. /**
  152. * Launches a specific handler with a message reply in response to a
  153. * trigger. This function object wraps up the handler function
  154. * object and a buffer for incoming data.
  155. */
  156. template<typename Type, typename Handler>
  157. class reply_trigger_launcher : public trigger_base
  158. {
  159. public:
  160. explicit reply_trigger_launcher(mpi_process_group& self, int tag,
  161. const Handler& handler)
  162. : trigger_base(tag), self(self), handler(handler)
  163. {}
  164. void
  165. receive(mpi_process_group const& pg, int source, int tag,
  166. trigger_receive_context context, int block=-1) const;
  167. private:
  168. mpi_process_group& self;
  169. mutable Handler handler;
  170. };
  171. template<typename Type, typename Handler>
  172. class global_trigger_launcher : public trigger_base
  173. {
  174. public:
  175. explicit global_trigger_launcher(mpi_process_group& self, int tag,
  176. const Handler& handler)
  177. : trigger_base(tag), handler(handler)
  178. {
  179. }
  180. void
  181. receive(mpi_process_group const& pg, int source, int tag,
  182. trigger_receive_context context, int block=-1) const;
  183. private:
  184. mutable Handler handler;
  185. // TBD: do not forget to cancel any outstanding Irecv when deleted,
  186. // if we decide to use Irecv
  187. };
  188. template<typename Type, typename Handler>
  189. class global_irecv_trigger_launcher : public trigger_base
  190. {
  191. public:
  192. explicit global_irecv_trigger_launcher(mpi_process_group& self, int tag,
  193. const Handler& handler, int sz)
  194. : trigger_base(tag), handler(handler), buffer_size(sz)
  195. {
  196. prepare_receive(self,tag);
  197. }
  198. void
  199. receive(mpi_process_group const& pg, int source, int tag,
  200. trigger_receive_context context, int block=-1) const;
  201. private:
  202. void prepare_receive(mpi_process_group const& pg, int tag, bool force=false) const;
  203. Handler handler;
  204. int buffer_size;
  205. // TBD: do not forget to cancel any outstanding Irecv when deleted,
  206. // if we decide to use Irecv
  207. };
  208. public:
  209. /**
  210. * Construct a new BSP process group from an MPI communicator. The
  211. * MPI communicator will be duplicated to create a new communicator
  212. * for this process group to use.
  213. */
  214. mpi_process_group(communicator_type parent_comm = communicator_type());
  215. /**
  216. * Construct a new BSP process group from an MPI communicator. The
  217. * MPI communicator will be duplicated to create a new communicator
  218. * for this process group to use. This constructor allows to tune the
  219. * size of message batches.
  220. *
  221. * @param num_headers The maximum number of headers in a message batch
  222. *
  223. * @param buffer_size The maximum size of the message buffer in a batch.
  224. *
  225. */
  226. mpi_process_group( std::size_t num_headers, std::size_t buffer_size,
  227. communicator_type parent_comm = communicator_type());
  228. /**
  229. * Construct a copy of the BSP process group for a new distributed
  230. * data structure. This data structure will synchronize with all
  231. * other members of the process group's equivalence class (including
  232. * @p other), but will have its own set of tags.
  233. *
  234. * @param other The process group that this new process group will
  235. * be based on, using a different set of tags within the same
  236. * communication and synchronization space.
  237. *
  238. * @param handler A message handler that will be passed (source,
  239. * tag) pairs for each message received by this data
  240. * structure. The handler is expected to receive the messages
  241. * immediately. The handler can be changed after-the-fact by
  242. * calling @c replace_handler.
  243. *
  244. * @param out_of_band_receive An anachronism. TODO: remove this.
  245. */
  246. mpi_process_group(const mpi_process_group& other,
  247. const receiver_type& handler,
  248. bool out_of_band_receive = false);
  249. /**
  250. * Construct a copy of the BSP process group for a new distributed
  251. * data structure. This data structure will synchronize with all
  252. * other members of the process group's equivalence class (including
  253. * @p other), but will have its own set of tags.
  254. */
  255. mpi_process_group(const mpi_process_group& other,
  256. attach_distributed_object,
  257. bool out_of_band_receive = false);
  258. /**
  259. * Create an "empty" process group, with no information. This is an
  260. * internal routine that users should never need.
  261. */
  262. explicit mpi_process_group(create_empty) {}
  263. /**
  264. * Destroys this copy of the process group.
  265. */
  266. ~mpi_process_group();
  267. /**
  268. * Replace the current message handler with a new message handler.
  269. *
  270. * @param handle The new message handler.
  271. * @param out_of_band_receive An anachronism: remove this
  272. */
  273. void replace_handler(const receiver_type& handler,
  274. bool out_of_band_receive = false);
  275. /**
  276. * Turns this process group into the process group for a new
  277. * distributed data structure or object, allocating its own tag
  278. * block.
  279. */
  280. void make_distributed_object();
  281. /**
  282. * Replace the handler to be invoked at the beginning of synchronize.
  283. */
  284. void
  285. replace_on_synchronize_handler(const on_synchronize_event_type& handler = 0);
  286. /**
  287. * Return the block number of the current data structure. A value of
  288. * 0 indicates that this particular instance of the process group is
  289. * not associated with any distributed data structure.
  290. */
  291. int my_block_number() const { return block_num? *block_num : 0; }
  292. /**
  293. * Encode a block number/tag pair into a single encoded tag for
  294. * transmission.
  295. */
  296. int encode_tag(int block_num, int tag) const
  297. { return block_num * max_tags + tag; }
  298. /**
  299. * Decode an encoded tag into a block number/tag pair.
  300. */
  301. std::pair<int, int> decode_tag(int encoded_tag) const
  302. { return std::make_pair(encoded_tag / max_tags, encoded_tag % max_tags); }
  303. // @todo Actually write up the friend declarations so these could be
  304. // private.
  305. // private:
  306. /** Allocate a block of tags for this instance. The block should not
  307. * have been allocated already, e.g., my_block_number() ==
  308. * 0. Returns the newly-allocated block number.
  309. */
  310. int allocate_block(bool out_of_band_receive = false);
  311. /** Potentially emit a receive event out of band. Returns true if an event
  312. * was actually sent, false otherwise.
  313. */
  314. bool maybe_emit_receive(int process, int encoded_tag) const;
  315. /** Emit a receive event. Returns true if an event was actually
  316. * sent, false otherwise.
  317. */
  318. bool emit_receive(int process, int encoded_tag) const;
  319. /** Emit an on-synchronize event to all block handlers. */
  320. void emit_on_synchronize() const;
  321. /** Retrieve a reference to the stored receiver in this block. */
  322. template<typename Receiver>
  323. Receiver* get_receiver();
  324. template<typename T>
  325. void
  326. send_impl(int dest, int tag, const T& value,
  327. mpl::true_ /*is_mpi_datatype*/) const;
  328. template<typename T>
  329. void
  330. send_impl(int dest, int tag, const T& value,
  331. mpl::false_ /*is_mpi_datatype*/) const;
  332. template<typename T>
  333. typename disable_if<boost::mpi::is_mpi_datatype<T>, void>::type
  334. array_send_impl(int dest, int tag, const T values[], std::size_t n) const;
  335. template<typename T>
  336. bool
  337. receive_impl(int source, int tag, T& value,
  338. mpl::true_ /*is_mpi_datatype*/) const;
  339. template<typename T>
  340. bool
  341. receive_impl(int source, int tag, T& value,
  342. mpl::false_ /*is_mpi_datatype*/) const;
  343. // Receive an array of values
  344. template<typename T>
  345. typename disable_if<boost::mpi::is_mpi_datatype<T>, bool>::type
  346. array_receive_impl(int source, int tag, T* values, std::size_t& n) const;
  347. optional<std::pair<mpi_process_group::process_id_type, int> > probe() const;
  348. void synchronize() const;
  349. operator bool() { return bool(impl_); }
  350. mpi_process_group base() const;
  351. /**
  352. * Create a new trigger for a specific message tag. Triggers handle
  353. * out-of-band messaging, and the handler itself will be called
  354. * whenever a message is available. The handler itself accepts four
  355. * arguments: the source of the message, the message tag (which will
  356. * be the same as @p tag), the message data (of type @c Type), and a
  357. * boolean flag that states whether the message was received
  358. * out-of-band. The last will be @c true for out-of-band receives,
  359. * or @c false for receives at the end of a synchronization step.
  360. */
  361. template<typename Type, typename Handler>
  362. void trigger(int tag, const Handler& handler);
  363. /**
  364. * Create a new trigger for a specific message tag, along with a way
  365. * to send a reply with data back to the sender. Triggers handle
  366. * out-of-band messaging, and the handler itself will be called
  367. * whenever a message is available. The handler itself accepts four
  368. * arguments: the source of the message, the message tag (which will
  369. * be the same as @p tag), the message data (of type @c Type), and a
  370. * boolean flag that states whether the message was received
  371. * out-of-band. The last will be @c true for out-of-band receives,
  372. * or @c false for receives at the end of a synchronization
  373. * step. The handler also returns a value, which will be routed back
  374. * to the sender.
  375. */
  376. template<typename Type, typename Handler>
  377. void trigger_with_reply(int tag, const Handler& handler);
  378. template<typename Type, typename Handler>
  379. void global_trigger(int tag, const Handler& handler, std::size_t buffer_size=0);
  380. /**
  381. * Poll for any out-of-band messages. This routine will check if any
  382. * out-of-band messages are available. Those that are available will
  383. * be handled immediately, if possible.
  384. *
  385. * @returns if an out-of-band message has been received, but we are
  386. * unable to actually receive the message, a (source, tag) pair will
  387. * be returned. Otherwise, returns an empty optional.
  388. *
  389. * @param wait When true, we should block until a message comes in.
  390. *
  391. * @param synchronizing whether we are currently synchronizing the
  392. * process group
  393. */
  394. optional<std::pair<int, int> >
  395. poll(bool wait = false, int block = -1, bool synchronizing = false) const;
  396. /**
  397. * Determines the context of the trigger currently executing. If
  398. * multiple triggers are executing (recursively), then the context
  399. * for the most deeply nested trigger will be returned. If no
  400. * triggers are executing, returns @c trc_none. This might be used,
  401. * for example, to determine whether a reply to a message should
  402. * itself be sent out-of-band or whether it can go via the normal,
  403. * slower communication route.
  404. */
  405. trigger_receive_context trigger_context() const;
  406. /// INTERNAL ONLY
  407. void receive_batch(process_id_type source, outgoing_messages& batch) const;
  408. /// INTERNAL ONLY
  409. ///
  410. /// Determine the actual communicator and tag will be used for a
  411. /// transmission with the given tag.
  412. std::pair<boost::mpi::communicator, int>
  413. actual_communicator_and_tag(int tag, int block) const;
  414. /// set the size of the message buffer used for buffered oob sends
  415. static void set_message_buffer_size(std::size_t s);
  416. /// get the size of the message buffer used for buffered oob sends
  417. static std::size_t message_buffer_size();
  418. static int old_buffer_size;
  419. static void* old_buffer;
  420. private:
  421. void install_trigger(int tag, int block,
  422. shared_ptr<trigger_base> const& launcher);
  423. void poll_requests(int block=-1) const;
  424. // send a batch if the buffer is full now or would get full
  425. void maybe_send_batch(process_id_type dest) const;
  426. // actually send a batch
  427. void send_batch(process_id_type dest, outgoing_messages& batch) const;
  428. void send_batch(process_id_type dest) const;
  429. void pack_headers() const;
  430. /**
  431. * Process a batch of incoming messages immediately.
  432. *
  433. * @param source the source of these messages
  434. */
  435. void process_batch(process_id_type source) const;
  436. void receive_batch(boost::mpi::status& status) const;
  437. //void free_finished_sends() const;
  438. /// Status messages used internally by the process group
  439. enum status_messages {
  440. /// the first of the reserved message tags
  441. msg_reserved_first = 126,
  442. /// Sent from a processor when sending batched messages
  443. msg_batch = 126,
  444. /// Sent from a processor when sending large batched messages, larger than
  445. /// the maximum buffer size for messages to be received by MPI_Irecv
  446. msg_large_batch = 127,
  447. /// Sent from a source processor to everyone else when that
  448. /// processor has entered the synchronize() function.
  449. msg_synchronizing = 128,
  450. /// the last of the reserved message tags
  451. msg_reserved_last = 128
  452. };
  453. /**
  454. * Description of a block of tags associated to a particular
  455. * distributed data structure. This structure will live as long as
  456. * the distributed data structure is around, and will be used to
  457. * help send messages to the data structure.
  458. */
  459. struct block_type
  460. {
  461. block_type() { }
  462. /// Handler for receive events
  463. receiver_type on_receive;
  464. /// Handler executed at the start of synchronization
  465. on_synchronize_event_type on_synchronize;
  466. /// Individual message triggers. Note: at present, this vector is
  467. /// indexed by the (local) tag of the trigger. Any tags that
  468. /// don't have triggers will have NULL pointers in that spot.
  469. std::vector<shared_ptr<trigger_base> > triggers;
  470. };
  471. /**
  472. * Data structure containing all of the blocks for the distributed
  473. * data structures attached to a process group.
  474. */
  475. typedef std::vector<block_type*> blocks_type;
  476. /// Iterator into @c blocks_type.
  477. typedef blocks_type::iterator block_iterator;
  478. /**
  479. * Deleter used to deallocate a block when its distributed data
  480. * structure is destroyed. This type will be used as the deleter for
  481. * @c block_num.
  482. */
  483. struct deallocate_block;
  484. static std::vector<char> message_buffer;
  485. public:
  486. /**
  487. * Data associated with the process group and all of its attached
  488. * distributed data structures.
  489. */
  490. shared_ptr<impl> impl_;
  491. /**
  492. * When non-null, indicates that this copy of the process group is
  493. * associated with a particular distributed data structure. The
  494. * integer value contains the block number (a value > 0) associated
  495. * with that data structure. The deleter for this @c shared_ptr is a
  496. * @c deallocate_block object that will deallocate the associated
  497. * block in @c impl_->blocks.
  498. */
  499. shared_ptr<int> block_num;
  500. /**
  501. * Rank of this process, to avoid having to call rank() repeatedly.
  502. */
  503. int rank;
  504. /**
  505. * Number of processes in this process group, to avoid having to
  506. * call communicator::size() repeatedly.
  507. */
  508. int size;
  509. };
  510. inline mpi_process_group::process_id_type
  511. process_id(const mpi_process_group& pg)
  512. { return pg.rank; }
  513. inline mpi_process_group::process_size_type
  514. num_processes(const mpi_process_group& pg)
  515. { return pg.size; }
  516. mpi_process_group::communicator_type communicator(const mpi_process_group& pg);
  517. template<typename T>
  518. void
  519. send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
  520. int tag, const T& value);
  521. template<typename InputIterator>
  522. void
  523. send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
  524. int tag, InputIterator first, InputIterator last);
  525. template<typename T>
  526. inline void
  527. send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
  528. int tag, T* first, T* last)
  529. { send(pg, dest, tag, first, last - first); }
  530. template<typename T>
  531. inline void
  532. send(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
  533. int tag, const T* first, const T* last)
  534. { send(pg, dest, tag, first, last - first); }
  535. template<typename T>
  536. mpi_process_group::process_id_type
  537. receive(const mpi_process_group& pg, int tag, T& value);
  538. template<typename T>
  539. mpi_process_group::process_id_type
  540. receive(const mpi_process_group& pg,
  541. mpi_process_group::process_id_type source, int tag, T& value);
  542. optional<std::pair<mpi_process_group::process_id_type, int> >
  543. probe(const mpi_process_group& pg);
  544. void synchronize(const mpi_process_group& pg);
  545. template<typename T, typename BinaryOperation>
  546. T*
  547. all_reduce(const mpi_process_group& pg, T* first, T* last, T* out,
  548. BinaryOperation bin_op);
  549. template<typename T, typename BinaryOperation>
  550. T*
  551. scan(const mpi_process_group& pg, T* first, T* last, T* out,
  552. BinaryOperation bin_op);
  553. template<typename InputIterator, typename T>
  554. void
  555. all_gather(const mpi_process_group& pg,
  556. InputIterator first, InputIterator last, std::vector<T>& out);
  557. template<typename InputIterator>
  558. mpi_process_group
  559. process_subgroup(const mpi_process_group& pg,
  560. InputIterator first, InputIterator last);
  561. template<typename T>
  562. void
  563. broadcast(const mpi_process_group& pg, T& val,
  564. mpi_process_group::process_id_type root);
  565. /*******************************************************************
  566. * Out-of-band communication *
  567. *******************************************************************/
  568. template<typename T>
  569. typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
  570. send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
  571. int tag, const T& value, int block=-1)
  572. {
  573. using boost::mpi::get_mpi_datatype;
  574. // Determine the actual message tag we will use for the send, and which
  575. // communicator we will use.
  576. std::pair<boost::mpi::communicator, int> actual
  577. = pg.actual_communicator_and_tag(tag, block);
  578. #ifdef SEND_OOB_BSEND
  579. if (mpi_process_group::message_buffer_size()) {
  580. MPI_Bsend(const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), dest,
  581. actual.second, actual.first);
  582. return;
  583. }
  584. #endif
  585. MPI_Request request;
  586. MPI_Isend(const_cast<T*>(&value), 1, get_mpi_datatype<T>(value), dest,
  587. actual.second, actual.first, &request);
  588. int done=0;
  589. do {
  590. pg.poll();
  591. MPI_Test(&request,&done,MPI_STATUS_IGNORE);
  592. } while (!done);
  593. }
  594. template<typename T>
  595. typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
  596. send_oob(const mpi_process_group& pg, mpi_process_group::process_id_type dest,
  597. int tag, const T& value, int block=-1)
  598. {
  599. using boost::mpi::packed_oarchive;
  600. // Determine the actual message tag we will use for the send, and which
  601. // communicator we will use.
  602. std::pair<boost::mpi::communicator, int> actual
  603. = pg.actual_communicator_and_tag(tag, block);
  604. // Serialize the data into a buffer
  605. packed_oarchive out(actual.first);
  606. out << value;
  607. std::size_t size = out.size();
  608. // Send the actual message data
  609. #ifdef SEND_OOB_BSEND
  610. if (mpi_process_group::message_buffer_size()) {
  611. MPI_Bsend(const_cast<void*>(out.address()), size, MPI_PACKED,
  612. dest, actual.second, actual.first);
  613. return;
  614. }
  615. #endif
  616. MPI_Request request;
  617. MPI_Isend(const_cast<void*>(out.address()), size, MPI_PACKED,
  618. dest, actual.second, actual.first, &request);
  619. int done=0;
  620. do {
  621. pg.poll();
  622. MPI_Test(&request,&done,MPI_STATUS_IGNORE);
  623. } while (!done);
  624. }
  625. template<typename T>
  626. typename enable_if<boost::mpi::is_mpi_datatype<T> >::type
  627. receive_oob(const mpi_process_group& pg,
  628. mpi_process_group::process_id_type source, int tag, T& value, int block=-1);
  629. template<typename T>
  630. typename disable_if<boost::mpi::is_mpi_datatype<T> >::type
  631. receive_oob(const mpi_process_group& pg,
  632. mpi_process_group::process_id_type source, int tag, T& value, int block=-1);
  633. template<typename SendT, typename ReplyT>
  634. typename enable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
  635. send_oob_with_reply(const mpi_process_group& pg,
  636. mpi_process_group::process_id_type dest,
  637. int tag, const SendT& send_value, ReplyT& reply_value,
  638. int block = -1);
  639. template<typename SendT, typename ReplyT>
  640. typename disable_if<boost::mpi::is_mpi_datatype<ReplyT> >::type
  641. send_oob_with_reply(const mpi_process_group& pg,
  642. mpi_process_group::process_id_type dest,
  643. int tag, const SendT& send_value, ReplyT& reply_value,
  644. int block = -1);
  645. } } } // end namespace boost::graph::distributed
  646. BOOST_IS_BITWISE_SERIALIZABLE(boost::graph::distributed::mpi_process_group::message_header)
  647. namespace boost { namespace mpi {
  648. template<>
  649. struct is_mpi_datatype<boost::graph::distributed::mpi_process_group::message_header> : mpl::true_ { };
  650. } } // end namespace boost::mpi
  651. namespace std {
  652. /// optimized swap for outgoing messages
  653. inline void
  654. swap(boost::graph::distributed::mpi_process_group::outgoing_messages& x,
  655. boost::graph::distributed::mpi_process_group::outgoing_messages& y)
  656. {
  657. x.swap(y);
  658. }
  659. }
  660. BOOST_CLASS_IMPLEMENTATION(boost::graph::distributed::mpi_process_group::outgoing_messages,object_serializable)
  661. BOOST_CLASS_TRACKING(boost::graph::distributed::mpi_process_group::outgoing_messages,track_never)
  662. #include <boost/graph/distributed/detail/mpi_process_group.ipp>
  663. #endif // BOOST_PARALLEL_MPI_MPI_PROCESS_GROUP_HPP