1#ifndef BYTEME_BUFFERED_WRITER_HPP
2#define BYTEME_BUFFERED_WRITER_HPP
10#include <condition_variable>
38template<
typename Type_,
class WriterPo
inter_ = std::unique_ptr<Writer> >
45 if (buffer_size == 0) {
46 throw std::runtime_error(
"buffer size must be positive");
64 virtual void flush_async() = 0;
66 virtual void flush_sync(
const Type_*, std::size_t) = 0;
68 std::vector<Type_>& get_buffer() {
76 std::vector<Type_> my_buffer;
77 std::size_t my_current = 0;
81 unsigned long long my_overall = 0;
88 return my_overall + my_current;
99 my_buffer[my_current] = input;
102 const auto buffer_size = my_buffer.size();
103 if (my_current == buffer_size) {
106 my_overall += buffer_size;
118 const auto buffer_size = my_buffer.size();
121 const std::size_t remaining = buffer_size - my_current;
123 std::copy_n(input,
number, my_buffer.data() + my_current);
128 std::copy_n(input, remaining, my_buffer.data() + my_current);
134 my_overall += buffer_size;
139 flush_sync(my_buffer.data(), buffer_size);
140 my_overall += buffer_size;
143 while (
number >= buffer_size) {
144 flush_sync(input, buffer_size);
145 my_overall += buffer_size;
146 input += buffer_size;
151 std::copy_n(input,
number, my_buffer.data());
162 flush_sync(my_buffer.data(), my_current);
163 my_overall += my_current;
186template<
typename Type_,
class Po
inter_ = std::unique_ptr<Writer> >
215 auto& buffer = this->get_buffer();
216 my_writer->write(
reinterpret_cast<unsigned char*
>(buffer.data()), buffer.size());
219 void flush_sync(
const Type_* ptr, std::size_t num) {
220 my_writer->write(
reinterpret_cast<const unsigned char*
>(ptr), num);
243template<
typename Type_,
class Po
inter_ = std::unique_ptr<Writer> >
254 my_writer(std::move(writer)),
255 my_buffer_worker(
sanisizer::cast<I<decltype(my_buffer_worker.size())> >(buffer_size))
257 my_thread = std::thread([&]() { thread_loop(); });
266 std::unique_lock lck(my_mut);
268 my_ready_input =
true;
279 std::vector<Type_> my_buffer_worker;
280 std::size_t my_to_write = 0;
283 std::thread my_thread;
284 std::exception_ptr my_thread_err =
nullptr;
286 std::condition_variable my_cv;
288 bool my_ready_input =
false, my_ready_output =
false;
289 bool my_worker_active =
false;
290 bool my_kill =
false;
294 std::unique_lock lck(my_mut);
295 my_cv.wait(lck, [&]() {
return my_ready_input; });
296 my_ready_input =
false;
303 my_writer->write(
reinterpret_cast<const unsigned char*
>(my_buffer_worker.data()), my_to_write);
305 my_thread_err = std::current_exception();
308 my_ready_output =
true;
319 auto& buffer = this->get_buffer();
320 const auto num = buffer.size();
322 if (my_worker_active) {
325 std::unique_lock lck(my_mut);
326 my_cv.wait(lck, [&]() {
return my_ready_output; });
327 my_ready_output =
false;
330 std::rethrow_exception(my_thread_err);
333 buffer.swap(my_buffer_worker);
336 my_ready_input =
true;
342 std::unique_lock lck(my_mut);
343 buffer.swap(my_buffer_worker);
346 my_ready_input =
true;
349 my_worker_active =
true;
353 void flush_sync(
const Type_* ptr, std::size_t num) {
354 if (my_worker_active) {
363 std::unique_lock lck(my_mut);
364 my_cv.wait(lck, [&]() {
return my_ready_output; });
365 my_ready_output =
false;
367 std::rethrow_exception(my_thread_err);
370 my_writer->write(
reinterpret_cast<const unsigned char*
>(ptr), num);
371 my_worker_active =
false;
375 my_writer->write(
reinterpret_cast<const unsigned char*
>(ptr), num);
Buffered wrapper around a Writer.
Definition BufferedWriter.hpp:39
void flush()
Definition BufferedWriter.hpp:161
void write(const Type_ *input, std::size_t number)
Definition BufferedWriter.hpp:117
void write(Type_ input)
Definition BufferedWriter.hpp:98
unsigned long long number()
Definition BufferedWriter.hpp:87
Parallelized buffering to wrap a Writer.
Definition BufferedWriter.hpp:244
ParallelBufferedWriter(Pointer_ writer, std::size_t buffer_size)
Definition BufferedWriter.hpp:252
Serial buffering to wrap a Writer.
Definition BufferedWriter.hpp:187
SerialBufferedWriter(Pointer_ writer, std::size_t buffer_size)
Definition BufferedWriter.hpp:195
Simple byte readers and writers.
Definition BufferedReader.hpp:21
constexpr Dest_ cast(Value_ x)