parallel_example.cpp 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. // Copyright (C) 2005-2006 Matthias Troyer
  2. // Use, modification and distribution is subject to the Boost Software
  3. // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
  4. // http://www.boost.org/LICENSE_1_0.txt)
  5. // An example of a parallel Monte Carlo simulation using some nodes to produce
  6. // data and others to aggregate the data
  7. #include <iostream>
  8. #include <boost/mpi.hpp>
  9. #include <boost/random/parallel.hpp>
  10. #include <boost/random.hpp>
  11. #include <boost/foreach.hpp>
  12. #include <iostream>
  13. #include <cstdlib>
  14. namespace mpi = boost::mpi;
  15. enum {sample_tag, sample_skeleton_tag, sample_broadcast_tag, quit_tag};
  16. void calculate_samples(int sample_length)
  17. {
  18. int num_samples = 100;
  19. std::vector<double> sample(sample_length);
  20. // setup communicator by splitting
  21. mpi::communicator world;
  22. mpi::communicator calculate_communicator = world.split(0);
  23. unsigned int num_calculate_ranks = calculate_communicator.size();
  24. // the master of the accumulaion ranks is the first of them, hence
  25. // with a rank just one after the last calculation rank
  26. int master_accumulate_rank = num_calculate_ranks;
  27. // the master of the calculation ranks sends the skeleton of the sample
  28. // to the master of the accumulation ranks
  29. if (world.rank()==0)
  30. world.send(master_accumulate_rank,sample_skeleton_tag,mpi::skeleton(sample));
  31. // next we extract the content of the sample vector, to be used in sending
  32. // the content later on
  33. mpi::content sample_content = mpi::get_content(sample);
  34. // now intialize the parallel random number generator
  35. boost::lcg64 engine(
  36. boost::random::stream_number = calculate_communicator.rank(),
  37. boost::random::total_streams = calculate_communicator.size()
  38. );
  39. boost::variate_generator<boost::lcg64&,boost::uniform_real<> >
  40. rng(engine,boost::uniform_real<>());
  41. for (unsigned int i=0; i<num_samples/num_calculate_ranks+1;++i) {
  42. // calculate sample by filling the vector with random numbers
  43. // note that std::generate will not work since it takes the generator
  44. // by value, and boost::ref cannot be used as a generator.
  45. // boost::ref should be fixed so that it can be used as generator
  46. BOOST_FOREACH(double& x, sample)
  47. x = rng();
  48. // send sample to accumulation ranks
  49. // Ideally we want to do this as a broadcast with an inter-communicator
  50. // between the calculation and accumulation ranks. MPI2 should support
  51. // this, but here we present an MPI1 compatible solution.
  52. // send content of sample to first (master) accumulation process
  53. world.send(master_accumulate_rank,sample_tag,sample_content);
  54. // gather some results from all calculation ranks
  55. double local_result = sample[0];
  56. std::vector<double> gathered_results(calculate_communicator.size());
  57. mpi::all_gather(calculate_communicator,local_result,gathered_results);
  58. }
  59. // we are done: the master tells the accumulation ranks to quit
  60. if (world.rank()==0)
  61. world.send(master_accumulate_rank,quit_tag);
  62. }
  63. void accumulate_samples()
  64. {
  65. std::vector<double> sample;
  66. // setup the communicator for all accumulation ranks by splitting
  67. mpi::communicator world;
  68. mpi::communicator accumulate_communicator = world.split(1);
  69. bool is_master_accumulate_rank = accumulate_communicator.rank()==0;
  70. // the master receives the sample skeleton
  71. if (is_master_accumulate_rank)
  72. world.recv(0,sample_skeleton_tag,mpi::skeleton(sample));
  73. // and broadcasts it to all accumulation ranks
  74. mpi::broadcast(accumulate_communicator,mpi::skeleton(sample),0);
  75. // next we extract the content of the sample vector, to be used in receiving
  76. // the content later on
  77. mpi::content sample_content = mpi::get_content(sample);
  78. // accumulate until quit is called
  79. double sum=0.;
  80. while (true) {
  81. // the accumulation master checks whether we should quit
  82. if (world.iprobe(0,quit_tag)) {
  83. world.recv(0,quit_tag);
  84. for (int i=1; i<accumulate_communicator.size();++i)
  85. accumulate_communicator.send(i,quit_tag);
  86. std::cout << sum << "\n";
  87. break; // We're done
  88. }
  89. // the otehr accumulation ranks check whether we should quit
  90. if (accumulate_communicator.iprobe(0,quit_tag)) {
  91. accumulate_communicator.recv(0,quit_tag);
  92. std::cout << sum << "\n";
  93. break; // We're done
  94. }
  95. // check whether the master accumulation rank has received a sample
  96. if (world.iprobe(mpi::any_source,sample_tag)) {
  97. BOOST_ASSERT(is_master_accumulate_rank);
  98. // receive the content
  99. world.recv(mpi::any_source,sample_tag,sample_content);
  100. // now we need to braodcast
  101. // the problam is we do not have a non-blocking broadcast that we could
  102. // abort if we receive a quit message from the master. We thus need to
  103. // first tell all accumulation ranks to start a broadcast. If the sample
  104. // is small, we could just send the sample in this message, but here we
  105. // optimize the code for large samples, so that the overhead of these
  106. // sends can be ignored, and we count on an optimized broadcast
  107. // implementation with O(log N) complexity
  108. for (int i=1; i<accumulate_communicator.size();++i)
  109. accumulate_communicator.send(i,sample_broadcast_tag);
  110. // now broadcast the contents of the sample to all accumulate ranks
  111. mpi::broadcast(accumulate_communicator,sample_content,0);
  112. // and handle the sample by summing the appropriate value
  113. sum += sample[0];
  114. }
  115. // the other accumulation ranks wait for a mesage to start the broadcast
  116. if (accumulate_communicator.iprobe(0,sample_broadcast_tag)) {
  117. BOOST_ASSERT(!is_master_accumulate_rank);
  118. accumulate_communicator.recv(0,sample_broadcast_tag);
  119. // receive broadcast of the sample contents
  120. mpi::broadcast(accumulate_communicator,sample_content,0);
  121. // and handle the sample
  122. // and handle the sample by summing the appropriate value
  123. sum += sample[accumulate_communicator.rank()];
  124. }
  125. }
  126. }
  127. int main(int argc, char** argv)
  128. {
  129. mpi::environment env(argc, argv);
  130. mpi::communicator world;
  131. // half of the processes generate, the others accumulate
  132. // the sample size is just the number of accumulation ranks
  133. if (world.rank() < world.size()/2)
  134. calculate_samples(world.size()-world.size()/2);
  135. else
  136. accumulate_samples();
  137. return 0;
  138. }