byteme
C++ wrappers for buffered inputs
Loading...
Searching...
No Matches
BufferedReader.hpp
Go to the documentation of this file.
1#ifndef BYTEME_BUFFERED_READER_HPP
2#define BYTEME_BUFFERED_READER_HPP
3
9#include <thread>
10#include <condition_variable>
11#include <mutex>
12#include <vector>
13#include <algorithm>
14#include <exception>
15#include <type_traits>
16#include <memory>
17#include <cstddef>
18
19#include "Reader.hpp"
20
21namespace byteme {
22
35template<typename Type_>
37public:
41 BufferedReader(std::size_t buffer_size) : my_buffer(sanisizer::cap<I<decltype(my_buffer.size())> >(buffer_size)) {
42 if (buffer_size == 0) {
43 throw std::runtime_error("buffer size must be positive");
44 }
45 };
46
48 BufferedReader(const BufferedReader&) = delete; // not copyable.
49 BufferedReader& operator=(BufferedReader&&) = delete;
50 BufferedReader& operator=(const BufferedReader&) = delete; // not copyable.
51
52 virtual ~BufferedReader() = default;
57protected:
61 virtual std::size_t refill() = 0; // refills the buffer, returning the number of bytes filled.
62
63 virtual std::size_t refill(Type_*) = 0; // refills in the user-supplied array.
64
65 void initialize() {
66 my_available = refill();
67 }
68
69 auto& get_buffer() {
70 return my_buffer;
71 }
72
73 auto get_buffer_size() const {
74 return my_buffer.size();
75 }
80private:
81 std::vector<Type_> my_buffer;
82 std::size_t my_available = 0;
83 std::size_t my_current = 0;
84
85 // Standard guarantees this to be at least 64 bits, which is more than that of size_t.
86 // We don't use uint64_t because that might not be defined by the implementation.
87 unsigned long long my_overall = 0;
88
89public:
96 bool advance() {
97 ++my_current;
98 if (my_current < my_available) {
99 return true;
100 }
101
102 my_current = 0;
103 my_overall += my_available;
104
105 const auto buffer_size = get_buffer_size();
106 if (my_available == buffer_size) {
107 my_available = refill();
108 } else {
109 my_available = 0;
110 }
111
112 return my_available > 0; // Check that we haven't reached the end of the reader.
113 }
114
120 Type_ get() const {
121 return my_buffer[my_current];
122 }
123
127 unsigned long long position() const {
128 return my_overall + my_current;
129 }
130
134 bool valid() const {
135 return my_current < my_available;
136 }
137
138public:
159 std::pair<std::size_t, bool> extract(std::size_t number, Type_* output) {
160 const std::size_t original = number;
161
162 // Copying contents of the cached buffer.
163 const std::size_t remaining = my_available - my_current;
164 if (number < remaining) {
165 std::copy_n(my_buffer.data() + my_current, number, output);
166 my_current += number;
167 return std::make_pair(number, true);
168 }
169
170 std::copy_n(my_buffer.data() + my_current, remaining, output);
171 output += remaining;
172 number -= remaining;
173
174 // If the available number of bytes is less than the buffer size, the reader is already exhausted.
175 const auto buffer_size = get_buffer_size();
176 if (my_available < buffer_size) {
177 my_current = my_available;
178 return std::make_pair(original - number, false);
179 }
180
181 // Directly filling the output array, bypassing our buffer.
182 while (number >= buffer_size) {
183 my_overall += my_available;
184 my_available = refill(output);
185
186 output += my_available;
187 number -= my_available;
188 if (my_available < buffer_size) {
189 my_current = my_available;
190 return std::make_pair(original - number, false);
191 }
192 }
193
194 // Filling the cache and copying the remainder into the output array.
195 my_overall += my_available;
196 my_available = refill();
197
198 const auto to_use = std::min(my_available, number);
199 std::copy_n(my_buffer.data(), to_use, output);
200 number -= to_use;
201 my_current = to_use;
202
203 // We know that number < buffer_size from the loop condition, so if exhausted == true,
204 // this means that my_available <= number < buffer_size, i.e., the source is exhausted.
205 // Thus, any future advance() would definitely return false.
206 bool exhausted = (to_use == my_available);
207
208 return std::make_pair(original - number, !exhausted);
209 }
210};
211
222template<typename Type_, class Pointer_ = std::unique_ptr<Reader> >
223class SerialBufferedReader final : public BufferedReader<Type_> {
224public:
231 SerialBufferedReader(Pointer_ reader, std::size_t buffer_size) :
232 BufferedReader<Type_>(buffer_size),
233 my_reader(std::move(reader))
234 {
235 this->initialize();
236 }
237
238private:
239 Pointer_ my_reader;
240
241protected:
245 std::size_t refill() {
246 auto& buf = this->get_buffer();
247 return my_reader->read(reinterpret_cast<unsigned char*>(buf.data()), buf.size());
248 }
249
250 std::size_t refill(Type_* ptr) {
251 return my_reader->read(reinterpret_cast<unsigned char*>(ptr), this->get_buffer_size());
252 }
256};
257
269template<typename Type_, class Pointer_ = std::unique_ptr<Reader> >
270class ParallelBufferedReader final : public BufferedReader<Type_> {
271public:
278 ParallelBufferedReader(Pointer_ reader, std::size_t buffer_size) :
279 BufferedReader<Type_>(buffer_size),
280 my_reader(std::move(reader)),
281 my_buffer_worker(buffer_size)
282 {
283 my_thread = std::thread([&]() { thread_loop(); }); // set up thread before initializing.
284 this->initialize();
285 }
286
291 std::unique_lock lck(my_mut);
292 my_kill = true;
293 my_ready_input = true;
294 lck.unlock(); // releasing the lock so that the notified thread doesn't immediately block.
295 my_cv.notify_one();
296 my_thread.join();
297 }
302private:
303 Pointer_ my_reader;
304 std::vector<Type_> my_buffer_main, my_buffer_worker;
305 std::size_t my_next_available = 0;
306
307private:
308 std::thread my_thread;
309 std::exception_ptr my_thread_err = nullptr;
310 std::mutex my_mut;
311 std::condition_variable my_cv;
312
313 bool my_ready_input = false, my_ready_output = false;
314 bool my_worker_active = false;
315 bool my_kill = false;
316
317 void thread_loop() {
318 const auto bufsize = this->get_buffer_size();
319 while (1) {
320 std::unique_lock lck(my_mut);
321 my_cv.wait(lck, [&]() { return my_ready_input; });
322 my_ready_input = false;
323
324 if (my_kill) { // Handle an explicit kill signal from the destructor.
325 break;
326 }
327
328 try {
329 my_next_available = my_reader->read(reinterpret_cast<unsigned char*>(my_buffer_worker.data()), bufsize);
330 } catch (...) {
331 my_thread_err = std::current_exception();
332 }
333
334 my_ready_output = true;
335 lck.unlock();
336 my_cv.notify_one();
337 }
338 }
339
340protected:
344 std::size_t refill() {
345 auto& buffer_main = this->get_buffer();
346
347 if (my_worker_active) {
348 // If the worker is active, it's probably already gotten started so we just wait for it to finish.
349 // Then we swap the results with the main buffer and submit a new job to the worker.
350 std::unique_lock lck(my_mut);
351 my_cv.wait(lck, [&]() { return my_ready_output; });
352 my_ready_output = false;
353
354 if (my_thread_err) {
355 std::rethrow_exception(my_thread_err);
356 }
357
358 buffer_main.swap(my_buffer_worker);
359 const auto output = my_next_available;
360
361 my_ready_input = true;
362 lck.unlock();
363 my_cv.notify_one();
364
365 return output;
366
367 } else {
368 // If the worker is not active, we just do a read directly on the main thread, and then submit a new job to the worker.
369 // Obviously, if we did the read on the worker, we'd have to wait for it anyway, so we might as well cut out the communication overhead.
370 const auto available = my_reader->read(reinterpret_cast<unsigned char*>(buffer_main.data()), buffer_main.size());
371
372 std::unique_lock lck(my_mut);
373 my_ready_input = true;
374 lck.unlock();
375 my_cv.notify_one();
376 my_worker_active = true;
377
378 return available;
379 }
380 }
381
382 std::size_t refill(Type_* ptr) {
383 if (my_worker_active) {
384 // If the worker is active, we wait for it to finish, transfer the results to the supplied pointer.
385 // We do not submit a new job, based on the loop in BufferedReader::extract():
386 //
387 // - We'll probably want to call this refill() overload again.
388 // On subsequent calls, we'll want to directly read from the Reader into the supplied pointer to cut out a copy.
389 // - But, if we didn't call this overload again, we'd still probably call the other refill() overload immediately.
390 // So if we sent a new job to the worker, we'd end up immediately blocking on it to get the next read.
391 // So we might as well skip that communication overhead and read directly on the main thread.
392 std::unique_lock lck(my_mut);
393 my_cv.wait(lck, [&]() { return my_ready_output; });
394 my_ready_output = false;
395 if (my_thread_err) {
396 std::rethrow_exception(my_thread_err);
397 }
398
399 std::copy_n(my_buffer_worker.begin(), my_next_available, ptr);
400 my_worker_active = false;
401 return my_next_available;
402
403 } else {
404 // If the worker isn't active, we can directly read the into the supplied pointer, avoiding some communication overhead.
405 return my_reader->read(reinterpret_cast<unsigned char*>(ptr), this->get_buffer_size());
406 }
407 }
411};
412
413}
414
415#endif
Read an input source.
Buffered wrapper around a Reader.
Definition BufferedReader.hpp:36
bool valid() const
Definition BufferedReader.hpp:134
std::pair< std::size_t, bool > extract(std::size_t number, Type_ *output)
Definition BufferedReader.hpp:159
Type_ get() const
Definition BufferedReader.hpp:120
bool advance()
Definition BufferedReader.hpp:96
unsigned long long position() const
Definition BufferedReader.hpp:127
Parallelized buffering to wrap a Reader.
Definition BufferedReader.hpp:270
ParallelBufferedReader(Pointer_ reader, std::size_t buffer_size)
Definition BufferedReader.hpp:278
Serial buffering to wrap a Reader.
Definition BufferedReader.hpp:223
SerialBufferedReader(Pointer_ reader, std::size_t buffer_size)
Definition BufferedReader.hpp:231
Simple byte readers and writers.
Definition BufferedReader.hpp:21
constexpr Dest_ cap(Value_ x)