Parallel-Impl.hpp
Go to the documentation of this file.
1 // This code is based on Jet framework.
2 // Copyright (c) 2018 Doyub Kim
3 // CubbyFlow is voxel-based fluid simulation engine for computer games.
4 // Copyright (c) 2020 CubbyFlow Team
5 // Core Part: Chris Ohk, Junwoo Hwang, Jihong Sin, Seungwoo Yoo
6 // AI Part: Dongheon Cho, Minseo Kim
7 // We are making my contributions/submissions to this project solely in our
8 // personal capacity and are not conveying any rights to any intellectual
9 // property of any third parties.
10 
11 #ifndef CUBBYFLOW_PARALLEL_IMPL_HPP
12 #define CUBBYFLOW_PARALLEL_IMPL_HPP
13 
14 #include <Core/Utils/Constants.hpp>
15 #include <Core/Utils/Parallel.hpp>
16 
17 #if defined(CUBBYFLOW_TASKING_HPX)
18 #include <hpx/include/future.hpp>
19 #include <hpx/include/parallel_fill.hpp>
20 #include <hpx/include/parallel_for_each.hpp>
21 #include <hpx/include/parallel_for_loop.hpp>
22 #include <hpx/include/parallel_reduce.hpp>
23 #include <hpx/include/parallel_sort.hpp>
24 #endif
25 
26 #if defined(CUBBYFLOW_TASKING_TBB)
27 #include <tbb/parallel_for.h>
28 #include <tbb/parallel_reduce.h>
29 #include <tbb/parallel_sort.h>
30 #include <tbb/task.h>
31 #elif defined(CUBBYFLOW_TASKING_CPP11THREAD)
32 #include <thread>
33 #endif
34 
35 #include <algorithm>
36 #include <cmath>
37 #include <future>
38 #include <vector>
39 
40 #undef max
41 #undef min
42 
43 namespace CubbyFlow
44 {
45 namespace Internal
46 {
47 #if defined(CUBBYFLOW_TASKING_HPX)
48 template <typename Task>
49 using future = hpx::future<Task>;
50 #else
51 template <typename Task>
52 using future = std::future<Task>;
53 #endif
54 
55 template <typename TASK>
56 using operator_return_t = typename std::invoke_result_t<TASK>;
57 
58 template <typename TASK>
59 inline auto Async(TASK&& fn) -> future<operator_return_t<TASK>>
60 {
61 #if defined(CUBBYFLOW_TASKING_HPX)
62  return hpx::async(std::forward<TASK>(fn));
63 
64 #elif defined(CUBBYFLOW_TASKING_TBB)
65  struct LocalTBBTask : public tbb::task
66  {
67  TASK func;
68 
69  LocalTBBTask(TASK&& f) : func(std::forward<TASK>(f))
70  {
71  // Do nothing
72  }
73 
74  tbb::task* execute() override
75  {
76  func();
77  return nullptr;
78  }
79  };
80 
81  using package_t = std::packaged_task<operator_return_t<TASK>()>;
82 
83  auto task = new package_t(std::forward<TASK>(fn));
84  auto* tbbNode = new (tbb::task::allocate_root()) LocalTBBTask([=]() {
85  (*task)();
86  delete task;
87  });
88 
89  tbb::task::enqueue(*tbbNode);
90  return task.get_future();
91 
92 #elif defined(CUBBYFLOW_TASKING_CPP11THREAD)
93  return std::async(std::launch::async, fn);
94 #else
95  return std::async(std::launch::deferred, fn);
96 #endif
97 }
98 
99 // Adopted from:
100 // Radenski, A.
101 // Shared Memory, Message Passing, and Hybrid Merge Sorts for Standalone and
102 // Clustered SMPs. Proc PDPTA'11, the 2011 International Conference on Parallel
103 // and Distributed Processing Techniques and Applications, CSREA Press
104 // (H. Arabnia, Ed.), 2011, pp. 367 - 373.
105 template <typename RandomIterator, typename RandomIterator2,
106  typename CompareFunction>
107 void Merge(RandomIterator a, size_t size, RandomIterator2 temp,
108  CompareFunction compareFunction)
109 {
110  size_t i1 = 0;
111  size_t i2 = size / 2;
112  size_t tempi = 0;
113 
114  while (i1 < size / 2 && i2 < size)
115  {
116  if (compareFunction(a[i1], a[i2]))
117  {
118  temp[tempi] = a[i1];
119  i1++;
120  }
121  else
122  {
123  temp[tempi] = a[i2];
124  i2++;
125  }
126 
127  tempi++;
128  }
129 
130  while (i1 < size / 2)
131  {
132  temp[tempi] = a[i1];
133  i1++;
134  tempi++;
135  }
136 
137  while (i2 < size)
138  {
139  temp[tempi] = a[i2];
140  i2++;
141  tempi++;
142  }
143 
144  // Copy sorted temp array into main array, a
145  ParallelFor(ZERO_SIZE, size, [&](size_t i) { a[i] = temp[i]; });
146 }
147 
148 template <typename RandomIterator, typename RandomIterator2,
149  typename CompareFunction>
150 void ParallelMergeSort(RandomIterator a, size_t size, RandomIterator2 temp,
151  unsigned int numThreads, CompareFunction compareFunction)
152 {
153  if (numThreads == 1)
154  {
155  std::sort(a, a + size, compareFunction);
156  }
157  else if (numThreads > 1)
158  {
159  std::vector<future<void>> pool;
160  pool.reserve(2);
161 
162  auto launchRange = [compareFunction](RandomIterator begin, size_t k2,
163  RandomIterator2 temp,
164  unsigned int numThreads) {
165  ParallelMergeSort(begin, k2, temp, numThreads, compareFunction);
166  };
167 
168  pool.emplace_back(Internal::Async(
169  [=]() { launchRange(a, size / 2, temp, numThreads / 2); }));
170 
171  pool.emplace_back(Internal::Async([=]() {
172  launchRange(a + size / 2, size - size / 2, temp + size / 2,
173  numThreads - numThreads / 2);
174  }));
175 
176  // Wait for jobs to finish
177  for (auto& f : pool)
178  {
179  if (f.valid())
180  {
181  f.wait();
182  }
183  }
184 
185  Merge(a, size, temp, compareFunction);
186  }
187 }
188 } // namespace Internal
189 
190 template <typename RandomIterator, typename T>
191 void ParallelFill(const RandomIterator& begin, const RandomIterator& end,
192  const T& value, ExecutionPolicy policy)
193 {
194  auto diff = end - begin;
195  if (diff <= 0)
196  {
197  return;
198  }
199 
200 #if defined(CUBBYFLOW_TASKING_HPX)
201  hpx::parallel::fill(hpx::parallel::execution::par, begin, end, value);
202 #else
203  size_t size = static_cast<size_t>(diff);
204  ParallelFor(
205  ZERO_SIZE, size, [begin, value](size_t i) { begin[i] = value; },
206  policy);
207 #endif
208 }
209 
210 // Adopted from http://ideone.com/Z7zldb
211 template <typename IndexType, typename Function>
212 void ParallelFor(IndexType beginIndex, IndexType endIndex,
213  const Function& function, ExecutionPolicy policy)
214 {
215  if (beginIndex > endIndex)
216  {
217  return;
218  }
219 
220  if (policy == ExecutionPolicy::Parallel)
221  {
222 #if defined(CUBBYFLOW_TASKING_TBB)
223  (void)policy;
224  tbb::parallel_for(beginIndex, endIndex, function);
225 #elif defined(CUBBYFLOW_TASKING_HPX)
226  (void)policy;
227  hpx::parallel::for_loop(hpx::parallel::execution::par, beginIndex,
228  endIndex, function);
229 #elif defined(CUBBYFLOW_TASKING_CPP11THREAD)
230  // Estimate number of threads in the pool
231  const unsigned int numThreadsHint = GetMaxNumberOfThreads();
232  const unsigned int numThreads =
233  (numThreadsHint == 0u) ? 8u : numThreadsHint;
234 
235  // Size of a slice for the range functions
236  IndexType n = endIndex - beginIndex + 1;
237  IndexType slice = static_cast<IndexType>(
238  std::round(n / static_cast<double>(numThreads)));
239  slice = std::max(slice, IndexType(1));
240 
241  // [Helper] Inner loop
242  auto launchRange = [&function](IndexType k1, IndexType k2) {
243  for (IndexType k = k1; k < k2; ++k)
244  {
245  function(k);
246  }
247  };
248 
249  // Create pool and launch jobs
250  std::vector<std::thread> pool;
251  pool.reserve(numThreads);
252  IndexType i1 = beginIndex;
253  IndexType i2 = std::min(beginIndex + slice, endIndex);
254 
255  for (unsigned int i = 0; i + 1 < numThreads && i1 < endIndex; ++i)
256  {
257  pool.emplace_back(launchRange, i1, i2);
258  i1 = i2;
259  i2 = std::min(i2 + slice, endIndex);
260  }
261 
262  if (i1 < endIndex)
263  {
264  pool.emplace_back(launchRange, i1, endIndex);
265  }
266 
267  // Wait for jobs to finish
268  for (std::thread& t : pool)
269  {
270  if (t.joinable())
271  {
272  t.join();
273  }
274  }
275 #else
276  (void)policy;
277 
278 #if defined(CUBBYFLOW_TASKING_OPENMP)
279 #pragma omp parallel for
280 #if defined(_MSC_VER) && !defined(__INTEL_COMPILER)
281  for (ssize_t i = beginIndex; i < static_cast<ssize_t>(endIndex); ++i)
282  {
283 #else // !MSVC || Intel
284  for (auto i = beginIndex; i < endIndex; ++i)
285  {
286 #endif // MSVC && !Intel
287  function(i);
288  }
289 #else // CUBBYFLOW_TASKING_SERIAL
290  for (auto i = beginIndex; i < endIndex; ++i)
291  {
292  function(i);
293  }
294 #endif // CUBBYFLOW_TASKING_OPENMP
295 #endif
296  }
297  else
298  {
299  for (auto i = beginIndex; i < endIndex; ++i)
300  {
301  function(i);
302  }
303  }
304 }
305 
306 template <typename IndexType, typename Function>
307 void ParallelRangeFor(IndexType beginIndex, IndexType endIndex,
308  const Function& function, ExecutionPolicy policy)
309 {
310  if (beginIndex > endIndex)
311  {
312  return;
313  }
314 
315  if (policy == ExecutionPolicy::Parallel)
316  {
317 #if defined(CUBBYFLOW_TASKING_TBB)
318  tbb::parallel_for(
319  tbb::blocked_range<IndexType>(beginIndex, endIndex),
320  [&function](const tbb::blocked_range<IndexType>& range) {
321  function(range.begin(), range.end());
322  });
323 #else
324  // Estimate number of threads in the pool
325  const unsigned int numThreadsHint = GetMaxNumberOfThreads();
326  const unsigned int numThreads =
327  numThreadsHint == 0u ? 8u : numThreadsHint;
328 
329  // Size of a slice for the range functions
330  IndexType n = endIndex - beginIndex + 1;
331  IndexType slice = static_cast<IndexType>(
332  std::round(n / static_cast<double>(numThreads)));
333  slice = std::max(slice, IndexType(1));
334 
335  // Create pool and launch jobs
336  std::vector<CubbyFlow::Internal::future<void>> pool;
337  pool.reserve(numThreads);
338  IndexType i1 = beginIndex;
339  IndexType i2 = std::min(beginIndex + slice, endIndex);
340 
341  for (unsigned int i = 0; i + 1 < numThreads && i1 < endIndex; ++i)
342  {
343  pool.emplace_back(Internal::Async([=]() { function(i1, i2); }));
344  i1 = i2;
345  i2 = std::min(i2 + slice, endIndex);
346  }
347 
348  if (i1 < endIndex)
349  {
350  pool.emplace_back(
351  Internal::Async([=]() { function(i1, endIndex); }));
352  }
353 
354  // Wait for jobs to finish
355  for (auto& f : pool)
356  {
357  if (f.valid())
358  {
359  f.wait();
360  }
361  }
362 #endif
363  }
364  else
365  {
366  function(beginIndex, endIndex);
367  }
368 }
369 
370 template <typename IndexType, typename Function>
371 void ParallelFor(IndexType beginIndexX, IndexType endIndexX,
372  IndexType beginIndexY, IndexType endIndexY,
373  const Function& function, ExecutionPolicy policy)
374 {
375  ParallelFor(
376  beginIndexY, endIndexY,
377  [&](IndexType j) {
378  for (IndexType i = beginIndexX; i < endIndexX; ++i)
379  {
380  function(i, j);
381  }
382  },
383  policy);
384 }
385 
386 template <typename IndexType, typename Function>
387 void ParallelRangeFor(IndexType beginIndexX, IndexType endIndexX,
388  IndexType beginIndexY, IndexType endIndexY,
389  const Function& function, ExecutionPolicy policy)
390 {
392  beginIndexY, endIndexY,
393  [&](IndexType jBegin, IndexType jEnd) {
394  function(beginIndexX, endIndexX, jBegin, jEnd);
395  },
396  policy);
397 }
398 
399 template <typename IndexType, typename Function>
400 void ParallelFor(IndexType beginIndexX, IndexType endIndexX,
401  IndexType beginIndexY, IndexType endIndexY,
402  IndexType beginIndexZ, IndexType endIndexZ,
403  const Function& function, ExecutionPolicy policy)
404 {
405  ParallelFor(
406  beginIndexZ, endIndexZ,
407  [&](IndexType k) {
408  for (IndexType j = beginIndexY; j < endIndexY; ++j)
409  {
410  for (IndexType i = beginIndexX; i < endIndexX; ++i)
411  {
412  function(i, j, k);
413  }
414  }
415  },
416  policy);
417 }
418 
419 template <typename IndexType, typename Function>
420 void ParallelRangeFor(IndexType beginIndexX, IndexType endIndexX,
421  IndexType beginIndexY, IndexType endIndexY,
422  IndexType beginIndexZ, IndexType endIndexZ,
423  const Function& function, ExecutionPolicy policy)
424 {
426  beginIndexZ, endIndexZ,
427  [&](IndexType kBegin, IndexType kEnd) {
428  function(beginIndexX, endIndexX, beginIndexY, endIndexY, kBegin,
429  kEnd);
430  },
431  policy);
432 }
433 
434 template <typename IndexType, typename Value, typename Function,
435  typename Reduce>
436 Value ParallelReduce(IndexType beginIndex, IndexType endIndex,
437  const Value& identity, const Function& function,
438  const Reduce& reduce, ExecutionPolicy policy)
439 {
440  if (beginIndex > endIndex)
441  {
442  return identity;
443  }
444 
445  if (policy == ExecutionPolicy::Parallel)
446  {
447 #if defined(CUBBYFLOW_TASKING_TBB)
448  return tbb::parallel_reduce(
449  tbb::blocked_range<IndexType>(beginIndex, endIndex), identity,
450  [&function](const tbb::blocked_range<IndexType>& range,
451  const Value& init) {
452  return function(range.begin(), range.end(), init);
453  },
454  reduce);
455 #else
456  // Estimate number of threads in the pool
457  const unsigned int numThreadsHint = GetMaxNumberOfThreads();
458  const unsigned int numThreads =
459  (numThreadsHint == 0u) ? 8u : numThreadsHint;
460 
461  // Size of a slice for the range functions
462  IndexType n = endIndex - beginIndex + 1;
463  IndexType slice = static_cast<IndexType>(
464  std::round(n / static_cast<double>(numThreads)));
465  slice = std::max(slice, IndexType(1));
466 
467  // Results
468  std::vector<Value> results(numThreads, identity);
469 
470  // [Helper] Inner loop
471  auto launchRange = [&](IndexType k1, IndexType k2, unsigned int tid) {
472  results[tid] = function(k1, k2, identity);
473  };
474 
475  // Create pool and launch jobs
476  std::vector<CubbyFlow::Internal::future<void>> pool;
477  pool.reserve(numThreads);
478 
479  IndexType i1 = beginIndex;
480  IndexType i2 = std::min(beginIndex + slice, endIndex);
481  unsigned int threadID = 0;
482 
483  for (; threadID + 1 < numThreads && i1 < endIndex; ++threadID)
484  {
485  pool.emplace_back(
486  Internal::Async([=]() { launchRange(i1, i2, threadID); }));
487 
488  i1 = i2;
489  i2 = std::min(i2 + slice, endIndex);
490  }
491 
492  if (i1 < endIndex)
493  {
494  pool.emplace_back(Internal::Async(
495  [=]() { launchRange(i1, endIndex, threadID); }));
496  }
497 
498  // Wait for jobs to finish
499  for (auto& f : pool)
500  {
501  if (f.valid())
502  {
503  f.wait();
504  }
505  }
506 
507  // Gather
508  Value finalResult = identity;
509  for (const Value& val : results)
510  {
511  finalResult = reduce(val, finalResult);
512  }
513 
514  return finalResult;
515 #endif
516  }
517 
518  (void)reduce;
519  return function(beginIndex, endIndex, identity);
520 }
521 
522 template <typename RandomIterator>
523 void ParallelSort(RandomIterator begin, RandomIterator end,
524  ExecutionPolicy policy)
525 {
526  ParallelSort(
527  begin, end,
528  std::less<typename std::iterator_traits<RandomIterator>::value_type>(),
529  policy);
530 }
531 
532 template <typename RandomIterator, typename CompareFunction>
533 void ParallelSort(RandomIterator begin, RandomIterator end,
534  CompareFunction compareFunction, ExecutionPolicy policy)
535 {
536  if (begin > end)
537  {
538  return;
539  }
540 
541  if (policy == ExecutionPolicy::Parallel)
542  {
543 #if defined(CUBBYFLOW_TASKING_HPX)
544  hpx::parallel::sort(hpx::parallel::execution::par, begin, end,
545  compareFunction);
546 #elif defined(CUBBYFLOW_TASKING_TBB)
547  tbb::parallel_sort(begin, end, compareFunction);
548 #else
549 
550  size_t size = static_cast<size_t>(end - begin);
551 
552  using value_type =
553  typename std::iterator_traits<RandomIterator>::value_type;
554  std::vector<value_type> temp(size);
555 
556  // Estimate number of threads in the pool
557  const unsigned int numThreadsHint = GetMaxNumberOfThreads();
558  const unsigned int numThreads =
559  (numThreadsHint == 0u) ? 8u : numThreadsHint;
560 
561  Internal::ParallelMergeSort(begin, size, temp.begin(), numThreads,
562  compareFunction);
563 #endif
564  }
565  else
566  {
567  std::sort(begin, end, compareFunction);
568  }
569 }
570 } // namespace CubbyFlow
571 
572 #endif
unsigned int GetMaxNumberOfThreads()
Returns maximum number of threads to use.
void ParallelRangeFor(IndexType beginIndex, IndexType endIndex, const Function &function, ExecutionPolicy policy)
Makes a range-loop from beginIndex to endIndex in parallel.
Definition: Parallel-Impl.hpp:307
typename std::invoke_result_t< TASK > operator_return_t
Definition: Parallel-Impl.hpp:56
Definition: pybind11Utils.hpp:20
void ParallelFor(IndexType beginIndex, IndexType endIndex, const Function &function, ExecutionPolicy policy)
Makes a for-loop from beginIndex to endIndex in parallel.
Definition: Parallel-Impl.hpp:212
void Merge(RandomIterator a, size_t size, RandomIterator2 temp, CompareFunction compareFunction)
Definition: Parallel-Impl.hpp:107
void ParallelSort(RandomIterator begin, RandomIterator end, ExecutionPolicy policy)
Sorts a container in parallel.
Definition: Parallel-Impl.hpp:523
auto Async(TASK &&fn) -> future< operator_return_t< TASK >>
Definition: Parallel-Impl.hpp:59
constexpr size_t ZERO_SIZE
Zero size_t.
Definition: Constants.hpp:20
void ParallelMergeSort(RandomIterator a, size_t size, RandomIterator2 temp, unsigned int numThreads, CompareFunction compareFunction)
Definition: Parallel-Impl.hpp:150
ExecutionPolicy
Execution policy tag.
Definition: Parallel.hpp:17
void ParallelFill(const RandomIterator &begin, const RandomIterator &end, const T &value, ExecutionPolicy policy)
Fills from begin to end with value in parallel.
Definition: Parallel-Impl.hpp:191
std::future< Task > future
Definition: Parallel-Impl.hpp:52
Value ParallelReduce(IndexType beginIndex, IndexType endIndex, const Value &identity, const Function &function, const Reduce &reduce, ExecutionPolicy policy)
Performs reduce operation in parallel.
Definition: Parallel-Impl.hpp:436