byteme
Read/write bytes from various sources
Loading...
Searching...
No Matches
BufferedWriter.hpp
Go to the documentation of this file.
1#ifndef BYTEME_BUFFERED_WRITER_HPP
2#define BYTEME_BUFFERED_WRITER_HPP
3
9#include <thread>
10#include <condition_variable>
11#include <mutex>
12#include <vector>
13#include <algorithm>
14#include <exception>
15#include <type_traits>
16#include <memory>
17#include <cstddef>
18#include <string>
19
21
22#include "Writer.hpp"
23#include "utils.hpp"
24
25namespace byteme {
26
39template<typename Type_, class WriterPointer_ = std::unique_ptr<Writer> >
41public:
45 BufferedWriter(std::size_t buffer_size) : my_buffer(sanisizer::cast<I<decltype(my_buffer.size())> >(buffer_size)) {
46 if (buffer_size == 0) {
47 throw std::runtime_error("buffer size must be positive");
48 }
49 }
50
52 BufferedWriter(const BufferedWriter&) = delete; // not copyable.
53 BufferedWriter& operator=(BufferedWriter&&) = delete;
54 BufferedWriter& operator=(const BufferedWriter&) = delete; // not copyable.
55
56 virtual ~BufferedWriter() = default;
61protected:
65 virtual void flush_async() = 0;
66
67 virtual void flush_sync(const Type_*, std::size_t) = 0;
68
69 std::vector<Type_>& get_buffer() {
70 return my_buffer;
71 }
76private:
77 std::vector<Type_> my_buffer;
78 std::size_t my_current = 0;
79
80 // Standard guarantees this to be at least 64 bits, which is more than that of size_t.
81 // We don't use uint64_t because that might not be defined by the implementation.
82 unsigned long long my_overall = 0;
83
84public:
88 unsigned long long number() {
89 return my_overall + my_current;
90 }
91
92public:
99 void write(Type_ input) {
100 my_buffer[my_current] = input;
101 ++my_current;
102
103 const auto buffer_size = my_buffer.size();
104 if (my_current == buffer_size) {
105 flush_async();
106 my_current = 0;
107 my_overall += buffer_size;
108 }
109 }
110
118 void write(const Type_* input, std::size_t number) {
119 const auto buffer_size = my_buffer.size();
120
121 // Seeing if we fill up the rest of our buffer or not.
122 const std::size_t remaining = buffer_size - my_current;
123 if (number < remaining) {
124 std::copy_n(input, number, my_buffer.data() + my_current);
125 my_current += number;
126 return;
127 }
128
129 std::copy_n(input, remaining, my_buffer.data() + my_current);
130 number -= remaining;
131 input += remaining;
132
133 if (number == 0) {
134 flush_async();
135 my_overall += buffer_size;
136 my_current = 0;
137 return;
138 }
139
140 flush_sync(my_buffer.data(), buffer_size);
141 my_overall += buffer_size;
142
143 // Directly filling the output array, bypassing our buffer.
144 while (number >= buffer_size) {
145 flush_sync(input, buffer_size);
146 my_overall += buffer_size;
147 input += buffer_size;
148 number -= buffer_size;
149 }
150
151 // Filling the cache and copying the remainder into the output array.
152 std::copy_n(input, number, my_buffer.data());
153 my_current = number;
154 }
155
162 void write(const char* string) {
163 write(reinterpret_cast<const Type_*>(string), std::char_traits<char>::length(string));
164 }
165
172 void write(const std::string& string) {
173 write(reinterpret_cast<const Type_*>(string.c_str()), string.size());
174 }
175
176public:
182 void flush() {
183 flush_sync(my_buffer.data(), my_current); // Flush whatever's left.
184 my_overall += my_current;
185 my_current = 0;
186 }
187
194 virtual void finish() = 0;
195};
196
207template<typename Type_, class Pointer_ = std::unique_ptr<Writer> >
208class SerialBufferedWriter final : public BufferedWriter<Type_> {
209public:
216 SerialBufferedWriter(Pointer_ writer, std::size_t buffer_size) : BufferedWriter<Type_>(buffer_size), my_writer(std::move(writer)) {}
217
222 this->flush();
223 }
228private:
229 Pointer_ my_writer;
230
231protected:
235 void flush_async() {
236 auto& buffer = this->get_buffer();
237 my_writer->write(reinterpret_cast<unsigned char*>(buffer.data()), buffer.size());
238 }
239
240 void flush_sync(const Type_* ptr, std::size_t num) {
241 my_writer->write(reinterpret_cast<const unsigned char*>(ptr), num);
242 }
247public:
248 void finish() {
249 this->flush();
250 my_writer->finish();
251 }
252};
253
265template<typename Type_, class Pointer_ = std::unique_ptr<Writer> >
266class ParallelBufferedWriter final : public BufferedWriter<Type_> {
267public:
274 ParallelBufferedWriter(Pointer_ writer, std::size_t buffer_size) :
275 BufferedWriter<Type_>(buffer_size),
276 my_writer(std::move(writer)),
277 my_buffer_worker(sanisizer::cast<I<decltype(my_buffer_worker.size())> >(buffer_size))
278 {
279 my_thread = std::thread([&]() { thread_loop(); }); // set up thread before initializing.
280 }
281
286 this->flush();
287
288 std::unique_lock lck(my_mut);
289 my_kill = true;
290 my_ready_input = true;
291 lck.unlock(); // releasing the lock so that the notified thread doesn't immediately block.
292 my_cv.notify_one();
293 my_thread.join();
294 }
299private:
300 Pointer_ my_writer;
301 std::vector<Type_> my_buffer_worker;
302 std::size_t my_to_write = 0;
303
304private:
305 std::thread my_thread;
306 std::exception_ptr my_thread_err = nullptr;
307 std::mutex my_mut;
308 std::condition_variable my_cv;
309
310 bool my_ready_input = false, my_ready_output = false;
311 bool my_worker_active = false;
312 bool my_kill = false;
313
314 void thread_loop() {
315 while (1) {
316 std::unique_lock lck(my_mut);
317 my_cv.wait(lck, [&]() { return my_ready_input; });
318 my_ready_input = false;
319
320 if (my_kill) { // Handle an explicit kill signal from the destructor.
321 break;
322 }
323
324 try {
325 my_writer->write(reinterpret_cast<const unsigned char*>(my_buffer_worker.data()), my_to_write);
326 } catch (...) {
327 my_thread_err = std::current_exception();
328 }
329
330 my_ready_output = true;
331 lck.unlock();
332 my_cv.notify_one();
333 }
334 }
335
336protected:
340 void flush_async() {
341 auto& buffer = this->get_buffer();
342 const auto num = buffer.size();
343
344 if (my_worker_active) {
345 // If the worker is active, it's probably already gotten started so we just wait for it to finish.
346 // Then we swap the results with the main buffer and submit a new job to the worker.
347 std::unique_lock lck(my_mut);
348 my_cv.wait(lck, [&]() { return my_ready_output; });
349 my_ready_output = false;
350
351 if (my_thread_err) {
352 std::rethrow_exception(my_thread_err);
353 }
354
355 buffer.swap(my_buffer_worker);
356 my_to_write = num;
357
358 my_ready_input = true;
359 lck.unlock();
360 my_cv.notify_one();
361
362 } else {
363 // If the worker is not active, we can just submit directly.
364 std::unique_lock lck(my_mut);
365 buffer.swap(my_buffer_worker);
366 my_to_write = num;
367
368 my_ready_input = true;
369 lck.unlock();
370 my_cv.notify_one();
371 my_worker_active = true;
372 }
373 }
374
375 void flush_sync(const Type_* ptr, std::size_t num) {
376 if (my_worker_active) {
377 // If the worker is active, we wait for it to finish and then write from the supplied pointer.
378 // We do not submit a new job, based on the loop in the BufferedWriter::write() overload:
379 //
380 // - We'll probably want to call this flush() overload again.
381 // On subsequent calls, we'll want to directly write from the supplied pointer to the Writer to cut out a copy.
382 // - If we didn't call this overload again, we'd probably call the other refill() overload immediately,
383 // So if we sent a new job to the worker, we'd end up immediately blocking on it to submit the next write.
384 // So we might as well skip that communication overhead and write directly on the main thread.
385 std::unique_lock lck(my_mut);
386 my_cv.wait(lck, [&]() { return my_ready_output; });
387 my_ready_output = false;
388 if (my_thread_err) {
389 std::rethrow_exception(my_thread_err);
390 }
391
392 my_writer->write(reinterpret_cast<const unsigned char*>(ptr), num);
393 my_worker_active = false;
394
395 } else {
396 // If the worker isn't active, we can directly write from the supplied pointer, avoiding some communication overhead.
397 my_writer->write(reinterpret_cast<const unsigned char*>(ptr), num);
398 }
399 }
404public:
405 void finish() {
406 this->flush();
407 my_writer->finish();
408 }
409};
410
411}
412
413#endif
Write to an output sink.
Buffered wrapper around a Writer.
Definition BufferedWriter.hpp:40
virtual void finish()=0
void flush()
Definition BufferedWriter.hpp:182
void write(const Type_ *input, std::size_t number)
Definition BufferedWriter.hpp:118
void write(const char *string)
Definition BufferedWriter.hpp:162
void write(Type_ input)
Definition BufferedWriter.hpp:99
void write(const std::string &string)
Definition BufferedWriter.hpp:172
unsigned long long number()
Definition BufferedWriter.hpp:88
Parallelized buffering to wrap a Writer.
Definition BufferedWriter.hpp:266
ParallelBufferedWriter(Pointer_ writer, std::size_t buffer_size)
Definition BufferedWriter.hpp:274
void finish()
Definition BufferedWriter.hpp:405
Serial buffering to wrap a Writer.
Definition BufferedWriter.hpp:208
void finish()
Definition BufferedWriter.hpp:248
SerialBufferedWriter(Pointer_ writer, std::size_t buffer_size)
Definition BufferedWriter.hpp:216
Simple byte readers and writers.
Definition BufferedReader.hpp:21
constexpr Dest_ cast(Value_ x)