generate_collect_optional.cpp 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. // Copyright (C) 2006 Douglas Gregor <doug.gregor@gmail.com>
  2. // Use, modification and distribution is subject to the Boost Software
  3. // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
  4. // http://www.boost.org/LICENSE_1_0.txt)
  5. // An example using Boost.MPI's split() operation on communicators to
  6. // create separate data-generating processes and data-collecting
  7. // processes using boost::optional for broadcasting.
  8. #include <boost/mpi.hpp>
  9. #include <iostream>
  10. #include <cstdlib>
  11. #include <boost/serialization/vector.hpp>
  12. #include <boost/serialization/optional.hpp>
  13. namespace mpi = boost::mpi;
  14. enum message_tags { msg_data_packet, msg_finished };
  15. void generate_data(mpi::communicator local, mpi::communicator world)
  16. {
  17. using std::srand;
  18. using std::rand;
  19. // The rank of the collector within the world communicator
  20. int master_collector = local.size();
  21. srand(time(0) + world.rank());
  22. // Send out several blocks of random data to the collectors.
  23. int num_data_blocks = rand() % 3 + 1;
  24. for (int block = 0; block < num_data_blocks; ++block) {
  25. // Generate some random dataa
  26. int num_samples = rand() % 1000;
  27. std::vector<int> data;
  28. for (int i = 0; i < num_samples; ++i) {
  29. data.push_back(rand());
  30. }
  31. // Send our data to the master collector process.
  32. std::cout << "Generator #" << local.rank() << " sends some data..."
  33. << std::endl;
  34. world.send(master_collector, msg_data_packet, data);
  35. }
  36. // Wait for all of the generators to complete
  37. (local.barrier)();
  38. // The first generator will send the message to the master collector
  39. // indicating that we're done.
  40. if (local.rank() == 0)
  41. world.send(master_collector, msg_finished);
  42. }
  43. void collect_data(mpi::communicator local, mpi::communicator world)
  44. {
  45. // The rank of the collector within the world communicator
  46. int master_collector = world.size() - local.size();
  47. if (world.rank() == master_collector) {
  48. while (true) {
  49. // Wait for a message
  50. mpi::status msg = world.probe();
  51. if (msg.tag() == msg_data_packet) {
  52. // Receive the packet of data into a boost::optional
  53. boost::optional<std::vector<int> > data;
  54. data = std::vector<int>();
  55. world.recv(msg.source(), msg.source(), *data);
  56. // Broadcast the actual data.
  57. broadcast(local, data, 0);
  58. } else if (msg.tag() == msg_finished) {
  59. // Receive the message
  60. world.recv(msg.source(), msg.tag());
  61. // Broadcast to each collector to tell them we've finished.
  62. boost::optional<std::vector<int> > data;
  63. broadcast(local, data, 0);
  64. break;
  65. }
  66. }
  67. } else {
  68. boost::optional<std::vector<int> > data;
  69. do {
  70. // Wait for a broadcast from the master collector
  71. broadcast(local, data, 0);
  72. if (data) {
  73. std::cout << "Collector #" << local.rank()
  74. << " is processing data." << std::endl;
  75. }
  76. } while (data);
  77. }
  78. }
  79. int main(int argc, char* argv[])
  80. {
  81. mpi::environment env(argc, argv);
  82. mpi::communicator world;
  83. if (world.size() < 4) {
  84. if (world.rank() == 0) {
  85. std::cerr << "Error: this example requires at least 4 processes."
  86. << std::endl;
  87. }
  88. env.abort(-1);
  89. }
  90. bool is_generator = world.rank() < 2 * world.size() / 3;
  91. mpi::communicator local = world.split(is_generator? 0 : 1);
  92. if (is_generator) generate_data(local, world);
  93. else collect_data(local, world);
  94. return 0;
  95. }