serialization.hpp 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988
  1. // Copyright Daniel Wallin 2007. Use, modification and distribution is
  2. // subject to the Boost Software License, Version 1.0. (See accompanying
  3. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  4. #ifndef BOOST_GRAPH_DISTRIBUTED_ADJLIST_SERIALIZATION_070925_HPP
  5. #define BOOST_GRAPH_DISTRIBUTED_ADJLIST_SERIALIZATION_070925_HPP
  6. #ifndef BOOST_GRAPH_USE_MPI
  7. #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
  8. #endif
  9. # include <boost/assert.hpp>
  10. # include <boost/lexical_cast.hpp>
  11. # include <boost/foreach.hpp>
  12. # include <boost/filesystem/path.hpp>
  13. # include <boost/filesystem/operations.hpp>
  14. # include <cctype>
  15. # include <fstream>
  16. namespace boost {
  17. namespace detail { namespace parallel
  18. {
  19. // Wraps a local descriptor, making it serializable.
  20. template <class Local>
  21. struct serializable_local_descriptor
  22. {
  23. serializable_local_descriptor()
  24. {}
  25. serializable_local_descriptor(Local local)
  26. : local(local)
  27. {}
  28. operator Local const&() const
  29. {
  30. return local;
  31. }
  32. bool operator==(serializable_local_descriptor const& other) const
  33. {
  34. return local == other.local;
  35. }
  36. bool operator<(serializable_local_descriptor const& other) const
  37. {
  38. return local < other.local;
  39. }
  40. template <class Archive>
  41. void serialize(Archive& ar, const unsigned int /*version*/)
  42. {
  43. ar & unsafe_serialize(local);
  44. }
  45. Local local;
  46. };
  47. template <class Vertex, class Properties>
  48. struct pending_edge
  49. {
  50. pending_edge(
  51. Vertex source, Vertex target
  52. , Properties properties, void* property_ptr
  53. )
  54. : source(source)
  55. , target(target)
  56. , properties(properties)
  57. , property_ptr(property_ptr)
  58. {}
  59. Vertex source;
  60. Vertex target;
  61. Properties properties;
  62. void* property_ptr;
  63. };
  64. inline bool is_digit(char c)
  65. {
  66. return std::isdigit(c) != 0;
  67. }
  68. inline std::vector<int>
  69. available_process_files(std::string const& filename)
  70. {
  71. if (!filesystem::exists(filename))
  72. return std::vector<int>();
  73. std::vector<int> result;
  74. for (filesystem::directory_iterator i(filename), end; i != end; ++i)
  75. {
  76. if (!filesystem::is_regular(*i))
  77. boost::throw_exception(std::runtime_error("directory contains non-regular entries"));
  78. std::string process_name = i->path().filename().string();
  79. for (std::string::size_type i = 0; i < process_name.size(); ++i)
  80. if (!is_digit(process_name[i]))
  81. boost::throw_exception(std::runtime_error("directory contains files with invalid names"));
  82. result.push_back(boost::lexical_cast<int>(process_name));
  83. }
  84. return result;
  85. }
  86. template <class Archive, class Tag, class T, class Base>
  87. void maybe_load_properties(
  88. Archive& ar, char const* name, property<Tag, T, Base>& properties)
  89. {
  90. ar >> serialization::make_nvp(name, get_property_value(properties, Tag()));
  91. maybe_load_properties(ar, name, static_cast<Base&>(properties));
  92. }
  93. template <class Archive>
  94. void maybe_load_properties(
  95. Archive&, char const*, no_property&)
  96. {}
  97. template <class Archive, typename Bundle>
  98. void maybe_load_properties(
  99. Archive& ar, char const* name, Bundle& bundle)
  100. {
  101. ar >> serialization::make_nvp(name, bundle);
  102. no_property prop;
  103. maybe_load_properties(ar, name, prop);
  104. }
  105. template <class Graph, class Archive, class VertexListS>
  106. struct graph_loader
  107. {
  108. typedef typename Graph::vertex_descriptor vertex_descriptor;
  109. typedef typename Graph::local_vertex_descriptor local_vertex_descriptor;
  110. typedef typename Graph::vertex_property_type vertex_property_type;
  111. typedef typename Graph::edge_descriptor edge_descriptor;
  112. typedef typename Graph::local_edge_descriptor local_edge_descriptor;
  113. typedef typename Graph::edge_property_type edge_property_type;
  114. typedef typename Graph::process_group_type process_group_type;
  115. typedef typename process_group_type::process_id_type process_id_type;
  116. typedef typename Graph::directed_selector directed_selector;
  117. typedef typename mpl::if_<
  118. is_same<VertexListS, defaultS>, vecS, VertexListS
  119. >::type vertex_list_selector;
  120. typedef pending_edge<vertex_descriptor, edge_property_type>
  121. pending_edge_type;
  122. typedef serializable_local_descriptor<local_vertex_descriptor>
  123. serializable_vertex_descriptor;
  124. graph_loader(Graph& g, Archive& ar)
  125. : m_g(g)
  126. , m_ar(ar)
  127. , m_pg(g.process_group())
  128. , m_requested_vertices(num_processes(m_pg))
  129. , m_remote_vertices(num_processes(m_pg))
  130. , m_property_ptrs(num_processes(m_pg))
  131. {
  132. g.clear();
  133. load_prefix();
  134. load_vertices();
  135. load_edges();
  136. ar >> make_nvp("distribution", m_g.distribution());
  137. }
  138. private:
  139. struct pending_in_edge
  140. {
  141. pending_in_edge(
  142. vertex_descriptor u, vertex_descriptor v, void* property_ptr
  143. )
  144. : u(u)
  145. , v(v)
  146. , property_ptr(property_ptr)
  147. {}
  148. vertex_descriptor u;
  149. vertex_descriptor v;
  150. void* property_ptr;
  151. };
  152. bool is_root() const
  153. {
  154. return process_id(m_pg) == 0;
  155. }
  156. template <class T>
  157. serialization::nvp<T> const make_nvp(char const* name, T& value) const
  158. {
  159. return serialization::nvp<T>(name, value);
  160. }
  161. void load_prefix();
  162. void load_vertices();
  163. template <class Anything>
  164. void maybe_load_and_store_local_vertex(Anything);
  165. void maybe_load_and_store_local_vertex(vecS);
  166. void load_edges();
  167. void load_in_edges(bidirectionalS);
  168. void load_in_edges(directedS);
  169. void add_pending_in_edge(
  170. vertex_descriptor u, vertex_descriptor v, void* property_ptr, vecS);
  171. template <class Anything>
  172. void add_pending_in_edge(
  173. vertex_descriptor u, vertex_descriptor v, void* property_ptr, Anything);
  174. template <class Anything>
  175. void add_edge(
  176. vertex_descriptor u, vertex_descriptor v
  177. , edge_property_type const& property, void* property_ptr, Anything);
  178. void add_edge(
  179. vertex_descriptor u, vertex_descriptor v
  180. , edge_property_type const& property, void* property_ptr, vecS);
  181. void add_remote_vertex_request(
  182. vertex_descriptor u, vertex_descriptor v, directedS);
  183. void add_remote_vertex_request(
  184. vertex_descriptor u, vertex_descriptor v, bidirectionalS);
  185. void add_in_edge(
  186. edge_descriptor const&, void*, directedS);
  187. void add_in_edge(
  188. edge_descriptor const& edge, void* old_property_ptr, bidirectionalS);
  189. void resolve_remote_vertices(directedS);
  190. void resolve_remote_vertices(bidirectionalS);
  191. vertex_descriptor resolve_remote_vertex(vertex_descriptor u) const;
  192. vertex_descriptor resolve_remote_vertex(vertex_descriptor u, vecS) const;
  193. template <class Anything>
  194. vertex_descriptor resolve_remote_vertex(vertex_descriptor u, Anything) const;
  195. void resolve_property_ptrs();
  196. void commit_pending_edges(vecS);
  197. template <class Anything>
  198. void commit_pending_edges(Anything);
  199. void commit_pending_in_edges(directedS);
  200. void commit_pending_in_edges(bidirectionalS);
  201. void* maybe_load_property_ptr(directedS) { return 0; }
  202. void* maybe_load_property_ptr(bidirectionalS);
  203. Graph& m_g;
  204. Archive& m_ar;
  205. process_group_type m_pg;
  206. std::vector<process_id_type> m_id_mapping;
  207. // Maps local vertices as loaded from the archive to
  208. // the ones actually added to the graph. Only used
  209. // when !vecS.
  210. std::map<local_vertex_descriptor, local_vertex_descriptor> m_local_vertices;
  211. // This is the list of remote vertex descriptors that we
  212. // are going to receive from other processes. This is
  213. // kept sorted so that we can determine the position of
  214. // the matching vertex descriptor in m_remote_vertices.
  215. std::vector<std::vector<serializable_vertex_descriptor> > m_requested_vertices;
  216. // This is the list of remote vertex descriptors that
  217. // we send and receive from other processes.
  218. std::vector<std::vector<serializable_vertex_descriptor> > m_remote_vertices;
  219. // ...
  220. std::vector<pending_edge_type> m_pending_edges;
  221. // The pending in-edges that will be added in the commit step, after
  222. // the remote vertex descriptors has been resolved. Only used
  223. // when bidirectionalS and !vecS.
  224. std::vector<pending_in_edge> m_pending_in_edges;
  225. std::vector<std::vector<unsafe_pair<void*,void*> > > m_property_ptrs;
  226. };
  227. template <class Graph, class Archive, class VertexListS>
  228. void graph_loader<Graph, Archive, VertexListS>::load_prefix()
  229. {
  230. typename process_group_type::process_size_type num_processes_;
  231. m_ar >> make_nvp("num_processes", num_processes_);
  232. if (num_processes_ != num_processes(m_pg))
  233. boost::throw_exception(std::runtime_error("number of processes mismatch"));
  234. process_id_type old_id;
  235. m_ar >> make_nvp("id", old_id);
  236. std::vector<typename Graph::distribution_type::size_type> mapping;
  237. m_ar >> make_nvp("mapping", mapping);
  238. // Fetch all the old id's from the other processes.
  239. std::vector<process_id_type> old_ids;
  240. all_gather(m_pg, &old_id, &old_id+1, old_ids);
  241. m_id_mapping.resize(num_processes(m_pg), -1);
  242. for (process_id_type i = 0; i < num_processes(m_pg); ++i)
  243. {
  244. # ifdef PBGL_SERIALIZE_DEBUG
  245. if (is_root())
  246. std::cout << i << " used to be " << old_ids[i] << "\n";
  247. # endif
  248. BOOST_ASSERT(m_id_mapping[old_ids[i]] == -1);
  249. m_id_mapping[old_ids[i]] = i;
  250. }
  251. std::vector<typename Graph::distribution_type::size_type> new_mapping(
  252. mapping.size());
  253. for (int i = 0; i < num_processes(m_pg); ++i)
  254. {
  255. new_mapping[mapping[old_ids[i]]] = i;
  256. }
  257. m_g.distribution().assign_mapping(
  258. new_mapping.begin(), new_mapping.end());
  259. }
  260. template <class Graph, class Archive, class VertexListS>
  261. void graph_loader<Graph, Archive, VertexListS>::load_vertices()
  262. {
  263. int V;
  264. m_ar >> BOOST_SERIALIZATION_NVP(V);
  265. # ifdef PBGL_SERIALIZE_DEBUG
  266. if (is_root())
  267. std::cout << "Loading vertices\n";
  268. # endif
  269. for (int i = 0; i < V; ++i)
  270. {
  271. maybe_load_and_store_local_vertex(vertex_list_selector());
  272. }
  273. }
  274. template <class Graph, class Archive, class VertexListS>
  275. template <class Anything>
  276. void graph_loader<Graph, Archive, VertexListS>::maybe_load_and_store_local_vertex(Anything)
  277. {
  278. // Load the original vertex descriptor
  279. local_vertex_descriptor local;
  280. m_ar >> make_nvp("local", unsafe_serialize(local));
  281. // Load the properties
  282. vertex_property_type property;
  283. detail::parallel::maybe_load_properties(m_ar, "vertex_property",
  284. property);
  285. // Add the vertex
  286. vertex_descriptor v(process_id(m_pg), add_vertex(property, m_g.base()));
  287. if (m_g.on_add_vertex)
  288. m_g.on_add_vertex(v, m_g);
  289. // Create the mapping from the "old" local descriptor to the new
  290. // local descriptor.
  291. m_local_vertices[local] = v.local;
  292. }
  293. template <class Graph, class Archive, class VertexListS>
  294. void graph_loader<Graph, Archive, VertexListS>::maybe_load_and_store_local_vertex(vecS)
  295. {
  296. // Load the properties
  297. vertex_property_type property;
  298. detail::parallel::maybe_load_properties(m_ar, "vertex_property",
  299. property);
  300. // Add the vertex
  301. vertex_descriptor v(process_id(m_pg),
  302. add_vertex(m_g.build_vertex_property(property),
  303. m_g.base()));
  304. if (m_g.on_add_vertex)
  305. m_g.on_add_vertex(v, m_g);
  306. }
  307. template <class Graph, class Archive, class VertexListS>
  308. void graph_loader<Graph, Archive, VertexListS>::load_edges()
  309. {
  310. int E;
  311. m_ar >> BOOST_SERIALIZATION_NVP(E);
  312. # ifdef PBGL_SERIALIZE_DEBUG
  313. if (is_root())
  314. std::cout << "Loading edges\n";
  315. # endif
  316. for (int i = 0; i < E; ++i)
  317. {
  318. local_vertex_descriptor local_src;
  319. process_id_type target_owner;
  320. local_vertex_descriptor local_tgt;
  321. m_ar >> make_nvp("source", unsafe_serialize(local_src));
  322. m_ar >> make_nvp("target_owner", target_owner);
  323. m_ar >> make_nvp("target", unsafe_serialize(local_tgt));
  324. process_id_type new_src_owner = process_id(m_pg);
  325. process_id_type new_tgt_owner = m_id_mapping[target_owner];
  326. vertex_descriptor source(new_src_owner, local_src);
  327. vertex_descriptor target(new_tgt_owner, local_tgt);
  328. edge_property_type properties;
  329. detail::parallel::maybe_load_properties(m_ar, "edge_property", properties);
  330. void* property_ptr = maybe_load_property_ptr(directed_selector());
  331. add_edge(source, target, properties, property_ptr, vertex_list_selector());
  332. }
  333. load_in_edges(directed_selector());
  334. commit_pending_edges(vertex_list_selector());
  335. }
  336. template <class Graph, class Archive, class VertexListS>
  337. void graph_loader<Graph, Archive, VertexListS>::load_in_edges(bidirectionalS)
  338. {
  339. std::size_t I;
  340. m_ar >> BOOST_SERIALIZATION_NVP(I);
  341. # ifdef PBGL_SERIALIZE_DEBUG
  342. if (is_root())
  343. std::cout << "Loading in-edges\n";
  344. # endif
  345. for (int i = 0; i < I; ++i)
  346. {
  347. process_id_type src_owner;
  348. local_vertex_descriptor local_src;
  349. local_vertex_descriptor local_target;
  350. void* property_ptr;
  351. m_ar >> make_nvp("src_owner", src_owner);
  352. m_ar >> make_nvp("source", unsafe_serialize(local_src));
  353. m_ar >> make_nvp("target", unsafe_serialize(local_target));
  354. m_ar >> make_nvp("property_ptr", unsafe_serialize(property_ptr));
  355. src_owner = m_id_mapping[src_owner];
  356. vertex_descriptor u(src_owner, local_src);
  357. vertex_descriptor v(process_id(m_pg), local_target);
  358. add_pending_in_edge(u, v, property_ptr, vertex_list_selector());
  359. }
  360. }
  361. template <class Graph, class Archive, class VertexListS>
  362. void graph_loader<Graph, Archive, VertexListS>::load_in_edges(directedS)
  363. {}
  364. template <class Graph, class Archive, class VertexListS>
  365. void graph_loader<Graph, Archive, VertexListS>::add_pending_in_edge(
  366. vertex_descriptor u, vertex_descriptor v, void* property_ptr, vecS)
  367. {
  368. m_pending_in_edges.push_back(pending_in_edge(u,v,property_ptr));
  369. }
  370. template <class Graph, class Archive, class VertexListS>
  371. template <class Anything>
  372. void graph_loader<Graph, Archive, VertexListS>::add_pending_in_edge(
  373. vertex_descriptor u, vertex_descriptor v, void* property_ptr, Anything)
  374. {
  375. // u and v represent the out-edge here, meaning v is local
  376. // to us, and u is always remote.
  377. m_pending_in_edges.push_back(pending_in_edge(u,v,property_ptr));
  378. add_remote_vertex_request(v, u, bidirectionalS());
  379. }
  380. template <class Graph, class Archive, class VertexListS>
  381. template <class Anything>
  382. void graph_loader<Graph, Archive, VertexListS>::add_edge(
  383. vertex_descriptor u, vertex_descriptor v
  384. , edge_property_type const& property, void* property_ptr, Anything)
  385. {
  386. m_pending_edges.push_back(pending_edge_type(u, v, property, property_ptr));
  387. add_remote_vertex_request(u, v, directed_selector());
  388. }
  389. template <class Graph, class Archive, class VertexListS>
  390. void graph_loader<Graph, Archive, VertexListS>::add_remote_vertex_request(
  391. vertex_descriptor u, vertex_descriptor v, directedS)
  392. {
  393. // We have to request the remote vertex.
  394. m_requested_vertices[owner(v)].push_back(local(v));
  395. }
  396. template <class Graph, class Archive, class VertexListS>
  397. void graph_loader<Graph, Archive, VertexListS>::add_remote_vertex_request(
  398. vertex_descriptor u, vertex_descriptor v, bidirectionalS)
  399. {
  400. // If the edge spans to another process, we know
  401. // that that process has a matching in-edge, so
  402. // we can just send our vertex. No requests
  403. // necessary.
  404. if (owner(v) != m_g.processor())
  405. {
  406. m_remote_vertices[owner(v)].push_back(local(u));
  407. m_requested_vertices[owner(v)].push_back(local(v));
  408. }
  409. }
  410. template <class Graph, class Archive, class VertexListS>
  411. void graph_loader<Graph, Archive, VertexListS>::add_edge(
  412. vertex_descriptor u, vertex_descriptor v
  413. , edge_property_type const& property, void* property_ptr, vecS)
  414. {
  415. std::pair<local_edge_descriptor, bool> inserted =
  416. detail::parallel::add_local_edge(
  417. local(u), local(v)
  418. , m_g.build_edge_property(property), m_g.base());
  419. BOOST_ASSERT(inserted.second);
  420. put(edge_target_processor_id, m_g.base(), inserted.first, owner(v));
  421. edge_descriptor e(owner(u), owner(v), true, inserted.first);
  422. if (inserted.second && m_g.on_add_edge)
  423. m_g.on_add_edge(e, m_g);
  424. add_in_edge(e, property_ptr, directed_selector());
  425. }
  426. template <class Graph, class Archive, class VertexListS>
  427. void graph_loader<Graph, Archive, VertexListS>::add_in_edge(
  428. edge_descriptor const&, void*, directedS)
  429. {}
  430. template <class Graph, class Archive, class VertexListS>
  431. void graph_loader<Graph, Archive, VertexListS>::add_in_edge(
  432. edge_descriptor const& edge, void* old_property_ptr, bidirectionalS)
  433. {
  434. if (owner(target(edge, m_g)) == m_g.processor())
  435. {
  436. detail::parallel::stored_in_edge<local_edge_descriptor>
  437. e(m_g.processor(), local(edge));
  438. boost::graph_detail::push(get(
  439. vertex_in_edges, m_g.base())[local(target(edge, m_g))], e);
  440. }
  441. else
  442. {
  443. // We send the (old,new) property pointer pair to
  444. // the remote process. This could be optimized to
  445. // only send the new one -- the ordering can be
  446. // made implicit because the old pointer value is
  447. // stored on the remote process.
  448. //
  449. // Doing that is a little bit more complicated, but
  450. // in case it turns out it's important we can do it.
  451. void* property_ptr = local(edge).get_property();
  452. m_property_ptrs[owner(target(edge, m_g))].push_back(
  453. unsafe_pair<void*,void*>(old_property_ptr, property_ptr));
  454. }
  455. }
  456. template <class Graph, class Archive, class VertexListS>
  457. void graph_loader<Graph, Archive, VertexListS>::resolve_property_ptrs()
  458. {
  459. # ifdef PBGL_SERIALIZE_DEBUG
  460. if (is_root())
  461. std::cout << "Resolving property pointers\n";
  462. # endif
  463. for (int i = 0; i < num_processes(m_pg); ++i)
  464. {
  465. std::sort(
  466. m_property_ptrs[i].begin(), m_property_ptrs[i].end());
  467. }
  468. boost::parallel::inplace_all_to_all(m_pg, m_property_ptrs);
  469. }
  470. template <class Graph, class Archive, class VertexListS>
  471. void graph_loader<Graph, Archive, VertexListS>::resolve_remote_vertices(directedS)
  472. {
  473. for (int i = 0; i < num_processes(m_pg); ++i)
  474. {
  475. std::sort(m_requested_vertices[i].begin(), m_requested_vertices[i].end());
  476. }
  477. boost::parallel::inplace_all_to_all(
  478. m_pg, m_requested_vertices, m_remote_vertices);
  479. for (int i = 0; i < num_processes(m_pg); ++i)
  480. {
  481. BOOST_FOREACH(serializable_vertex_descriptor& u, m_remote_vertices[i])
  482. {
  483. u = m_local_vertices[u];
  484. }
  485. }
  486. boost::parallel::inplace_all_to_all(m_pg, m_remote_vertices);
  487. }
  488. template <class Graph, class Archive, class VertexListS>
  489. void graph_loader<Graph, Archive, VertexListS>::resolve_remote_vertices(bidirectionalS)
  490. {
  491. # ifdef PBGL_SERIALIZE_DEBUG
  492. if (is_root())
  493. std::cout << "Resolving remote vertices\n";
  494. # endif
  495. for (int i = 0; i < num_processes(m_pg); ++i)
  496. {
  497. std::sort(m_requested_vertices[i].begin(), m_requested_vertices[i].end());
  498. std::sort(m_remote_vertices[i].begin(), m_remote_vertices[i].end());
  499. BOOST_FOREACH(serializable_vertex_descriptor& u, m_remote_vertices[i])
  500. {
  501. u = m_local_vertices[u];
  502. }
  503. }
  504. boost::parallel::inplace_all_to_all(m_pg, m_remote_vertices);
  505. for (int i = 0; i < num_processes(m_pg); ++i)
  506. BOOST_ASSERT(m_remote_vertices[i].size() == m_requested_vertices[i].size());
  507. }
  508. template <class Graph, class Archive, class VertexListS>
  509. void graph_loader<Graph, Archive, VertexListS>::commit_pending_edges(vecS)
  510. {
  511. commit_pending_in_edges(directed_selector());
  512. }
  513. template <class Graph, class Archive, class VertexListS>
  514. template <class Anything>
  515. void graph_loader<Graph, Archive, VertexListS>::commit_pending_edges(Anything)
  516. {
  517. resolve_remote_vertices(directed_selector());
  518. BOOST_FOREACH(pending_edge_type const& e, m_pending_edges)
  519. {
  520. vertex_descriptor u = resolve_remote_vertex(e.source);
  521. vertex_descriptor v = resolve_remote_vertex(e.target);
  522. add_edge(u, v, e.properties, e.property_ptr, vecS());
  523. }
  524. commit_pending_in_edges(directed_selector());
  525. }
  526. template <class Graph, class Archive, class VertexListS>
  527. void graph_loader<Graph, Archive, VertexListS>::commit_pending_in_edges(directedS)
  528. {}
  529. template <class Graph, class Archive, class VertexListS>
  530. void graph_loader<Graph, Archive, VertexListS>::commit_pending_in_edges(bidirectionalS)
  531. {
  532. resolve_property_ptrs();
  533. BOOST_FOREACH(pending_in_edge const& e, m_pending_in_edges)
  534. {
  535. vertex_descriptor u = resolve_remote_vertex(e.u, vertex_list_selector());
  536. vertex_descriptor v = resolve_remote_vertex(e.v, vertex_list_selector());
  537. typedef detail::parallel::stored_in_edge<local_edge_descriptor> stored_edge;
  538. std::vector<unsafe_pair<void*,void*> >::iterator i = std::lower_bound(
  539. m_property_ptrs[owner(u)].begin()
  540. , m_property_ptrs[owner(u)].end()
  541. , unsafe_pair<void*,void*>(e.property_ptr, 0)
  542. );
  543. if (i == m_property_ptrs[owner(u)].end()
  544. || i->first != e.property_ptr)
  545. {
  546. BOOST_ASSERT(false);
  547. }
  548. local_edge_descriptor local_edge(local(u), local(v), i->second);
  549. stored_edge edge(owner(u), local_edge);
  550. boost::graph_detail::push(
  551. get(vertex_in_edges, m_g.base())[local(v)], edge);
  552. }
  553. }
  554. template <class Graph, class Archive, class VertexListS>
  555. typename graph_loader<Graph, Archive, VertexListS>::vertex_descriptor
  556. graph_loader<Graph, Archive, VertexListS>::resolve_remote_vertex(
  557. vertex_descriptor u) const
  558. {
  559. if (owner(u) == process_id(m_pg))
  560. {
  561. return vertex_descriptor(
  562. process_id(m_pg), m_local_vertices.find(local(u))->second);
  563. }
  564. typename std::vector<serializable_vertex_descriptor>::const_iterator
  565. i = std::lower_bound(
  566. m_requested_vertices[owner(u)].begin()
  567. , m_requested_vertices[owner(u)].end()
  568. , serializable_vertex_descriptor(local(u))
  569. );
  570. if (i == m_requested_vertices[owner(u)].end()
  571. || *i != local(u))
  572. {
  573. BOOST_ASSERT(false);
  574. }
  575. local_vertex_descriptor local =
  576. m_remote_vertices[owner(u)][m_requested_vertices[owner(u)].end() - i];
  577. return vertex_descriptor(owner(u), local);
  578. }
  579. template <class Graph, class Archive, class VertexListS>
  580. typename graph_loader<Graph, Archive, VertexListS>::vertex_descriptor
  581. graph_loader<Graph, Archive, VertexListS>::resolve_remote_vertex(
  582. vertex_descriptor u, vecS) const
  583. {
  584. return u;
  585. }
  586. template <class Graph, class Archive, class VertexListS>
  587. template <class Anything>
  588. typename graph_loader<Graph, Archive, VertexListS>::vertex_descriptor
  589. graph_loader<Graph, Archive, VertexListS>::resolve_remote_vertex(
  590. vertex_descriptor u, Anything) const
  591. {
  592. return resolve_remote_vertex(u);
  593. }
  594. template <class Graph, class Archive, class VertexListS>
  595. void*
  596. graph_loader<Graph, Archive, VertexListS>::maybe_load_property_ptr(bidirectionalS)
  597. {
  598. void* ptr;
  599. m_ar >> make_nvp("property_ptr", unsafe_serialize(ptr));
  600. return ptr;
  601. }
  602. template <class Archive, class D>
  603. void maybe_save_local_descriptor(Archive& ar, D const&, vecS)
  604. {}
  605. template <class Archive, class D, class NotVecS>
  606. void maybe_save_local_descriptor(Archive& ar, D const& d, NotVecS)
  607. {
  608. ar << serialization::make_nvp(
  609. "local", unsafe_serialize(const_cast<D&>(d)));
  610. }
  611. template <class Archive>
  612. void maybe_save_properties(
  613. Archive&, char const*, no_property const&)
  614. {}
  615. template <class Archive, class Tag, class T, class Base>
  616. void maybe_save_properties(
  617. Archive& ar, char const* name, property<Tag, T, Base> const& properties)
  618. {
  619. ar & serialization::make_nvp(name, get_property_value(properties, Tag()));
  620. maybe_save_properties(ar, name, static_cast<Base const&>(properties));
  621. }
  622. template <class Archive, class Graph>
  623. void save_in_edges(Archive& ar, Graph const& g, directedS)
  624. {}
  625. // We need to save the edges in the base edge
  626. // list, and the in_edges that are stored in the
  627. // vertex_in_edges vertex property.
  628. template <class Archive, class Graph>
  629. void save_in_edges(Archive& ar, Graph const& g, bidirectionalS)
  630. {
  631. typedef typename Graph::process_group_type
  632. process_group_type;
  633. typedef typename process_group_type::process_id_type
  634. process_id_type;
  635. typedef typename graph_traits<
  636. Graph>::vertex_descriptor vertex_descriptor;
  637. typedef typename vertex_descriptor::local_descriptor_type
  638. local_vertex_descriptor;
  639. typedef typename graph_traits<
  640. Graph>::edge_descriptor edge_descriptor;
  641. process_id_type id = g.processor();
  642. std::vector<edge_descriptor> saved_in_edges;
  643. BGL_FORALL_VERTICES_T(v, g, Graph)
  644. {
  645. BOOST_FOREACH(edge_descriptor const& e, in_edges(v, g))
  646. {
  647. // Only save the in_edges that isn't owned by this process.
  648. if (owner(e) == id)
  649. continue;
  650. saved_in_edges.push_back(e);
  651. }
  652. }
  653. std::size_t I = saved_in_edges.size();
  654. ar << BOOST_SERIALIZATION_NVP(I);
  655. BOOST_FOREACH(edge_descriptor const& e, saved_in_edges)
  656. {
  657. process_id_type src_owner = owner(source(e,g));
  658. local_vertex_descriptor local_src = local(source(e,g));
  659. local_vertex_descriptor local_target = local(target(e,g));
  660. void* property_ptr = local(e).get_property();
  661. using serialization::make_nvp;
  662. ar << make_nvp("src_owner", src_owner);
  663. ar << make_nvp("source", unsafe_serialize(local_src));
  664. ar << make_nvp("target", unsafe_serialize(local_target));
  665. ar << make_nvp("property_ptr", unsafe_serialize(property_ptr));
  666. }
  667. }
  668. template <class Archive, class Edge>
  669. void maybe_save_property_ptr(Archive&, Edge const&, directedS)
  670. {}
  671. template <class Archive, class Edge>
  672. void maybe_save_property_ptr(Archive& ar, Edge const& e, bidirectionalS)
  673. {
  674. void* ptr = local(e).get_property();
  675. ar << serialization::make_nvp("property_ptr", unsafe_serialize(ptr));
  676. }
  677. template <class Archive, class Graph, class DirectedS>
  678. void save_edges(Archive& ar, Graph const& g, DirectedS)
  679. {
  680. typedef typename Graph::process_group_type
  681. process_group_type;
  682. typedef typename process_group_type::process_id_type
  683. process_id_type;
  684. typedef typename graph_traits<
  685. Graph>::vertex_descriptor vertex_descriptor;
  686. typedef typename Graph::edge_property_type edge_property_type;
  687. int E = num_edges(g);
  688. ar << BOOST_SERIALIZATION_NVP(E);
  689. // For *directed* graphs, we can just save
  690. // the edge list and be done.
  691. //
  692. // For *bidirectional* graphs, we need to also
  693. // save the "vertex_in_edges" property map,
  694. // because it might contain in-edges that
  695. // are not locally owned.
  696. BGL_FORALL_EDGES_T(e, g, Graph)
  697. {
  698. vertex_descriptor src(source(e, g));
  699. vertex_descriptor tgt(target(e, g));
  700. typename vertex_descriptor::local_descriptor_type
  701. local_u(local(src));
  702. typename vertex_descriptor::local_descriptor_type
  703. local_v(local(tgt));
  704. process_id_type target_owner = owner(tgt);
  705. using serialization::make_nvp;
  706. ar << make_nvp("source", unsafe_serialize(local_u));
  707. ar << make_nvp("target_owner", target_owner);
  708. ar << make_nvp("target", unsafe_serialize(local_v));
  709. maybe_save_properties(
  710. ar, "edge_property"
  711. , static_cast<edge_property_type const&>(get(edge_all_t(), g, e))
  712. );
  713. maybe_save_property_ptr(ar, e, DirectedS());
  714. }
  715. save_in_edges(ar, g, DirectedS());
  716. }
  717. }} // namespace detail::parallel
  718. template <PBGL_DISTRIB_ADJLIST_TEMPLATE_PARMS>
  719. template <class IStreamConstructibleArchive>
  720. void PBGL_DISTRIB_ADJLIST_TYPE::load(std::string const& filename)
  721. {
  722. process_group_type pg = process_group();
  723. process_id_type id = process_id(pg);
  724. synchronize(pg);
  725. std::vector<int> disk_files = detail::parallel::available_process_files(filename);
  726. std::sort(disk_files.begin(), disk_files.end());
  727. // Negotiate which process gets which file. Serialized.
  728. std::vector<int> consumed_files;
  729. int picked_file = -1;
  730. if (id > 0)
  731. receive_oob(pg, id-1, 0, consumed_files);
  732. std::sort(consumed_files.begin(), consumed_files.end());
  733. std::vector<int> available_files;
  734. std::set_difference(
  735. disk_files.begin(), disk_files.end()
  736. , consumed_files.begin(), consumed_files.end()
  737. , std::back_inserter(available_files)
  738. );
  739. if (available_files.empty())
  740. boost::throw_exception(std::runtime_error("no file available"));
  741. // back() used for debug purposes. Making sure the
  742. // ranks are shuffled.
  743. picked_file = available_files.back();
  744. # ifdef PBGL_SERIALIZE_DEBUG
  745. std::cout << id << " picked " << picked_file << "\n";
  746. # endif
  747. consumed_files.push_back(picked_file);
  748. if (id < num_processes(pg) - 1)
  749. send_oob(pg, id+1, 0, consumed_files);
  750. std::string local_filename = filename + "/" +
  751. lexical_cast<std::string>(picked_file);
  752. std::ifstream in(local_filename.c_str(), std::ios_base::binary);
  753. IStreamConstructibleArchive ar(in);
  754. detail::parallel::graph_loader<
  755. graph_type, IStreamConstructibleArchive, InVertexListS
  756. > loader(*this, ar);
  757. # ifdef PBGL_SERIALIZE_DEBUG
  758. std::cout << "Process " << id << " done loading.\n";
  759. # endif
  760. synchronize(pg);
  761. }
  762. template <PBGL_DISTRIB_ADJLIST_TEMPLATE_PARMS>
  763. template <class OStreamConstructibleArchive>
  764. void PBGL_DISTRIB_ADJLIST_TYPE::save(std::string const& filename) const
  765. {
  766. typedef typename config_type::VertexListS vertex_list_selector;
  767. process_group_type pg = process_group();
  768. process_id_type id = process_id(pg);
  769. if (filesystem::exists(filename) && !filesystem::is_directory(filename))
  770. boost::throw_exception(std::runtime_error("entry exists, but is not a directory"));
  771. filesystem::remove_all(filename);
  772. filesystem::create_directory(filename);
  773. synchronize(pg);
  774. std::string local_filename = filename + "/" +
  775. lexical_cast<std::string>(id);
  776. std::ofstream out(local_filename.c_str(), std::ios_base::binary);
  777. OStreamConstructibleArchive ar(out);
  778. using serialization::make_nvp;
  779. typename process_group_type::process_size_type num_processes_ = num_processes(pg);
  780. ar << make_nvp("num_processes", num_processes_);
  781. ar << BOOST_SERIALIZATION_NVP(id);
  782. ar << make_nvp("mapping", this->distribution().mapping());
  783. int V = num_vertices(*this);
  784. ar << BOOST_SERIALIZATION_NVP(V);
  785. BGL_FORALL_VERTICES_T(v, *this, graph_type)
  786. {
  787. local_vertex_descriptor local_descriptor(local(v));
  788. detail::parallel::maybe_save_local_descriptor(
  789. ar, local_descriptor, vertex_list_selector());
  790. detail::parallel::maybe_save_properties(
  791. ar, "vertex_property"
  792. , static_cast<vertex_property_type const&>(get(vertex_all_t(), *this, v))
  793. );
  794. }
  795. detail::parallel::save_edges(ar, *this, directed_selector());
  796. ar << make_nvp("distribution", this->distribution());
  797. }
  798. } // namespace boost
  799. #endif // BOOST_GRAPH_DISTRIBUTED_ADJLIST_SERIALIZATION_070925_HPP