// Copyright (C) 2004-2006 The Trustees of Indiana University. // Use, modification and distribution is subject to the Boost Software // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at // http://www.boost.org/LICENSE_1_0.txt) // Authors: Nick Edmonds // Douglas Gregor // Andrew Lumsdaine #ifndef BOOST_GRAPH_PARALLEL_CC_HPP #define BOOST_GRAPH_PARALLEL_CC_HPP #ifndef BOOST_GRAPH_USE_MPI #error "Parallel BGL files should not be included unless has been included" #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define PBGL_IN_PLACE_MERGE /* In place merge instead of sorting */ //#define PBGL_SORT_ASSERT /* Assert sorted for in place merge */ /* Explicit sychronization in pointer doubling step? */ #define PBGL_EXPLICIT_SYNCH //#define PBGL_CONSTRUCT_METAGRAPH #ifdef PBGL_CONSTRUCT_METAGRAPH # define MAX_VERTICES_IN_METAGRAPH 10000 #endif namespace boost { namespace graph { namespace distributed { namespace cc_detail { enum connected_components_message { edges_msg, req_parents_msg, parents_msg, root_adj_msg }; template struct metaVertex { metaVertex() {} metaVertex(const Vertex& v) : name(v) {} template void serialize(Archiver& ar, const unsigned int /*version*/) { ar & name; } Vertex name; }; #ifdef PBGL_CONSTRUCT_METAGRAPH // Build meta-graph on result of local connected components template void build_local_metagraph(const Graph& g, ParentMap p, RootIterator r, RootIterator r_end, AdjacencyMap& adj) { // TODO: Static assert that AdjacencyMap::value_type is std::vector typedef typename boost::graph::parallel::process_group_type::type process_group_type; typedef typename process_group_type::process_id_type process_id_type; typedef typename graph_traits::vertex_descriptor vertex_descriptor; BOOST_STATIC_ASSERT((is_same >::value)); using boost::graph::parallel::process_group; process_group_type pg = process_group(g); process_id_type id = process_id(pg); if (id != 0) { // Send component roots and their associated edges to P0 for ( ; r != r_end; ++r ) { std::vector adjs(1, *r); // Root adjs.reserve(adjs.size() + adj[*r].size()); for (typename std::vector::iterator iter = adj[*r].begin(); iter != adj[*r].end(); ++iter) adjs.push_back(get(p, *iter)); // Adjacencies send(pg, 0, root_adj_msg, adjs); } } synchronize(pg); if (id == 0) { typedef metaVertex VertexProperties; typedef boost::adjacency_list metaGraph; typedef typename graph_traits::vertex_descriptor meta_vertex_descriptor; std::map vertex_map; std::vector > edges; // Receive remote roots and edges while (optional > m = probe(pg)) { BOOST_ASSERT(m->second == root_adj_msg); std::vector adjs; receive(pg, m->first, m->second, adjs); vertex_map[adjs[0]] = graph_traits::null_vertex(); for (typename std::vector::iterator iter = ++adjs.begin(); iter != adjs.end(); ++iter) edges.push_back(std::make_pair(adjs[0], *iter)); } // Add local roots and edges for ( ; r != r_end; ++r ) { vertex_map[*r] = graph_traits::null_vertex(); edges.reserve(edges.size() + adj[*r].size()); for (typename std::vector::iterator iter = adj[*r].begin(); iter != adj[*r].end(); ++iter) edges.push_back(std::make_pair(*r, get(p, *iter))); } // Build local meta-graph metaGraph mg; // Add vertices with property to map back to distributed graph vertex for (typename std::map::iterator iter = vertex_map.begin(); iter != vertex_map.end(); ++iter) vertex_map[iter->first] = add_vertex(metaVertex(iter->first), mg); // Build meta-vertex map typename property_map::type metaVertexMap = get(&VertexProperties::name, mg); typename std::vector > ::iterator edge_iter = edges.begin(); for ( ; edge_iter != edges.end(); ++edge_iter) add_edge(vertex_map[edge_iter->first], vertex_map[edge_iter->second], mg); edges.clear(); // Call connected_components on it typedef typename property_map::type meta_index_map_type; meta_index_map_type meta_index = get(vertex_index, mg); std::vector mg_component_vec(num_vertices(mg)); typedef iterator_property_map::iterator, meta_index_map_type> meta_components_map_type; meta_components_map_type mg_component(mg_component_vec.begin(), meta_index); std::size_t num_comp = connected_components(mg, mg_component); // Update Parent pointers std::vector roots(num_comp, graph_traits::null_vertex()); BGL_FORALL_VERTICES_T(v, mg, metaGraph) { size_t component = get(mg_component, v); if (roots[component] == graph_traits::null_vertex() || get(meta_index, v) < get(meta_index, roots[component])) roots[component] = v; } // Set all the local parent pointers BGL_FORALL_VERTICES_T(v, mg, metaGraph) { // Problem in value being put (3rd parameter) put(p, get(metaVertexMap, v), get(metaVertexMap, roots[get(mg_component, v)])); } } synchronize(p); } #endif /* Function object used to remove internal vertices and vertices > the current vertex from the adjacent vertex lists at each root */ template class cull_adjacency_list { public: cull_adjacency_list(const Vertex v, const ParentMap p) : v(v), p(p) {} bool operator() (const Vertex x) { return (get(p, x) == v || x == v); } private: const Vertex v; const ParentMap p; }; /* Comparison operator used to choose targets for hooking s.t. vertices that are hooked to are evenly distributed across processors */ template class hashed_vertex_compare { public: hashed_vertex_compare (const OwnerMap& o, const LocalMap& l) : owner(o), local(l) { } template bool operator() (const Vertex x, const Vertex y) { if (get(local, x) < get(local, y)) return true; else if (get(local, x) == get(local, y)) return (get(owner, x) < get(owner, y)); return false; } private: OwnerMap owner; LocalMap local; }; #ifdef PBGL_EXPLICIT_SYNCH template void request_parent_map_entries(const Graph& g, ParentMap p, std::vector& parent_requests) { typedef typename boost::graph::parallel::process_group_type ::type process_group_type; typedef typename process_group_type::process_id_type process_id_type; typedef typename graph_traits::vertex_descriptor vertex_descriptor; process_group_type pg = process_group(g); /* This should probably be send_oob_with_reply, especially when Dave finishes prefetch-batching */ // Send root requests for (process_id_type i = 0; i < num_processes(pg); ++i) { if (!parent_requests[i].empty()) { std::vector reqs(parent_requests[i].begin(), parent_requests[i].end()); send(pg, i, req_parents_msg, reqs); } } synchronize(pg); // Receive root requests and reply to them while (optional > m = probe(pg)) { std::vector requests; receive(pg, m->first, m->second, requests); for (std::size_t i = 0; i < requests.size(); ++i) requests[i] = get(p, requests[i]); send(pg, m->first, parents_msg, requests); } synchronize(pg); // Receive requested parents std::vector responses; for (process_id_type i = 0; i < num_processes(pg); ++i) { if (!parent_requests[i].empty()) { receive(pg, i, parents_msg, responses); std::size_t parent_idx = 0; for (typename VertexList::iterator v = parent_requests[i].begin(); v != parent_requests[i].end(); ++v, ++parent_idx) put(p, *v, responses[parent_idx]); } } } #endif template void parallel_connected_components(DistributedGraph& g, ParentMap p) { using boost::connected_components; typedef typename graph_traits::adjacency_iterator adjacency_iterator; typedef typename graph_traits::vertex_descriptor vertex_descriptor; typedef typename boost::graph::parallel::process_group_type ::type process_group_type; typedef typename process_group_type::process_id_type process_id_type; using boost::graph::parallel::process_group; process_group_type pg = process_group(g); process_id_type id = process_id(pg); // TODO (NGE): Should old_roots, roots, and completed_roots be std::list adjacency_iterator av1, av2; std::vector old_roots; typename std::vector::iterator liter; typename std::vector::iterator aliter; typename std::map > adj; typedef typename property_map::const_type OwnerMap; OwnerMap owner = get(vertex_owner, g); typedef typename property_map::const_type LocalMap; LocalMap local = get(vertex_local, g); // We need to hold on to all of the parent pointers p.set_max_ghost_cells(0); // // STAGE 1 : Compute local components // local_subgraph ls(g); typedef typename property_map, vertex_index_t>::type local_index_map_type; local_index_map_type local_index = get(vertex_index, ls); // Compute local connected components std::vector ls_components_vec(num_vertices(ls)); typedef iterator_property_map::iterator, local_index_map_type> ls_components_map_type; ls_components_map_type ls_component(ls_components_vec.begin(), local_index); std::size_t num_comp = connected_components(ls, ls_component); std::vector roots(num_comp, graph_traits::null_vertex()); BGL_FORALL_VERTICES_T(v, g, DistributedGraph) { size_t component = get(ls_component, v); if (roots[component] == graph_traits::null_vertex() || get(local_index, v) < get(local_index, roots[component])) roots[component] = v; } // Set all the local parent pointers BGL_FORALL_VERTICES_T(v, g, DistributedGraph) { put(p, v, roots[get(ls_component, v)]); } if (num_processes(pg) == 1) return; // Build adjacency list for all roots BGL_FORALL_VERTICES_T(v, g, DistributedGraph) { std::vector& my_adj = adj[get(p, v)]; for (boost::tie(av1, av2) = adjacent_vertices(v, g); av1 != av2; ++av1) { if (get(owner, *av1) != id) my_adj.push_back(*av1); } } // For all vertices adjacent to a local vertex get p(v) for ( liter = roots.begin(); liter != roots.end(); ++liter ) { std::vector& my_adj = adj[*liter]; for ( aliter = my_adj.begin(); aliter != my_adj.end(); ++aliter ) request(p, *aliter); } synchronize(p); // Update adjacency list at root to make sure all adjacent // vertices are roots of remote components for ( liter = roots.begin(); liter != roots.end(); ++liter ) { std::vector& my_adj = adj[*liter]; for ( aliter = my_adj.begin(); aliter != my_adj.end(); ++aliter ) *aliter = get(p, *aliter); my_adj.erase (std::remove_if(my_adj.begin(), my_adj.end(), cull_adjacency_list(*liter, p) ), my_adj.end()); // This sort needs to be here to make sure the initial // adjacency list is sorted std::sort(my_adj.begin(), my_adj.end(), std::less()); my_adj.erase(std::unique(my_adj.begin(), my_adj.end()), my_adj.end()); } // Get p(v) for the new adjacent roots p.clear(); for ( liter = roots.begin(); liter != roots.end(); ++liter ) { std::vector& my_adj = adj[*liter]; for ( aliter = my_adj.begin(); aliter != my_adj.end(); ++aliter ) request(p, *aliter); } #ifdef PBGL_EXPLICIT_SYNCH synchronize(p); #endif // Lastly, remove roots with no adjacent vertices, this is // unnecessary but will speed up sparse graphs for ( liter = roots.begin(); liter != roots.end(); /*in loop*/) { if ( adj[*liter].empty() ) liter = roots.erase(liter); else ++liter; } #ifdef PBGL_CONSTRUCT_METAGRAPH /* TODO: If the number of roots is sufficiently small, we can use a 'problem folding' approach like we do in MST to gather all the roots and their adjacencies on one proc and solve for the connected components of the meta-graph */ using boost::parallel::all_reduce; std::size_t num_roots = all_reduce(pg, roots.size(), std::plus()); if (num_roots < MAX_VERTICES_IN_METAGRAPH) { build_local_metagraph(g, p, roots.begin(), roots.end(), adj); // For each vertex in g, p(v) = p(p(v)), assign parent of leaf // vertices from first step to final parent BGL_FORALL_VERTICES_T(v, g, DistributedGraph) { put(p, v, get(p, get(p, v))); } synchronize(p); return; } #endif // // Parallel Phase // std::vector completed_roots; hashed_vertex_compare v_compare(owner, local); bool any_hooked; vertex_descriptor new_root; std::size_t steps = 0; do { ++steps; // Pull in new parents for hooking phase synchronize(p); // // Hooking // bool hooked = false; completed_roots.clear(); for ( liter = roots.begin(); liter != roots.end(); ) { new_root = graph_traits::null_vertex(); std::vector& my_adj = adj[*liter]; for ( aliter = my_adj.begin(); aliter != my_adj.end(); ++aliter ) // try to hook to better adjacent vertex if ( v_compare( get(p, *aliter), *liter ) ) new_root = get(p, *aliter); if ( new_root != graph_traits::null_vertex() ) { hooked = true; put(p, *liter, new_root); old_roots.push_back(*liter); completed_roots.push_back(*liter); liter = roots.erase(liter); } else ++liter; } // // Pointer jumping, perform until new roots determined // // TODO: Implement cycle reduction rules to reduce this from // O(n) to O(log n) [n = cycle length] bool all_done; std::size_t parent_root_count; std::size_t double_steps = 0; do { ++double_steps; #ifndef PBGL_EXPLICIT_SYNCH // Get p(p(v)) for all old roots, and p(v) for all current roots for ( liter = old_roots.begin(); liter != old_roots.end(); ++liter ) request(p, get(p, *liter)); synchronize(p); #else // Build root requests typedef std::set VertexSet; std::vector parent_requests(num_processes(pg)); for ( liter = old_roots.begin(); liter != old_roots.end(); ++liter ) { vertex_descriptor p1 = *liter; if (get(owner, p1) != id) parent_requests[get(owner, p1)].insert(p1); vertex_descriptor p2 = get(p, p1); if (get(owner, p2) != id) parent_requests[get(owner, p2)].insert(p2); } request_parent_map_entries(g, p, parent_requests); #endif // Perform a pointer jumping step on all old roots for ( liter = old_roots.begin(); liter != old_roots.end(); ++liter ) put(p, *liter, get(p, get(p, *liter))); // make sure the parent of all old roots is itself a root parent_root_count = 0; for ( liter = old_roots.begin(); liter != old_roots.end(); ++liter ) if ( get(p, *liter) == get(p, get(p, *liter)) ) parent_root_count++; bool done = parent_root_count == old_roots.size(); all_reduce(pg, &done, &done+1, &all_done, std::logical_and()); } while ( !all_done ); #ifdef PARALLEL_BGL_DEBUG if (id == 0) std::cerr << double_steps << " doubling steps.\n"; #endif // // Add adjacent vertices of just completed roots to adjacent // vertex list at new parent // typename std::vector outgoing_edges; for ( liter = completed_roots.begin(); liter != completed_roots.end(); ++liter ) { vertex_descriptor new_parent = get(p, *liter); if ( get(owner, new_parent) == id ) { std::vector& my_adj = adj[new_parent]; my_adj.reserve(my_adj.size() + adj[*liter].size()); my_adj.insert( my_adj.end(), adj[*liter].begin(), adj[*liter].end() ); #ifdef PBGL_IN_PLACE_MERGE #ifdef PBGL_SORT_ASSERT BOOST_ASSERT(::boost::detail::is_sorted(my_adj.begin(), my_adj.end() - adj[*liter].size(), std::less())); BOOST_ASSERT(::boost::detail::is_sorted(my_adj.end() - adj[*liter].size(), my_adj.end(), std::less())); #endif std::inplace_merge(my_adj.begin(), my_adj.end() - adj[*liter].size(), my_adj.end(), std::less()); #endif } else if ( adj[*liter].begin() != adj[*liter].end() ) { outgoing_edges.clear(); outgoing_edges.reserve(adj[*liter].size() + 1); // First element is the destination of the adjacency list outgoing_edges.push_back(new_parent); outgoing_edges.insert(outgoing_edges.end(), adj[*liter].begin(), adj[*liter].end() ); send(pg, get(owner, new_parent), edges_msg, outgoing_edges); adj[*liter].clear(); } } synchronize(pg); // Receive edges sent by remote nodes and add them to the // indicated vertex's adjacency list while (optional > m = probe(pg)) { std::vector incoming_edges; receive(pg, m->first, edges_msg, incoming_edges); typename std::vector::iterator aviter = incoming_edges.begin(); ++aviter; std::vector& my_adj = adj[incoming_edges[0]]; my_adj.reserve(my_adj.size() + incoming_edges.size() - 1); my_adj.insert( my_adj.end(), aviter, incoming_edges.end() ); #ifdef PBGL_IN_PLACE_MERGE std::size_t num_incoming_edges = incoming_edges.size(); #ifdef PBGL_SORT_ASSERT BOOST_ASSERT(::boost::detail::is_sorted(my_adj.begin(), my_adj.end() - (num_incoming_edges-1), std::less())); BOOST_ASSERT(::boost::detail::is_sorted(my_adj.end() - (num_incoming_edges-1), my_adj.end(), std::less())); #endif std::inplace_merge(my_adj.begin(), my_adj.end() - (num_incoming_edges - 1), my_adj.end(), std::less()); #endif } // Remove any adjacent vertices that are in the same component // as a root from that root's list for ( liter = roots.begin(); liter != roots.end(); ++liter ) { // We can probably get away without sorting and removing // duplicates Though sorting *may* cause root // determination to occur faster by choosing the root with // the most potential to hook to at each step std::vector& my_adj = adj[*liter]; my_adj.erase (std::remove_if(my_adj.begin(), my_adj.end(), cull_adjacency_list(*liter, p) ), my_adj.end()); #ifndef PBGL_IN_PLACE_MERGE std::sort(my_adj.begin(), my_adj.end(), std::less() ); #endif my_adj.erase(std::unique(my_adj.begin(), my_adj.end()), my_adj.end()); } // Reduce result of empty root list test all_reduce(pg, &hooked, &hooked+1, &any_hooked, std::logical_or()); } while ( any_hooked ); #ifdef PARALLEL_BGL_DEBUG if (id == 0) std::cerr << steps << " iterations.\n"; #endif // // Finalize // // For each vertex in g, p(v) = p(p(v)), assign parent of leaf // vertices from first step to final parent BGL_FORALL_VERTICES_T(v, g, DistributedGraph) { put(p, v, get(p, get(p, v))); } synchronize(p); } } // end namespace cc_detail template typename property_traits::value_type number_components_from_parents(const Graph& g, ParentMap p, ComponentMap c) { typedef typename graph_traits::vertex_descriptor vertex_descriptor; typedef typename boost::graph::parallel::process_group_type::type process_group_type; typedef typename property_traits::value_type ComponentMapType; process_group_type pg = process_group(g); /* Build list of roots */ std::vector my_roots, all_roots; BGL_FORALL_VERTICES_T(v, g, Graph) { if( std::find( my_roots.begin(), my_roots.end(), get(p, v) ) == my_roots.end() ) my_roots.push_back( get(p, v) ); } all_gather(pg, my_roots.begin(), my_roots.end(), all_roots); /* Number components */ std::map comp_numbers; ComponentMapType c_num = 0; // Compute component numbers for (std::size_t i = 0; i < all_roots.size(); i++ ) if ( comp_numbers.count(all_roots[i]) == 0 ) comp_numbers[all_roots[i]] = c_num++; // Broadcast component numbers BGL_FORALL_VERTICES_T(v, g, Graph) { put( c, v, comp_numbers[get(p, v)] ); } // Broadcast number of components if (process_id(pg) == 0) { typedef typename process_group_type::process_size_type process_size_type; for (process_size_type dest = 1, n = num_processes(pg); dest != n; ++dest) send(pg, dest, 0, c_num); } synchronize(pg); if (process_id(pg) != 0) receive(pg, 0, 0, c_num); synchronize(c); return c_num; } template int number_components_from_parents(const Graph& g, ParentMap p, dummy_property_map) { using boost::parallel::all_reduce; // Count local roots. int num_roots = 0; BGL_FORALL_VERTICES_T(v, g, Graph) if (get(p, v) == v) ++num_roots; return all_reduce(g.process_group(), num_roots, std::plus()); } template typename property_traits::value_type connected_components (const Graph& g, ComponentMap c, ParentMap p BOOST_GRAPH_ENABLE_IF_MODELS_PARM(Graph, distributed_graph_tag)) { cc_detail::parallel_connected_components(g, p); return number_components_from_parents(g, p, c); } /* Construct ParentMap by default */ template typename property_traits::value_type connected_components ( const Graph& g, ComponentMap c BOOST_GRAPH_ENABLE_IF_MODELS_PARM(Graph, distributed_graph_tag) ) { typedef typename graph_traits::vertex_descriptor vertex_descriptor; std::vector x(num_vertices(g)); return connected_components (g, c, make_iterator_property_map(x.begin(), get(vertex_index, g))); } } // end namespace distributed using distributed::connected_components; } // end namespace graph using graph::distributed::connected_components; } // end namespace boost #endif // BOOST_GRAPH_PARALLEL_CC_HPP