// Copyright 2004 The Trustees of Indiana University. // Distributed under the Boost Software License, Version 1.0. // (See accompanying file LICENSE_1_0.txt or copy at // http://www.boost.org/LICENSE_1_0.txt) // Authors: Douglas Gregor // Andrew Lumsdaine #ifndef BOOST_GRAPH_PARALLEL_BRANDES_BETWEENNESS_CENTRALITY_HPP #define BOOST_GRAPH_PARALLEL_BRANDES_BETWEENNESS_CENTRALITY_HPP #ifndef BOOST_GRAPH_USE_MPI #error "Parallel BGL files should not be included unless has been included" #endif // #define COMPUTE_PATH_COUNTS_INLINE #include #include #include #include #include #include // For additive_reducer #include #include #include #include #include #include #include #include // NGE - Needed for minstd_rand at L807, should pass vertex list // or generator instead #include #include #include #include // Appending reducer template struct append_reducer { BOOST_STATIC_CONSTANT(bool, non_default_resolver = true); template T operator()(const K&) const { return T(); } template T operator()(const K&, const T& x, const T& y) const { T z(x.begin(), x.end()); for (typename T::const_iterator iter = y.begin(); iter != y.end(); ++iter) if (std::find(z.begin(), z.end(), *iter) == z.end()) z.push_back(*iter); return z; } }; namespace boost { namespace serialization { // TODO(nge): Write generalized serialization for tuples template void serialize(Archive & ar, boost::tuple& t, const unsigned int) { ar & boost::tuples::get<0>(t); ar & boost::tuples::get<1>(t); ar & boost::tuples::get<2>(t); ar & boost::tuples::get<3>(t); } } // serialization template class get_owner_of_first_tuple_element { public: typedef typename property_traits::value_type owner_type; get_owner_of_first_tuple_element(OwnerMap owner) : owner(owner) { } owner_type get_owner(Tuple t) { return get(owner, boost::tuples::get<0>(t)); } private: OwnerMap owner; }; template typename get_owner_of_first_tuple_element::owner_type get(get_owner_of_first_tuple_element o, Tuple t) { return o.get_owner(t); } template class get_owner_of_first_pair_element { public: typedef typename property_traits::value_type owner_type; get_owner_of_first_pair_element(OwnerMap owner) : owner(owner) { } template owner_type get_owner(std::pair p) { return get(owner, p.first); } private: OwnerMap owner; }; template typename get_owner_of_first_pair_element::owner_type get(get_owner_of_first_pair_element o, std::pair p) { return o.get_owner(p); } namespace graph { namespace parallel { namespace detail { template class betweenness_centrality_msg_value { typedef typename property_traits::value_type distance_type; typedef typename property_traits::value_type incoming_type; typedef typename incoming_type::value_type incoming_value_type; public: typedef std::pair type; static type create(distance_type dist, incoming_value_type source) { return std::make_pair(dist, source); } }; /************************************************************************/ /* Delta-stepping Betweenness Centrality */ /************************************************************************/ template class betweenness_centrality_delta_stepping_impl { // Could inherit from delta_stepping_impl to get run() method // but for the time being it's just reproduced here typedef typename graph_traits::vertex_descriptor Vertex; typedef typename graph_traits::degree_size_type Degree; typedef typename property_traits::value_type Dist; typedef typename property_traits::value_type IncomingType; typedef typename boost::graph::parallel::process_group_type::type ProcessGroup; typedef std::list Bucket; typedef typename Bucket::iterator BucketIterator; typedef typename std::vector::size_type BucketIndex; typedef betweenness_centrality_msg_value MessageValue; enum { // Relax a remote vertex. The message contains a pair, the first part of which is the vertex whose // tentative distance is being relaxed and the second part // contains either the new distance (if there is no predecessor // map) or a pair with the distance and predecessor. msg_relax }; public: // Must supply delta, ctor that guesses delta removed betweenness_centrality_delta_stepping_impl(const Graph& g, DistanceMap distance, IncomingMap incoming, EdgeWeightMap weight, PathCountMap path_count, #ifdef COMPUTE_PATH_COUNTS_INLINE IsSettledMap is_settled, VertexIndexMap vertex_index, #endif Dist delta); void run(Vertex s); private: // Relax the edge (u, v), creating a new best path of distance x. void relax(Vertex u, Vertex v, Dist x); // Synchronize all of the processes, by receiving all messages that // have not yet been received. void synchronize() { using boost::parallel::synchronize; synchronize(pg); } // Setup triggers for msg_relax messages void setup_triggers() { using boost::parallel::simple_trigger; simple_trigger(pg, msg_relax, this, &betweenness_centrality_delta_stepping_impl::handle_msg_relax); } void handle_msg_relax(int /*source*/, int /*tag*/, const std::pair& data, boost::parallel::trigger_receive_context) { relax(data.second.second, data.first, data.second.first); } const Graph& g; IncomingMap incoming; DistanceMap distance; EdgeWeightMap weight; PathCountMap path_count; #ifdef COMPUTE_PATH_COUNTS_INLINE IsSettledMap is_settled; VertexIndexMap vertex_index; #endif Dist delta; ProcessGroup pg; typename property_map::const_type owner; typename property_map::const_type local; // A "property map" that contains the position of each vertex in // whatever bucket it resides in. std::vector position_in_bucket; // Bucket data structure. The ith bucket contains all local vertices // with (tentative) distance in the range [i*delta, // (i+1)*delta). std::vector buckets; // This "dummy" list is used only so that we can initialize the // position_in_bucket property map with non-singular iterators. This // won't matter for most implementations of the C++ Standard // Library, but it avoids undefined behavior and allows us to run // with library "debug modes". std::list dummy_list; // A "property map" that states which vertices have been deleted // from the bucket in this iteration. std::vector vertex_was_deleted; }; template betweenness_centrality_delta_stepping_impl< Graph, DistanceMap, IncomingMap, EdgeWeightMap, PathCountMap #ifdef COMPUTE_PATH_COUNTS_INLINE , IsSettledMap, VertexIndexMap #endif >:: betweenness_centrality_delta_stepping_impl(const Graph& g, DistanceMap distance, IncomingMap incoming, EdgeWeightMap weight, PathCountMap path_count, #ifdef COMPUTE_PATH_COUNTS_INLINE IsSettledMap is_settled, VertexIndexMap vertex_index, #endif Dist delta) : g(g), incoming(incoming), distance(distance), weight(weight), path_count(path_count), #ifdef COMPUTE_PATH_COUNTS_INLINE is_settled(is_settled), vertex_index(vertex_index), #endif delta(delta), pg(boost::graph::parallel::process_group_adl(g), boost::parallel::attach_distributed_object()), owner(get(vertex_owner, g)), local(get(vertex_local, g)) { setup_triggers(); } template void betweenness_centrality_delta_stepping_impl< Graph, DistanceMap, IncomingMap, EdgeWeightMap, PathCountMap #ifdef COMPUTE_PATH_COUNTS_INLINE , IsSettledMap, VertexIndexMap #endif >:: run(Vertex s) { typedef typename boost::graph::parallel::process_group_type::type process_group_type; typename process_group_type::process_id_type id = process_id(pg); Dist inf = (std::numeric_limits::max)(); // None of the vertices are stored in the bucket. position_in_bucket.clear(); position_in_bucket.resize(num_vertices(g), dummy_list.end()); // None of the vertices have been deleted vertex_was_deleted.clear(); vertex_was_deleted.resize(num_vertices(g), false); // No path from s to any other vertex, yet BGL_FORALL_VERTICES_T(v, g, Graph) put(distance, v, inf); // The distance to the starting node is zero if (get(owner, s) == id) // Put "s" into its bucket (bucket 0) relax(s, s, 0); else // Note that we know the distance to s is zero cache(distance, s, 0); #ifdef COMPUTE_PATH_COUNTS_INLINE // Synchronize here to deliver initial relaxation since we don't // synchronize at the beginning of the inner loop any more synchronize(); // Incoming edge count map is an implementation detail and should // be freed as soon as possible so build it here typedef typename graph_traits::edges_size_type edges_size_type; std::vector incoming_edge_countS(num_vertices(g)); iterator_property_map::iterator, VertexIndexMap> incoming_edge_count(incoming_edge_countS.begin(), vertex_index); #endif BucketIndex max_bucket = (std::numeric_limits::max)(); BucketIndex current_bucket = 0; do { #ifdef COMPUTE_PATH_COUNTS_INLINE // We need to clear the outgoing map after every bucket so just build it here std::vector outgoingS(num_vertices(g)); IncomingMap outgoing(outgoingS.begin(), vertex_index); outgoing.set_reduce(append_reducer()); #else // Synchronize with all of the other processes. synchronize(); #endif // Find the next bucket that has something in it. while (current_bucket < buckets.size() && (!buckets[current_bucket] || buckets[current_bucket]->empty())) ++current_bucket; if (current_bucket >= buckets.size()) current_bucket = max_bucket; // Find the smallest bucket (over all processes) that has vertices // that need to be processed. using boost::parallel::all_reduce; using boost::parallel::minimum; current_bucket = all_reduce(pg, current_bucket, minimum()); if (current_bucket == max_bucket) // There are no non-empty buckets in any process; exit. break; // Contains the set of vertices that have been deleted in the // relaxation of "light" edges. Note that we keep track of which // vertices were deleted with the property map // "vertex_was_deleted". std::vector deleted_vertices; // Repeatedly relax light edges bool nonempty_bucket; do { // Someone has work to do in this bucket. if (current_bucket < buckets.size() && buckets[current_bucket]) { Bucket& bucket = *buckets[current_bucket]; // For each element in the bucket while (!bucket.empty()) { Vertex u = bucket.front(); // Remove u from the front of the bucket bucket.pop_front(); // Insert u into the set of deleted vertices, if it hasn't // been done already. if (!vertex_was_deleted[get(local, u)]) { vertex_was_deleted[get(local, u)] = true; deleted_vertices.push_back(u); } // Relax each light edge. Dist u_dist = get(distance, u); BGL_FORALL_OUTEDGES_T(u, e, g, Graph) if (get(weight, e) <= delta) // light edge relax(u, target(e, g), u_dist + get(weight, e)); } } // Synchronize with all of the other processes. synchronize(); // Is the bucket empty now? nonempty_bucket = (current_bucket < buckets.size() && buckets[current_bucket] && !buckets[current_bucket]->empty()); } while (all_reduce(pg, nonempty_bucket, std::logical_or())); // Relax heavy edges for each of the vertices that we previously // deleted. for (typename std::vector::iterator iter = deleted_vertices.begin(); iter != deleted_vertices.end(); ++iter) { // Relax each heavy edge. Vertex u = *iter; Dist u_dist = get(distance, u); BGL_FORALL_OUTEDGES_T(u, e, g, Graph) if (get(weight, e) > delta) // heavy edge relax(u, target(e, g), u_dist + get(weight, e)); #ifdef COMPUTE_PATH_COUNTS_INLINE // Set outgoing paths IncomingType in = get(incoming, u); for (typename IncomingType::iterator pred = in.begin(); pred != in.end(); ++pred) if (get(owner, *pred) == id) { IncomingType x = get(outgoing, *pred); if (std::find(x.begin(), x.end(), u) == x.end()) x.push_back(u); put(outgoing, *pred, x); } else { IncomingType in; in.push_back(u); put(outgoing, *pred, in); } // Set incoming edge counts put(incoming_edge_count, u, in.size()); #endif } #ifdef COMPUTE_PATH_COUNTS_INLINE synchronize(); // Deliver heavy edge relaxations and outgoing paths // Build Queue typedef typename property_traits::value_type PathCountType; typedef std::pair queue_value_type; typedef typename property_map::const_type OwnerMap; typedef typename get_owner_of_first_pair_element IndirectOwnerMap; typedef boost::queue local_queue_type; typedef boost::graph::distributed::distributed_queue dist_queue_type; IndirectOwnerMap indirect_owner(owner); dist_queue_type Q(pg, indirect_owner); // Find sources to initialize queue BGL_FORALL_VERTICES_T(v, g, Graph) { if (get(is_settled, v) && !(get(outgoing, v).empty())) { put(incoming_edge_count, v, 1); Q.push(std::make_pair(v, 0)); // Push this vertex with no additional path count } } // Set path counts for vertices in this bucket while (!Q.empty()) { queue_value_type t = Q.top(); Q.pop(); Vertex v = t.first; PathCountType p = t.second; put(path_count, v, get(path_count, v) + p); put(incoming_edge_count, v, get(incoming_edge_count, v) - 1); if (get(incoming_edge_count, v) == 0) { IncomingType out = get(outgoing, v); for (typename IncomingType::iterator iter = out.begin(); iter != out.end(); ++iter) Q.push(std::make_pair(*iter, get(path_count, v))); } } // Mark the vertices in this bucket settled for (typename std::vector::iterator iter = deleted_vertices.begin(); iter != deleted_vertices.end(); ++iter) put(is_settled, *iter, true); // No need to clear path count map as it is never read/written remotely // No need to clear outgoing map as it is re-alloced every bucket #endif // Go to the next bucket: the current bucket must already be empty. ++current_bucket; } while (true); // Delete all of the buckets. for (typename std::vector::iterator iter = buckets.begin(); iter != buckets.end(); ++iter) { if (*iter) { delete *iter; *iter = 0; } } } template void betweenness_centrality_delta_stepping_impl< Graph, DistanceMap, IncomingMap, EdgeWeightMap, PathCountMap #ifdef COMPUTE_PATH_COUNTS_INLINE , IsSettledMap, VertexIndexMap #endif >:: relax(Vertex u, Vertex v, Dist x) { if (x <= get(distance, v)) { // We're relaxing the edge to vertex v. if (get(owner, v) == process_id(pg)) { if (x < get(distance, v)) { // Compute the new bucket index for v BucketIndex new_index = static_cast(x / delta); // Make sure there is enough room in the buckets data structure. if (new_index >= buckets.size()) buckets.resize(new_index + 1, 0); // Make sure that we have allocated the bucket itself. if (!buckets[new_index]) buckets[new_index] = new Bucket; if (get(distance, v) != (std::numeric_limits::max)() && !vertex_was_deleted[get(local, v)]) { // We're moving v from an old bucket into a new one. Compute // the old index, then splice it in. BucketIndex old_index = static_cast(get(distance, v) / delta); buckets[new_index]->splice(buckets[new_index]->end(), *buckets[old_index], position_in_bucket[get(local, v)]); } else { // We're inserting v into a bucket for the first time. Put it // at the end. buckets[new_index]->push_back(v); } // v is now at the last position in the new bucket position_in_bucket[get(local, v)] = buckets[new_index]->end(); --position_in_bucket[get(local, v)]; // Update tentative distance information and incoming, path_count if (u != v) put(incoming, v, IncomingType(1, u)); put(distance, v, x); } // u != v covers initial source relaxation and self-loops else if (x == get(distance, v) && u != v) { // Add incoming edge if it's not already in the list IncomingType in = get(incoming, v); if (std::find(in.begin(), in.end(), u) == in.end()) { in.push_back(u); put(incoming, v, in); } } } else { // The vertex is remote: send a request to the vertex's owner send(pg, get(owner, v), msg_relax, std::make_pair(v, MessageValue::create(x, u))); // Cache tentative distance information cache(distance, v, x); } } } /************************************************************************/ /* Shortest Paths function object for betweenness centrality */ /************************************************************************/ template struct brandes_shortest_paths { typedef typename property_traits::value_type weight_type; brandes_shortest_paths() : weight(1), delta(0) { } brandes_shortest_paths(weight_type delta) : weight(1), delta(delta) { } brandes_shortest_paths(WeightMap w) : weight(w), delta(0) { } brandes_shortest_paths(WeightMap w, weight_type delta) : weight(w), delta(delta) { } template void operator()(Graph& g, typename graph_traits::vertex_descriptor s, IncomingMap incoming, DistanceMap distance, PathCountMap path_count #ifdef COMPUTE_PATH_COUNTS_INLINE , IsSettledMap is_settled, VertexIndexMap vertex_index #endif ) { // The "distance" map needs to act like one, retrieving the default // value of infinity. set_property_map_role(vertex_distance, distance); // Only calculate delta the first time operator() is called // This presumes g is the same every time, but so does the fact // that we're reusing the weight map if (delta == 0) set_delta(g); // TODO (NGE): Restructure the code so we don't have to construct // impl every time? betweenness_centrality_delta_stepping_impl< Graph, DistanceMap, IncomingMap, WeightMap, PathCountMap #ifdef COMPUTE_PATH_COUNTS_INLINE , IsSettledMap, VertexIndexMap #endif > impl(g, distance, incoming, weight, path_count, #ifdef COMPUTE_PATH_COUNTS_INLINE is_settled, vertex_index, #endif delta); impl.run(s); } private: template void set_delta(const Graph& g) { using boost::parallel::all_reduce; using boost::parallel::maximum; using std::max; typedef typename graph_traits::degree_size_type Degree; typedef weight_type Dist; // Compute the maximum edge weight and degree Dist max_edge_weight = 0; Degree max_degree = 0; BGL_FORALL_VERTICES_T(u, g, Graph) { max_degree = max BOOST_PREVENT_MACRO_SUBSTITUTION (max_degree, out_degree(u, g)); BGL_FORALL_OUTEDGES_T(u, e, g, Graph) max_edge_weight = max BOOST_PREVENT_MACRO_SUBSTITUTION (max_edge_weight, get(weight, e)); } max_edge_weight = all_reduce(process_group(g), max_edge_weight, maximum()); max_degree = all_reduce(process_group(g), max_degree, maximum()); // Take a guess at delta, based on what works well for random // graphs. delta = max_edge_weight / max_degree; if (delta == 0) delta = 1; } WeightMap weight; weight_type delta; }; // Perform a single SSSP from the specified vertex and update the centrality map(s) template void do_brandes_sssp(const Graph& g, CentralityMap centrality, EdgeCentralityMap edge_centrality_map, IncomingMap incoming, DistanceMap distance, DependencyMap dependency, PathCountMap path_count, #ifdef COMPUTE_PATH_COUNTS_INLINE IsSettledMap is_settled, #endif VertexIndexMap vertex_index, ShortestPaths shortest_paths, typename graph_traits::vertex_descriptor s) { using boost::detail::graph::update_centrality; using boost::graph::parallel::process_group; typedef typename graph_traits::vertex_descriptor vertex_descriptor; typedef typename graph_traits::edges_size_type edges_size_type; typedef typename property_traits::value_type incoming_type; typedef typename property_traits::value_type distance_type; typedef typename property_traits::value_type dependency_type; typedef typename property_traits::value_type path_count_type; typedef typename incoming_type::iterator incoming_iterator; typedef typename property_map::const_type OwnerMap; OwnerMap owner = get(vertex_owner, g); typedef typename boost::graph::parallel::process_group_type::type process_group_type; process_group_type pg = process_group(g); typename process_group_type::process_id_type id = process_id(pg); // TODO: Is it faster not to clear some of these maps? // Initialize for this iteration distance.clear(); incoming.clear(); path_count.clear(); dependency.clear(); BGL_FORALL_VERTICES_T(v, g, Graph) { put(path_count, v, 0); put(dependency, v, 0); } if (get(owner, s) == id) { put(incoming, s, incoming_type()); #ifdef COMPUTE_PATH_COUNTS_INLINE put(path_count, s, 1); put(is_settled, s, true); #endif } // Execute the shortest paths algorithm. This will be either // a weighted or unweighted customized breadth-first search, shortest_paths(g, s, incoming, distance, path_count #ifdef COMPUTE_PATH_COUNTS_INLINE , is_settled, vertex_index #endif ); #ifndef COMPUTE_PATH_COUNTS_INLINE // // TODO: Optimize case where source has no out-edges // // Count of incoming edges to tell when all incoming edges have been relaxed in // the induced shortest paths DAG std::vector incoming_edge_countS(num_vertices(g)); iterator_property_map::iterator, VertexIndexMap> incoming_edge_count(incoming_edge_countS.begin(), vertex_index); BGL_FORALL_VERTICES_T(v, g, Graph) { put(incoming_edge_count, v, get(incoming, v).size()); } if (get(owner, s) == id) { put(incoming_edge_count, s, 1); put(incoming, s, incoming_type()); } std::vector outgoingS(num_vertices(g)); iterator_property_map::iterator, VertexIndexMap> outgoing(outgoingS.begin(), vertex_index); outgoing.set_reduce(append_reducer()); // Mark forward adjacencies in DAG of shortest paths // TODO: It's possible to do this using edge flags but it's not currently done this way // because during traversal of the DAG we would have to examine all out edges // which would lead to more memory accesses and a larger cache footprint. // // In the bidirectional graph case edge flags would be an excellent way of marking // edges in the DAG of shortest paths BGL_FORALL_VERTICES_T(v, g, Graph) { incoming_type i = get(incoming, v); for (typename incoming_type::iterator iter = i.begin(); iter != i.end(); ++iter) { if (get(owner, *iter) == id) { incoming_type x = get(outgoing, *iter); if (std::find(x.begin(), x.end(), v) == x.end()) x.push_back(v); put(outgoing, *iter, x); } else { incoming_type in; in.push_back(v); put(outgoing, *iter, in); } } } synchronize(pg); // Traverse DAG induced by forward edges in dependency order and compute path counts { typedef std::pair queue_value_type; typedef get_owner_of_first_pair_element IndirectOwnerMap; typedef boost::queue local_queue_type; typedef boost::graph::distributed::distributed_queue dist_queue_type; IndirectOwnerMap indirect_owner(owner); dist_queue_type Q(pg, indirect_owner); if (get(owner, s) == id) Q.push(std::make_pair(s, 1)); while (!Q.empty()) { queue_value_type t = Q.top(); Q.pop(); vertex_descriptor v = t.first; path_count_type p = t.second; put(path_count, v, get(path_count, v) + p); put(incoming_edge_count, v, get(incoming_edge_count, v) - 1); if (get(incoming_edge_count, v) == 0) { incoming_type out = get(outgoing, v); for (typename incoming_type::iterator iter = out.begin(); iter != out.end(); ++iter) Q.push(std::make_pair(*iter, get(path_count, v))); } } } #endif // COMPUTE_PATH_COUNTS_INLINE // // Compute dependencies // // Build the distributed_queue // Value type consists of 1) target of update 2) source of update // 3) dependency of source 4) path count of source typedef boost::tuple queue_value_type; typedef get_owner_of_first_tuple_element IndirectOwnerMap; typedef boost::queue local_queue_type; typedef boost::graph::distributed::distributed_queue dist_queue_type; IndirectOwnerMap indirect_owner(owner); dist_queue_type Q(pg, indirect_owner); // Calculate number of vertices each vertex depends on, when a vertex has been pushed // that number of times then we will update it // AND Request path counts of sources of incoming edges std::vector dependency_countS(num_vertices(g), 0); iterator_property_map::iterator, VertexIndexMap> dependency_count(dependency_countS.begin(), vertex_index); dependency_count.set_reduce(boost::graph::distributed::additive_reducer()); path_count.set_max_ghost_cells(0); BGL_FORALL_VERTICES_T(v, g, Graph) { if (get(distance, v) < (std::numeric_limits::max)()) { incoming_type el = get(incoming, v); for (incoming_iterator vw = el.begin(); vw != el.end(); ++vw) { if (get(owner, *vw) == id) put(dependency_count, *vw, get(dependency_count, *vw) + 1); else { put(dependency_count, *vw, 1); // Request path counts get(path_count, *vw); } // request() doesn't work here, perhaps because we don't have a copy of this // ghost cell already? } } } synchronize(pg); // Push vertices with non-zero distance/path count and zero dependency count BGL_FORALL_VERTICES_T(v, g, Graph) { if (get(distance, v) < (std::numeric_limits::max)() && get(dependency_count, v) == 0) Q.push(boost::make_tuple(v, v, get(dependency, v), get(path_count, v))); } dependency.set_max_ghost_cells(0); while(!Q.empty()) { queue_value_type x = Q.top(); Q.pop(); vertex_descriptor w = boost::tuples::get<0>(x); vertex_descriptor source = boost::tuples::get<1>(x); dependency_type dep = boost::tuples::get<2>(x); path_count_type pc = boost::tuples::get<3>(x); cache(dependency, source, dep); cache(path_count, source, pc); if (get(dependency_count, w) != 0) put(dependency_count, w, get(dependency_count, w) - 1); if (get(dependency_count, w) == 0) { // Update dependency and centrality of sources of incoming edges incoming_type el = get(incoming, w); for (incoming_iterator vw = el.begin(); vw != el.end(); ++vw) { vertex_descriptor v = *vw; BOOST_ASSERT(get(path_count, w) != 0); dependency_type factor = dependency_type(get(path_count, v)) / dependency_type(get(path_count, w)); factor *= (dependency_type(1) + get(dependency, w)); if (get(owner, v) == id) put(dependency, v, get(dependency, v) + factor); else put(dependency, v, factor); update_centrality(edge_centrality_map, v, factor); } if (w != s) update_centrality(centrality, w, get(dependency, w)); // Push sources of edges in incoming edge list for (incoming_iterator vw = el.begin(); vw != el.end(); ++vw) Q.push(boost::make_tuple(*vw, w, get(dependency, w), get(path_count, w))); } } } template void brandes_betweenness_centrality_impl(const Graph& g, CentralityMap centrality, EdgeCentralityMap edge_centrality_map, IncomingMap incoming, DistanceMap distance, DependencyMap dependency, PathCountMap path_count, VertexIndexMap vertex_index, ShortestPaths shortest_paths, Buffer sources) { using boost::detail::graph::init_centrality_map; using boost::detail::graph::divide_centrality_by_two; using boost::graph::parallel::process_group; typedef typename graph_traits::vertex_descriptor vertex_descriptor; typedef typename property_traits::value_type distance_type; typedef typename property_traits::value_type dependency_type; // Initialize centrality init_centrality_map(vertices(g), centrality); init_centrality_map(edges(g), edge_centrality_map); // Set the reduction operation on the dependency map to be addition dependency.set_reduce(boost::graph::distributed::additive_reducer()); distance.set_reduce(boost::graph::distributed::choose_min_reducer()); // Don't allow remote procs to write incoming or path_count maps // updating them is handled inside the betweenness_centrality_queue incoming.set_consistency_model(0); path_count.set_consistency_model(0); typedef typename boost::graph::parallel::process_group_type::type process_group_type; process_group_type pg = process_group(g); #ifdef COMPUTE_PATH_COUNTS_INLINE // Build is_settled maps std::vector is_settledS(num_vertices(g)); typedef iterator_property_map::iterator, VertexIndexMap> IsSettledMap; IsSettledMap is_settled(is_settledS.begin(), vertex_index); #endif if (!sources.empty()) { // DO SSSPs while (!sources.empty()) { do_brandes_sssp(g, centrality, edge_centrality_map, incoming, distance, dependency, path_count, #ifdef COMPUTE_PATH_COUNTS_INLINE is_settled, #endif vertex_index, shortest_paths, sources.top()); sources.pop(); } } else { // Exact Betweenness Centrality typedef typename graph_traits::vertices_size_type vertices_size_type; vertices_size_type n = num_vertices(g); n = boost::parallel::all_reduce(pg, n, std::plus()); for (vertices_size_type i = 0; i < n; ++i) { vertex_descriptor v = vertex(i, g); do_brandes_sssp(g, centrality, edge_centrality_map, incoming, distance, dependency, path_count, #ifdef COMPUTE_PATH_COUNTS_INLINE is_settled, #endif vertex_index, shortest_paths, v); } } typedef typename graph_traits::directed_category directed_category; const bool is_undirected = is_convertible::value; if (is_undirected) { divide_centrality_by_two(vertices(g), centrality); divide_centrality_by_two(edges(g), edge_centrality_map); } } template void do_sequential_brandes_sssp(const Graph& g, CentralityMap centrality, EdgeCentralityMap edge_centrality_map, IncomingMap incoming, DistanceMap distance, DependencyMap dependency, PathCountMap path_count, VertexIndexMap vertex_index, ShortestPaths shortest_paths, Stack& ordered_vertices, typename graph_traits::vertex_descriptor v) { using boost::detail::graph::update_centrality; typedef typename graph_traits::vertex_descriptor vertex_descriptor; // Initialize for this iteration BGL_FORALL_VERTICES_T(w, g, Graph) { // put(path_count, w, 0); incoming[w].clear(); put(dependency, w, 0); } put(path_count, v, 1); incoming[v].clear(); // Execute the shortest paths algorithm. This will be either // Dijkstra's algorithm or a customized breadth-first search, // depending on whether the graph is weighted or unweighted. shortest_paths(g, v, ordered_vertices, incoming, distance, path_count, vertex_index); while (!ordered_vertices.empty()) { vertex_descriptor w = ordered_vertices.top(); ordered_vertices.pop(); typedef typename property_traits::value_type incoming_type; typedef typename incoming_type::iterator incoming_iterator; typedef typename property_traits::value_type dependency_type; for (incoming_iterator vw = incoming[w].begin(); vw != incoming[w].end(); ++vw) { vertex_descriptor v = source(*vw, g); dependency_type factor = dependency_type(get(path_count, v)) / dependency_type(get(path_count, w)); factor *= (dependency_type(1) + get(dependency, w)); put(dependency, v, get(dependency, v) + factor); update_centrality(edge_centrality_map, *vw, factor); } if (w != v) { update_centrality(centrality, w, get(dependency, w)); } } } // Betweenness Centrality variant that duplicates graph across processors // and parallizes SSSPs // This function expects a non-distributed graph and property-maps template void non_distributed_brandes_betweenness_centrality_impl(const ProcessGroup& pg, const Graph& g, CentralityMap centrality, EdgeCentralityMap edge_centrality_map, IncomingMap incoming, // P DistanceMap distance, // d DependencyMap dependency, // delta PathCountMap path_count, // sigma VertexIndexMap vertex_index, ShortestPaths shortest_paths, Buffer sources) { using boost::detail::graph::init_centrality_map; using boost::detail::graph::divide_centrality_by_two; using boost::graph::parallel::process_group; typedef typename graph_traits::vertex_descriptor vertex_descriptor; typedef ProcessGroup process_group_type; typename process_group_type::process_id_type id = process_id(pg); typename process_group_type::process_size_type p = num_processes(pg); // Initialize centrality init_centrality_map(vertices(g), centrality); init_centrality_map(edges(g), edge_centrality_map); std::stack ordered_vertices; if (!sources.empty()) { std::vector local_sources; for (int i = 0; i < id; ++i) if (!sources.empty()) sources.pop(); while (!sources.empty()) { local_sources.push_back(sources.top()); for (int i = 0; i < p; ++i) if (!sources.empty()) sources.pop(); } // DO SSSPs for(size_t i = 0; i < local_sources.size(); ++i) do_sequential_brandes_sssp(g, centrality, edge_centrality_map, incoming, distance, dependency, path_count, vertex_index, shortest_paths, ordered_vertices, local_sources[i]); } else { // Exact Betweenness Centrality typedef typename graph_traits::vertices_size_type vertices_size_type; vertices_size_type n = num_vertices(g); for (vertices_size_type i = id; i < n; i += p) { vertex_descriptor v = vertex(i, g); do_sequential_brandes_sssp(g, centrality, edge_centrality_map, incoming, distance, dependency, path_count, vertex_index, shortest_paths, ordered_vertices, v); } } typedef typename graph_traits::directed_category directed_category; const bool is_undirected = is_convertible::value; if (is_undirected) { divide_centrality_by_two(vertices(g), centrality); divide_centrality_by_two(edges(g), edge_centrality_map); } // Merge the centrality maps by summing the values at each vertex) // TODO(nge): this copy-out, reduce, copy-in is lame typedef typename property_traits::value_type centrality_type; typedef typename property_traits::value_type edge_centrality_type; std::vector centrality_v(num_vertices(g)); std::vector edge_centrality_v; edge_centrality_v.reserve(num_edges(g)); BGL_FORALL_VERTICES_T(v, g, Graph) { centrality_v[get(vertex_index, v)] = get(centrality, v); } // Skip when EdgeCentralityMap is a dummy_property_map if (!is_same::value) { BGL_FORALL_EDGES_T(e, g, Graph) { edge_centrality_v.push_back(get(edge_centrality_map, e)); } // NGE: If we trust that the order of elements in the vector isn't changed in the // all_reduce below then this method avoids the need for an edge index map } using boost::parallel::all_reduce; all_reduce(pg, ¢rality_v[0], ¢rality_v[centrality_v.size()], ¢rality_v[0], std::plus()); if (edge_centrality_v.size()) all_reduce(pg, &edge_centrality_v[0], &edge_centrality_v[edge_centrality_v.size()], &edge_centrality_v[0], std::plus()); BGL_FORALL_VERTICES_T(v, g, Graph) { put(centrality, v, centrality_v[get(vertex_index, v)]); } // Skip when EdgeCentralityMap is a dummy_property_map if (!is_same::value) { int i = 0; BGL_FORALL_EDGES_T(e, g, Graph) { put(edge_centrality_map, e, edge_centrality_v[i]); ++i; } } } } } } // end namespace graph::parallel::detail template void brandes_betweenness_centrality(const Graph& g, CentralityMap centrality, EdgeCentralityMap edge_centrality_map, IncomingMap incoming, DistanceMap distance, DependencyMap dependency, PathCountMap path_count, VertexIndexMap vertex_index, Buffer sources, typename property_traits::value_type delta BOOST_GRAPH_ENABLE_IF_MODELS_PARM(Graph,distributed_graph_tag)) { typedef typename property_traits::value_type distance_type; typedef static_property_map WeightMap; graph::parallel::detail::brandes_shortest_paths shortest_paths(delta); graph::parallel::detail::brandes_betweenness_centrality_impl(g, centrality, edge_centrality_map, incoming, distance, dependency, path_count, vertex_index, shortest_paths, sources); } template void brandes_betweenness_centrality(const Graph& g, CentralityMap centrality, EdgeCentralityMap edge_centrality_map, IncomingMap incoming, DistanceMap distance, DependencyMap dependency, PathCountMap path_count, VertexIndexMap vertex_index, Buffer sources, typename property_traits::value_type delta, WeightMap weight_map BOOST_GRAPH_ENABLE_IF_MODELS_PARM(Graph,distributed_graph_tag)) { graph::parallel::detail::brandes_shortest_paths shortest_paths(weight_map, delta); graph::parallel::detail::brandes_betweenness_centrality_impl(g, centrality, edge_centrality_map, incoming, distance, dependency, path_count, vertex_index, shortest_paths, sources); } namespace graph { namespace parallel { namespace detail { template void brandes_betweenness_centrality_dispatch2(const Graph& g, CentralityMap centrality, EdgeCentralityMap edge_centrality_map, WeightMap weight_map, VertexIndexMap vertex_index, Buffer sources, typename property_traits::value_type delta) { typedef typename graph_traits::degree_size_type degree_size_type; typedef typename graph_traits::vertex_descriptor vertex_descriptor; typedef typename mpl::if_c<(is_same::value), EdgeCentralityMap, CentralityMap>::type a_centrality_map; typedef typename property_traits::value_type centrality_type; typename graph_traits::vertices_size_type V = num_vertices(g); std::vector > incoming(V); std::vector distance(V); std::vector dependency(V); std::vector path_count(V); brandes_betweenness_centrality( g, centrality, edge_centrality_map, make_iterator_property_map(incoming.begin(), vertex_index), make_iterator_property_map(distance.begin(), vertex_index), make_iterator_property_map(dependency.begin(), vertex_index), make_iterator_property_map(path_count.begin(), vertex_index), vertex_index, unwrap_ref(sources), delta, weight_map); } // TODO: Should the type of the distance and dependency map depend on the // value type of the centrality map? template void brandes_betweenness_centrality_dispatch2(const Graph& g, CentralityMap centrality, EdgeCentralityMap edge_centrality_map, VertexIndexMap vertex_index, Buffer sources, typename graph_traits::edges_size_type delta) { typedef typename graph_traits::degree_size_type degree_size_type; typedef typename graph_traits::edges_size_type edges_size_type; typedef typename graph_traits::vertex_descriptor vertex_descriptor; typename graph_traits::vertices_size_type V = num_vertices(g); std::vector > incoming(V); std::vector distance(V); std::vector dependency(V); std::vector path_count(V); brandes_betweenness_centrality( g, centrality, edge_centrality_map, make_iterator_property_map(incoming.begin(), vertex_index), make_iterator_property_map(distance.begin(), vertex_index), make_iterator_property_map(dependency.begin(), vertex_index), make_iterator_property_map(path_count.begin(), vertex_index), vertex_index, unwrap_ref(sources), delta); } template struct brandes_betweenness_centrality_dispatch1 { template static void run(const Graph& g, CentralityMap centrality, EdgeCentralityMap edge_centrality_map, VertexIndexMap vertex_index, Buffer sources, typename property_traits::value_type delta, WeightMap weight_map) { boost::graph::parallel::detail::brandes_betweenness_centrality_dispatch2( g, centrality, edge_centrality_map, weight_map, vertex_index, sources, delta); } }; template<> struct brandes_betweenness_centrality_dispatch1 { template static void run(const Graph& g, CentralityMap centrality, EdgeCentralityMap edge_centrality_map, VertexIndexMap vertex_index, Buffer sources, typename graph_traits::edges_size_type delta, boost::param_not_found) { boost::graph::parallel::detail::brandes_betweenness_centrality_dispatch2( g, centrality, edge_centrality_map, vertex_index, sources, delta); } }; } } } // end namespace graph::parallel::detail template void brandes_betweenness_centrality(const Graph& g, const bgl_named_params& params BOOST_GRAPH_ENABLE_IF_MODELS_PARM(Graph,distributed_graph_tag)) { typedef bgl_named_params named_params; typedef queue::vertex_descriptor> queue_t; queue_t q; typedef typename get_param_type::type ew_param; typedef typename detail::choose_impl_result::type ew; graph::parallel::detail::brandes_betweenness_centrality_dispatch1::run( g, choose_param(get_param(params, vertex_centrality), dummy_property_map()), choose_param(get_param(params, edge_centrality), dummy_property_map()), choose_const_pmap(get_param(params, vertex_index), g, vertex_index), choose_param(get_param(params, buffer_param_t()), boost::ref(q)), choose_param(get_param(params, lookahead_t()), 0), choose_const_pmap(get_param(params, edge_weight), g, edge_weight)); } template void brandes_betweenness_centrality(const Graph& g, CentralityMap centrality BOOST_GRAPH_ENABLE_IF_MODELS_PARM(Graph,distributed_graph_tag)) { typedef queue::vertex_descriptor> queue_t; queue_t q; boost::graph::parallel::detail::brandes_betweenness_centrality_dispatch2( g, centrality, dummy_property_map(), get(vertex_index, g), boost::ref(q), 0); } template void brandes_betweenness_centrality(const Graph& g, CentralityMap centrality, EdgeCentralityMap edge_centrality_map BOOST_GRAPH_ENABLE_IF_MODELS_PARM(Graph,distributed_graph_tag)) { typedef queue queue_t; queue_t q; boost::graph::parallel::detail::brandes_betweenness_centrality_dispatch2( g, centrality, edge_centrality_map, get(vertex_index, g), boost::ref(q), 0); } template void non_distributed_brandes_betweenness_centrality(const ProcessGroup& pg, const Graph& g, CentralityMap centrality, EdgeCentralityMap edge_centrality_map, IncomingMap incoming, DistanceMap distance, DependencyMap dependency, PathCountMap path_count, VertexIndexMap vertex_index, Buffer sources) { detail::graph::brandes_unweighted_shortest_paths shortest_paths; graph::parallel::detail::non_distributed_brandes_betweenness_centrality_impl(pg, g, centrality, edge_centrality_map, incoming, distance, dependency, path_count, vertex_index, shortest_paths, sources); } template void non_distributed_brandes_betweenness_centrality(const ProcessGroup& pg, const Graph& g, CentralityMap centrality, EdgeCentralityMap edge_centrality_map, IncomingMap incoming, DistanceMap distance, DependencyMap dependency, PathCountMap path_count, VertexIndexMap vertex_index, WeightMap weight_map, Buffer sources) { detail::graph::brandes_dijkstra_shortest_paths shortest_paths(weight_map); graph::parallel::detail::non_distributed_brandes_betweenness_centrality_impl(pg, g, centrality, edge_centrality_map, incoming, distance, dependency, path_count, vertex_index, shortest_paths, sources); } namespace detail { namespace graph { template void non_distributed_brandes_betweenness_centrality_dispatch2(const ProcessGroup& pg, const Graph& g, CentralityMap centrality, EdgeCentralityMap edge_centrality_map, WeightMap weight_map, VertexIndexMap vertex_index, Buffer sources) { typedef typename graph_traits::degree_size_type degree_size_type; typedef typename graph_traits::edge_descriptor edge_descriptor; typedef typename mpl::if_c<(is_same::value), EdgeCentralityMap, CentralityMap>::type a_centrality_map; typedef typename property_traits::value_type centrality_type; typename graph_traits::vertices_size_type V = num_vertices(g); std::vector > incoming(V); std::vector distance(V); std::vector dependency(V); std::vector path_count(V); non_distributed_brandes_betweenness_centrality( pg, g, centrality, edge_centrality_map, make_iterator_property_map(incoming.begin(), vertex_index), make_iterator_property_map(distance.begin(), vertex_index), make_iterator_property_map(dependency.begin(), vertex_index), make_iterator_property_map(path_count.begin(), vertex_index), vertex_index, weight_map, unwrap_ref(sources)); } template void non_distributed_brandes_betweenness_centrality_dispatch2(const ProcessGroup& pg, const Graph& g, CentralityMap centrality, EdgeCentralityMap edge_centrality_map, VertexIndexMap vertex_index, Buffer sources) { typedef typename graph_traits::degree_size_type degree_size_type; typedef typename graph_traits::edge_descriptor edge_descriptor; typedef typename mpl::if_c<(is_same::value), EdgeCentralityMap, CentralityMap>::type a_centrality_map; typedef typename property_traits::value_type centrality_type; typename graph_traits::vertices_size_type V = num_vertices(g); std::vector > incoming(V); std::vector distance(V); std::vector dependency(V); std::vector path_count(V); non_distributed_brandes_betweenness_centrality( pg, g, centrality, edge_centrality_map, make_iterator_property_map(incoming.begin(), vertex_index), make_iterator_property_map(distance.begin(), vertex_index), make_iterator_property_map(dependency.begin(), vertex_index), make_iterator_property_map(path_count.begin(), vertex_index), vertex_index, unwrap_ref(sources)); } template struct non_distributed_brandes_betweenness_centrality_dispatch1 { template static void run(const ProcessGroup& pg, const Graph& g, CentralityMap centrality, EdgeCentralityMap edge_centrality_map, VertexIndexMap vertex_index, Buffer sources, WeightMap weight_map) { non_distributed_brandes_betweenness_centrality_dispatch2(pg, g, centrality, edge_centrality_map, weight_map, vertex_index, sources); } }; template<> struct non_distributed_brandes_betweenness_centrality_dispatch1 { template static void run(const ProcessGroup& pg, const Graph& g, CentralityMap centrality, EdgeCentralityMap edge_centrality_map, VertexIndexMap vertex_index, Buffer sources, param_not_found) { non_distributed_brandes_betweenness_centrality_dispatch2(pg, g, centrality, edge_centrality_map, vertex_index, sources); } }; } } // end namespace detail::graph template void non_distributed_brandes_betweenness_centrality(const ProcessGroup& pg, const Graph& g, const bgl_named_params& params) { typedef bgl_named_params named_params; typedef queue queue_t; queue_t q; typedef typename get_param_type::type ew_param; typedef typename detail::choose_impl_result::type ew; detail::graph::non_distributed_brandes_betweenness_centrality_dispatch1::run( pg, g, choose_param(get_param(params, vertex_centrality), dummy_property_map()), choose_param(get_param(params, edge_centrality), dummy_property_map()), choose_const_pmap(get_param(params, vertex_index), g, vertex_index), choose_param(get_param(params, buffer_param_t()), boost::ref(q)), choose_const_pmap(get_param(params, edge_weight), g, edge_weight)); } template void non_distributed_brandes_betweenness_centrality(const ProcessGroup& pg, const Graph& g, CentralityMap centrality) { typedef queue queue_t; queue_t q; detail::graph::non_distributed_brandes_betweenness_centrality_dispatch2( pg, g, centrality, dummy_property_map(), get(vertex_index, g), boost::ref(q)); } template void non_distributed_brandes_betweenness_centrality(const ProcessGroup& pg, const Graph& g, CentralityMap centrality, Buffer sources) { detail::graph::non_distributed_brandes_betweenness_centrality_dispatch2( pg, g, centrality, dummy_property_map(), get(vertex_index, g), sources); } template void non_distributed_brandes_betweenness_centrality(const ProcessGroup& pg, const Graph& g, CentralityMap centrality, EdgeCentralityMap edge_centrality_map, Buffer sources) { detail::graph::non_distributed_brandes_betweenness_centrality_dispatch2( pg, g, centrality, edge_centrality_map, get(vertex_index, g), sources); } // Compute the central point dominance of a graph. // TODO: Make sure central point dominance works in parallel case template typename property_traits::value_type central_point_dominance(const Graph& g, CentralityMap centrality BOOST_GRAPH_ENABLE_IF_MODELS_PARM(Graph,distributed_graph_tag)) { using std::max; typedef typename graph_traits::vertex_iterator vertex_iterator; typedef typename property_traits::value_type centrality_type; typedef typename graph_traits::vertices_size_type vertices_size_type; typedef typename boost::graph::parallel::process_group_type::type process_group_type; process_group_type pg = boost::graph::parallel::process_group(g); vertices_size_type n = num_vertices(g); using boost::parallel::all_reduce; n = all_reduce(pg, n, std::plus()); // Find max centrality centrality_type max_centrality(0); vertex_iterator v, v_end; for (boost::tie(v, v_end) = vertices(g); v != v_end; ++v) { max_centrality = (max)(max_centrality, get(centrality, *v)); } // All reduce to get global max centrality max_centrality = all_reduce(pg, max_centrality, boost::parallel::maximum()); // Compute central point dominance centrality_type sum(0); for (boost::tie(v, v_end) = vertices(g); v != v_end; ++v) { sum += (max_centrality - get(centrality, *v)); } sum = all_reduce(pg, sum, std::plus()); return sum/(n-1); } } // end namespace boost #endif // BOOST_GRAPH_PARALLEL_BRANDES_BETWEENNESS_CENTRALITY_HPP