byteme
C++ wrappers for buffered inputs
Loading...
Searching...
No Matches
PerByte.hpp
Go to the documentation of this file.
1#ifndef BYTEME_PERBYTE_HPP
2#define BYTEME_PERBYTE_HPP
3
9#include <thread>
10#include <condition_variable>
11#include <mutex>
12#include <vector>
13#include <algorithm>
14#include <exception>
15#include <type_traits>
16#include <memory>
17#include <cstddef>
18
19#include "Reader.hpp"
20
21namespace byteme {
22
26template<class Reader_>
27void skip_zero_buffers(Reader_& reader, std::size_t& available) {
28 available = 0;
29 while (reader.load()) {
30 available = reader.available(); // continue collecting bytes if a zero-length buffer is returned without load() returning false.
31 if (available) {
32 break;
33 }
34 }
35
36 // If available == 0 on return, then reader->load() must be false,
37 // and there are no more bytes left in the source.
38}
51template<typename Type_>
53public:
57 PerByteInterface() = default;
59 PerByteInterface(const PerByteInterface&) = delete; // not copyable.
60 PerByteInterface& operator=(PerByteInterface&&) = delete;
61 PerByteInterface& operator=(const PerByteInterface&) = delete; // not copyable.
62 virtual ~PerByteInterface() = default;
67protected:
71 const Type_* ptr = nullptr;
72
76 std::size_t available = 0;
77
81 virtual void refill() = 0;
82
83private:
84 std::size_t my_current = 0;
85
86 // Standard guarantees this to be at least 64 bits, which is more than that of size_t.
87 // We don't use uint64_t because that might not be defined by the implementation.
88 unsigned long long my_overall = 0;
89
90public:
97 bool advance() {
98 ++my_current;
99 if (my_current < available) {
100 return true;
101 }
102
103 my_current = 0;
104 my_overall += available;
105 refill();
106 return available > 0; // Check that we haven't reached the end of the reader.
107 }
108
114 Type_ get() const {
115 return ptr[my_current];
116 }
117
121 unsigned long long position() const {
122 return my_overall + my_current;
123 }
124
128 bool valid() const {
129 return my_current < available;
130 }
131
132public:
148 std::pair<std::size_t, bool> extract(std::size_t number, Type_* output) {
149 std::size_t original = number;
150 bool okay = true;
151
152 while (1) {
153 auto start = ptr + my_current;
154 auto leftover = available - my_current;
155
156 if (leftover > number) {
157 my_current += number;
158 number = 0;
159 std::copy(start, ptr + my_current, output);
160 break;
161
162 } else {
163 number -= leftover;
164 std::copy(start, ptr + available, output);
165
166 my_current = 0;
167 my_overall += available;
168 refill();
169
170 okay = (available > 0);
171 if (number == 0 || !okay) {
172 break;
173 }
174 output += leftover;
175 }
176 }
177
178 return std::make_pair(original - number, okay);
179 }
180};
181
182
194template<typename Type_, class Pointer_ = std::unique_ptr<Reader> >
195class PerByteSerial final : public PerByteInterface<Type_> {
196public:
200 PerByteSerial(Pointer_ reader) : my_reader(std::move(reader)) {
201 refill();
202 }
203
204private:
205 Pointer_ my_reader;
206
207protected:
208 void refill() {
209 skip_zero_buffers(*my_reader, this->available);
210 this->ptr = reinterpret_cast<const Type_*>(my_reader->buffer());
211 }
212};
213
225template<typename Type_, class Pointer_ = std::unique_ptr<Reader> >
226class PerByteParallel final : public PerByteInterface<Type_> {
227public:
231 PerByteParallel(Pointer_ reader) : my_reader(std::move(reader)) {
232 my_ready_input = false;
233 my_thread = std::thread([&]() { thread_loop(); });
234
235 skip_zero_buffers(*my_reader, my_next_available);
236 my_ready_output = true; // run the first iteration of refill().
237 refill();
238 }
239
244 if (!my_finished) {
245 std::unique_lock lck(my_mut);
246 my_finished = true;
247 my_ready_input = true;
248 lck.unlock(); // releasing the lock so that the notified thread doesn't immediately block.
249 my_cv.notify_one();
250 }
251 my_thread.join();
252 }
257private:
258 Pointer_ my_reader;
259 std::vector<Type_> my_buffer;
260 std::size_t my_next_available = 0;
261 bool my_finished = false;
262
263private:
264 std::thread my_thread;
265 std::exception_ptr my_thread_err = nullptr;
266 std::mutex my_mut;
267 std::condition_variable my_cv;
268 bool my_ready_input, my_ready_output;
269
270 void thread_loop() {
271 while (!my_finished) {
272 std::unique_lock lck(my_mut);
273 my_cv.wait(lck, [&]() { return my_ready_input; });
274 my_ready_input = false;
275
276 if (my_finished) { // an explicit kill signal from the destructor.
277 break;
278 }
279
280 try {
281 skip_zero_buffers(*my_reader, my_next_available);
282 my_finished = my_next_available == 0; // see the definition of skip_zero_buffers().
283 } catch (...) {
284 my_thread_err = std::current_exception();
285 my_finished = true;
286 }
287
288 my_ready_output = true;
289 lck.unlock();
290 my_cv.notify_one();
291 }
292 }
293
294protected:
295 void refill() {
296 if (my_finished) {
297 this->ptr = nullptr;
298 this->available = 0;
299 return;
300 }
301
302 std::unique_lock lck(my_mut);
303 my_cv.wait(lck, [&]() { return my_ready_output; });
304 my_ready_output = false;
305 if (my_thread_err) {
306 std::rethrow_exception(my_thread_err);
307 }
308
309 auto rptr = reinterpret_cast<const Type_*>(my_reader->buffer());
310 this->available = my_next_available;
311 my_buffer.resize(this->available);
312 std::copy_n(rptr, this->available, my_buffer.data());
313 this->ptr = my_buffer.data();
314
315 my_ready_input = true;
316 lck.unlock();
317 if (!my_finished) {
318 my_cv.notify_one();
319 }
320 }
321};
322
323}
324
325#endif
Read an input source.
Interface for byte-by-byte extraction from a Reader source.
Definition PerByte.hpp:52
unsigned long long position() const
Definition PerByte.hpp:121
const Type_ * ptr
Definition PerByte.hpp:71
bool valid() const
Definition PerByte.hpp:128
Type_ get() const
Definition PerByte.hpp:114
bool advance()
Definition PerByte.hpp:97
std::pair< std::size_t, bool > extract(std::size_t number, Type_ *output)
Definition PerByte.hpp:148
virtual void refill()=0
std::size_t available
Definition PerByte.hpp:76
Parallelized byte-by-byte extraction from a Reader source.
Definition PerByte.hpp:226
void refill()
Definition PerByte.hpp:295
PerByteParallel(Pointer_ reader)
Definition PerByte.hpp:231
Serial byte-by-byte extraction from a Reader source.
Definition PerByte.hpp:195
PerByteSerial(Pointer_ reader)
Definition PerByte.hpp:200
void refill()
Definition PerByte.hpp:208
Simple byte readers and writers.