byteme
C++ wrappers for buffered inputs
Loading...
Searching...
No Matches
BufferedWriter.hpp
Go to the documentation of this file.
1#ifndef BYTEME_BUFFERED_WRITER_HPP
2#define BYTEME_BUFFERED_WRITER_HPP
3
9#include <thread>
10#include <condition_variable>
11#include <mutex>
12#include <vector>
13#include <algorithm>
14#include <exception>
15#include <type_traits>
16#include <memory>
17#include <cstddef>
18
20
21#include "Writer.hpp"
22#include "utils.hpp"
23
24namespace byteme {
25
38template<typename Type_, class WriterPointer_ = std::unique_ptr<Writer> >
40public:
44 BufferedWriter(std::size_t buffer_size) : my_buffer(sanisizer::cast<I<decltype(my_buffer.size())> >(buffer_size)) {
45 if (buffer_size == 0) {
46 throw std::runtime_error("buffer size must be positive");
47 }
48 }
49
51 BufferedWriter(const BufferedWriter&) = delete; // not copyable.
52 BufferedWriter& operator=(BufferedWriter&&) = delete;
53 BufferedWriter& operator=(const BufferedWriter&) = delete; // not copyable.
54
55 virtual ~BufferedWriter() = default;
60protected:
64 virtual void flush_async() = 0;
65
66 virtual void flush_sync(const Type_*, std::size_t) = 0;
67
68 std::vector<Type_>& get_buffer() {
69 return my_buffer;
70 }
75private:
76 std::vector<Type_> my_buffer;
77 std::size_t my_current = 0;
78
79 // Standard guarantees this to be at least 64 bits, which is more than that of size_t.
80 // We don't use uint64_t because that might not be defined by the implementation.
81 unsigned long long my_overall = 0;
82
83public:
87 unsigned long long number() {
88 return my_overall + my_current;
89 }
90
91public:
98 void write(Type_ input) {
99 my_buffer[my_current] = input;
100 ++my_current;
101
102 const auto buffer_size = my_buffer.size();
103 if (my_current == buffer_size) {
104 flush_async();
105 my_current = 0;
106 my_overall += buffer_size;
107 }
108 }
109
117 void write(const Type_* input, std::size_t number) {
118 const auto buffer_size = my_buffer.size();
119
120 // Seeing if we fill up the rest of our buffer or not.
121 const std::size_t remaining = buffer_size - my_current;
122 if (number < remaining) {
123 std::copy_n(input, number, my_buffer.data() + my_current);
124 my_current += number;
125 return;
126 }
127
128 std::copy_n(input, remaining, my_buffer.data() + my_current);
129 number -= remaining;
130 input += remaining;
131
132 if (number == 0) {
133 flush_async();
134 my_overall += buffer_size;
135 my_current = 0;
136 return;
137 }
138
139 flush_sync(my_buffer.data(), buffer_size);
140 my_overall += buffer_size;
141
142 // Directly filling the output array, bypassing our buffer.
143 while (number >= buffer_size) {
144 flush_sync(input, buffer_size);
145 my_overall += buffer_size;
146 input += buffer_size;
147 number -= buffer_size;
148 }
149
150 // Filling the cache and copying the remainder into the output array.
151 std::copy_n(input, number, my_buffer.data());
152 my_current = number;
153 }
154
155public:
161 void flush() {
162 flush_sync(my_buffer.data(), my_current); // Flush whatever's left.
163 my_overall += my_current;
164 my_current = 0;
165 }
166
173 virtual void finish() = 0;
174};
175
186template<typename Type_, class Pointer_ = std::unique_ptr<Writer> >
187class SerialBufferedWriter final : public BufferedWriter<Type_> {
188public:
195 SerialBufferedWriter(Pointer_ writer, std::size_t buffer_size) : BufferedWriter<Type_>(buffer_size), my_writer(std::move(writer)) {}
196
201 this->flush();
202 }
207private:
208 Pointer_ my_writer;
209
210protected:
214 void flush_async() {
215 auto& buffer = this->get_buffer();
216 my_writer->write(reinterpret_cast<unsigned char*>(buffer.data()), buffer.size());
217 }
218
219 void flush_sync(const Type_* ptr, std::size_t num) {
220 my_writer->write(reinterpret_cast<const unsigned char*>(ptr), num);
221 }
222
223 void finish() {
224 this->flush();
225 my_writer->finish();
226 }
230};
231
243template<typename Type_, class Pointer_ = std::unique_ptr<Writer> >
244class ParallelBufferedWriter final : public BufferedWriter<Type_> {
245public:
252 ParallelBufferedWriter(Pointer_ writer, std::size_t buffer_size) :
253 BufferedWriter<Type_>(buffer_size),
254 my_writer(std::move(writer)),
255 my_buffer_worker(sanisizer::cast<I<decltype(my_buffer_worker.size())> >(buffer_size))
256 {
257 my_thread = std::thread([&]() { thread_loop(); }); // set up thread before initializing.
258 }
259
264 this->flush();
265
266 std::unique_lock lck(my_mut);
267 my_kill = true;
268 my_ready_input = true;
269 lck.unlock(); // releasing the lock so that the notified thread doesn't immediately block.
270 my_cv.notify_one();
271 my_thread.join();
272 }
277private:
278 Pointer_ my_writer;
279 std::vector<Type_> my_buffer_worker;
280 std::size_t my_to_write = 0;
281
282private:
283 std::thread my_thread;
284 std::exception_ptr my_thread_err = nullptr;
285 std::mutex my_mut;
286 std::condition_variable my_cv;
287
288 bool my_ready_input = false, my_ready_output = false;
289 bool my_worker_active = false;
290 bool my_kill = false;
291
292 void thread_loop() {
293 while (1) {
294 std::unique_lock lck(my_mut);
295 my_cv.wait(lck, [&]() { return my_ready_input; });
296 my_ready_input = false;
297
298 if (my_kill) { // Handle an explicit kill signal from the destructor.
299 break;
300 }
301
302 try {
303 my_writer->write(reinterpret_cast<const unsigned char*>(my_buffer_worker.data()), my_to_write);
304 } catch (...) {
305 my_thread_err = std::current_exception();
306 }
307
308 my_ready_output = true;
309 lck.unlock();
310 my_cv.notify_one();
311 }
312 }
313
314protected:
318 void flush_async() {
319 auto& buffer = this->get_buffer();
320 const auto num = buffer.size();
321
322 if (my_worker_active) {
323 // If the worker is active, it's probably already gotten started so we just wait for it to finish.
324 // Then we swap the results with the main buffer and submit a new job to the worker.
325 std::unique_lock lck(my_mut);
326 my_cv.wait(lck, [&]() { return my_ready_output; });
327 my_ready_output = false;
328
329 if (my_thread_err) {
330 std::rethrow_exception(my_thread_err);
331 }
332
333 buffer.swap(my_buffer_worker);
334 my_to_write = num;
335
336 my_ready_input = true;
337 lck.unlock();
338 my_cv.notify_one();
339
340 } else {
341 // If the worker is not active, we can just submit directly.
342 std::unique_lock lck(my_mut);
343 buffer.swap(my_buffer_worker);
344 my_to_write = num;
345
346 my_ready_input = true;
347 lck.unlock();
348 my_cv.notify_one();
349 my_worker_active = true;
350 }
351 }
352
353 void flush_sync(const Type_* ptr, std::size_t num) {
354 if (my_worker_active) {
355 // If the worker is active, we wait for it to finish and then write from the supplied pointer.
356 // We do not submit a new job, based on the loop in the BufferedWriter::write() overload:
357 //
358 // - We'll probably want to call this flush() overload again.
359 // On subsequent calls, we'll want to directly write from the supplied pointer to the Writer to cut out a copy.
360 // - If we didn't call this overload again, we'd probably call the other refill() overload immediately,
361 // So if we sent a new job to the worker, we'd end up immediately blocking on it to submit the next write.
362 // So we might as well skip that communication overhead and write directly on the main thread.
363 std::unique_lock lck(my_mut);
364 my_cv.wait(lck, [&]() { return my_ready_output; });
365 my_ready_output = false;
366 if (my_thread_err) {
367 std::rethrow_exception(my_thread_err);
368 }
369
370 my_writer->write(reinterpret_cast<const unsigned char*>(ptr), num);
371 my_worker_active = false;
372
373 } else {
374 // If the worker isn't active, we can directly write from the supplied pointer, avoiding some communication overhead.
375 my_writer->write(reinterpret_cast<const unsigned char*>(ptr), num);
376 }
377 }
378
379 void finish() {
380 this->flush();
381 my_writer->finish();
382 }
386};
387
388}
389
390#endif
Write to an output sink.
Buffered wrapper around a Writer.
Definition BufferedWriter.hpp:39
virtual void finish()=0
void flush()
Definition BufferedWriter.hpp:161
void write(const Type_ *input, std::size_t number)
Definition BufferedWriter.hpp:117
void write(Type_ input)
Definition BufferedWriter.hpp:98
unsigned long long number()
Definition BufferedWriter.hpp:87
Parallelized buffering to wrap a Writer.
Definition BufferedWriter.hpp:244
ParallelBufferedWriter(Pointer_ writer, std::size_t buffer_size)
Definition BufferedWriter.hpp:252
Serial buffering to wrap a Writer.
Definition BufferedWriter.hpp:187
SerialBufferedWriter(Pointer_ writer, std::size_t buffer_size)
Definition BufferedWriter.hpp:195
Simple byte readers and writers.
Definition BufferedReader.hpp:21
constexpr Dest_ cast(Value_ x)