From fd328f0bb1fc8e363a4a4400f0e3afca7c358d77 Mon Sep 17 00:00:00 2001 From: Hicham Azimani Date: Tue, 10 Aug 2021 14:05:30 +0200 Subject: [PATCH 1/2] New thread-safe pipe_streambuf --- threadbuf.h | 170 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 170 insertions(+) diff --git a/threadbuf.h b/threadbuf.h index 1e2b63e..9961a0c 100644 --- a/threadbuf.h +++ b/threadbuf.h @@ -29,6 +29,7 @@ #include #include #include +#include // ---------------------------------------------------------------------------- @@ -115,3 +116,172 @@ class threadbuf : public std::streambuf { return traits_type::eof(); } }; + +template> +class basic_pipebuf : public std::basic_streambuf<_Elem, _Traits> +{ +public: + using traits_type = std::streambuf::traits_type; + using buffer_type = typename std::vector<_Elem>; + using buffer_size_type = typename buffer_type::size_type; + + std::mutex m_mutex; + std::condition_variable m_condition; + bool m_closed; + + buffer_type m_buffer; + _Elem* m_begin; + _Elem* m_end; + buffer_size_type m_chunk_size; + +public: + basic_pipebuf(buffer_size_type buffer_size = 16 * 1024, buffer_size_type chunk_size = 16): + m_closed(false), + m_buffer(buffer_size), + m_begin(&m_buffer[0]), + m_end(&m_buffer[0] + m_buffer.size()), + m_chunk_size(chunk_size) + { + this->setp(m_begin + m_chunk_size, m_begin + 2 * m_chunk_size); + this->setg(m_begin, m_begin + m_chunk_size, m_begin + m_chunk_size); + } + + void close(){ + std::unique_lock lock(m_mutex); + m_closed = true; + m_condition.notify_all(); + } + +private: + traits_type::int_type overflow(traits_type::int_type c) override + { + std::unique_lock lock(m_mutex); + + if (m_closed) + { + return traits_type::eof(); + } + + if (traits_type::eq_int_type(traits_type::eof(), c)) + return traits_type::not_eof(c); + + traits_type::int_type ret = traits_type::eof(); + + if (this->epptr() < m_end) + { + while (this->epptr() == this->eback() && !m_closed) + m_condition.wait(lock); + + if (this->epptr() != this->eback()) + { + this->setp(this->pbase() + m_chunk_size, this->epptr() + m_chunk_size); + ret = this->sputc(c); + } + } + else + { + while (!(this->eback() > m_begin || m_closed)) + m_condition.wait(lock); + + if (this->eback() > m_begin) + { + this->setp(m_begin, m_begin + m_chunk_size); + ret = this->sputc(c); + } + } + + m_condition.notify_one(); + return ret; + } + + traits_type::int_type underflow() override + { + std::unique_lock lock(m_mutex); + traits_type::int_type ret = traits_type::eof(); + + if (this->eback() != this->pbase()) + { + if (this->egptr() < m_end) + { + while (this->egptr() == this->pbase() && !m_closed) + m_condition.wait(lock); + + if (this->egptr() != this->pbase()) + { + this->setg(this->eback() + m_chunk_size, + this->eback() + m_chunk_size, + this->egptr() + m_chunk_size); + } + else // if m_closed + { + this->setg(this->pbase(), this->pbase(), this->pptr()); + } + + ret = traits_type::to_int_type(*this->gptr()); + } + else + { + while (!(this->pbase() > m_begin || m_closed)) + m_condition.wait(lock); + + if (this->pbase() > m_begin) + { + this->setg(m_begin, m_begin, m_begin + m_chunk_size); + } + else // if m_closed + { + this->setg(this->pbase(), this->pbase(), this->pptr()); + } + + ret = traits_type::to_int_type(*this->gptr()); + } + } + + m_condition.notify_one(); + return ret; + } +}; + +template> +class basic_opipestream : + public std::basic_ostream<_Elem, _Traits> +{ + using buffer_type = basic_pipebuf<_Elem, _Traits> ; + buffer_type* mPbuf; + +public: + basic_opipestream(buffer_type* pBuf) : + std::basic_ostream<_Elem, _Traits>(pBuf), + mPbuf(pBuf) + { + } + + void close() + { + mPbuf->close(); + } +}; + +template> +class basic_ipipestream : + public std::basic_istream<_Elem, _Traits> +{ + using buffer_type = basic_pipebuf<_Elem, _Traits>; + buffer_type* mPbuf; + +public: + basic_ipipestream(buffer_type* pBuf) : + std::basic_istream<_Elem, _Traits>(pBuf), + mPbuf(pBuf) + { + } + + void close() + { + mPbuf->close(); + } +}; + +using pipebuf = basic_pipebuf; +using opipestream = basic_opipestream; +using ipipestream = basic_ipipestream; From 55afa3b7f79d65ee76a83914a80deb0c509a8302 Mon Sep 17 00:00:00 2001 From: Hicham Azimani Date: Tue, 10 Aug 2021 18:18:05 +0200 Subject: [PATCH 2/2] Added comment --- threadbuf.h | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/threadbuf.h b/threadbuf.h index 9961a0c..2f4ecc7 100644 --- a/threadbuf.h +++ b/threadbuf.h @@ -1,4 +1,5 @@ // From http://stackoverflow.com/a/12413298 +// New Implementation from https://stackoverflow.com/questions/43588275/exceptions-in-stl-streams?noredirect=1&lq=1 // ---------------------------------------------------------------------------- // Copyright (C) 2013 Dietmar Kuehl http://www.dietmar-kuehl.de // @@ -117,6 +118,12 @@ class threadbuf : public std::streambuf { } }; + +// ---------------------------------------------------------------------------- +// From https://stackoverflow.com/questions/43588275/exceptions-in-stl-streams?noredirect=1&lq=1 +// basic_pipebuf is a different implementation of threadbuf +// basic_pipebuf resolves a bug (deadlock) found in threadbuf on debian10 platform + template> class basic_pipebuf : public std::basic_streambuf<_Elem, _Traits> {