//---------------------------------------------------------------------------- /// @file sample_sort.hpp /// @brief contains the class sample_sort /// /// @author Copyright (c) 2016 Francisco Jose Tapia (fjtapia@gmail.com )\n /// Distributed under the Boost Software License, Version 1.0.\n /// ( See accompanying file LICENSE_1_0.txt or copy at /// http://www.boost.org/LICENSE_1_0.txt ) /// @version 0.1 /// /// @remarks //----------------------------------------------------------------------------- #ifndef __BOOST_SORT_PARALLEL_DETAIL_SAMPLE_SORT_HPP #define __BOOST_SORT_PARALLEL_DETAIL_SAMPLE_SORT_HPP #include #include #include #include #include #include #include #include #include #include #include #include #include namespace boost { namespace sort { namespace sample_detail { //--------------------------------------------------------------------------- // USING SENTENCES //--------------------------------------------------------------------------- namespace bsc = boost::sort::common; namespace bss = boost::sort::spin_detail; namespace bscu = boost::sort::common::util; using bsc::range; using bscu::atomic_add; using bsc::merge_vector4; using bsc::uninit_merge_level4; using bsc::less_ptr_no_null; // ///--------------------------------------------------------------------------- /// @struct sample_sort /// @brief This a structure for to implement a sample sort, exception /// safe /// @tparam /// @remarks //---------------------------------------------------------------------------- template struct sample_sort { //------------------------------------------------------------------------ // DEFINITIONS //------------------------------------------------------------------------ typedef value_iter value_t; typedef range range_it; typedef range range_buf; typedef sample_sort this_t; //------------------------------------------------------------------------ // VARIABLES AND CONSTANTS //------------------------------------------------------------------------ // minimun numbers of elements for to be sortd in parallel mode static const uint32_t thread_min = (1 << 16); // Number of threads to use in the algorithm // Number of intervals for to do the internal division of the data uint32_t nthread, ninterval; // Bool variables indicating if the auxiliary memory is constructed // and indicating in the auxiliary memory had been obtained inside the /// algorithm or had been received as a parameter bool construct = false, owner = false; // Comparison object for to compare two elements Compare comp; // Range with all the elements to sort range_it global_range; // range with the auxiliary memory range_buf global_buf; // vector of futures std::vector> vfuture; // vector of vectors which contains the ranges to merge obtained in the // subdivision std::vector> vv_range_it; // each vector of ranges of the vv_range_it, need their corresponding buffer // for to do the merge std::vector> vv_range_buf; // Initial vector of ranges std::vector vrange_it_ini; // Initial vector of buffers std::vector vrange_buf_ini; // atomic counter for to know when are finished the function_t created // inside a function std::atomic njob; // Indicate if an error in the algorithm for to undo all bool error; //------------------------------------------------------------------------ // FUNCTIONS OF THE STRUCT //------------------------------------------------------------------------ void initial_configuration(void); sample_sort (Iter_t first, Iter_t last, Compare cmp, uint32_t num_thread, value_t *paux, size_t naux); sample_sort(Iter_t first, Iter_t last) : sample_sort (first, last, Compare(), std::thread::hardware_concurrency(), nullptr, 0) { }; sample_sort(Iter_t first, Iter_t last, Compare cmp) : sample_sort(first, last, cmp, std::thread::hardware_concurrency(), nullptr, 0) { }; sample_sort(Iter_t first, Iter_t last, uint32_t num_thread) : sample_sort(first, last, Compare(), num_thread, nullptr, 0) { }; sample_sort(Iter_t first, Iter_t last, Compare cmp, uint32_t num_thread) : sample_sort(first, last, cmp, num_thread, nullptr, 0) { }; sample_sort(Iter_t first, Iter_t last, Compare cmp, uint32_t num_thread, range_buf range_buf_initial) : sample_sort(first, last, cmp, num_thread, range_buf_initial.first, range_buf_initial.size()) { }; void destroy_all(void); // //----------------------------------------------------------------------------- // function :~sample_sort /// @brief destructor of the class. The utility is to destroy the temporary /// buffer used in the sorting process //----------------------------------------------------------------------------- ~sample_sort(void) { destroy_all(); }; // //----------------------------------------------------------------------- // function : execute first /// @brief this a function to assign to each thread in the first merge //----------------------------------------------------------------------- void execute_first(void) { uint32_t job = 0; while ((job = atomic_add(njob, 1)) < ninterval) { uninit_merge_level4(vrange_buf_ini[job], vv_range_it[job], vv_range_buf[job], comp); }; }; // //----------------------------------------------------------------------- // function : execute /// @brief this is a function to assignt each thread the final merge //----------------------------------------------------------------------- void execute(void) { uint32_t job = 0; while ((job = atomic_add(njob, 1)) < ninterval) { merge_vector4(vrange_buf_ini[job], vrange_it_ini[job], vv_range_buf[job], vv_range_it[job], comp); }; }; // //----------------------------------------------------------------------- // function : first merge /// @brief Implement the merge of the initially sparse ranges //----------------------------------------------------------------------- void first_merge(void) { //---------------------------------- begin -------------------------- njob = 0; for (uint32_t i = 0; i < nthread; ++i) { vfuture[i] = std::async(std::launch::async, &this_t::execute_first, this); }; for (uint32_t i = 0; i < nthread; ++i) vfuture[i].get(); }; // //----------------------------------------------------------------------- // function : final merge /// @brief Implement the final merge of the ranges //----------------------------------------------------------------------- void final_merge(void) { //---------------------------------- begin -------------------------- njob = 0; for (uint32_t i = 0; i < nthread; ++i) { vfuture[i] = std::async(std::launch::async, &this_t::execute, this); }; for (uint32_t i = 0; i < nthread; ++i) vfuture[i].get(); }; //---------------------------------------------------------------------------- }; // End class sample_sort //---------------------------------------------------------------------------- // //############################################################################ // ## // N O N I N L I N E F U N C T I O N S ## // ## // ## //############################################################################ // //----------------------------------------------------------------------------- // function : sample_sort /// @brief constructor of the class /// /// @param first : iterator to the first element of the range to sort /// @param last : iterator after the last element to the range to sort /// @param cmp : object for to compare two elements pointed by Iter_t iterators /// @param num_thread : Number of threads to use in the process. When this value /// is lower than 2, the sorting is done with 1 thread /// @param paux : pointer to the auxiliary memory. If nullptr, the memory is /// created inside the class /// @param naux : number of elements of the memory pointed by paux //----------------------------------------------------------------------------- template sample_sort ::sample_sort (Iter_t first, Iter_t last, Compare cmp, uint32_t num_thread, value_t *paux, size_t naux) : nthread(num_thread), owner(false), comp(cmp), global_range(first, last), global_buf(nullptr, nullptr), error(false) { assert((last - first) >= 0); size_t nelem = size_t(last - first); construct = false; njob = 0; vfuture.resize(nthread); // Adjust when have many threads and only a few elements while (nelem > thread_min and (nthread * nthread) > (nelem >> 3)) { nthread /= 2; }; ninterval = (nthread << 3); if (nthread < 2 or nelem <= (thread_min)) { bss::spinsort(first, last, comp); return; }; //------------------- check if sort -------------------------------------- bool sw = true; for (Iter_t it1 = first, it2 = first + 1; it2 != last and (sw = not comp(*it2, *it1)); it1 = it2++); if (sw) return; //------------------- check if reverse sort --------------------------- sw = true; for (Iter_t it1 = first, it2 = first + 1; it2 != last and (sw = comp(*it2, *it1)); it1 = it2++); if (sw) { size_t nelem2 = nelem >> 1; Iter_t it1 = first, it2 = last - 1; for (size_t i = 0; i < nelem2; ++i) std::swap(*(it1++), *(it2--)); return; }; if (paux != nullptr) { assert(naux != 0); global_buf.first = paux; global_buf.last = paux + naux; owner = false; } else { value_t *ptr = std::get_temporary_buffer(nelem).first; if (ptr == nullptr) throw std::bad_alloc(); owner = true; global_buf = range_buf(ptr, ptr + nelem); }; //------------------------------------------------------------------------ // PROCESS //------------------------------------------------------------------------ try { initial_configuration(); } catch (std::bad_alloc &) { error = true; }; if (not error) { first_merge(); construct = true; final_merge(); }; if (error) { destroy_all(); throw std::bad_alloc(); }; } ; // //----------------------------------------------------------------------------- // function : destroy_all /// @brief destructor of the class. The utility is to destroy the temporary /// buffer used in the sorting process //----------------------------------------------------------------------------- template void sample_sort::destroy_all(void) { if (construct) { destroy(global_buf); construct = false; } if (global_buf.first != nullptr and owner) std::return_temporary_buffer(global_buf.first); } // //----------------------------------------------------------------------------- // function : initial_configuration /// @brief Create the internal data structures, and obtain the inital set of /// ranges to merge //----------------------------------------------------------------------------- template void sample_sort::initial_configuration(void) { std::vector vmem_thread; std::vector vbuf_thread; size_t nelem = global_range.size(); //------------------------------------------------------------------------ size_t cupo = nelem / nthread; Iter_t it_first = global_range.first; value_t *buf_first = global_buf.first; vmem_thread.reserve(nthread + 1); vbuf_thread.reserve(nthread + 1); for (uint32_t i = 0; i < (nthread - 1); ++i, it_first += cupo, buf_first += cupo) { vmem_thread.emplace_back(it_first, it_first + cupo); vbuf_thread.emplace_back(buf_first, buf_first + cupo); }; vmem_thread.emplace_back(it_first, global_range.last); vbuf_thread.emplace_back(buf_first, global_buf.last); //------------------------------------------------------------------------ // Sorting of the ranges //------------------------------------------------------------------------ std::vector> vfuture(nthread); for (uint32_t i = 0; i < nthread; ++i) { auto func = [=]() { bss::spinsort (vmem_thread[i].first, vmem_thread[i].last, comp, vbuf_thread[i]); }; vfuture[i] = std::async(std::launch::async, func); }; for (uint32_t i = 0; i < nthread; ++i) vfuture[i].get(); //------------------------------------------------------------------------ // Obtain the vector of milestones //------------------------------------------------------------------------ std::vector vsample; vsample.reserve(nthread * (ninterval - 1)); for (uint32_t i = 0; i < nthread; ++i) { size_t distance = vmem_thread[i].size() / ninterval; for (size_t j = 1, pos = distance; j < ninterval; ++j, pos += distance) { vsample.push_back(vmem_thread[i].first + pos); }; }; typedef less_ptr_no_null compare_ptr; typedef typename std::vector::iterator it_to_it; bss::spinsort(vsample.begin(), vsample.end(), compare_ptr(comp)); //------------------------------------------------------------------------ // Create the final milestone vector //------------------------------------------------------------------------ std::vector vmilestone; vmilestone.reserve(ninterval); for (uint32_t pos = nthread >> 1; pos < vsample.size(); pos += nthread) { vmilestone.push_back(vsample[pos]); }; //------------------------------------------------------------------------ // Creation of the first vector of ranges //------------------------------------------------------------------------ std::vector>>vv_range_first (nthread); for (uint32_t i = 0; i < nthread; ++i) { Iter_t itaux = vmem_thread[i].first; for (uint32_t k = 0; k < (ninterval - 1); ++k) { Iter_t it2 = std::upper_bound(itaux, vmem_thread[i].last, *vmilestone[k], comp); vv_range_first[i].emplace_back(itaux, it2); itaux = it2; }; vv_range_first[i].emplace_back(itaux, vmem_thread[i].last); }; //------------------------------------------------------------------------ // Copy in buffer and creation of the final matrix of ranges //------------------------------------------------------------------------ vv_range_it.resize(ninterval); vv_range_buf.resize(ninterval); vrange_it_ini.reserve(ninterval); vrange_buf_ini.reserve(ninterval); for (uint32_t i = 0; i < ninterval; ++i) { vv_range_it[i].reserve(nthread); vv_range_buf[i].reserve(nthread); }; Iter_t it = global_range.first; value_t *it_buf = global_buf.first; for (uint32_t k = 0; k < ninterval; ++k) { size_t nelem_interval = 0; for (uint32_t i = 0; i < nthread; ++i) { size_t nelem_range = vv_range_first[i][k].size(); if (nelem_range != 0) { vv_range_it[k].push_back(vv_range_first[i][k]); }; nelem_interval += nelem_range; }; vrange_it_ini.emplace_back(it, it + nelem_interval); vrange_buf_ini.emplace_back(it_buf, it_buf + nelem_interval); it += nelem_interval; it_buf += nelem_interval; }; } ; // //**************************************************************************** } ; // End namespace sample_detail //**************************************************************************** // namespace bscu = boost::sort::common::util; // //############################################################################ // ## // ## // S A M P L E _ S O R T ## // ## // ## //############################################################################ // //----------------------------------------------------------------------------- // function : sample_sort /// @brief parallel sample sort algorithm (stable sort) /// /// @param first : iterator to the first element of the range to sort /// @param last : iterator after the last element to the range to sort //----------------------------------------------------------------------------- template void sample_sort(Iter_t first, Iter_t last) { typedef compare_iter Compare; sample_detail::sample_sort(first, last); }; // //----------------------------------------------------------------------------- // function : sample_sort /// @brief parallel sample sort algorithm (stable sort) /// /// @param first : iterator to the first element of the range to sort /// @param last : iterator after the last element to the range to sort /// @param nthread : Number of threads to use in the process. When this value /// is lower than 2, the sorting is done with 1 thread //----------------------------------------------------------------------------- template void sample_sort(Iter_t first, Iter_t last, uint32_t nthread) { typedef compare_iter Compare; sample_detail::sample_sort(first, last, nthread); }; // //----------------------------------------------------------------------------- // function : sample_sort /// @brief parallel sample sort algorithm (stable sort) /// /// @param first : iterator to the first element of the range to sort /// @param last : iterator after the last element to the range to sort /// @param comp : object for to compare two elements pointed by Iter_t /// iterators //----------------------------------------------------------------------------- template * = nullptr> void sample_sort(Iter_t first, Iter_t last, Compare comp) { sample_detail::sample_sort(first, last, comp); }; // //----------------------------------------------------------------------------- // function : sample_sort /// @brief parallel sample sort algorithm (stable sort) /// /// @param first : iterator to the first element of the range to sort /// @param last : iterator after the last element to the range to sort /// @param comp : object for to compare two elements pointed by Iter_t /// iterators /// @param nthread : Number of threads to use in the process. When this value /// is lower than 2, the sorting is done with 1 thread //----------------------------------------------------------------------------- template void sample_sort(Iter_t first, Iter_t last, Compare comp, uint32_t nthread) { sample_detail::sample_sort(first, last, comp, nthread); }; // //**************************************************************************** };// End namespace sort };// End namespace boost //**************************************************************************** // #endif