1#ifndef BYTEME_BUFFERED_WRITER_HPP
2#define BYTEME_BUFFERED_WRITER_HPP
10#include <condition_variable>
39template<
typename Type_,
class WriterPo
inter_ = std::unique_ptr<Writer> >
46 if (buffer_size == 0) {
47 throw std::runtime_error(
"buffer size must be positive");
65 virtual void flush_async() = 0;
67 virtual void flush_sync(
const Type_*, std::size_t) = 0;
69 std::vector<Type_>& get_buffer() {
77 std::vector<Type_> my_buffer;
78 std::size_t my_current = 0;
82 unsigned long long my_overall = 0;
89 return my_overall + my_current;
100 my_buffer[my_current] = input;
103 const auto buffer_size = my_buffer.size();
104 if (my_current == buffer_size) {
107 my_overall += buffer_size;
119 const auto buffer_size = my_buffer.size();
122 const std::size_t remaining = buffer_size - my_current;
124 std::copy_n(input,
number, my_buffer.data() + my_current);
129 std::copy_n(input, remaining, my_buffer.data() + my_current);
135 my_overall += buffer_size;
140 flush_sync(my_buffer.data(), buffer_size);
141 my_overall += buffer_size;
144 while (
number >= buffer_size) {
145 flush_sync(input, buffer_size);
146 my_overall += buffer_size;
147 input += buffer_size;
152 std::copy_n(input,
number, my_buffer.data());
163 write(
reinterpret_cast<const Type_*
>(
string), std::char_traits<char>::length(
string));
172 void write(
const std::string&
string) {
173 write(
reinterpret_cast<const Type_*
>(
string.c_str()),
string.size());
183 flush_sync(my_buffer.data(), my_current);
184 my_overall += my_current;
207template<
typename Type_,
class Po
inter_ = std::unique_ptr<Writer> >
236 auto& buffer = this->get_buffer();
237 my_writer->write(
reinterpret_cast<unsigned char*
>(buffer.data()), buffer.size());
240 void flush_sync(
const Type_* ptr, std::size_t num) {
241 my_writer->write(
reinterpret_cast<const unsigned char*
>(ptr), num);
265template<
typename Type_,
class Po
inter_ = std::unique_ptr<Writer> >
276 my_writer(std::move(writer)),
277 my_buffer_worker(
sanisizer::cast<I<decltype(my_buffer_worker.size())> >(buffer_size))
279 my_thread = std::thread([&]() { thread_loop(); });
288 std::unique_lock lck(my_mut);
290 my_ready_input =
true;
301 std::vector<Type_> my_buffer_worker;
302 std::size_t my_to_write = 0;
305 std::thread my_thread;
306 std::exception_ptr my_thread_err =
nullptr;
308 std::condition_variable my_cv;
310 bool my_ready_input =
false, my_ready_output =
false;
311 bool my_worker_active =
false;
312 bool my_kill =
false;
316 std::unique_lock lck(my_mut);
317 my_cv.wait(lck, [&]() {
return my_ready_input; });
318 my_ready_input =
false;
325 my_writer->write(
reinterpret_cast<const unsigned char*
>(my_buffer_worker.data()), my_to_write);
327 my_thread_err = std::current_exception();
330 my_ready_output =
true;
341 auto& buffer = this->get_buffer();
342 const auto num = buffer.size();
344 if (my_worker_active) {
347 std::unique_lock lck(my_mut);
348 my_cv.wait(lck, [&]() {
return my_ready_output; });
349 my_ready_output =
false;
352 std::rethrow_exception(my_thread_err);
355 buffer.swap(my_buffer_worker);
358 my_ready_input =
true;
364 std::unique_lock lck(my_mut);
365 buffer.swap(my_buffer_worker);
368 my_ready_input =
true;
371 my_worker_active =
true;
375 void flush_sync(
const Type_* ptr, std::size_t num) {
376 if (my_worker_active) {
385 std::unique_lock lck(my_mut);
386 my_cv.wait(lck, [&]() {
return my_ready_output; });
387 my_ready_output =
false;
389 std::rethrow_exception(my_thread_err);
392 my_writer->write(
reinterpret_cast<const unsigned char*
>(ptr), num);
393 my_worker_active =
false;
397 my_writer->write(
reinterpret_cast<const unsigned char*
>(ptr), num);
Buffered wrapper around a Writer.
Definition BufferedWriter.hpp:40
void flush()
Definition BufferedWriter.hpp:182
void write(const Type_ *input, std::size_t number)
Definition BufferedWriter.hpp:118
void write(const char *string)
Definition BufferedWriter.hpp:162
void write(Type_ input)
Definition BufferedWriter.hpp:99
void write(const std::string &string)
Definition BufferedWriter.hpp:172
unsigned long long number()
Definition BufferedWriter.hpp:88
Parallelized buffering to wrap a Writer.
Definition BufferedWriter.hpp:266
ParallelBufferedWriter(Pointer_ writer, std::size_t buffer_size)
Definition BufferedWriter.hpp:274
void finish()
Definition BufferedWriter.hpp:405
Serial buffering to wrap a Writer.
Definition BufferedWriter.hpp:208
void finish()
Definition BufferedWriter.hpp:248
SerialBufferedWriter(Pointer_ writer, std::size_t buffer_size)
Definition BufferedWriter.hpp:216
Simple byte readers and writers.
Definition BufferedReader.hpp:21
constexpr Dest_ cast(Value_ x)