merge_blocks.hpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. //----------------------------------------------------------------------------
  2. /// @file merge_blocks.hpp
  3. /// @brief contains the class merge_blocks, which is part of the
  4. /// block_indirect_sort algorithm
  5. ///
  6. /// @author Copyright (c) 2016 Francisco Jose Tapia (fjtapia@gmail.com )\n
  7. /// Distributed under the Boost Software License, Version 1.0.\n
  8. /// ( See accompanying file LICENSE_1_0.txt or copy at
  9. /// http://www.boost.org/LICENSE_1_0.txt )
  10. /// @version 0.1
  11. ///
  12. /// @remarks
  13. //-----------------------------------------------------------------------------
  14. #ifndef __BOOST_SORT_PARALLEL_DETAIL_MERGE_BLOCKS_HPP
  15. #define __BOOST_SORT_PARALLEL_DETAIL_MERGE_BLOCKS_HPP
  16. #include <atomic>
  17. #include <boost/sort/block_indirect_sort/blk_detail/backbone.hpp>
  18. #include <boost/sort/common/range.hpp>
  19. #include <future>
  20. #include <iostream>
  21. #include <iterator>
  22. namespace boost
  23. {
  24. namespace sort
  25. {
  26. namespace blk_detail
  27. {
  28. //----------------------------------------------------------------------------
  29. // USING SENTENCES
  30. //----------------------------------------------------------------------------
  31. namespace bsc = boost::sort::common;
  32. namespace bscu = bsc::util;
  33. using bsc::range;
  34. using bsc::is_mergeable;
  35. using bsc::merge_uncontiguous;
  36. //
  37. ///---------------------------------------------------------------------------
  38. /// @struct merge_blocks
  39. /// @brief This class merge the blocks. The blocks to merge are defined by two
  40. /// ranges of positions in the index of the backbone
  41. //----------------------------------------------------------------------------
  42. template<uint32_t Block_size, uint32_t Group_size, class Iter_t, class Compare>
  43. struct merge_blocks
  44. {
  45. //-----------------------------------------------------------------------
  46. // D E F I N I T I O N S
  47. //-----------------------------------------------------------------------
  48. typedef typename std::iterator_traits<Iter_t>::value_type value_t;
  49. typedef std::atomic<uint32_t> atomic_t;
  50. typedef range<size_t> range_pos;
  51. typedef range<Iter_t> range_it;
  52. typedef range<value_t *> range_buf;
  53. typedef std::function<void(void)> function_t;
  54. typedef backbone<Block_size, Iter_t, Compare> backbone_t;
  55. typedef compare_block_pos<Block_size, Iter_t, Compare> compare_block_pos_t;
  56. //------------------------------------------------------------------------
  57. // V A R I A B L E S
  58. //------------------------------------------------------------------------
  59. // Object with the elements to sort and all internal data structures of the
  60. // algorithm
  61. backbone_t &bk;
  62. //
  63. //------------------------------------------------------------------------
  64. // F U N C T I O N S
  65. //------------------------------------------------------------------------
  66. merge_blocks(backbone_t &bkb, size_t pos_index1, size_t pos_index2,
  67. size_t pos_index3);
  68. void tail_process(std::vector<block_pos> &vblkpos1,
  69. std::vector<block_pos> &vblkpos2);
  70. void cut_range(range_pos rng);
  71. void merge_range_pos(range_pos rng);
  72. void extract_ranges(range_pos range_input);
  73. //
  74. //------------------------------------------------------------------------
  75. // function : function_merge_range_pos
  76. /// @brief create a function_t with a call to merge_range_pos, and insert
  77. /// in the stack of the backbone
  78. //
  79. /// @param rng_input : range of positions of blocks in the index to merge
  80. /// @param son_counter : atomic variable which is decremented when finish
  81. /// the function. This variable is used for to know
  82. /// when are finished all the function_t created
  83. /// inside an object
  84. /// @param error : global indicator of error.
  85. ///
  86. //------------------------------------------------------------------------
  87. void function_merge_range_pos(const range_pos &rng_input, atomic_t &counter,
  88. bool &error)
  89. {
  90. bscu::atomic_add(counter, 1);
  91. function_t f1 = [this, rng_input, &counter, &error]( ) -> void
  92. {
  93. if (not error)
  94. {
  95. try
  96. {
  97. this->merge_range_pos (rng_input);
  98. }
  99. catch (std::bad_alloc &ba)
  100. {
  101. error = true;
  102. };
  103. }
  104. bscu::atomic_sub (counter, 1);
  105. };
  106. bk.works.emplace_back(f1);
  107. }
  108. ;
  109. //
  110. //------------------------------------------------------------------------
  111. // function : function_cut_range
  112. /// @brief create a function_t with a call to cut_range, and inser in
  113. /// the stack of the backbone
  114. //
  115. /// @param rng_input : range of positions in the index to cut
  116. /// @param counter : atomic variable which is decremented when finish
  117. /// the function. This variable is used for to know
  118. /// when are finished all the function_t created
  119. /// inside an object
  120. /// @param error : global indicator of error.
  121. //------------------------------------------------------------------------
  122. void function_cut_range(const range_pos &rng_input, atomic_t &counter,
  123. bool &error)
  124. {
  125. bscu::atomic_add(counter, 1);
  126. function_t f1 = [this, rng_input, &counter, &error]( ) -> void
  127. {
  128. if (not error)
  129. {
  130. try
  131. {
  132. this->cut_range (rng_input);
  133. }
  134. catch (std::bad_alloc &)
  135. {
  136. error = true;
  137. };
  138. }
  139. bscu::atomic_sub (counter, 1);
  140. };
  141. bk.works.emplace_back(f1);
  142. }
  143. //----------------------------------------------------------------------------
  144. };
  145. // end struct merge_blocks
  146. //----------------------------------------------------------------------------
  147. //
  148. //############################################################################
  149. // ##
  150. // ##
  151. // N O N I N L I N E F U N C T I O N S ##
  152. // ##
  153. // ##
  154. //############################################################################
  155. //
  156. //-------------------------------------------------------------------------
  157. // function : merge_blocks
  158. /// @brief make the indirect merge of the two range_pos defined by their index
  159. /// position [pos_index1, pos_index2 ) and [ pos_index2, pos_index3 )
  160. //
  161. /// @param bkb : backbone with all the data to sort , and the internal data
  162. /// structures of the algorithm
  163. /// @param pos_index1 : first position of the first range in the index
  164. /// @param pos_index2 : last position of the first range and first position
  165. /// of the second range in the index
  166. /// @param pos_index3 : last position of the second range in the index
  167. //-------------------------------------------------------------------------
  168. template<uint32_t Block_size, uint32_t Group_size, class Iter_t, class Compare>
  169. merge_blocks<Block_size, Group_size, Iter_t, Compare>
  170. ::merge_blocks( backbone_t &bkb, size_t pos_index1, size_t pos_index2,
  171. size_t pos_index3) : bk(bkb)
  172. {
  173. size_t nblock1 = pos_index2 - pos_index1;
  174. size_t nblock2 = pos_index3 - pos_index2;
  175. if (nblock1 == 0 or nblock2 == 0) return;
  176. //-----------------------------------------------------------------------
  177. // Merging of the two intervals
  178. //-----------------------------------------------------------------------
  179. std::vector<block_pos> vpos1, vpos2;
  180. vpos1.reserve(nblock1 + 1);
  181. vpos2.reserve(nblock2 + 1);
  182. for (size_t i = pos_index1; i < pos_index2; ++i)
  183. {
  184. vpos1.emplace_back(bk.index[i].pos(), true);
  185. };
  186. for (size_t i = pos_index2; i < pos_index3; ++i)
  187. {
  188. vpos2.emplace_back(bk.index[i].pos(), false);
  189. };
  190. //-------------------------------------------------------------------
  191. // tail process
  192. //-------------------------------------------------------------------
  193. if (vpos2.back().pos() == (bk.nblock - 1)
  194. and bk.range_tail.first != bk.range_tail.last)
  195. {
  196. tail_process(vpos1, vpos2);
  197. nblock1 = vpos1.size();
  198. nblock2 = vpos2.size();
  199. };
  200. compare_block_pos_t cmp_blk(bk.global_range.first, bk.cmp);
  201. if (bk.error) return;
  202. bscu::merge(vpos1.begin(), vpos1.end(), vpos2.begin(), vpos2.end(),
  203. bk.index.begin() + pos_index1, cmp_blk);
  204. if (bk.error) return;
  205. // Extracting the ranges for to merge the elements
  206. extract_ranges(range_pos(pos_index1, pos_index1 + nblock1 + nblock2));
  207. }
  208. //
  209. //-------------------------------------------------------------------------
  210. // function : tail_process
  211. /// @brief make the process when the second vector of block_pos to merge is
  212. /// the last, and have an incomplete block ( tail)
  213. //
  214. /// @param vblkpos1 : first vector of block_pos elements to merge
  215. /// @param vblkpos2 : second vector of block_pos elements to merge
  216. //-------------------------------------------------------------------------
  217. template<uint32_t Block_size, uint32_t Group_size, class Iter_t, class Compare>
  218. void merge_blocks<Block_size, Group_size, Iter_t, Compare>
  219. ::tail_process( std::vector<block_pos> &vblkpos1,
  220. std::vector<block_pos> &vblkpos2 )
  221. {
  222. if (vblkpos1.size() == 0 or vblkpos2.size() == 0) return;
  223. vblkpos2.pop_back();
  224. size_t posback1 = vblkpos1.back().pos();
  225. range_it range_back1 = bk.get_range(posback1);
  226. if (bsc::is_mergeable(range_back1, bk.range_tail, bk.cmp))
  227. {
  228. bsc::merge_uncontiguous(range_back1, bk.range_tail, bk.get_range_buf(),
  229. bk.cmp);
  230. if (vblkpos1.size() > 1)
  231. {
  232. size_t pos_aux = vblkpos1[vblkpos1.size() - 2].pos();
  233. range_it range_aux = bk.get_range(pos_aux);
  234. if (bsc::is_mergeable(range_aux, range_back1, bk.cmp))
  235. {
  236. vblkpos2.emplace_back(posback1, false);
  237. vblkpos1.pop_back();
  238. };
  239. };
  240. };
  241. }
  242. //
  243. //-------------------------------------------------------------------------
  244. // function : cut_range
  245. /// @brief when the rng_input is greather than Group_size, this function divide
  246. /// it in several parts creating function_t elements, which are inserted
  247. /// in the concurrent stack of the backbone
  248. //
  249. /// @param rng_input : range to divide
  250. //-------------------------------------------------------------------------
  251. template<uint32_t Block_size, uint32_t Group_size, class Iter_t, class Compare>
  252. void merge_blocks<Block_size, Group_size, Iter_t, Compare>
  253. ::cut_range(range_pos rng_input)
  254. {
  255. if (rng_input.size() < Group_size)
  256. {
  257. merge_range_pos(rng_input);
  258. return;
  259. };
  260. atomic_t counter(0);
  261. size_t npart = (rng_input.size() + Group_size - 1) / Group_size;
  262. size_t size_part = rng_input.size() / npart;
  263. size_t pos_ini = rng_input.first;
  264. size_t pos_last = rng_input.last;
  265. while (pos_ini < pos_last)
  266. {
  267. size_t pos = pos_ini + size_part;
  268. while (pos < pos_last
  269. and bk.index[pos - 1].side() == bk.index[pos].side())
  270. {
  271. ++pos;
  272. };
  273. if (pos < pos_last)
  274. {
  275. merge_uncontiguous(bk.get_range(bk.index[pos - 1].pos()),
  276. bk.get_range(bk.index[pos].pos()),
  277. bk.get_range_buf(), bk.cmp);
  278. }
  279. else pos = pos_last;
  280. if ((pos - pos_ini) > 1)
  281. {
  282. range_pos rng_aux(pos_ini, pos);
  283. function_merge_range_pos(rng_aux, counter, bk.error);
  284. };
  285. pos_ini = pos;
  286. };
  287. bk.exec(counter); // wait until finish all the ranges
  288. }
  289. //
  290. //-------------------------------------------------------------------------
  291. // function : merge_range_pos
  292. /// @brief make the indirect merge of the blocks inside the rng_input
  293. //
  294. /// @param rng_input : range of positions of the blocks to merge
  295. //-------------------------------------------------------------------------
  296. template<uint32_t Block_size, uint32_t Group_size, class Iter_t, class Compare>
  297. void merge_blocks<Block_size, Group_size, Iter_t, Compare>
  298. ::merge_range_pos(range_pos rng_input)
  299. {
  300. if (rng_input.size() < 2) return;
  301. range_buf rbuf = bk.get_range_buf();
  302. range_it rng_prev = bk.get_range(bk.index[rng_input.first].pos());
  303. move_forward(rbuf, rng_prev);
  304. range_it rng_posx(rng_prev);
  305. for (size_t posx = rng_input.first + 1; posx != rng_input.last; ++posx)
  306. {
  307. rng_posx = bk.get_range(bk.index[posx].pos());
  308. bsc::merge_flow(rng_prev, rbuf, rng_posx, bk.cmp);
  309. rng_prev = rng_posx;
  310. };
  311. move_forward(rng_posx, rbuf);
  312. }
  313. //
  314. //-------------------------------------------------------------------------
  315. // function : extract_ranges
  316. /// @brief from a big range of positions of blocks in the index. Examine which
  317. /// are mergeable, and generate a couple of ranges for to be merged.
  318. /// With the ranges obtained generate function_t elements and are
  319. /// inserted in the concurrent stack.
  320. /// When the range obtained is smaller than Group_size, generate a
  321. /// function_t calling to merge_range_pos, when is greater, generate a
  322. /// function_t calling to cut_range
  323. //
  324. /// @param rpos range_input : range of the position in the index, where must
  325. /// extract the ranges to merge
  326. //-------------------------------------------------------------------------
  327. template<uint32_t Block_size, uint32_t Group_size, class Iter_t, class Compare>
  328. void merge_blocks<Block_size, Group_size, Iter_t, Compare>
  329. ::extract_ranges(range_pos range_input)
  330. {
  331. if (range_input.size() < 2) return;
  332. atomic_t counter(0);
  333. // The names with x are positions of the index
  334. size_t posx_ini = range_input.first;
  335. block_pos bp_posx_ini = bk.index[posx_ini];
  336. range_it rng_max = bk.get_range(bp_posx_ini.pos());
  337. bool side_max = bp_posx_ini.side();
  338. block_pos bp_posx;
  339. range_it rng_posx = rng_max;
  340. bool side_posx = side_max;
  341. for (size_t posx = posx_ini + 1; posx <= range_input.last; ++posx)
  342. {
  343. bool final = (posx == range_input.last);
  344. bool mergeable = false;
  345. if (not final)
  346. {
  347. bp_posx = bk.index[posx];
  348. rng_posx = bk.get_range(bp_posx.pos());
  349. side_posx = bp_posx.side();
  350. mergeable = (side_max != side_posx
  351. and is_mergeable(rng_max, rng_posx, bk.cmp));
  352. };
  353. if (bk.error) return;
  354. if (final or not mergeable)
  355. {
  356. range_pos rp_final(posx_ini, posx);
  357. if (rp_final.size() > 1)
  358. {
  359. if (rp_final.size() > Group_size)
  360. {
  361. function_cut_range(rp_final, counter, bk.error);
  362. }
  363. else
  364. {
  365. function_merge_range_pos(rp_final, counter, bk.error);
  366. };
  367. };
  368. posx_ini = posx;
  369. if (not final)
  370. {
  371. rng_max = rng_posx;
  372. side_max = side_posx;
  373. };
  374. }
  375. else
  376. {
  377. if (bk.cmp(*(rng_max.back()), *(rng_posx.back())))
  378. {
  379. rng_max = rng_posx;
  380. side_max = side_posx;
  381. };
  382. };
  383. };
  384. bk.exec(counter);
  385. }
  386. //
  387. //****************************************************************************
  388. }; // End namespace blk_detail
  389. }; // End namespace sort
  390. }; // End namespace boost
  391. //****************************************************************************
  392. //
  393. #endif