123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- // Copyright (C) 2005-2006 The Trustees of Indiana University.
- // Use, modification and distribution is subject to the Boost Software
- // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
- // http://www.boost.org/LICENSE_1_0.txt)
- // Authors: Douglas Gregor
- // Andrew Lumsdaine
- #ifndef BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
- #define BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
- #ifndef BOOST_GRAPH_USE_MPI
- #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
- #endif
- #include <boost/graph/parallel/process_group.hpp>
- #include <boost/type_traits/is_convertible.hpp>
- #include <vector>
- #include <boost/assert.hpp>
- #include <boost/optional.hpp>
- #include <queue>
- namespace boost { namespace graph { namespace detail {
- template<typename ProcessGroup>
- void do_synchronize(ProcessGroup& pg)
- {
- using boost::parallel::synchronize;
- synchronize(pg);
- }
- struct remote_set_queued {};
- struct remote_set_immediate {};
- template<typename ProcessGroup>
- class remote_set_semantics
- {
- BOOST_STATIC_CONSTANT
- (bool,
- queued = (is_convertible<
- typename ProcessGroup::communication_category,
- boost::parallel::bsp_process_group_tag>::value));
- public:
- typedef typename mpl::if_c<queued,
- remote_set_queued,
- remote_set_immediate>::type type;
- };
- template<typename Derived, typename ProcessGroup, typename Value,
- typename OwnerMap,
- typename Semantics = typename remote_set_semantics<ProcessGroup>::type>
- class remote_update_set;
- /**********************************************************************
- * Remote updating set that queues messages until synchronization *
- **********************************************************************/
- template<typename Derived, typename ProcessGroup, typename Value,
- typename OwnerMap>
- class remote_update_set<Derived, ProcessGroup, Value, OwnerMap,
- remote_set_queued>
- {
- typedef typename property_traits<OwnerMap>::key_type Key;
- typedef std::vector<std::pair<Key, Value> > Updates;
- typedef typename Updates::size_type updates_size_type;
- typedef typename Updates::value_type updates_pair_type;
- public:
- private:
- typedef typename ProcessGroup::process_id_type process_id_type;
- enum message_kind {
- /** Message containing the number of updates that will be sent in
- * a msg_updates message that will immediately follow. This
- * message will contain a single value of type
- * updates_size_type.
- */
- msg_num_updates,
- /** Contains (key, value) pairs with all of the updates from a
- * particular source. The number of updates is variable, but will
- * be provided in a msg_num_updates message that immediately
- * preceeds this message.
- *
- */
- msg_updates
- };
- struct handle_messages
- {
- explicit
- handle_messages(remote_update_set* self, const ProcessGroup& pg)
- : self(self), update_sizes(num_processes(pg), 0) { }
- void operator()(process_id_type source, int tag)
- {
- switch(tag) {
- case msg_num_updates:
- {
- // Receive the # of updates
- updates_size_type num_updates;
- receive(self->process_group, source, tag, num_updates);
- update_sizes[source] = num_updates;
- }
- break;
- case msg_updates:
- {
- updates_size_type num_updates = update_sizes[source];
- BOOST_ASSERT(num_updates);
- // Receive the actual updates
- std::vector<updates_pair_type> updates(num_updates);
- receive(self->process_group, source, msg_updates, &updates[0],
- num_updates);
-
- // Send updates to derived "receive_update" member
- Derived* derived = static_cast<Derived*>(self);
- for (updates_size_type u = 0; u < num_updates; ++u)
- derived->receive_update(source, updates[u].first, updates[u].second);
- update_sizes[source] = 0;
- }
- break;
- };
- }
- private:
- remote_update_set* self;
- std::vector<updates_size_type> update_sizes;
- };
- friend struct handle_messages;
- protected:
- remote_update_set(const ProcessGroup& pg, const OwnerMap& owner)
- : process_group(pg, handle_messages(this, pg)),
- updates(num_processes(pg)), owner(owner) {
- }
- void update(const Key& key, const Value& value)
- {
- if (get(owner, key) == process_id(process_group)) {
- Derived* derived = static_cast<Derived*>(this);
- derived->receive_update(get(owner, key), key, value);
- }
- else {
- updates[get(owner, key)].push_back(std::make_pair(key, value));
- }
- }
- void collect() { }
- void synchronize()
- {
- // Emit all updates and then remove them
- process_id_type num_processes = updates.size();
- for (process_id_type p = 0; p < num_processes; ++p) {
- if (!updates[p].empty()) {
- send(process_group, p, msg_num_updates, updates[p].size());
- send(process_group, p, msg_updates,
- &updates[p].front(), updates[p].size());
- updates[p].clear();
- }
- }
-
- do_synchronize(process_group);
- }
- ProcessGroup process_group;
- private:
- std::vector<Updates> updates;
- OwnerMap owner;
- };
- /**********************************************************************
- * Remote updating set that sends messages immediately *
- **********************************************************************/
- template<typename Derived, typename ProcessGroup, typename Value,
- typename OwnerMap>
- class remote_update_set<Derived, ProcessGroup, Value, OwnerMap,
- remote_set_immediate>
- {
- typedef typename property_traits<OwnerMap>::key_type Key;
- typedef std::pair<Key, Value> update_pair_type;
- typedef typename std::vector<update_pair_type>::size_type updates_size_type;
- public:
- typedef typename ProcessGroup::process_id_type process_id_type;
- private:
- enum message_kind {
- /** Contains a (key, value) pair that will be updated. */
- msg_update
- };
- struct handle_messages
- {
- explicit handle_messages(remote_update_set* self, const ProcessGroup& pg)
- : self(self)
- { update_sizes.resize(num_processes(pg), 0); }
- void operator()(process_id_type source, int tag)
- {
- // Receive the # of updates
- BOOST_ASSERT(tag == msg_update);
- update_pair_type update;
- receive(self->process_group, source, tag, update);
-
- // Send update to derived "receive_update" member
- Derived* derived = static_cast<Derived*>(self);
- derived->receive_update(source, update.first, update.second);
- }
- private:
- std::vector<updates_size_type> update_sizes;
- remote_update_set* self;
- };
- friend struct handle_messages;
- protected:
- remote_update_set(const ProcessGroup& pg, const OwnerMap& owner)
- : process_group(pg, handle_messages(this, pg)), owner(owner) { }
- void update(const Key& key, const Value& value)
- {
- if (get(owner, key) == process_id(process_group)) {
- Derived* derived = static_cast<Derived*>(this);
- derived->receive_update(get(owner, key), key, value);
- }
- else
- send(process_group, get(owner, key), msg_update,
- update_pair_type(key, value));
- }
- void collect()
- {
- typedef std::pair<process_id_type, int> probe_type;
- handle_messages handler(this, process_group);
- while (optional<probe_type> stp = probe(process_group))
- if (stp->second == msg_update) handler(stp->first, stp->second);
- }
- void synchronize()
- {
- do_synchronize(process_group);
- }
- ProcessGroup process_group;
- OwnerMap owner;
- };
- } } } // end namespace boost::graph::detail
- #endif // BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
|