byteme
Read/write bytes from various sources
Loading...
Searching...
No Matches
BufferedReader.hpp
Go to the documentation of this file.
1#ifndef BYTEME_BUFFERED_READER_HPP
2#define BYTEME_BUFFERED_READER_HPP
3
9#include <thread>
10#include <condition_variable>
11#include <mutex>
12#include <vector>
13#include <algorithm>
14#include <exception>
15#include <type_traits>
16#include <memory>
17#include <cstddef>
18
19#include "Reader.hpp"
20
21namespace byteme {
22
35template<typename Type_>
37public:
41 BufferedReader(std::size_t buffer_size) : my_buffer(sanisizer::cap<I<decltype(my_buffer.size())> >(buffer_size)) {
42 if (buffer_size == 0) {
43 throw std::runtime_error("buffer size must be positive");
44 }
45 };
46
48 BufferedReader(const BufferedReader&) = delete; // not copyable.
49 BufferedReader& operator=(BufferedReader&&) = delete;
50 BufferedReader& operator=(const BufferedReader&) = delete; // not copyable.
51
52 virtual ~BufferedReader() = default;
57protected:
61 virtual std::size_t refill() = 0; // refills the buffer, returning the number of bytes filled.
62
63 virtual std::size_t refill(Type_*) = 0; // refills in the user-supplied array.
64
65 void initialize() {
66 my_available = refill();
67 }
68
69 auto& get_buffer() {
70 return my_buffer;
71 }
72
73 auto get_buffer_size() const {
74 return my_buffer.size();
75 }
80private:
81 std::vector<Type_> my_buffer;
82 std::size_t my_available = 0;
83 std::size_t my_current = 0;
84
85 // Standard guarantees this to be at least 64 bits, which is more than that of size_t.
86 // We don't use uint64_t because that might not be defined by the implementation.
87 unsigned long long my_overall = 0;
88
89public:
96 bool advance() {
97 ++my_current;
98 if (my_current < my_available) {
99 return true;
100 }
101
102 my_current = 0;
103 my_overall += my_available;
104
105 const auto buffer_size = get_buffer_size();
106 if (my_available == buffer_size) {
107 my_available = refill();
108 } else {
109 my_available = 0;
110 }
111
112 return my_available > 0; // Check that we haven't reached the end of the reader.
113 }
114
120 Type_ get() const {
121 return my_buffer[my_current];
122 }
123
127 unsigned long long position() const {
128 return my_overall + my_current;
129 }
130
134 bool valid() const {
135 return my_current < my_available;
136 }
137
138public:
159 std::pair<std::size_t, bool> extract(std::size_t number, Type_* output) {
160 const std::size_t original = number;
161
162 // Copying contents of the cached buffer.
163 const std::size_t remaining = my_available - my_current;
164 if (number < remaining) {
165 std::copy_n(my_buffer.data() + my_current, number, output);
166 my_current += number;
167 //std::cout << "case A" << std::endl;
168 return std::make_pair(number, true);
169 }
170
171 std::copy_n(my_buffer.data() + my_current, remaining, output);
172 output += remaining;
173 number -= remaining;
174
175 // If the available number of bytes is less than the buffer size, the reader is already exhausted.
176 const auto buffer_size = get_buffer_size();
177 if (my_available < buffer_size) {
178 my_current = my_available;
179 //std::cout << "case B" << std::endl;
180 return std::make_pair(original - number, false);
181 }
182
183 // Directly filling the output array, bypassing our buffer.
184 while (number >= buffer_size) {
185 my_overall += my_available;
186 my_available = refill(output);
187
188 output += my_available;
189 number -= my_available;
190 if (my_available < buffer_size) {
191 my_current = my_available;
192 //std::cout << "case C" << std::endl;
193 return std::make_pair(original - number, false);
194 }
195 }
196
197 // Filling the cache and copying the remainder into the output array.
198 my_overall += my_available;
199 my_available = refill();
200
201 const auto to_use = std::min(my_available, number);
202 std::copy_n(my_buffer.data(), to_use, output);
203 number -= to_use;
204 my_current = to_use;
205
206 // We know that number < buffer_size from the loop condition, so if exhausted == true,
207 // this means that my_available <= number < buffer_size, i.e., the source is exhausted.
208 // Thus, any future advance() would definitely return false.
209 bool exhausted = (to_use == my_available);
210
211 //std::cout << "D" << std::endl;
212 return std::make_pair(original - number, !exhausted);
213 }
214
215public:
235 std::size_t extract_until(std::size_t number, Type_* output) {
236 const auto original = number;
237 assert(number > 0);
238
239 // Copying contents of the cached buffer.
240 // Unlike extract(), we allow equality here because we can leave ourselves on the last byte.
241 const std::size_t remaining = my_available - my_current;
242 if (number <= remaining) {
243 std::copy_n(my_buffer.data() + my_current, number, output);
244 my_current += number - 1;
245 //std::cout << "case A" << std::endl;
246 return number;
247 }
248
249 std::copy_n(my_buffer.data() + my_current, remaining, output);
250 output += remaining;
251 number -= remaining;
252
253 // If the available number of bytes is less than the buffer size, the reader is already exhausted.
254 const auto buffer_size = get_buffer_size();
255 if (my_available < buffer_size) {
256 assert(my_available > 0); // my_available > 0 otherwise valid() would be false.
257 my_current = my_available - 1;
258 //std::cout << "case B" << std::endl;
259 return original - number;
260 }
261
262 // Directly filling the output array, bypassing our buffer.
263 // Unlike extract(), we don't allow equality here because we don't want to skip past the last byte.
264 while (number > buffer_size) {
265 my_overall += my_available;
266 my_available = refill(output);
267
268 output += my_available;
269 number -= my_available;
270
271 if (my_available < buffer_size) {
272 // We want to make the last byte available for the next get(), so we just stick it in the buffer.
273 // We know that output must have advanced past its input value as remaining > 0 (otherwise valid() would be false), so subtraction is valid.
274 assert(remaining > 0);
275 my_buffer[0] = *(output - 1);
276 my_overall += my_available;
277 my_overall -= 1;
278 my_available = 1;
279 my_current = 0;
280 //std::cout << "case C" << std::endl;
281 return original - number;
282 }
283 }
284
285 // Filling the cache and copying the remainder into the output array.
286 my_overall += my_available;
287 my_available = refill();
288
289 if (my_available) {
290 // If we didn't take the loop, we know that original > remaining to get to this point, and thus number > 0.
291 // If we did take the loop, the body ensures that number > buffer_size >= my_available, so a non-premature exit would leave number > 0.
292 assert(number > 0);
293
294 const auto to_use = std::min(my_available, number); // both values are positive so to_use > 0 and we can safely subtract 1 from it.
295 std::copy_n(my_buffer.data(), to_use, output);
296 number -= to_use;
297 my_current = to_use - 1;
298 //std::cout << "case D" << std::endl;
299 } else {
300 // Again, we want to make the last byte available, so we just stick it in the buffer, see logic above.
301 assert(remaining > 0);
302 my_buffer[0] = *(output - 1);
303 my_overall -= 1;
304 my_available = 1;
305 my_current = 0;
306 //std::cout << "case E" << std::endl;
307 }
308
309 return original - number;
310 }
311};
312
323template<typename Type_, class Pointer_ = std::unique_ptr<Reader> >
324class SerialBufferedReader final : public BufferedReader<Type_> {
325public:
332 SerialBufferedReader(Pointer_ reader, std::size_t buffer_size) :
333 BufferedReader<Type_>(buffer_size),
334 my_reader(std::move(reader))
335 {
336 this->initialize();
337 }
338
339private:
340 Pointer_ my_reader;
341
342protected:
346 std::size_t refill() {
347 auto& buf = this->get_buffer();
348 return my_reader->read(reinterpret_cast<unsigned char*>(buf.data()), buf.size());
349 }
350
351 std::size_t refill(Type_* ptr) {
352 return my_reader->read(reinterpret_cast<unsigned char*>(ptr), this->get_buffer_size());
353 }
357};
358
370template<typename Type_, class Pointer_ = std::unique_ptr<Reader> >
371class ParallelBufferedReader final : public BufferedReader<Type_> {
372public:
379 ParallelBufferedReader(Pointer_ reader, std::size_t buffer_size) :
380 BufferedReader<Type_>(buffer_size),
381 my_reader(std::move(reader)),
382 my_buffer_worker(buffer_size)
383 {
384 my_thread = std::thread([&]() { thread_loop(); }); // set up thread before initializing.
385
386 try {
387 this->initialize();
388 } catch (std::exception& e) {
389 // Killing thread as destructor won't be called if the constructor didn't finish.
390 kill_thread();
391 throw;
392 }
393 }
394
399 kill_thread();
400 }
405private:
406 Pointer_ my_reader;
407 std::vector<Type_> my_buffer_main, my_buffer_worker;
408 std::size_t my_next_available = 0;
409
410private:
411 std::thread my_thread;
412 std::exception_ptr my_thread_err = nullptr;
413 std::mutex my_mut;
414 std::condition_variable my_cv;
415
416 bool my_ready_input = false, my_ready_output = false;
417 bool my_worker_active = false;
418 bool my_kill = false;
419
420 void thread_loop() {
421 const auto bufsize = this->get_buffer_size();
422 while (1) {
423 std::unique_lock lck(my_mut);
424 my_cv.wait(lck, [&]() { return my_ready_input; });
425 my_ready_input = false;
426
427 if (my_kill) { // Handle an explicit kill signal from the destructor.
428 break;
429 }
430
431 try {
432 my_next_available = my_reader->read(reinterpret_cast<unsigned char*>(my_buffer_worker.data()), bufsize);
433 } catch (...) {
434 my_thread_err = std::current_exception();
435 }
436
437 my_ready_output = true;
438 lck.unlock();
439 my_cv.notify_one();
440 }
441 }
442
443 void kill_thread() {
444 std::unique_lock lck(my_mut);
445 my_kill = true;
446 my_ready_input = true;
447 lck.unlock(); // releasing the lock so that the notified thread doesn't immediately block.
448 my_cv.notify_one();
449 my_thread.join();
450 }
451
452protected:
456 std::size_t refill() {
457 auto& buffer_main = this->get_buffer();
458
459 if (my_worker_active) {
460 // If the worker is active, it's probably already gotten started so we just wait for it to finish.
461 // Then we swap the results with the main buffer and submit a new job to the worker.
462 std::unique_lock lck(my_mut);
463 my_cv.wait(lck, [&]() { return my_ready_output; });
464 my_ready_output = false;
465
466 if (my_thread_err) {
467 std::rethrow_exception(my_thread_err);
468 }
469
470 buffer_main.swap(my_buffer_worker);
471 const auto output = my_next_available;
472
473 my_ready_input = true;
474 lck.unlock();
475 my_cv.notify_one();
476
477 return output;
478
479 } else {
480 // If the worker is not active, we just do a read directly on the main thread, and then submit a new job to the worker.
481 // Obviously, if we did the read on the worker, we'd have to wait for it anyway, so we might as well cut out the communication overhead.
482 const auto available = my_reader->read(reinterpret_cast<unsigned char*>(buffer_main.data()), buffer_main.size());
483
484 std::unique_lock lck(my_mut);
485 my_ready_input = true;
486 lck.unlock();
487 my_cv.notify_one();
488 my_worker_active = true;
489
490 return available;
491 }
492 }
493
494 std::size_t refill(Type_* ptr) {
495 if (my_worker_active) {
496 // If the worker is active, we wait for it to finish, transfer the results to the supplied pointer.
497 // We do not submit a new job, based on the loop in BufferedReader::extract():
498 //
499 // - We'll probably want to call this refill() overload again.
500 // On subsequent calls, we'll want to directly read from the Reader into the supplied pointer to cut out a copy.
501 // - But, if we didn't call this overload again, we'd still probably call the other refill() overload immediately.
502 // So if we sent a new job to the worker, we'd end up immediately blocking on it to get the next read.
503 // So we might as well skip that communication overhead and read directly on the main thread.
504 std::unique_lock lck(my_mut);
505 my_cv.wait(lck, [&]() { return my_ready_output; });
506 my_ready_output = false;
507 if (my_thread_err) {
508 std::rethrow_exception(my_thread_err);
509 }
510
511 std::copy_n(my_buffer_worker.begin(), my_next_available, ptr);
512 my_worker_active = false;
513 return my_next_available;
514
515 } else {
516 // If the worker isn't active, we can directly read the into the supplied pointer, avoiding some communication overhead.
517 return my_reader->read(reinterpret_cast<unsigned char*>(ptr), this->get_buffer_size());
518 }
519 }
523};
524
525}
526
527#endif
Read an input source.
Buffered wrapper around a Reader.
Definition BufferedReader.hpp:36
bool valid() const
Definition BufferedReader.hpp:134
std::pair< std::size_t, bool > extract(std::size_t number, Type_ *output)
Definition BufferedReader.hpp:159
std::size_t extract_until(std::size_t number, Type_ *output)
Definition BufferedReader.hpp:235
Type_ get() const
Definition BufferedReader.hpp:120
bool advance()
Definition BufferedReader.hpp:96
unsigned long long position() const
Definition BufferedReader.hpp:127
Parallelized buffering to wrap a Reader.
Definition BufferedReader.hpp:371
ParallelBufferedReader(Pointer_ reader, std::size_t buffer_size)
Definition BufferedReader.hpp:379
Serial buffering to wrap a Reader.
Definition BufferedReader.hpp:324
SerialBufferedReader(Pointer_ reader, std::size_t buffer_size)
Definition BufferedReader.hpp:332
Simple byte readers and writers.
Definition BufferedReader.hpp:21
constexpr Dest_ cap(Value_ x)