subpar
Substitutable parallelization for C++ libraries
Loading...
Searching...
No Matches
range.hpp
Go to the documentation of this file.
1#ifndef SUBPAR_RANGE_HPP
2#define SUBPAR_RANGE_HPP
3
4#include <limits>
5
6#ifndef SUBPAR_CUSTOM_PARALLELIZE_RANGE
7#include <vector>
8#include <stdexcept>
9#include <thread>
10#include <type_traits>
11#endif
12
18namespace subpar {
19
23namespace internal {
24
25template<typename Task_>
26bool ge(int num_workers, Task_ num_tasks) { // We already assume that both of them are non-negative at this point.
27 if constexpr(static_cast<size_t>(std::numeric_limits<int>::max()) > static_cast<size_t>(std::numeric_limits<Task_>::max())) {
28 return num_workers >= static_cast<int>(num_tasks);
29 } else {
30 return static_cast<Task_>(num_workers) >= num_tasks;
31 }
32}
33
34}
57template<typename Task_>
58int sanitize_num_workers(int num_workers, Task_ num_tasks) {
59 if (num_workers <= 0) {
60 return (num_tasks > 0);
61 }
62
63 if (internal::ge(num_workers, num_tasks)) {
64 return num_tasks;
65 }
66
67 return num_workers;
68}
69
118template<bool nothrow_ = false, typename Task_, class Run_>
119void parallelize_range(int num_workers, Task_ num_tasks, Run_ run_task_range) {
120#ifdef SUBPAR_CUSTOM_PARALLELIZE_RANGE
121 if constexpr(nothrow_) {
122#ifdef SUBPAR_CUSTOM_PARALLELIZE_RANGE_NOTHROW
123 SUBPAR_CUSTOM_PARALLELIZE_RANGE_NOTHROW(num_workers, num_tasks, run_task_range);
124#else
125 SUBPAR_CUSTOM_PARALLELIZE_RANGE(num_workers, num_tasks, run_task_range);
126#endif
127 } else {
128 SUBPAR_CUSTOM_PARALLELIZE_RANGE(num_workers, num_tasks, run_task_range);
129 }
130
131#else
132 if (num_tasks == 0) {
133 return;
134 }
135
136 if (num_workers <= 1 || num_tasks == 1) {
137 run_task_range(0, 0, num_tasks);
138 return;
139 }
140
141 // All workers with indices below 'remainder' get an extra task to fill up the remainder.
142 Task_ tasks_per_worker;
143 int remainder;
144 if (internal::ge(num_workers, num_tasks)) {
145 num_workers = num_tasks;
146 tasks_per_worker = 1;
147 remainder = 0;
148 } else {
149 tasks_per_worker = num_tasks / num_workers;
150 remainder = num_tasks % num_workers;
151 }
152
153 // Avoid instantiating a vector if it is known that the function can't throw.
154 typename std::conditional<nothrow_, int, std::vector<std::exception_ptr> >::type errors(num_workers);
155
156#if defined(_OPENMP) && !defined(SUBPAR_NO_OPENMP_RANGE) && !defined(SUBPAR_NO_OPENMP)
157#define SUBPAR_USES_OPENMP 1
158#define SUBPAR_USES_OPENMP_RANGE 1
159
160 // OpenMP doesn't guarantee that we'll actually start 'num_workers' workers,
161 // so we need to do a loop here to ensure that each task range is executed.
162 #pragma omp parallel for num_threads(num_workers)
163 for (int w = 0; w < num_workers; ++w) {
164 Task_ start = w * tasks_per_worker + (w < remainder ? w : remainder); // need to shift the start by the number of previous 't' that added a remainder.
165 Task_ length = tasks_per_worker + (w < remainder);
166
167 if constexpr(nothrow_) {
168 run_task_range(w, start, length);
169 } else {
170 try {
171 run_task_range(w, start, length);
172 } catch (...) {
173 errors[w] = std::current_exception();
174 }
175 }
176 }
177
178#else
179// Wiping it out, just in case.
180#undef SUBPAR_USES_OPENMP
181#undef SUBPAR_USES_OPENMP_RANGE
182
183 Task_ start = 0;
184 std::vector<std::thread> workers;
185 workers.reserve(num_workers);
186
187 for (int w = 0; w < num_workers; ++w) {
188 Task_ length = tasks_per_worker + (w < remainder);
189
190 if constexpr(nothrow_) {
191 workers.emplace_back(run_task_range, w, start, length);
192 } else {
193 workers.emplace_back([&run_task_range,&errors](int w, Task_ start, Task_ length) -> void {
194 try {
195 run_task_range(w, start, length);
196 } catch (...) {
197 errors[w] = std::current_exception();
198 }
199 }, w, start, length);
200 }
201
202 start += length;
203 }
204
205 for (auto& wrk : workers) {
206 wrk.join();
207 }
208#endif
209
210 if constexpr(!nothrow_) {
211 for (const auto& e : errors) {
212 if (e) {
213 std::rethrow_exception(e);
214 }
215 }
216 }
217#endif
218}
219
223// Back-compatibility only.
224template<typename Task_, class Run_>
225void parallelize(int num_workers, Task_ num_tasks, Run_ run_task_range) {
226 parallelize_range<false, Task_, Run_>(num_workers, num_tasks, std::move(run_task_range));
227}
232}
233
234#endif
Substitutable parallelization functions.
int sanitize_num_workers(int num_workers, Task_ num_tasks)
Adjust the number of workers to the number of tasks in parallelize_range().
Definition range.hpp:58
void parallelize_range(int num_workers, Task_ num_tasks, Run_ run_task_range)
Parallelize a range of tasks across multiple workers.
Definition range.hpp:119