11 #ifndef CUBBYFLOW_PARALLEL_IMPL_HPP 12 #define CUBBYFLOW_PARALLEL_IMPL_HPP 17 #if defined(CUBBYFLOW_TASKING_HPX) 18 #include <hpx/include/future.hpp> 19 #include <hpx/include/parallel_fill.hpp> 20 #include <hpx/include/parallel_for_each.hpp> 21 #include <hpx/include/parallel_for_loop.hpp> 22 #include <hpx/include/parallel_reduce.hpp> 23 #include <hpx/include/parallel_sort.hpp> 26 #if defined(CUBBYFLOW_TASKING_TBB) 27 #include <tbb/parallel_for.h> 28 #include <tbb/parallel_reduce.h> 29 #include <tbb/parallel_sort.h> 31 #elif defined(CUBBYFLOW_TASKING_CPP11THREAD) 47 #if defined(CUBBYFLOW_TASKING_HPX) 48 template <
typename Task>
49 using future = hpx::future<Task>;
51 template <
typename Task>
55 template <
typename TASK>
58 template <
typename TASK>
61 #if defined(CUBBYFLOW_TASKING_HPX) 62 return hpx::async(std::forward<TASK>(fn));
64 #elif defined(CUBBYFLOW_TASKING_TBB) 65 struct LocalTBBTask :
public tbb::task
69 LocalTBBTask(TASK&& f) : func(std::forward<TASK>(f))
74 tbb::task* execute()
override 81 using package_t = std::packaged_task<operator_return_t<TASK>()>;
83 auto task =
new package_t(std::forward<TASK>(fn));
84 auto* tbbNode =
new (tbb::task::allocate_root()) LocalTBBTask([=]() {
89 tbb::task::enqueue(*tbbNode);
90 return task.get_future();
92 #elif defined(CUBBYFLOW_TASKING_CPP11THREAD) 93 return std::async(std::launch::async, fn);
95 return std::async(std::launch::deferred, fn);
105 template <
typename RandomIterator,
typename RandomIterator2,
106 typename CompareFunction>
107 void Merge(RandomIterator a,
size_t size, RandomIterator2 temp,
108 CompareFunction compareFunction)
111 size_t i2 = size / 2;
114 while (i1 < size / 2 && i2 < size)
116 if (compareFunction(a[i1], a[i2]))
130 while (i1 < size / 2)
148 template <
typename RandomIterator,
typename RandomIterator2,
149 typename CompareFunction>
151 unsigned int numThreads, CompareFunction compareFunction)
155 std::sort(a, a + size, compareFunction);
157 else if (numThreads > 1)
159 std::vector<future<void>> pool;
162 auto launchRange = [compareFunction](RandomIterator begin,
size_t k2,
163 RandomIterator2 temp,
164 unsigned int numThreads) {
169 [=]() { launchRange(a, size / 2, temp, numThreads / 2); }));
172 launchRange(a + size / 2, size - size / 2, temp + size / 2,
173 numThreads - numThreads / 2);
185 Merge(a, size, temp, compareFunction);
190 template <
typename RandomIterator,
typename T>
191 void ParallelFill(
const RandomIterator& begin,
const RandomIterator& end,
194 auto diff = end - begin;
200 #if defined(CUBBYFLOW_TASKING_HPX) 201 hpx::parallel::fill(hpx::parallel::execution::par, begin, end, value);
203 size_t size =
static_cast<size_t>(diff);
205 ZERO_SIZE, size, [begin, value](
size_t i) { begin[i] = value; },
211 template <
typename IndexType,
typename Function>
215 if (beginIndex > endIndex)
222 #if defined(CUBBYFLOW_TASKING_TBB) 224 tbb::parallel_for(beginIndex, endIndex,
function);
225 #elif defined(CUBBYFLOW_TASKING_HPX) 227 hpx::parallel::for_loop(hpx::parallel::execution::par, beginIndex,
229 #elif defined(CUBBYFLOW_TASKING_CPP11THREAD) 232 const unsigned int numThreads =
233 (numThreadsHint == 0u) ? 8u : numThreadsHint;
236 IndexType n = endIndex - beginIndex + 1;
237 IndexType slice =
static_cast<IndexType
>(
238 std::round(n / static_cast<double>(numThreads)));
239 slice = std::max(slice, IndexType(1));
242 auto launchRange = [&
function](IndexType k1, IndexType k2) {
243 for (IndexType k = k1; k < k2; ++k)
250 std::vector<std::thread> pool;
251 pool.reserve(numThreads);
252 IndexType i1 = beginIndex;
253 IndexType i2 = std::min(beginIndex + slice, endIndex);
255 for (
unsigned int i = 0; i + 1 < numThreads && i1 < endIndex; ++i)
257 pool.emplace_back(launchRange, i1, i2);
259 i2 = std::min(i2 + slice, endIndex);
264 pool.emplace_back(launchRange, i1, endIndex);
268 for (std::thread& t : pool)
278 #if defined(CUBBYFLOW_TASKING_OPENMP) 279 #pragma omp parallel for 280 #if defined(_MSC_VER) && !defined(__INTEL_COMPILER) 281 for (ssize_t i = beginIndex; i < static_cast<ssize_t>(endIndex); ++i)
283 #else // !MSVC || Intel 284 for (
auto i = beginIndex; i < endIndex; ++i)
286 #endif // MSVC && !Intel 289 #else // CUBBYFLOW_TASKING_SERIAL 290 for (
auto i = beginIndex; i < endIndex; ++i)
294 #endif // CUBBYFLOW_TASKING_OPENMP 299 for (
auto i = beginIndex; i < endIndex; ++i)
306 template <
typename IndexType,
typename Function>
310 if (beginIndex > endIndex)
317 #if defined(CUBBYFLOW_TASKING_TBB) 319 tbb::blocked_range<IndexType>(beginIndex, endIndex),
320 [&
function](
const tbb::blocked_range<IndexType>& range) {
321 function(range.begin(), range.end());
326 const unsigned int numThreads =
327 numThreadsHint == 0u ? 8u : numThreadsHint;
330 IndexType n = endIndex - beginIndex + 1;
331 IndexType slice =
static_cast<IndexType
>(
332 std::round(n / static_cast<double>(numThreads)));
333 slice = std::max(slice, IndexType(1));
336 std::vector<CubbyFlow::Internal::future<void>> pool;
337 pool.reserve(numThreads);
338 IndexType i1 = beginIndex;
339 IndexType i2 = std::min(beginIndex + slice, endIndex);
341 for (
unsigned int i = 0; i + 1 < numThreads && i1 < endIndex; ++i)
345 i2 = std::min(i2 + slice, endIndex);
366 function(beginIndex, endIndex);
370 template <
typename IndexType,
typename Function>
372 IndexType beginIndexY, IndexType endIndexY,
376 beginIndexY, endIndexY,
378 for (IndexType i = beginIndexX; i < endIndexX; ++i)
386 template <
typename IndexType,
typename Function>
388 IndexType beginIndexY, IndexType endIndexY,
392 beginIndexY, endIndexY,
393 [&](IndexType jBegin, IndexType jEnd) {
394 function(beginIndexX, endIndexX, jBegin, jEnd);
399 template <
typename IndexType,
typename Function>
401 IndexType beginIndexY, IndexType endIndexY,
402 IndexType beginIndexZ, IndexType endIndexZ,
406 beginIndexZ, endIndexZ,
408 for (IndexType j = beginIndexY; j < endIndexY; ++j)
410 for (IndexType i = beginIndexX; i < endIndexX; ++i)
419 template <
typename IndexType,
typename Function>
421 IndexType beginIndexY, IndexType endIndexY,
422 IndexType beginIndexZ, IndexType endIndexZ,
426 beginIndexZ, endIndexZ,
427 [&](IndexType kBegin, IndexType kEnd) {
428 function(beginIndexX, endIndexX, beginIndexY, endIndexY, kBegin,
434 template <
typename IndexType,
typename Value,
typename Function,
437 const Value& identity,
const Function&
function,
440 if (beginIndex > endIndex)
447 #if defined(CUBBYFLOW_TASKING_TBB) 448 return tbb::parallel_reduce(
449 tbb::blocked_range<IndexType>(beginIndex, endIndex), identity,
450 [&
function](
const tbb::blocked_range<IndexType>& range,
452 return function(range.begin(), range.end(), init);
458 const unsigned int numThreads =
459 (numThreadsHint == 0u) ? 8u : numThreadsHint;
462 IndexType n = endIndex - beginIndex + 1;
463 IndexType slice =
static_cast<IndexType
>(
464 std::round(n / static_cast<double>(numThreads)));
465 slice = std::max(slice, IndexType(1));
468 std::vector<Value> results(numThreads, identity);
471 auto launchRange = [&](IndexType k1, IndexType k2,
unsigned int tid) {
472 results[tid] =
function(k1, k2, identity);
476 std::vector<CubbyFlow::Internal::future<void>> pool;
477 pool.reserve(numThreads);
479 IndexType i1 = beginIndex;
480 IndexType i2 = std::min(beginIndex + slice, endIndex);
481 unsigned int threadID = 0;
483 for (; threadID + 1 < numThreads && i1 < endIndex; ++threadID)
489 i2 = std::min(i2 + slice, endIndex);
495 [=]() { launchRange(i1, endIndex, threadID); }));
508 Value finalResult = identity;
509 for (
const Value& val : results)
511 finalResult = reduce(val, finalResult);
519 return function(beginIndex, endIndex, identity);
522 template <
typename RandomIterator>
528 std::less<
typename std::iterator_traits<RandomIterator>::value_type>(),
532 template <
typename RandomIterator,
typename CompareFunction>
543 #if defined(CUBBYFLOW_TASKING_HPX) 544 hpx::parallel::sort(hpx::parallel::execution::par, begin, end,
546 #elif defined(CUBBYFLOW_TASKING_TBB) 547 tbb::parallel_sort(begin, end, compareFunction);
550 size_t size =
static_cast<size_t>(end - begin);
553 typename std::iterator_traits<RandomIterator>::value_type;
554 std::vector<value_type> temp(size);
558 const unsigned int numThreads =
559 (numThreadsHint == 0u) ? 8u : numThreadsHint;
567 std::sort(begin, end, compareFunction);
unsigned int GetMaxNumberOfThreads()
Returns maximum number of threads to use.
void ParallelRangeFor(IndexType beginIndex, IndexType endIndex, const Function &function, ExecutionPolicy policy)
Makes a range-loop from beginIndex to endIndex in parallel.
Definition: Parallel-Impl.hpp:307
typename std::invoke_result_t< TASK > operator_return_t
Definition: Parallel-Impl.hpp:56
Definition: pybind11Utils.hpp:20
void ParallelFor(IndexType beginIndex, IndexType endIndex, const Function &function, ExecutionPolicy policy)
Makes a for-loop from beginIndex to endIndex in parallel.
Definition: Parallel-Impl.hpp:212
void Merge(RandomIterator a, size_t size, RandomIterator2 temp, CompareFunction compareFunction)
Definition: Parallel-Impl.hpp:107
void ParallelSort(RandomIterator begin, RandomIterator end, ExecutionPolicy policy)
Sorts a container in parallel.
Definition: Parallel-Impl.hpp:523
auto Async(TASK &&fn) -> future< operator_return_t< TASK >>
Definition: Parallel-Impl.hpp:59
constexpr size_t ZERO_SIZE
Zero size_t.
Definition: Constants.hpp:20
void ParallelMergeSort(RandomIterator a, size_t size, RandomIterator2 temp, unsigned int numThreads, CompareFunction compareFunction)
Definition: Parallel-Impl.hpp:150
ExecutionPolicy
Execution policy tag.
Definition: Parallel.hpp:17
void ParallelFill(const RandomIterator &begin, const RandomIterator &end, const T &value, ExecutionPolicy policy)
Fills from begin to end with value in parallel.
Definition: Parallel-Impl.hpp:191
std::future< Task > future
Definition: Parallel-Impl.hpp:52
Value ParallelReduce(IndexType beginIndex, IndexType endIndex, const Value &identity, const Function &function, const Reduce &reduce, ExecutionPolicy policy)
Performs reduce operation in parallel.
Definition: Parallel-Impl.hpp:436