11 #ifndef CUBBYFLOW_PARALLEL_IMPL_HPP    12 #define CUBBYFLOW_PARALLEL_IMPL_HPP    17 #if defined(CUBBYFLOW_TASKING_HPX)    18 #include <hpx/include/future.hpp>    19 #include <hpx/include/parallel_fill.hpp>    20 #include <hpx/include/parallel_for_each.hpp>    21 #include <hpx/include/parallel_for_loop.hpp>    22 #include <hpx/include/parallel_reduce.hpp>    23 #include <hpx/include/parallel_sort.hpp>    26 #if defined(CUBBYFLOW_TASKING_TBB)    27 #include <tbb/parallel_for.h>    28 #include <tbb/parallel_reduce.h>    29 #include <tbb/parallel_sort.h>    31 #elif defined(CUBBYFLOW_TASKING_CPP11THREAD)    47 #if defined(CUBBYFLOW_TASKING_HPX)    48 template <
typename Task>
    49 using future = hpx::future<Task>;
    51 template <
typename Task>
    55 template <
typename TASK>
    58 template <
typename TASK>
    61 #if defined(CUBBYFLOW_TASKING_HPX)    62     return hpx::async(std::forward<TASK>(fn));
    64 #elif defined(CUBBYFLOW_TASKING_TBB)    65     struct LocalTBBTask : 
public tbb::task
    69         LocalTBBTask(TASK&& f) : func(std::forward<TASK>(f))
    74         tbb::task* execute()
 override    81     using package_t = std::packaged_task<operator_return_t<TASK>()>;
    83     auto task = 
new package_t(std::forward<TASK>(fn));
    84     auto* tbbNode = 
new (tbb::task::allocate_root()) LocalTBBTask([=]() {
    89     tbb::task::enqueue(*tbbNode);
    90     return task.get_future();
    92 #elif defined(CUBBYFLOW_TASKING_CPP11THREAD)    93     return std::async(std::launch::async, fn);
    95     return std::async(std::launch::deferred, fn);
   105 template <
typename RandomIterator, 
typename RandomIterator2,
   106           typename CompareFunction>
   107 void Merge(RandomIterator a, 
size_t size, RandomIterator2 temp,
   108            CompareFunction compareFunction)
   111     size_t i2 = size / 2;
   114     while (i1 < size / 2 && i2 < size)
   116         if (compareFunction(a[i1], a[i2]))
   130     while (i1 < size / 2)
   148 template <
typename RandomIterator, 
typename RandomIterator2,
   149           typename CompareFunction>
   151                        unsigned int numThreads, CompareFunction compareFunction)
   155         std::sort(a, a + size, compareFunction);
   157     else if (numThreads > 1)
   159         std::vector<future<void>> pool;
   162         auto launchRange = [compareFunction](RandomIterator begin, 
size_t k2,
   163                                              RandomIterator2 temp,
   164                                              unsigned int numThreads) {
   169             [=]() { launchRange(a, size / 2, temp, numThreads / 2); }));
   172             launchRange(a + size / 2, size - size / 2, temp + size / 2,
   173                         numThreads - numThreads / 2);
   185         Merge(a, size, temp, compareFunction);
   190 template <
typename RandomIterator, 
typename T>
   191 void ParallelFill(
const RandomIterator& begin, 
const RandomIterator& end,
   194     auto diff = end - begin;
   200 #if defined(CUBBYFLOW_TASKING_HPX)   201     hpx::parallel::fill(hpx::parallel::execution::par, begin, end, value);
   203     size_t size = 
static_cast<size_t>(diff);
   205         ZERO_SIZE, size, [begin, value](
size_t i) { begin[i] = value; },
   211 template <
typename IndexType, 
typename Function>
   215     if (beginIndex > endIndex)
   222 #if defined(CUBBYFLOW_TASKING_TBB)   224         tbb::parallel_for(beginIndex, endIndex, 
function);
   225 #elif defined(CUBBYFLOW_TASKING_HPX)   227         hpx::parallel::for_loop(hpx::parallel::execution::par, beginIndex,
   229 #elif defined(CUBBYFLOW_TASKING_CPP11THREAD)   232         const unsigned int numThreads =
   233             (numThreadsHint == 0u) ? 8u : numThreadsHint;
   236         IndexType n = endIndex - beginIndex + 1;
   237         IndexType slice = 
static_cast<IndexType
>(
   238             std::round(n / static_cast<double>(numThreads)));
   239         slice = std::max(slice, IndexType(1));
   242         auto launchRange = [&
function](IndexType k1, IndexType k2) {
   243             for (IndexType k = k1; k < k2; ++k)
   250         std::vector<std::thread> pool;
   251         pool.reserve(numThreads);
   252         IndexType i1 = beginIndex;
   253         IndexType i2 = std::min(beginIndex + slice, endIndex);
   255         for (
unsigned int i = 0; i + 1 < numThreads && i1 < endIndex; ++i)
   257             pool.emplace_back(launchRange, i1, i2);
   259             i2 = std::min(i2 + slice, endIndex);
   264             pool.emplace_back(launchRange, i1, endIndex);
   268         for (std::thread& t : pool)
   278 #if defined(CUBBYFLOW_TASKING_OPENMP)   279 #pragma omp parallel for   280 #if defined(_MSC_VER) && !defined(__INTEL_COMPILER)   281         for (ssize_t i = beginIndex; i < static_cast<ssize_t>(endIndex); ++i)
   283 #else   // !MSVC || Intel   284         for (
auto i = beginIndex; i < endIndex; ++i)
   286 #endif  // MSVC && !Intel   289 #else   // CUBBYFLOW_TASKING_SERIAL   290         for (
auto i = beginIndex; i < endIndex; ++i)
   294 #endif  // CUBBYFLOW_TASKING_OPENMP   299         for (
auto i = beginIndex; i < endIndex; ++i)
   306 template <
typename IndexType, 
typename Function>
   310     if (beginIndex > endIndex)
   317 #if defined(CUBBYFLOW_TASKING_TBB)   319             tbb::blocked_range<IndexType>(beginIndex, endIndex),
   320             [&
function](
const tbb::blocked_range<IndexType>& range) {
   321                 function(range.begin(), range.end());
   326         const unsigned int numThreads =
   327             numThreadsHint == 0u ? 8u : numThreadsHint;
   330         IndexType n = endIndex - beginIndex + 1;
   331         IndexType slice = 
static_cast<IndexType
>(
   332             std::round(n / static_cast<double>(numThreads)));
   333         slice = std::max(slice, IndexType(1));
   336         std::vector<CubbyFlow::Internal::future<void>> pool;
   337         pool.reserve(numThreads);
   338         IndexType i1 = beginIndex;
   339         IndexType i2 = std::min(beginIndex + slice, endIndex);
   341         for (
unsigned int i = 0; i + 1 < numThreads && i1 < endIndex; ++i)
   345             i2 = std::min(i2 + slice, endIndex);
   366         function(beginIndex, endIndex);
   370 template <
typename IndexType, 
typename Function>
   372                  IndexType beginIndexY, IndexType endIndexY,
   376         beginIndexY, endIndexY,
   378             for (IndexType i = beginIndexX; i < endIndexX; ++i)
   386 template <
typename IndexType, 
typename Function>
   388                       IndexType beginIndexY, IndexType endIndexY,
   392         beginIndexY, endIndexY,
   393         [&](IndexType jBegin, IndexType jEnd) {
   394             function(beginIndexX, endIndexX, jBegin, jEnd);
   399 template <
typename IndexType, 
typename Function>
   401                  IndexType beginIndexY, IndexType endIndexY,
   402                  IndexType beginIndexZ, IndexType endIndexZ,
   406         beginIndexZ, endIndexZ,
   408             for (IndexType j = beginIndexY; j < endIndexY; ++j)
   410                 for (IndexType i = beginIndexX; i < endIndexX; ++i)
   419 template <
typename IndexType, 
typename Function>
   421                       IndexType beginIndexY, IndexType endIndexY,
   422                       IndexType beginIndexZ, IndexType endIndexZ,
   426         beginIndexZ, endIndexZ,
   427         [&](IndexType kBegin, IndexType kEnd) {
   428             function(beginIndexX, endIndexX, beginIndexY, endIndexY, kBegin,
   434 template <
typename IndexType, 
typename Value, 
typename Function,
   437                      const Value& identity, 
const Function& 
function,
   440     if (beginIndex > endIndex)
   447 #if defined(CUBBYFLOW_TASKING_TBB)   448         return tbb::parallel_reduce(
   449             tbb::blocked_range<IndexType>(beginIndex, endIndex), identity,
   450             [&
function](
const tbb::blocked_range<IndexType>& range,
   452                 return function(range.begin(), range.end(), init);
   458         const unsigned int numThreads =
   459             (numThreadsHint == 0u) ? 8u : numThreadsHint;
   462         IndexType n = endIndex - beginIndex + 1;
   463         IndexType slice = 
static_cast<IndexType
>(
   464             std::round(n / static_cast<double>(numThreads)));
   465         slice = std::max(slice, IndexType(1));
   468         std::vector<Value> results(numThreads, identity);
   471         auto launchRange = [&](IndexType k1, IndexType k2, 
unsigned int tid) {
   472             results[tid] = 
function(k1, k2, identity);
   476         std::vector<CubbyFlow::Internal::future<void>> pool;
   477         pool.reserve(numThreads);
   479         IndexType i1 = beginIndex;
   480         IndexType i2 = std::min(beginIndex + slice, endIndex);
   481         unsigned int threadID = 0;
   483         for (; threadID + 1 < numThreads && i1 < endIndex; ++threadID)
   489             i2 = std::min(i2 + slice, endIndex);
   495                 [=]() { launchRange(i1, endIndex, threadID); }));
   508         Value finalResult = identity;
   509         for (
const Value& val : results)
   511             finalResult = reduce(val, finalResult);
   519     return function(beginIndex, endIndex, identity);
   522 template <
typename RandomIterator>
   528         std::less<
typename std::iterator_traits<RandomIterator>::value_type>(),
   532 template <
typename RandomIterator, 
typename CompareFunction>
   543 #if defined(CUBBYFLOW_TASKING_HPX)   544         hpx::parallel::sort(hpx::parallel::execution::par, begin, end,
   546 #elif defined(CUBBYFLOW_TASKING_TBB)   547         tbb::parallel_sort(begin, end, compareFunction);
   550         size_t size = 
static_cast<size_t>(end - begin);
   553             typename std::iterator_traits<RandomIterator>::value_type;
   554         std::vector<value_type> temp(size);
   558         const unsigned int numThreads =
   559             (numThreadsHint == 0u) ? 8u : numThreadsHint;
   567         std::sort(begin, end, compareFunction);
 unsigned int GetMaxNumberOfThreads()
Returns maximum number of threads to use. 
 
void ParallelRangeFor(IndexType beginIndex, IndexType endIndex, const Function &function, ExecutionPolicy policy)
Makes a range-loop from beginIndex to endIndex in parallel. 
Definition: Parallel-Impl.hpp:307
 
typename std::invoke_result_t< TASK > operator_return_t
Definition: Parallel-Impl.hpp:56
 
Definition: pybind11Utils.hpp:20
 
void ParallelFor(IndexType beginIndex, IndexType endIndex, const Function &function, ExecutionPolicy policy)
Makes a for-loop from beginIndex to endIndex in parallel. 
Definition: Parallel-Impl.hpp:212
 
void Merge(RandomIterator a, size_t size, RandomIterator2 temp, CompareFunction compareFunction)
Definition: Parallel-Impl.hpp:107
 
void ParallelSort(RandomIterator begin, RandomIterator end, ExecutionPolicy policy)
Sorts a container in parallel. 
Definition: Parallel-Impl.hpp:523
 
auto Async(TASK &&fn) -> future< operator_return_t< TASK >>
Definition: Parallel-Impl.hpp:59
 
constexpr size_t ZERO_SIZE
Zero size_t. 
Definition: Constants.hpp:20
 
void ParallelMergeSort(RandomIterator a, size_t size, RandomIterator2 temp, unsigned int numThreads, CompareFunction compareFunction)
Definition: Parallel-Impl.hpp:150
 
ExecutionPolicy
Execution policy tag. 
Definition: Parallel.hpp:17
 
void ParallelFill(const RandomIterator &begin, const RandomIterator &end, const T &value, ExecutionPolicy policy)
Fills from begin to end with value in parallel. 
Definition: Parallel-Impl.hpp:191
 
std::future< Task > future
Definition: Parallel-Impl.hpp:52
 
Value ParallelReduce(IndexType beginIndex, IndexType endIndex, const Value &identity, const Function &function, const Reduce &reduce, ExecutionPolicy policy)
Performs reduce operation in parallel. 
Definition: Parallel-Impl.hpp:436