generate_collect.cpp 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. // Copyright (C) 2006 Douglas Gregor <doug.gregor@gmail.com>
  2. // Use, modification and distribution is subject to the Boost Software
  3. // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
  4. // http://www.boost.org/LICENSE_1_0.txt)
  5. // An example using Boost.MPI's split() operation on communicators to
  6. // create separate data-generating processes and data-collecting
  7. // processes.
  8. #include <boost/mpi.hpp>
  9. #include <iostream>
  10. #include <cstdlib>
  11. #include <boost/serialization/vector.hpp>
  12. namespace mpi = boost::mpi;
  13. enum message_tags { msg_data_packet, msg_broadcast_data, msg_finished };
  14. void generate_data(mpi::communicator local, mpi::communicator world)
  15. {
  16. using std::srand;
  17. using std::rand;
  18. // The rank of the collector within the world communicator
  19. int master_collector = local.size();
  20. srand(time(0) + world.rank());
  21. // Send out several blocks of random data to the collectors.
  22. int num_data_blocks = rand() % 3 + 1;
  23. for (int block = 0; block < num_data_blocks; ++block) {
  24. // Generate some random data
  25. int num_samples = rand() % 1000;
  26. std::vector<int> data;
  27. for (int i = 0; i < num_samples; ++i) {
  28. data.push_back(rand());
  29. }
  30. // Send our data to the master collector process.
  31. std::cout << "Generator #" << local.rank() << " sends some data..."
  32. << std::endl;
  33. world.send(master_collector, msg_data_packet, data);
  34. }
  35. // Wait for all of the generators to complete
  36. (local.barrier)();
  37. // The first generator will send the message to the master collector
  38. // indicating that we're done.
  39. if (local.rank() == 0)
  40. world.send(master_collector, msg_finished);
  41. }
  42. void collect_data(mpi::communicator local, mpi::communicator world)
  43. {
  44. // The rank of the collector within the world communicator
  45. int master_collector = world.size() - local.size();
  46. if (world.rank() == master_collector) {
  47. while (true) {
  48. // Wait for a message
  49. mpi::status msg = world.probe();
  50. if (msg.tag() == msg_data_packet) {
  51. // Receive the packet of data
  52. std::vector<int> data;
  53. world.recv(msg.source(), msg.tag(), data);
  54. // Tell each of the collectors that we'll be broadcasting some data
  55. for (int dest = 1; dest < local.size(); ++dest)
  56. local.send(dest, msg_broadcast_data, msg.source());
  57. // Broadcast the actual data.
  58. broadcast(local, data, 0);
  59. } else if (msg.tag() == msg_finished) {
  60. // Receive the message
  61. world.recv(msg.source(), msg.tag());
  62. // Tell each of the collectors that we're finished
  63. for (int dest = 1; dest < local.size(); ++dest)
  64. local.send(dest, msg_finished);
  65. break;
  66. }
  67. }
  68. } else {
  69. while (true) {
  70. // Wait for a message from the master collector
  71. mpi::status msg = local.probe();
  72. if (msg.tag() == msg_broadcast_data) {
  73. // Receive the broadcast message
  74. int originator;
  75. local.recv(msg.source(), msg.tag(), originator);
  76. // Receive the data broadcasted from the master collector
  77. std::vector<int> data;
  78. broadcast(local, data, 0);
  79. std::cout << "Collector #" << local.rank()
  80. << " is processing data from generator #" << originator
  81. << "." << std::endl;
  82. } else if (msg.tag() == msg_finished) {
  83. // Receive the message
  84. local.recv(msg.source(), msg.tag());
  85. break;
  86. }
  87. }
  88. }
  89. }
  90. int main(int argc, char* argv[])
  91. {
  92. mpi::environment env(argc, argv);
  93. mpi::communicator world;
  94. if (world.size() < 3) {
  95. if (world.rank() == 0) {
  96. std::cerr << "Error: this example requires at least 3 processes."
  97. << std::endl;
  98. }
  99. env.abort(-1);
  100. }
  101. bool is_generator = world.rank() < 2 * world.size() / 3;
  102. mpi::communicator local = world.split(is_generator? 0 : 1);
  103. if (is_generator) generate_data(local, world);
  104. else collect_data(local, world);
  105. return 0;
  106. }