subpar
Substitutable parallelization for C++ libraries
Loading...
Searching...
No Matches
range.hpp
Go to the documentation of this file.
1#ifndef SUBPAR_RANGE_HPP
2#define SUBPAR_RANGE_HPP
3
4#include <limits>
5#include <type_traits>
6
7#ifndef SUBPAR_CUSTOM_PARALLELIZE_RANGE
8#include <vector>
9#include <stdexcept>
10#include <thread>
11#endif
12
13#include "sanisizer/sanisizer.hpp"
14
20namespace subpar {
21
39template<typename Task_>
40int sanitize_num_workers(const int num_workers, const Task_ num_tasks) {
41 // This code mirrors the return logic in the default parallelize_range(), but would be an upper bound even with a custom SUBPAR_CUSTOM_PARALLELIZE_RANGE.
42 // Remember that run_task_range must be called with a non-empty range so if num_tasks = 0, there is no choice but to not perform any calls.
43 // Similarly, we can't perform more than one call if num_tasks = 1, and we can't perform more calls than there are workers.
44
45 if (num_tasks <= 0) {
46 return 0;
47 }
48
49 if (num_workers <= 1 || num_tasks == 1) {
50 return 1;
51 }
52
53 return sanisizer::min(num_workers, num_tasks);
54}
55
123template<bool nothrow_ = false, typename Task_, class Run_>
124int parallelize_range(int num_workers, const Task_ num_tasks, const Run_ run_task_range) {
125#ifdef SUBPAR_CUSTOM_PARALLELIZE_RANGE
126 if constexpr(nothrow_) {
127#ifdef SUBPAR_CUSTOM_PARALLELIZE_RANGE_NOTHROW
128 return SUBPAR_CUSTOM_PARALLELIZE_RANGE_NOTHROW(num_workers, num_tasks, run_task_range);
129#else
130 return SUBPAR_CUSTOM_PARALLELIZE_RANGE(num_workers, num_tasks, run_task_range);
131#endif
132 } else {
133 return SUBPAR_CUSTOM_PARALLELIZE_RANGE(num_workers, num_tasks, run_task_range);
134 }
135
136#else
137 if (num_tasks <= 0) {
138 return 0;
139 }
140
141 if (num_workers <= 1 || num_tasks == 1) {
142 run_task_range(0, 0, num_tasks);
143 return 1;
144 }
145
146 // All workers with indices below 'remainder' get an extra task to fill up the remainder.
147 Task_ tasks_per_worker = 1;
148 int remainder = 0;
149 if (sanisizer::is_greater_than_or_equal(num_workers, num_tasks)) {
150 num_workers = num_tasks;
151 } else {
152 tasks_per_worker = num_tasks / num_workers;
153 remainder = num_tasks % num_workers;
154 }
155
156 const auto get_start = [&tasks_per_worker,&remainder](const int w) -> Task_ {
157 // Need to shift the start by the number of previous 'w' that added a remainder.
158 return w * tasks_per_worker + (w < remainder ? w : remainder);
159 };
160
161 const auto get_length = [&tasks_per_worker,&remainder](const int w) -> Task_ {
162 return tasks_per_worker + (w < remainder);
163 };
164
165 // Avoid instantiating a vector if it is known that the function can't throw.
166 auto errors = [&]{
167 if constexpr(nothrow_) {
168 return true;
169 } else {
170 return sanisizer::create<std::vector<std::exception_ptr> >(num_workers);
171 }
172 }();
173
174#if defined(_OPENMP) && !defined(SUBPAR_NO_OPENMP_RANGE) && !defined(SUBPAR_NO_OPENMP)
175#define SUBPAR_USES_OPENMP 1
176#define SUBPAR_USES_OPENMP_RANGE 1
177
178 // OpenMP doesn't guarantee that we'll actually start the specified number of workers,
179 // so we need to do a loop here to ensure that each task range is executed.
180 #pragma omp parallel for num_threads(num_workers)
181 for (int w = 0; w < num_workers; ++w) {
182 const Task_ start = get_start(w);
183 const Task_ length = get_length(w);
184
185 if constexpr(nothrow_) {
186 run_task_range(w, start, length);
187 } else {
188 try {
189 run_task_range(w, start, length);
190 } catch (...) {
191 errors[w] = std::current_exception();
192 }
193 }
194 }
195
196#else
197// Wiping it out, just in case.
198#undef SUBPAR_USES_OPENMP
199#undef SUBPAR_USES_OPENMP_RANGE
200
201 // We run the first job on the current thread, to avoid having to spin up an unnecessary worker.
202 std::vector<std::thread> workers;
203 sanisizer::reserve(workers, num_workers - 1); // preallocate to ensure we don't get alloc errors during emplace_back().
204
205 for (int w = 1; w < num_workers; ++w) {
206 const Task_ start = get_start(w);
207 const Task_ length = get_length(w);
208
209 if constexpr(nothrow_) {
210 workers.emplace_back(run_task_range, w, start, length);
211 } else {
212 workers.emplace_back([&run_task_range,&errors](int w, Task_ start, Task_ length) -> void {
213 try {
214 run_task_range(w, start, length);
215 } catch (...) {
216 errors[w] = std::current_exception();
217 }
218 }, w, start, length);
219 }
220 }
221
222 {
223 const Task_ start = get_start(0);
224 const Task_ length = get_length(0);
225
226 if constexpr(nothrow_) {
227 run_task_range(0, start, length);
228 } else {
229 try {
230 run_task_range(0, start, length);
231 } catch (...) {
232 errors[0] = std::current_exception();
233 }
234 }
235 }
236
237 for (auto& wrk : workers) {
238 wrk.join();
239 }
240#endif
241
242 if constexpr(!nothrow_) {
243 for (const auto& e : errors) {
244 if (e) {
245 std::rethrow_exception(e);
246 }
247 }
248 }
249#endif
250
251 return num_workers;
252}
253
257// Back-compatibility only.
258template<typename Task_, class Run_>
259void parallelize(int num_workers, Task_ num_tasks, Run_ run_task_range) {
260 parallelize_range<false, Task_, Run_>(num_workers, num_tasks, std::move(run_task_range));
261}
266}
267
268#endif
Substitutable parallelization functions.
int sanitize_num_workers(const int num_workers, const Task_ num_tasks)
Adjust the number of workers to the number of tasks in parallelize_range().
Definition range.hpp:40
int parallelize_range(int num_workers, const Task_ num_tasks, const Run_ run_task_range)
Parallelize a range of tasks across multiple workers.
Definition range.hpp:124