subpar
Substitutable parallelization for C++ libraries
Loading...
Searching...
No Matches
range.hpp
Go to the documentation of this file.
1#ifndef SUBPAR_RANGE_HPP
2#define SUBPAR_RANGE_HPP
3
4#include <limits>
5#include <type_traits>
6
7#ifndef SUBPAR_CUSTOM_PARALLELIZE_RANGE
8#include <vector>
9#include <stdexcept>
10#include <thread>
11#endif
12
18namespace subpar {
19
23namespace internal {
24
25template<typename Task_>
26bool ge(int num_workers, Task_ num_tasks) { // We already assume that both of them are non-negative at this point.
27 return static_cast<typename std::make_unsigned<int>::type>(num_workers) >= static_cast<typename std::make_unsigned<Task_>::type>(num_tasks);
28}
29
30template<bool nothrow_, typename NumWorkers_>
31auto create_error_vector(NumWorkers_ num_workers) {
32 if constexpr(nothrow_) {
33 return 0; // Avoid instantiating a vector if it is known that the function can't throw.
34 } else {
35 typedef std::vector<std::exception_ptr> Output;
36 if (static_cast<typename std::make_unsigned<NumWorkers_>::type>(num_workers) > std::numeric_limits<typename Output::size_type>::max()) {
37 throw std::runtime_error("cannot allocate the 'errors' vector");
38 }
39 return Output(num_workers);
40 }
41}
42
43}
66template<typename Task_>
67int sanitize_num_workers(int num_workers, Task_ num_tasks) {
68 if (num_workers <= 0) {
69 return (num_tasks > 0);
70 }
71
72 if (internal::ge(num_workers, num_tasks)) {
73 return num_tasks;
74 }
75
76 return num_workers;
77}
78
127template<bool nothrow_ = false, typename Task_, class Run_>
128void parallelize_range(int num_workers, Task_ num_tasks, Run_ run_task_range) {
129#ifdef SUBPAR_CUSTOM_PARALLELIZE_RANGE
130 if constexpr(nothrow_) {
131#ifdef SUBPAR_CUSTOM_PARALLELIZE_RANGE_NOTHROW
132 SUBPAR_CUSTOM_PARALLELIZE_RANGE_NOTHROW(num_workers, num_tasks, run_task_range);
133#else
134 SUBPAR_CUSTOM_PARALLELIZE_RANGE(num_workers, num_tasks, run_task_range);
135#endif
136 } else {
137 SUBPAR_CUSTOM_PARALLELIZE_RANGE(num_workers, num_tasks, run_task_range);
138 }
139
140#else
141 if (num_tasks == 0) {
142 return;
143 }
144
145 if (num_workers <= 1 || num_tasks == 1) {
146 run_task_range(0, 0, num_tasks);
147 return;
148 }
149
150 // All workers with indices below 'remainder' get an extra task to fill up the remainder.
151 Task_ tasks_per_worker;
152 int remainder;
153 if (internal::ge(num_workers, num_tasks)) {
154 num_workers = num_tasks;
155 tasks_per_worker = 1;
156 remainder = 0;
157 } else {
158 tasks_per_worker = num_tasks / num_workers;
159 remainder = num_tasks % num_workers;
160 }
161
162 auto errors = internal::create_error_vector<nothrow_>(num_workers);
163
164#if defined(_OPENMP) && !defined(SUBPAR_NO_OPENMP_RANGE) && !defined(SUBPAR_NO_OPENMP)
165#define SUBPAR_USES_OPENMP 1
166#define SUBPAR_USES_OPENMP_RANGE 1
167
168 // OpenMP doesn't guarantee that we'll actually start 'num_workers' workers,
169 // so we need to do a loop here to ensure that each task range is executed.
170 #pragma omp parallel for num_threads(num_workers)
171 for (int w = 0; w < num_workers; ++w) {
172 Task_ start = w * tasks_per_worker + (w < remainder ? w : remainder); // need to shift the start by the number of previous 't' that added a remainder.
173 Task_ length = tasks_per_worker + (w < remainder);
174
175 if constexpr(nothrow_) {
176 run_task_range(w, start, length);
177 } else {
178 try {
179 run_task_range(w, start, length);
180 } catch (...) {
181 errors[w] = std::current_exception();
182 }
183 }
184 }
185
186#else
187// Wiping it out, just in case.
188#undef SUBPAR_USES_OPENMP
189#undef SUBPAR_USES_OPENMP_RANGE
190
191 Task_ start = 0;
192 std::vector<std::thread> workers;
193 workers.reserve(num_workers);
194
195 for (int w = 0; w < num_workers; ++w) {
196 Task_ length = tasks_per_worker + (w < remainder);
197
198 if constexpr(nothrow_) {
199 workers.emplace_back(run_task_range, w, start, length);
200 } else {
201 workers.emplace_back([&run_task_range,&errors](int w, Task_ start, Task_ length) -> void {
202 try {
203 run_task_range(w, start, length);
204 } catch (...) {
205 errors[w] = std::current_exception();
206 }
207 }, w, start, length);
208 }
209
210 start += length;
211 }
212
213 for (auto& wrk : workers) {
214 wrk.join();
215 }
216#endif
217
218 if constexpr(!nothrow_) {
219 for (const auto& e : errors) {
220 if (e) {
221 std::rethrow_exception(e);
222 }
223 }
224 }
225#endif
226}
227
231// Back-compatibility only.
232template<typename Task_, class Run_>
233void parallelize(int num_workers, Task_ num_tasks, Run_ run_task_range) {
234 parallelize_range<false, Task_, Run_>(num_workers, num_tasks, std::move(run_task_range));
235}
240}
241
242#endif
Substitutable parallelization functions.
int sanitize_num_workers(int num_workers, Task_ num_tasks)
Adjust the number of workers to the number of tasks in parallelize_range().
Definition range.hpp:67
void parallelize_range(int num_workers, Task_ num_tasks, Run_ run_task_range)
Parallelize a range of tasks across multiple workers.
Definition range.hpp:128