byteme
C++ wrappers for buffered inputs
Loading...
Searching...
No Matches
PerByte.hpp
Go to the documentation of this file.
1#ifndef BYTEME_PERBYTE_HPP
2#define BYTEME_PERBYTE_HPP
3
9#include <thread>
10#include <condition_variable>
11#include <mutex>
12#include <vector>
13#include <algorithm>
14#include <exception>
15#include <type_traits>
16
17#include "Reader.hpp"
18
19namespace byteme {
20
24template<class Pointer_>
25void skip_zero_buffers(Pointer_& reader, size_t& available) {
26 available = 0;
27 while (reader->load()) {
28 available = reader->available(); // continue collecting bytes if a zero-length buffer is returned without load() returning false.
29 if (available) {
30 break;
31 }
32 }
33
34 // If available == 0 on return, then reader->load() must be false,
35 // and there are no more bytes left in the source.
36}
50template<typename Type_ = char, class Pointer_ = Reader*>
51class PerByte {
52private:
53 const Type_* my_ptr = nullptr;
54 size_t my_available = 0;
55 size_t my_current = 0;
56 size_t my_overall = 0;
57
58 Pointer_ my_reader;
59
60 void refill() {
61 my_overall += my_available;
62 skip_zero_buffers(my_reader, my_available);
63 my_ptr = reinterpret_cast<const Type_*>(my_reader->buffer());
64 my_current = 0;
65 }
66
67public:
71 PerByte(Pointer_ reader) : my_reader(std::move(reader)) {
72 refill();
73 }
74
78 bool valid() const {
79 return my_current < my_available;
80 }
81
88 bool advance() {
89 ++my_current;
90 if (my_current < my_available) {
91 return true;
92 }
93
94 refill();
95 return my_available > 0; // Check that we haven't reached the end of the reader.
96 }
97
103 Type_ get() const {
104 return my_ptr[my_current];
105 }
106
110 size_t position() const {
111 return my_overall + my_current;
112 }
113
128 std::pair<size_t, bool> extract(size_t number, Type_* output) {
129 size_t original = number;
130 bool okay = true;
131
132 while (1) {
133 auto start = my_ptr + my_current;
134 auto leftover = my_available - my_current;
135
136 if (leftover > number) {
137 my_current += number;
138 number = 0;
139 std::copy(start, my_ptr + my_current, output);
140 break;
141
142 } else {
143 number -= leftover;
144 std::copy(start, my_ptr + my_available, output);
145 refill();
146
147 okay = (my_available > 0);
148 if (number == 0 || !okay) {
149 break;
150 }
151 output += leftover;
152 }
153 }
154
155 return std::make_pair(original - number, okay);
156 }
157};
158
168template<typename Type_ = char, class Pointer_ = Reader*>
170private:
171 size_t my_current = 0;
172 size_t my_available = 0;
173 size_t my_overall = 0;
174
175 Pointer_ my_reader;
176
177 std::vector<Type_> my_buffer;
178 size_t my_next_available = 0;
179 bool my_finished = false;
180
181private:
182 std::thread my_thread;
183 std::exception_ptr my_thread_err = nullptr;
184 std::mutex my_mut;
185 std::condition_variable my_cv;
186 bool my_ready_input, my_ready_output;
187
188 void thread_loop() {
189 while (!my_finished) {
190 std::unique_lock lck(my_mut);
191 my_cv.wait(lck, [&]() { return my_ready_input; });
192 my_ready_input = false;
193
194 if (my_finished) { // an explicit kill signal from the destructor, see below.
195 break;
196 }
197
198 try {
199 skip_zero_buffers(my_reader, my_next_available);
200 my_finished = my_next_available == 0; // see the definition of skip_zero_buffers().
201 } catch (...) {
202 my_thread_err = std::current_exception();
203 my_finished = true;
204 }
205
206 my_ready_output = true;
207 lck.unlock();
208 my_cv.notify_one();
209 }
210 }
211
212private:
213 void refill() {
214 std::unique_lock lck(my_mut);
215 my_cv.wait(lck, [&]() { return my_ready_output; });
216 my_ready_output = false;
217
218 if (my_thread_err) {
219 std::rethrow_exception(my_thread_err);
220 }
221
222 my_overall += my_available;
223
224 auto ptr = reinterpret_cast<const Type_*>(my_reader->buffer());
225 my_available = my_next_available;
226 my_buffer.resize(my_available);
227 my_current = 0;
228
229 std::copy(ptr, ptr + my_available, my_buffer.begin());
230 my_ready_input = true;
231
232 lck.unlock();
233 if (!my_finished) {
234 my_cv.notify_one();
235 }
236 }
237
238public:
242 PerByteParallel(Pointer_ reader) : my_reader(std::move(reader)) {
243 my_ready_input = false;
244 my_thread = std::thread([&]() { thread_loop(); });
245
246 skip_zero_buffers(my_reader, my_next_available);
247 my_ready_output = true; // run the first iteration of refill().
248 refill();
249 }
250
255 if (!my_finished) {
256 std::unique_lock lck(my_mut);
257 my_finished = true;
258 my_ready_input = true;
259 lck.unlock(); // releasing the lock so that the notified thread doesn't immediately block.
260 my_cv.notify_one();
261 }
262 my_thread.join();
263 }
271 bool valid() const {
272 return my_current < my_available;
273 }
274
278 bool advance() {
279 ++my_current;
280 if (my_current < my_available) {
281 return true;
282 }
283
284 if (my_finished) {
285 return false;
286 }
287 refill();
288
289 return my_available > 0; // confirm there's actually bytes to extract in the next round.
290 }
291
295 Type_ get() const {
296 return my_buffer[my_current];
297 }
298
302 size_t position() const {
303 return my_overall + my_current;
304 }
305
320 std::pair<size_t, bool> extract(size_t number, Type_* output) {
321 size_t original = number;
322 bool okay = true;
323
324 while (1) {
325 auto start = my_buffer.data() + my_current;
326 auto leftover = my_available - my_current;
327
328 if (leftover > number) {
329 my_current += number;
330 number = 0;
331 std::copy(start, my_buffer.data() + my_current, output);
332 break;
333
334 } else {
335 number -= leftover;
336 std::copy(start, my_buffer.data() + my_available, output);
337
338 if (my_finished) {
339 my_current += leftover;
340 okay = false;
341 break;
342 }
343 refill();
344
345 okay = (my_available > 0);
346 if (number == 0 || !okay) {
347 break;
348 }
349 output += leftover;
350 }
351 }
352
353 return std::make_pair(original - number, okay);
354 }
355};
356
357}
358
359#endif
Read an input source.
Perform parallelized byte-by-byte extraction from a Reader source.
Definition PerByte.hpp:169
Type_ get() const
Definition PerByte.hpp:295
bool advance()
Definition PerByte.hpp:278
size_t position() const
Definition PerByte.hpp:302
std::pair< size_t, bool > extract(size_t number, Type_ *output)
Definition PerByte.hpp:320
bool valid() const
Definition PerByte.hpp:271
PerByteParallel(Pointer_ reader)
Definition PerByte.hpp:242
Perform byte-by-byte extraction from a Reader source.
Definition PerByte.hpp:51
size_t position() const
Definition PerByte.hpp:110
PerByte(Pointer_ reader)
Definition PerByte.hpp:71
Type_ get() const
Definition PerByte.hpp:103
bool valid() const
Definition PerByte.hpp:78
std::pair< size_t, bool > extract(size_t number, Type_ *output)
Definition PerByte.hpp:128
bool advance()
Definition PerByte.hpp:88
Simple byte readers and writers.