irlba
A C++ library for IRLBA
Loading...
Searching...
No Matches
parallel.hpp
Go to the documentation of this file.
1#ifndef IRLBA_PARALLEL_HPP
2#define IRLBA_PARALLEL_HPP
3
4#include "utils.hpp"
5#include <vector>
6#include "Eigen/Dense"
7
8#ifndef IRLBA_CUSTOM_PARALLEL
9#include "subpar/subpar.hpp"
10#endif
11
18namespace irlba {
19
32template<typename Task_, class Run_>
34#ifndef IRLBA_CUSTOM_PARALLEL
35 // Use cases here don't allocate or throw, so nothrow_ = true is fine.
36 subpar::parallelize_simple<true>(num_tasks, std::move(run_task));
37#else
39#endif
40}
41
68template<
69 class ValueArray_ = std::vector<double>,
70 class IndexArray_ = std::vector<int>,
71 class PointerArray_ = std::vector<size_t>,
72 class EigenVector_ = Eigen::VectorXd
73>
75public:
81
99 my_primary_dim(column_major ? ncol : nrow),
100 my_secondary_dim(column_major ? nrow : ncol),
101 my_nthreads(nthreads),
102 my_values(std::move(x)),
103 my_indices(std::move(i)),
104 my_ptrs(std::move(p)),
105 my_column_major(column_major)
106 {
107 if (nthreads > 1) {
108 fragment_threads();
109 }
110 }
111
112public:
116 Eigen::Index rows() const {
117 if (my_column_major) {
118 return my_secondary_dim;
119 } else {
120 return my_primary_dim;
121 }
122 }
123
127 Eigen::Index cols() const {
128 if (my_column_major) {
129 return my_primary_dim;
130 } else {
131 return my_secondary_dim;
132 }
133 }
134
139 const ValueArray_& get_values() const {
140 return my_values;
141 }
142
147 const IndexArray_& get_indices() const {
148 return my_indices;
149 }
150
155 return my_ptrs;
156 }
157
158public:
162 typedef typename std::remove_const<typename std::remove_reference<decltype(std::declval<PointerArray_>()[0])>::type>::type PointerType;
163
164private:
165 Eigen::Index my_primary_dim, my_secondary_dim;
166 int my_nthreads;
167 ValueArray_ my_values;
168 IndexArray_ my_indices;
169 PointerArray_ my_ptrs;
170 bool my_column_major;
171
172 typedef typename std::remove_const<typename std::remove_reference<decltype(std::declval<IndexArray_>()[0])>::type>::type IndexType;
173
174 std::vector<size_t> my_primary_starts, my_primary_ends;
175 std::vector<std::vector<PointerType> > my_secondary_nonzero_starts;
176
177public:
185 const std::vector<size_t>& get_primary_starts() const {
186 return my_primary_starts;
187 }
188
195 const std::vector<size_t>& get_primary_ends() const {
196 return my_primary_ends;
197 }
198
207 const std::vector<std::vector<PointerType> >& get_secondary_nonzero_starts() const {
208 return my_secondary_nonzero_starts;
209 }
210
211private:
212
213 void fragment_threads() {
214 auto total_nzeros = my_ptrs[my_primary_dim]; // last element - not using back() to avoid an extra requirement on PointerArray.
215 PointerType per_thread = (total_nzeros / my_nthreads) + (total_nzeros % my_nthreads > 0); // i.e., ceiling.
216
217 // Splitting columns across threads so each thread processes the same number of nonzero elements.
218 my_primary_starts.resize(my_nthreads);
219 my_primary_ends.resize(my_nthreads);
220 {
221 Eigen::Index primary_counter = 0;
223 for (int t = 0; t < my_nthreads; ++t) {
224 my_primary_starts[t] = primary_counter;
225 while (primary_counter < my_primary_dim && my_ptrs[primary_counter + 1] <= sofar) {
227 }
228 my_primary_ends[t] = primary_counter;
229 sofar += per_thread;
230 }
231 }
232
233 // Splitting rows across threads so each thread processes the same number of nonzero elements.
234 my_secondary_nonzero_starts.resize(my_nthreads + 1, std::vector<PointerType>(my_primary_dim));
235 {
236 std::vector<PointerType> secondary_nonzeros(my_secondary_dim);
237 for (PointerType i = 0; i < total_nzeros; ++i) { // don't using range for loop to avoid an extra requirement on IndexArray.
238 ++(secondary_nonzeros[my_indices[i]]);
239 }
240
241 std::vector<IndexType> secondary_ends(my_nthreads);
242 IndexType secondary_counter = 0;
245
246 for (int t = 0; t < my_nthreads; ++t) {
247 while (secondary_counter < my_secondary_dim && cum_rows <= sofar) {
250 }
252 sofar += per_thread;
253 }
254
255 for (Eigen::Index c = 0; c < my_primary_dim; ++c) {
256 auto primary_start = my_ptrs[c], primary_end = my_ptrs[c + 1];
257 my_secondary_nonzero_starts[0][c] = primary_start;
258
259 auto s = primary_start;
260 for (int thread = 0; thread < my_nthreads; ++thread) {
261 while (s < primary_end && my_indices[s] < secondary_ends[thread]) {
262 ++s;
263 }
264 my_secondary_nonzero_starts[thread + 1][c] = s;
265 }
266 }
267 }
268 }
269
270private:
271 void indirect_multiply(const EigenVector_& rhs, EigenVector_& output) const {
272 output.setZero();
273
274 if (my_nthreads == 1) {
275 for (Eigen::Index c = 0; c < my_primary_dim; ++c) {
276 auto start = my_ptrs[c];
277 auto end = my_ptrs[c + 1];
278 auto val = rhs.coeff(c);
279 for (PointerType s = start; s < end; ++s) {
280 output.coeffRef(my_indices[s]) += my_values[s] * val;
281 }
282 }
283 return;
284 }
285
286 parallelize(my_nthreads, [&](int t) -> void {
287 const auto& starts = my_secondary_nonzero_starts[t];
288 const auto& ends = my_secondary_nonzero_starts[t + 1];
289 for (Eigen::Index c = 0; c < my_primary_dim; ++c) {
290 auto start = starts[c];
291 auto end = ends[c];
292 auto val = rhs.coeff(c);
293 for (PointerType s = start; s < end; ++s) {
294 output.coeffRef(my_indices[s]) += my_values[s] * val;
295 }
296 }
297 });
298
299 return;
300 }
301
302 void direct_multiply(const EigenVector_& rhs, EigenVector_& output) const {
303 if (my_nthreads == 1) {
304 for (Eigen::Index c = 0; c < my_primary_dim; ++c) {
306 }
307 return;
308 }
309
310 parallelize(my_nthreads, [&](int t) -> void {
311 auto curstart = my_primary_starts[t];
312 auto curend = my_primary_ends[t];
313 for (size_t c = curstart; c < curend; ++c) {
315 }
316 });
317
318 return;
319 }
320
321 template<typename Scalar_>
322 Scalar_ column_dot_product(size_t c, const EigenVector_& rhs) const {
323 PointerType primary_start = my_ptrs[c], primary_end = my_ptrs[c + 1];
324 Scalar_ dot = 0;
325 for (PointerType s = primary_start; s < primary_end; ++s) {
326 dot += my_values[s] * rhs.coeff(my_indices[s]);
327 }
328 return dot;
329 }
330
334 // All MockMatrix interface methods, we can ignore this.
335public:
336 struct Workspace {
337 EigenVector_ buffer;
338 };
339
340 Workspace workspace() const {
341 return Workspace();
342 }
343
344 struct AdjointWorkspace {
345 EigenVector_ buffer;
346 };
347
348 AdjointWorkspace adjoint_workspace() const {
349 return AdjointWorkspace();
350 }
351
352public:
353 template<class Right_>
354 void multiply(const Right_& rhs, Workspace& work, EigenVector_& output) const {
355 const auto& realized_rhs = internal::realize_rhs(rhs, work.buffer);
356 if (my_column_major) {
357 indirect_multiply(realized_rhs, output);
358 } else {
359 direct_multiply(realized_rhs, output);
360 }
361 }
362
363 template<class Right_>
364 void adjoint_multiply(const Right_& rhs, AdjointWorkspace& work, EigenVector_& output) const {
365 const auto& realized_rhs = internal::realize_rhs(rhs, work.buffer);
366 if (my_column_major) {
367 direct_multiply(realized_rhs, output);
368 } else {
369 indirect_multiply(realized_rhs, output);
370 }
371 }
372
373public:
374 template<class EigenMatrix_>
375 EigenMatrix_ realize() const {
376 auto nr = rows(), nc = cols();
378 output.setZero();
379
380 if (my_column_major) {
381 for (Eigen::Index c = 0; c < nc; ++c) {
382 PointerType col_start = my_ptrs[c], col_end = my_ptrs[c + 1];
383 for (PointerType s = col_start; s < col_end; ++s) {
384 output.coeffRef(my_indices[s], c) = my_values[s];
385 }
386 }
387 } else {
388 for (Eigen::Index r = 0; r < nr; ++r) {
389 PointerType row_start = my_ptrs[r], row_end = my_ptrs[r + 1];
390 for (PointerType s = row_start; s < row_end; ++s) {
391 output.coeffRef(r, my_indices[s]) = my_values[s];
392 }
393 }
394 }
395
396 return output;
397 }
401};
402
419#ifndef _OPENMP
420public:
422
423#else
424public:
428 EigenThreadScope([[maybe_unused]] int num_threads) : my_previous(Eigen::nbThreads()) {
429#ifdef IRLBA_CUSTOM_PARALLEL
430#ifdef IRLBA_CUSTOM_PARALLEL_USES_OPENMP
431 Eigen::setNbThreads(num_threads);
432#else
433 Eigen::setNbThreads(1);
434#endif
435#else
436#ifdef SUBPAR_USES_OPENMP_SIMPLE
437 Eigen::setNbThreads(num_threads);
438#else
439 Eigen::setNbThreads(1);
440#endif
441#endif
442 }
443
444private:
445 int my_previous;
446
447public:
449 Eigen::setNbThreads(my_previous);
450 }
451#endif
452
453public:
457 EigenThreadScope(const EigenThreadScope&) = delete;
464};
465
466}
467
468#endif
Restrict the number of available threads for Eigen.
Definition parallel.hpp:418
Sparse matrix with customizable parallelization.
Definition parallel.hpp:74
std::remove_const< typenamestd::remove_reference< decltype(std::declval< PointerArray_ >()[0])>::type ::type PointerType
Definition parallel.hpp:162
const IndexArray_ & get_indices() const
Definition parallel.hpp:147
const ValueArray_ & get_values() const
Definition parallel.hpp:139
const std::vector< size_t > & get_primary_ends() const
Definition parallel.hpp:195
Eigen::Index rows() const
Definition parallel.hpp:116
const std::vector< size_t > & get_primary_starts() const
Definition parallel.hpp:185
const PointerArray_ & get_pointers() const
Definition parallel.hpp:154
const std::vector< std::vector< PointerType > > & get_secondary_nonzero_starts() const
Definition parallel.hpp:207
ParallelSparseMatrix()
Definition parallel.hpp:80
Eigen::Index cols() const
Definition parallel.hpp:127
ParallelSparseMatrix(Eigen::Index nrow, Eigen::Index ncol, ValueArray_ x, IndexArray_ i, PointerArray_ p, bool column_major, int nthreads)
Definition parallel.hpp:98
Implements IRLBA for approximate SVD.
Definition compute.hpp:18
void parallelize(Task_ num_tasks, Run_ run_task)
Definition parallel.hpp:33
EigenMatrix_ wrapped_realize(const Matrix_ &matrix)
Definition wrappers.hpp:185