byteme
C++ wrappers for buffered inputs
Loading...
Searching...
No Matches
PerByte.hpp
Go to the documentation of this file.
1#ifndef BYTEME_PERBYTE_HPP
2#define BYTEME_PERBYTE_HPP
3
9#include <thread>
10#include <condition_variable>
11#include <mutex>
12#include <vector>
13#include <algorithm>
14#include <exception>
15#include <type_traits>
16#include <memory>
17#include <cstddef>
18
19#include "Reader.hpp"
20
21namespace byteme {
22
26template<class Reader_>
27void skip_zero_buffers(Reader_& reader, std::size_t& available) {
28 available = 0;
29 while (reader.load()) {
30 available = reader.available(); // continue collecting bytes if a zero-length buffer is returned without load() returning false.
31 if (available) {
32 break;
33 }
34 }
35
36 // If available == 0 on return, then reader->load() must be false,
37 // and there are no more bytes left in the source.
38}
51template<typename Type_>
53public:
57 PerByteInterface() = default;
59 PerByteInterface(const PerByteInterface&) = delete; // not copyable.
60 PerByteInterface& operator=(PerByteInterface&&) = delete;
61 PerByteInterface& operator=(const PerByteInterface&) = delete; // not copyable.
62 virtual ~PerByteInterface() = default;
67protected:
71 const Type_* ptr = nullptr;
72
76 std::size_t available = 0;
77
82 virtual void refill() = 0;
83
84private:
85 std::size_t my_current = 0;
86
87 // Standard guarantees this to be at least 64 bits, which is more than that of size_t.
88 // We don't use uint64_t because that might not be defined by the implementation.
89 unsigned long long my_overall = 0;
90
91public:
98 bool advance() {
99 ++my_current;
100 if (my_current < available) {
101 return true;
102 }
103
104 my_current = 0;
105 my_overall += available;
106 refill();
107 return available > 0; // Check that we haven't reached the end of the reader.
108 }
109
115 Type_ get() const {
116 return ptr[my_current];
117 }
118
122 unsigned long long position() const {
123 return my_overall + my_current;
124 }
125
129 bool valid() const {
130 return my_current < available;
131 }
132
133public:
152 std::pair<std::size_t, bool> extract(std::size_t number, Type_* output) {
153 std::size_t original = number;
154 bool okay = true;
155
156 while (1) {
157 auto start = ptr + my_current;
158 auto leftover = available - my_current;
159
160 if (leftover > number) {
161 my_current += number;
162 number = 0;
163 std::copy(start, ptr + my_current, output);
164 break;
165
166 } else {
167 number -= leftover;
168 std::copy(start, ptr + available, output);
169
170 my_current = 0;
171 my_overall += available;
172 refill();
173
174 okay = (available > 0);
175 if (number == 0 || !okay) {
176 break;
177 }
178 output += leftover;
179 }
180 }
181
182 return std::make_pair(original - number, okay);
183 }
184
200 std::size_t advance_and_extract(std::size_t number, Type_* output) {
201 if (number == 0) {
202 return 0;
203 }
204 ++my_current;
205 std::size_t original = number;
206
207 while (1) {
208 auto start = ptr + my_current;
209 auto leftover = available - my_current;
210
211 if (leftover >= number) {
212 std::copy_n(start, number, output);
213 my_current += number - 1; // number must be positive at this point.
214 number = 0;
215 break;
216 } else {
217 number -= leftover; // number must be positive at this point.
218 std::copy(start, ptr + available, output);
219
220 my_current = 0;
221 my_overall += available;
222 refill();
223
224 if (available == 0) {
225 break;
226 }
227 output += leftover;
228 }
229 }
230
231 return original - number;
232 }
233};
234
235
247template<typename Type_, class Pointer_ = std::unique_ptr<Reader> >
248class PerByteSerial final : public PerByteInterface<Type_> {
249public:
254 PerByteSerial(Pointer_ reader) : my_reader(std::move(reader)) {
255 refill();
256 }
257
258private:
259 Pointer_ my_reader;
260
261protected:
262 void refill() {
263 skip_zero_buffers(*my_reader, this->available);
264 this->ptr = reinterpret_cast<const Type_*>(my_reader->buffer());
265 }
266};
267
279template<typename Type_, class Pointer_ = std::unique_ptr<Reader> >
280class PerByteParallel final : public PerByteInterface<Type_> {
281public:
286 PerByteParallel(Pointer_ reader) : my_reader(std::move(reader)) {
287 my_ready_input = false;
288 my_thread = std::thread([&]() { thread_loop(); });
289
290 skip_zero_buffers(*my_reader, my_next_available);
291 my_ready_output = true; // run the first iteration of refill().
292 refill();
293 }
294
299 if (!my_finished) {
300 std::unique_lock lck(my_mut);
301 my_finished = true;
302 my_ready_input = true;
303 lck.unlock(); // releasing the lock so that the notified thread doesn't immediately block.
304 my_cv.notify_one();
305 }
306 my_thread.join();
307 }
312private:
313 Pointer_ my_reader;
314 std::vector<Type_> my_buffer;
315 std::size_t my_next_available = 0;
316 bool my_finished = false;
317
318private:
319 std::thread my_thread;
320 std::exception_ptr my_thread_err = nullptr;
321 std::mutex my_mut;
322 std::condition_variable my_cv;
323 bool my_ready_input, my_ready_output;
324
325 void thread_loop() {
326 while (!my_finished) {
327 std::unique_lock lck(my_mut);
328 my_cv.wait(lck, [&]() { return my_ready_input; });
329 my_ready_input = false;
330
331 if (my_finished) { // an explicit kill signal from the destructor.
332 break;
333 }
334
335 try {
336 skip_zero_buffers(*my_reader, my_next_available);
337 my_finished = my_next_available == 0; // see the definition of skip_zero_buffers().
338 } catch (...) {
339 my_thread_err = std::current_exception();
340 my_finished = true;
341 }
342
343 my_ready_output = true;
344 lck.unlock();
345 my_cv.notify_one();
346 }
347 }
348
349protected:
350 void refill() {
351 if (my_finished) {
352 this->ptr = nullptr;
353 this->available = 0;
354 return;
355 }
356
357 std::unique_lock lck(my_mut);
358 my_cv.wait(lck, [&]() { return my_ready_output; });
359 my_ready_output = false;
360 if (my_thread_err) {
361 std::rethrow_exception(my_thread_err);
362 }
363
364 auto rptr = reinterpret_cast<const Type_*>(my_reader->buffer());
365 this->available = my_next_available;
366 my_buffer.resize(this->available);
367 std::copy_n(rptr, this->available, my_buffer.data());
368 this->ptr = my_buffer.data();
369
370 my_ready_input = true;
371 lck.unlock();
372 if (!my_finished) {
373 my_cv.notify_one();
374 }
375 }
376};
377
378}
379
380#endif
Read an input source.
Interface for byte-by-byte extraction from a Reader source.
Definition PerByte.hpp:52
unsigned long long position() const
Definition PerByte.hpp:122
const Type_ * ptr
Definition PerByte.hpp:71
bool valid() const
Definition PerByte.hpp:129
Type_ get() const
Definition PerByte.hpp:115
bool advance()
Definition PerByte.hpp:98
std::pair< std::size_t, bool > extract(std::size_t number, Type_ *output)
Definition PerByte.hpp:152
virtual void refill()=0
std::size_t available
Definition PerByte.hpp:76
std::size_t advance_and_extract(std::size_t number, Type_ *output)
Definition PerByte.hpp:200
Parallelized byte-by-byte extraction from a Reader source.
Definition PerByte.hpp:280
void refill()
Definition PerByte.hpp:350
PerByteParallel(Pointer_ reader)
Definition PerByte.hpp:286
Serial byte-by-byte extraction from a Reader source.
Definition PerByte.hpp:248
PerByteSerial(Pointer_ reader)
Definition PerByte.hpp:254
void refill()
Definition PerByte.hpp:262
Simple byte readers and writers.