subpar
Substitutable parallelization for C++ libraries
Loading...
Searching...
No Matches
range.hpp
Go to the documentation of this file.
1#ifndef SUBPAR_RANGE_HPP
2#define SUBPAR_RANGE_HPP
3
4#include <limits>
5#include <type_traits>
6
7#ifndef SUBPAR_CUSTOM_PARALLELIZE_RANGE
8#include <vector>
9#include <stdexcept>
10#include <thread>
11#endif
12
13#include "sanisizer/sanisizer.hpp"
14
20namespace subpar {
21
40template<typename Task_>
41int sanitize_num_workers(const int num_workers, const Task_ num_tasks) {
42 if (num_workers <= 0) {
43 return num_tasks > 0;
44 } else {
45 return sanisizer::min(num_workers, num_tasks);
46 }
47}
48
97template<bool nothrow_ = false, typename Task_, class Run_>
98void parallelize_range(int num_workers, const Task_ num_tasks, const Run_ run_task_range) {
99#ifdef SUBPAR_CUSTOM_PARALLELIZE_RANGE
100 if constexpr(nothrow_) {
101#ifdef SUBPAR_CUSTOM_PARALLELIZE_RANGE_NOTHROW
102 SUBPAR_CUSTOM_PARALLELIZE_RANGE_NOTHROW(num_workers, num_tasks, run_task_range);
103#else
104 SUBPAR_CUSTOM_PARALLELIZE_RANGE(num_workers, num_tasks, run_task_range);
105#endif
106 } else {
107 SUBPAR_CUSTOM_PARALLELIZE_RANGE(num_workers, num_tasks, run_task_range);
108 }
109
110#else
111 if (num_tasks == 0) {
112 return;
113 }
114
115 if (num_workers <= 1 || num_tasks == 1) {
116 run_task_range(0, 0, num_tasks);
117 return;
118 }
119
120 // All workers with indices below 'remainder' get an extra task to fill up the remainder.
121 Task_ tasks_per_worker = 1;
122 int remainder = 0;
123 if (sanisizer::is_greater_than_or_equal(num_workers, num_tasks)) {
124 num_workers = num_tasks;
125 } else {
126 tasks_per_worker = num_tasks / num_workers;
127 remainder = num_tasks % num_workers;
128 }
129
130 // Avoid instantiating a vector if it is known that the function can't throw.
131 auto errors = [&]{
132 if constexpr(nothrow_) {
133 return true;
134 } else {
135 return sanisizer::create<std::vector<std::exception_ptr> >(num_workers);
136 }
137 }();
138
139#if defined(_OPENMP) && !defined(SUBPAR_NO_OPENMP_RANGE) && !defined(SUBPAR_NO_OPENMP)
140#define SUBPAR_USES_OPENMP 1
141#define SUBPAR_USES_OPENMP_RANGE 1
142
143 // OpenMP doesn't guarantee that we'll actually start the specified number of workers,
144 // so we need to do a loop here to ensure that each task range is executed.
145 #pragma omp parallel for num_threads(num_workers)
146 for (int w = 0; w < num_workers; ++w) {
147 const Task_ start = w * tasks_per_worker + (w < remainder ? w : remainder); // need to shift the start by the number of previous 'w' that added a remainder.
148 const Task_ length = tasks_per_worker + (w < remainder);
149
150 if constexpr(nothrow_) {
151 run_task_range(w, start, length);
152 } else {
153 try {
154 run_task_range(w, start, length);
155 } catch (...) {
156 errors[w] = std::current_exception();
157 }
158 }
159 }
160
161#else
162// Wiping it out, just in case.
163#undef SUBPAR_USES_OPENMP
164#undef SUBPAR_USES_OPENMP_RANGE
165
166 Task_ start = 0;
167 std::vector<std::thread> workers;
168 workers.reserve(sanisizer::cast<decltype(workers.size())>(num_workers)); // preallocate to ensure we don't get alloc errors during emplace_back().
169
170 for (int w = 0; w < num_workers; ++w) {
171 const Task_ length = tasks_per_worker + (w < remainder);
172
173 if constexpr(nothrow_) {
174 workers.emplace_back(run_task_range, w, start, length);
175 } else {
176 workers.emplace_back([&run_task_range,&errors](int w, Task_ start, Task_ length) -> void {
177 try {
178 run_task_range(w, start, length);
179 } catch (...) {
180 errors[w] = std::current_exception();
181 }
182 }, w, start, length);
183 }
184
185 start += length;
186 }
187
188 for (auto& wrk : workers) {
189 wrk.join();
190 }
191#endif
192
193 if constexpr(!nothrow_) {
194 for (const auto& e : errors) {
195 if (e) {
196 std::rethrow_exception(e);
197 }
198 }
199 }
200#endif
201}
202
206// Back-compatibility only.
207template<typename Task_, class Run_>
208void parallelize(int num_workers, Task_ num_tasks, Run_ run_task_range) {
209 parallelize_range<false, Task_, Run_>(num_workers, num_tasks, std::move(run_task_range));
210}
215}
216
217#endif
Substitutable parallelization functions.
void parallelize_range(int num_workers, const Task_ num_tasks, const Run_ run_task_range)
Parallelize a range of tasks across multiple workers.
Definition range.hpp:98
int sanitize_num_workers(const int num_workers, const Task_ num_tasks)
Adjust the number of workers to the number of tasks in parallelize_range().
Definition range.hpp:41