subpar
Substitutable parallelization for C++ libraries
Loading...
Searching...
No Matches
range.hpp
Go to the documentation of this file.
1#ifndef SUBPAR_RANGE_HPP
2#define SUBPAR_RANGE_HPP
3
4#include <limits>
5
6#ifndef SUBPAR_CUSTOM_PARALLELIZE_RANGE
7#include <vector>
8#include <stdexcept>
9#include <thread>
10#include <type_traits>
11#endif
12
18namespace subpar {
19
23namespace internal {
24
25template<typename Task_>
26bool ge(int num_workers, Task_ num_tasks) { // We already assume that both of them are non-negative at this point.
27 if constexpr(static_cast<size_t>(std::numeric_limits<int>::max()) > static_cast<size_t>(std::numeric_limits<Task_>::max())) {
28 return num_workers >= static_cast<int>(num_tasks);
29 } else {
30 return static_cast<Task_>(num_workers) >= num_tasks;
31 }
32}
33
34}
57template<typename Task_>
58int sanitize_num_workers(int num_workers, Task_ num_tasks) {
59 if (num_workers <= 0) {
60 return 1;
61 }
62
63 if (internal::ge(num_workers, num_tasks)) {
64 return num_tasks;
65 }
66
67 return num_workers;
68}
69
125template<bool nothrow_ = false, typename Task_, class Run_>
126void parallelize_range(int num_workers, Task_ num_tasks, Run_ run_task_range) {
127#ifdef SUBPAR_CUSTOM_PARALLELIZE_RANGE
128 if constexpr(nothrow_) {
129#ifdef SUBPAR_CUSTOM_PARALLELIZE_RANGE_NOTHROW
130 SUBPAR_CUSTOM_PARALLELIZE_RANGE_NOTHROW(num_workers, num_tasks, run_task_range);
131#else
132 SUBPAR_CUSTOM_PARALLELIZE_RANGE(num_workers, num_tasks, run_task_range);
133#endif
134 } else {
135 SUBPAR_CUSTOM_PARALLELIZE_RANGE(num_workers, num_tasks, run_task_range);
136 }
137
138#else
139 if (num_tasks == 0) {
140 return;
141 }
142
143 if (num_workers <= 1 || num_tasks == 1) {
144 run_task_range(0, 0, num_tasks);
145 return;
146 }
147
148 // All workers with indices below 'remainder' get an extra task to fill up the remainder.
149 Task_ tasks_per_worker;
150 int remainder;
151 if (internal::ge(num_workers, num_tasks)) {
152 num_workers = num_tasks;
153 tasks_per_worker = 1;
154 remainder = 0;
155 } else {
156 tasks_per_worker = num_tasks / num_workers;
157 remainder = num_tasks % num_workers;
158 }
159
160 // Avoid instantiating a vector if it is known that the function can't throw.
161 typename std::conditional<nothrow_, int, std::vector<std::exception_ptr> >::type errors(num_workers);
162
163#if defined(_OPENMP) && !defined(SUBPAR_NO_OPENMP_RANGE) && !defined(SUBPAR_NO_OPENMP)
164#define SUBPAR_USES_OPENMP 1
165#define SUBPAR_USES_OPENMP_RANGE 1
166
167 // OpenMP doesn't guarantee that we'll actually start 'num_workers' workers,
168 // so we need to do a loop here to ensure that each task range is executed.
169 #pragma omp parallel for num_threads(num_workers)
170 for (int w = 0; w < num_workers; ++w) {
171 Task_ start = w * tasks_per_worker + (w < remainder ? w : remainder); // need to shift the start by the number of previous 't' that added a remainder.
172 Task_ length = tasks_per_worker + (w < remainder);
173
174 if constexpr(nothrow_) {
175 run_task_range(w, start, length);
176 } else {
177 try {
178 run_task_range(w, start, length);
179 } catch (...) {
180 errors[w] = std::current_exception();
181 }
182 }
183 }
184
185#else
186// Wiping it out, just in case.
187#undef SUBPAR_USES_OPENMP
188#undef SUBPAR_USES_OPENMP_RANGE
189
190 Task_ start = 0;
191 std::vector<std::thread> workers;
192 workers.reserve(num_workers);
193
194 for (int w = 0; w < num_workers; ++w) {
195 Task_ length = tasks_per_worker + (w < remainder);
196
197 if constexpr(nothrow_) {
198 workers.emplace_back(run_task_range, w, start, length);
199 } else {
200 workers.emplace_back([&run_task_range,&errors](int w, Task_ start, Task_ length) -> void {
201 try {
202 run_task_range(w, start, length);
203 } catch (...) {
204 errors[w] = std::current_exception();
205 }
206 }, w, start, length);
207 }
208
209 start += length;
210 }
211
212 for (auto& wrk : workers) {
213 wrk.join();
214 }
215#endif
216
217 if constexpr(!nothrow_) {
218 for (const auto& e : errors) {
219 if (e) {
220 std::rethrow_exception(e);
221 }
222 }
223 }
224#endif
225}
226
230// Back-compatibility only.
231template<typename Task_, class Run_>
232void parallelize(int num_workers, Task_ num_tasks, Run_ run_task_range) {
234}
239}
240
241#endif
Substitutable parallelization functions.
int sanitize_num_workers(int num_workers, Task_ num_tasks)
Adjust the number of workers to the number of tasks in parallelize_range().
Definition range.hpp:58
void parallelize_range(int num_workers, Task_ num_tasks, Run_ run_task_range)
Parallelize a range of tasks across multiple workers.
Definition range.hpp:126