123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- // Copyright (C) 2006 Douglas Gregor <doug.gregor@gmail.com>
- // Use, modification and distribution is subject to the Boost Software
- // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
- // http://www.boost.org/LICENSE_1_0.txt)
- // An example using Boost.MPI's split() operation on communicators to
- // create separate data-generating processes and data-collecting
- // processes using boost::optional for broadcasting.
- #include <boost/mpi.hpp>
- #include <iostream>
- #include <cstdlib>
- #include <boost/serialization/vector.hpp>
- #include <boost/serialization/optional.hpp>
- namespace mpi = boost::mpi;
- enum message_tags { msg_data_packet, msg_finished };
- void generate_data(mpi::communicator local, mpi::communicator world)
- {
- using std::srand;
- using std::rand;
- // The rank of the collector within the world communicator
- int master_collector = local.size();
- srand(time(0) + world.rank());
- // Send out several blocks of random data to the collectors.
- int num_data_blocks = rand() % 3 + 1;
- for (int block = 0; block < num_data_blocks; ++block) {
- // Generate some random dataa
- int num_samples = rand() % 1000;
- std::vector<int> data;
- for (int i = 0; i < num_samples; ++i) {
- data.push_back(rand());
- }
- // Send our data to the master collector process.
- std::cout << "Generator #" << local.rank() << " sends some data..."
- << std::endl;
- world.send(master_collector, msg_data_packet, data);
- }
- // Wait for all of the generators to complete
- (local.barrier)();
- // The first generator will send the message to the master collector
- // indicating that we're done.
- if (local.rank() == 0)
- world.send(master_collector, msg_finished);
- }
- void collect_data(mpi::communicator local, mpi::communicator world)
- {
- // The rank of the collector within the world communicator
- int master_collector = world.size() - local.size();
- if (world.rank() == master_collector) {
- while (true) {
- // Wait for a message
- mpi::status msg = world.probe();
- if (msg.tag() == msg_data_packet) {
- // Receive the packet of data into a boost::optional
- boost::optional<std::vector<int> > data;
- data = std::vector<int>();
- world.recv(msg.source(), msg.source(), *data);
- // Broadcast the actual data.
- broadcast(local, data, 0);
- } else if (msg.tag() == msg_finished) {
- // Receive the message
- world.recv(msg.source(), msg.tag());
- // Broadcast to each collector to tell them we've finished.
- boost::optional<std::vector<int> > data;
- broadcast(local, data, 0);
- break;
- }
- }
- } else {
- boost::optional<std::vector<int> > data;
- do {
- // Wait for a broadcast from the master collector
- broadcast(local, data, 0);
- if (data) {
- std::cout << "Collector #" << local.rank()
- << " is processing data." << std::endl;
- }
- } while (data);
- }
- }
- int main(int argc, char* argv[])
- {
- mpi::environment env(argc, argv);
- mpi::communicator world;
- if (world.size() < 4) {
- if (world.rank() == 0) {
- std::cerr << "Error: this example requires at least 4 processes."
- << std::endl;
- }
- env.abort(-1);
- }
- bool is_generator = world.rank() < 2 * world.size() / 3;
- mpi::communicator local = world.split(is_generator? 0 : 1);
- if (is_generator) generate_data(local, world);
- else collect_data(local, world);
- return 0;
- }
|