byteme
C++ wrappers for buffered inputs
Loading...
Searching...
No Matches
PerByte.hpp
Go to the documentation of this file.
1#ifndef BYTEME_PERBYTE_HPP
2#define BYTEME_PERBYTE_HPP
3
9#include <thread>
10#include <condition_variable>
11#include <mutex>
12#include <vector>
13#include <algorithm>
14#include <exception>
15#include <type_traits>
16#include <memory>
17#include <cstddef>
18
19#include "Reader.hpp"
20
21namespace byteme {
22
26template<class Reader_>
27void skip_zero_buffers(Reader_& reader, std::size_t& available) {
28 available = 0;
29 while (reader.load()) {
30 available = reader.available(); // continue collecting bytes if a zero-length buffer is returned without load() returning false.
31 if (available) {
32 break;
33 }
34 }
35
36 // If available == 0 on return, then reader->load() must be false,
37 // and there are no more bytes left in the source.
38}
51template<typename Type_>
53public:
57 PerByteInterface() = default;
59 PerByteInterface(const PerByteInterface&) = delete; // not copyable.
60 PerByteInterface& operator=(PerByteInterface&&) = delete;
61 PerByteInterface& operator=(const PerByteInterface&) = delete; // not copyable.
62 virtual ~PerByteInterface() = default;
67protected:
71 const Type_* ptr = nullptr;
72
76 std::size_t available = 0;
77
81 virtual void refill() = 0;
82
83private:
84 std::size_t my_current = 0;
85
86 // Standard guarantees this to be at least 64 bits, which is more than that of size_t.
87 // We don't use uint64_t because that might not be defined by the implementation.
88 unsigned long long my_overall = 0;
89
90public:
97 bool advance() {
98 ++my_current;
99 if (my_current < available) {
100 return true;
101 }
102
103 my_current = 0;
104 my_overall += available;
105 refill();
106 return available > 0; // Check that we haven't reached the end of the reader.
107 }
108
114 Type_ get() const {
115 return ptr[my_current];
116 }
117
121 unsigned long long position() const {
122 return my_overall + my_current;
123 }
124
128 bool valid() const {
129 return my_current < available;
130 }
131
132public:
147 std::pair<std::size_t, bool> extract(std::size_t number, Type_* output) {
148 std::size_t original = number;
149 bool okay = true;
150
151 while (1) {
152 auto start = ptr + my_current;
153 auto leftover = available - my_current;
154
155 if (leftover > number) {
156 my_current += number;
157 number = 0;
158 std::copy(start, ptr + my_current, output);
159 break;
160
161 } else {
162 number -= leftover;
163 std::copy(start, ptr + available, output);
164
165 my_current = 0;
166 my_overall += available;
167 refill();
168
169 okay = (available > 0);
170 if (number == 0 || !okay) {
171 break;
172 }
173 output += leftover;
174 }
175 }
176
177 return std::make_pair(original - number, okay);
178 }
179};
180
181
193template<typename Type_, class Pointer_ = std::unique_ptr<Reader> >
194class PerByteSerial final : public PerByteInterface<Type_> {
195public:
199 PerByteSerial(Pointer_ reader) : my_reader(std::move(reader)) {
200 refill();
201 }
202
203private:
204 Pointer_ my_reader;
205
206protected:
207 void refill() {
208 skip_zero_buffers(*my_reader, this->available);
209 this->ptr = reinterpret_cast<const Type_*>(my_reader->buffer());
210 }
211};
212
224template<typename Type_, class Pointer_ = std::unique_ptr<Reader> >
225class PerByteParallel final : public PerByteInterface<Type_> {
226public:
230 PerByteParallel(Pointer_ reader) : my_reader(std::move(reader)) {
231 my_ready_input = false;
232 my_thread = std::thread([&]() { thread_loop(); });
233
234 skip_zero_buffers(*my_reader, my_next_available);
235 my_ready_output = true; // run the first iteration of refill().
236 refill();
237 }
238
243 if (!my_finished) {
244 std::unique_lock lck(my_mut);
245 my_finished = true;
246 my_ready_input = true;
247 lck.unlock(); // releasing the lock so that the notified thread doesn't immediately block.
248 my_cv.notify_one();
249 }
250 my_thread.join();
251 }
256private:
257 Pointer_ my_reader;
258 std::vector<Type_> my_buffer;
259 std::size_t my_next_available = 0;
260 bool my_finished = false;
261
262private:
263 std::thread my_thread;
264 std::exception_ptr my_thread_err = nullptr;
265 std::mutex my_mut;
266 std::condition_variable my_cv;
267 bool my_ready_input, my_ready_output;
268
269 void thread_loop() {
270 while (!my_finished) {
271 std::unique_lock lck(my_mut);
272 my_cv.wait(lck, [&]() { return my_ready_input; });
273 my_ready_input = false;
274
275 if (my_finished) { // an explicit kill signal from the destructor.
276 break;
277 }
278
279 try {
280 skip_zero_buffers(*my_reader, my_next_available);
281 my_finished = my_next_available == 0; // see the definition of skip_zero_buffers().
282 } catch (...) {
283 my_thread_err = std::current_exception();
284 my_finished = true;
285 }
286
287 my_ready_output = true;
288 lck.unlock();
289 my_cv.notify_one();
290 }
291 }
292
293protected:
294 void refill() {
295 if (my_finished) {
296 this->ptr = nullptr;
297 this->available = 0;
298 return;
299 }
300
301 std::unique_lock lck(my_mut);
302 my_cv.wait(lck, [&]() { return my_ready_output; });
303 my_ready_output = false;
304 if (my_thread_err) {
305 std::rethrow_exception(my_thread_err);
306 }
307
308 auto rptr = reinterpret_cast<const Type_*>(my_reader->buffer());
309 this->available = my_next_available;
310 my_buffer.resize(this->available);
311 std::copy_n(rptr, this->available, my_buffer.data());
312 this->ptr = my_buffer.data();
313
314 my_ready_input = true;
315 lck.unlock();
316 if (!my_finished) {
317 my_cv.notify_one();
318 }
319 }
320};
321
322}
323
324#endif
Read an input source.
Interface for byte-by-byte extraction from a Reader source.
Definition PerByte.hpp:52
unsigned long long position() const
Definition PerByte.hpp:121
const Type_ * ptr
Definition PerByte.hpp:71
bool valid() const
Definition PerByte.hpp:128
Type_ get() const
Definition PerByte.hpp:114
bool advance()
Definition PerByte.hpp:97
std::pair< std::size_t, bool > extract(std::size_t number, Type_ *output)
Definition PerByte.hpp:147
virtual void refill()=0
std::size_t available
Definition PerByte.hpp:76
Parallelized byte-by-byte extraction from a Reader source.
Definition PerByte.hpp:225
void refill()
Definition PerByte.hpp:294
PerByteParallel(Pointer_ reader)
Definition PerByte.hpp:230
Serial byte-by-byte extraction from a Reader source.
Definition PerByte.hpp:194
PerByteSerial(Pointer_ reader)
Definition PerByte.hpp:199
void refill()
Definition PerByte.hpp:207
Simple byte readers and writers.