irlba
A C++ library for IRLBA
Loading...
Searching...
No Matches
parallel.hpp
Go to the documentation of this file.
1#ifndef IRLBA_PARALLEL_HPP
2#define IRLBA_PARALLEL_HPP
3
4#include "utils.hpp"
5#include <vector>
6#include "Eigen/Dense"
7
8#ifndef IRLBA_CUSTOM_PARALLEL
9#include "subpar/subpar.hpp"
10#endif
11
18namespace irlba {
19
32template<typename Task_, class Run_>
34#ifndef IRLBA_CUSTOM_PARALLEL
35 // Use cases here don't allocate or throw, so nothrow_ = true is fine.
36 subpar::parallelize_simple<true>(num_tasks, std::move(run_task));
37#else
39#endif
40}
41
68template<
69 class ValueArray_ = std::vector<double>,
70 class IndexArray_ = std::vector<int>,
71 class PointerArray_ = std::vector<size_t>,
72 class EigenVector_ = Eigen::VectorXd
73>
75public:
81
99 my_primary_dim(column_major ? ncol : nrow),
100 my_secondary_dim(column_major ? nrow : ncol),
101 my_nthreads(nthreads),
102 my_values(std::move(x)),
103 my_indices(std::move(i)),
104 my_ptrs(std::move(p)),
105 my_column_major(column_major)
106 {
107 if (nthreads > 1) {
108 fragment_threads();
109 }
110 }
111
112public:
116 Eigen::Index rows() const {
117 if (my_column_major) {
118 return my_secondary_dim;
119 } else {
120 return my_primary_dim;
121 }
122 }
123
127 Eigen::Index cols() const {
128 if (my_column_major) {
129 return my_primary_dim;
130 } else {
131 return my_secondary_dim;
132 }
133 }
134
139 const ValueArray_& get_values() const {
140 return my_values;
141 }
142
147 const IndexArray_& get_indices() const {
148 return my_indices;
149 }
150
155 return my_ptrs;
156 }
157
158public:
162 typedef typename std::remove_const<typename std::remove_reference<decltype(std::declval<PointerArray_>()[0])>::type>::type PointerType;
163
164private:
165 Eigen::Index my_primary_dim, my_secondary_dim;
166 int my_nthreads;
167 ValueArray_ my_values;
168 IndexArray_ my_indices;
169 PointerArray_ my_ptrs;
170 bool my_column_major;
171
172 typedef typename std::remove_const<typename std::remove_reference<decltype(std::declval<IndexArray_>()[0])>::type>::type IndexType;
173
174 std::vector<size_t> my_primary_starts, my_primary_ends;
175 std::vector<IndexType> my_secondary_ends;
176 std::vector<std::vector<PointerType> > my_secondary_nonzero_starts;
177
178public:
186 const std::vector<size_t>& get_primary_starts() const {
187 return my_primary_starts;
188 }
189
196 const std::vector<size_t>& get_primary_ends() const {
197 return my_primary_ends;
198 }
199
208 const std::vector<std::vector<PointerType> >& get_secondary_nonzero_starts() const {
209 return my_secondary_nonzero_starts;
210 }
211
212private:
213
214 void fragment_threads() {
215 auto total_nzeros = my_ptrs[my_primary_dim]; // last element - not using back() to avoid an extra requirement on PointerArray.
216 PointerType per_thread = (total_nzeros / my_nthreads) + (total_nzeros % my_nthreads > 0); // i.e., ceiling.
217
218 // Splitting columns across threads so each thread processes the same number of nonzero elements.
219 my_primary_starts.resize(my_nthreads);
220 my_primary_ends.resize(my_nthreads);
221 {
222 Eigen::Index primary_counter = 0;
224 for (int t = 0; t < my_nthreads; ++t) {
225 my_primary_starts[t] = primary_counter;
226 while (primary_counter < my_primary_dim && my_ptrs[primary_counter + 1] <= sofar) {
228 }
229 my_primary_ends[t] = primary_counter;
230 sofar += per_thread;
231 }
232 }
233
234 // Splitting rows across threads so each thread processes the same number of nonzero elements.
235 my_secondary_ends.resize(my_nthreads + 1);
236 my_secondary_nonzero_starts.resize(my_nthreads + 1, std::vector<PointerType>(my_primary_dim));
237 {
238 std::vector<PointerType> secondary_nonzeros(my_secondary_dim);
239 for (PointerType i = 0; i < total_nzeros; ++i) { // don't using range for loop to avoid an extra requirement on IndexArray.
240 ++(secondary_nonzeros[my_indices[i]]);
241 }
242
243 IndexType secondary_counter = 0;
246
247 for (int t = 0; t < my_nthreads; ++t) {
248 while (secondary_counter < my_secondary_dim && cum_rows <= sofar) {
251 }
252 my_secondary_ends[t + 1] = secondary_counter;
253 sofar += per_thread;
254 }
255
256 for (Eigen::Index c = 0; c < my_primary_dim; ++c) {
257 auto primary_start = my_ptrs[c], primary_end = my_ptrs[c + 1];
258 my_secondary_nonzero_starts[0][c] = primary_start;
259
260 auto s = primary_start;
261 for (int thread = 0; thread < my_nthreads; ++thread) {
262 auto limit = my_secondary_ends[thread + 1];
263 while (s < primary_end && my_indices[s] < limit) {
264 ++s;
265 }
266 my_secondary_nonzero_starts[thread + 1][c] = s;
267 }
268 }
269 }
270 }
271
272private:
273 void indirect_multiply(const EigenVector_& rhs, std::vector<std::vector<typename EigenVector_::Scalar> >& thread_buffers, EigenVector_& output) const {
274 if (my_nthreads == 1) {
275 output.setZero();
276 for (Eigen::Index c = 0; c < my_primary_dim; ++c) {
277 auto start = my_ptrs[c];
278 auto end = my_ptrs[c + 1];
279 auto val = rhs.coeff(c);
280 for (PointerType s = start; s < end; ++s) {
281 output.coeffRef(my_indices[s]) += my_values[s] * val;
282 }
283 }
284 return;
285 }
286
287 parallelize(my_nthreads, [&](int t) -> void {
288 auto secondary_start = my_secondary_ends[t];
289 auto secondary_end = my_secondary_ends[t + 1];
291
292 // Using a separate buffer for the other threads to avoid false
293 // sharing. On first use, each buffer is allocated within each
294 // thread to give malloc a chance of using thread-specific arenas.
295 typename EigenVector_::Scalar* optr;
296 if (t != 0) {
297 auto& curbuffer = thread_buffers[t - 1];
298 curbuffer.resize(secondary_len);
299 optr = curbuffer.data();
300 } else {
301 optr = output.data() + secondary_start;
302 }
303 std::fill_n(optr, secondary_len, static_cast<typename EigenVector_::Scalar>(0));
304
305 const auto& nz_starts = my_secondary_nonzero_starts[t];
306 const auto& nz_ends = my_secondary_nonzero_starts[t + 1];
307 for (Eigen::Index c = 0; c < my_primary_dim; ++c) {
308 auto nz_start = nz_starts[c];
309 auto nz_end = nz_ends[c];
310 auto val = rhs.coeff(c);
311 for (PointerType s = nz_start; s < nz_end; ++s) {
312 optr[my_indices[s] - secondary_start] += my_values[s] * val;
313 }
314 }
315
316 if (t != 0) {
317 std::copy_n(optr, secondary_len, output.data() + secondary_start);
318 }
319 });
320
321 return;
322 }
323
324 void direct_multiply(const EigenVector_& rhs, EigenVector_& output) const {
325 if (my_nthreads == 1) {
326 for (Eigen::Index c = 0; c < my_primary_dim; ++c) {
328 }
329 return;
330 }
331
332 parallelize(my_nthreads, [&](int t) -> void {
333 auto curstart = my_primary_starts[t];
334 auto curend = my_primary_ends[t];
335 for (size_t c = curstart; c < curend; ++c) {
337 }
338 });
339
340 return;
341 }
342
343 template<typename Scalar_>
344 Scalar_ column_dot_product(size_t c, const EigenVector_& rhs) const {
345 PointerType primary_start = my_ptrs[c], primary_end = my_ptrs[c + 1];
346 Scalar_ dot = 0;
347 for (PointerType s = primary_start; s < primary_end; ++s) {
348 dot += my_values[s] * rhs.coeff(my_indices[s]);
349 }
350 return dot;
351 }
352
356public:
357 struct Workspace {
358 EigenVector_ buffer;
359 std::vector<std::vector<typename EigenVector_::Scalar> > thread_buffers;
360 };
361
362 Workspace workspace() const {
363 Workspace output;
364 if (my_nthreads > 1 && my_column_major) {
365 output.thread_buffers.resize(my_nthreads - 1);
366 }
367 return output;
368 }
369
370 struct AdjointWorkspace {
371 EigenVector_ buffer;
372 std::vector<std::vector<typename EigenVector_::Scalar> > thread_buffers;
373 };
374
375 AdjointWorkspace adjoint_workspace() const {
376 AdjointWorkspace output;
377 if (my_nthreads > 1 && !my_column_major) {
378 output.thread_buffers.resize(my_nthreads - 1);
379 }
380 return output;
381 }
382
383public:
384 template<class Right_>
385 void multiply(const Right_& rhs, Workspace& work, EigenVector_& output) const {
386 const auto& realized_rhs = internal::realize_rhs(rhs, work.buffer);
387 if (my_column_major) {
388 indirect_multiply(realized_rhs, work.thread_buffers, output);
389 } else {
390 direct_multiply(realized_rhs, output);
391 }
392 }
393
394 template<class Right_>
395 void adjoint_multiply(const Right_& rhs, AdjointWorkspace& work, EigenVector_& output) const {
396 const auto& realized_rhs = internal::realize_rhs(rhs, work.buffer);
397 if (my_column_major) {
398 direct_multiply(realized_rhs, output);
399 } else {
400 indirect_multiply(realized_rhs, work.thread_buffers, output);
401 }
402 }
403
404public:
405 template<class EigenMatrix_>
406 EigenMatrix_ realize() const {
407 auto nr = rows(), nc = cols();
409 output.setZero();
410
411 if (my_column_major) {
412 for (Eigen::Index c = 0; c < nc; ++c) {
413 PointerType col_start = my_ptrs[c], col_end = my_ptrs[c + 1];
414 for (PointerType s = col_start; s < col_end; ++s) {
415 output.coeffRef(my_indices[s], c) = my_values[s];
416 }
417 }
418 } else {
419 for (Eigen::Index r = 0; r < nr; ++r) {
420 PointerType row_start = my_ptrs[r], row_end = my_ptrs[r + 1];
421 for (PointerType s = row_start; s < row_end; ++s) {
422 output.coeffRef(r, my_indices[s]) = my_values[s];
423 }
424 }
425 }
426
427 return output;
428 }
432};
433
450#ifndef _OPENMP
451public:
453
454#else
455public:
459 EigenThreadScope([[maybe_unused]] int num_threads) : my_previous(Eigen::nbThreads()) {
460#ifdef IRLBA_CUSTOM_PARALLEL
461#ifdef IRLBA_CUSTOM_PARALLEL_USES_OPENMP
462 Eigen::setNbThreads(num_threads);
463#else
464 Eigen::setNbThreads(1);
465#endif
466#else
467#ifdef SUBPAR_USES_OPENMP_SIMPLE
468 Eigen::setNbThreads(num_threads);
469#else
470 Eigen::setNbThreads(1);
471#endif
472#endif
473 }
474
475private:
476 int my_previous;
477
478public:
480 Eigen::setNbThreads(my_previous);
481 }
482#endif
483
484public:
488 EigenThreadScope(const EigenThreadScope&) = delete;
495};
496
497}
498
499#endif
Restrict the number of available threads for Eigen.
Definition parallel.hpp:449
Sparse matrix with customizable parallelization.
Definition parallel.hpp:74
std::remove_const< typenamestd::remove_reference< decltype(std::declval< PointerArray_ >()[0])>::type ::type PointerType
Definition parallel.hpp:162
const IndexArray_ & get_indices() const
Definition parallel.hpp:147
const ValueArray_ & get_values() const
Definition parallel.hpp:139
const std::vector< size_t > & get_primary_ends() const
Definition parallel.hpp:196
Eigen::Index rows() const
Definition parallel.hpp:116
const std::vector< size_t > & get_primary_starts() const
Definition parallel.hpp:186
const PointerArray_ & get_pointers() const
Definition parallel.hpp:154
const std::vector< std::vector< PointerType > > & get_secondary_nonzero_starts() const
Definition parallel.hpp:208
ParallelSparseMatrix()
Definition parallel.hpp:80
Eigen::Index cols() const
Definition parallel.hpp:127
ParallelSparseMatrix(Eigen::Index nrow, Eigen::Index ncol, ValueArray_ x, IndexArray_ i, PointerArray_ p, bool column_major, int nthreads)
Definition parallel.hpp:98
Implements IRLBA for approximate SVD.
Definition compute.hpp:18
void parallelize(Task_ num_tasks, Run_ run_task)
Definition parallel.hpp:33
EigenMatrix_ wrapped_realize(const Matrix_ &matrix)
Definition wrappers.hpp:185