Skip to content

Commit

Permalink
Merge pull request hove-io#103 from AzHicham/multithread-streambuf
Browse files Browse the repository at this point in the history
New thread-safe pipe_streambuf
  • Loading branch information
HichamAz authored Aug 10, 2021
2 parents d85d6f6 + 55afa3b commit a380953
Showing 1 changed file with 177 additions and 0 deletions.
177 changes: 177 additions & 0 deletions threadbuf.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// From http://stackoverflow.com/a/12413298
// New Implementation from https://stackoverflow.com/questions/43588275/exceptions-in-stl-streams?noredirect=1&lq=1
// ----------------------------------------------------------------------------
// Copyright (C) 2013 Dietmar Kuehl http://www.dietmar-kuehl.de
//
Expand Down Expand Up @@ -29,6 +30,7 @@
#include <mutex>
#include <streambuf>
#include <string>
#include <vector>

// ----------------------------------------------------------------------------

Expand Down Expand Up @@ -115,3 +117,178 @@ class threadbuf : public std::streambuf {
return traits_type::eof();
}
};


// ----------------------------------------------------------------------------
// From https://stackoverflow.com/questions/43588275/exceptions-in-stl-streams?noredirect=1&lq=1
// basic_pipebuf is a different implementation of threadbuf
// basic_pipebuf resolves a bug (deadlock) found in threadbuf on debian10 platform

template<class _Elem, class _Traits = std::char_traits<_Elem>>
class basic_pipebuf : public std::basic_streambuf<_Elem, _Traits>
{
public:
using traits_type = std::streambuf::traits_type;
using buffer_type = typename std::vector<_Elem>;
using buffer_size_type = typename buffer_type::size_type;

std::mutex m_mutex;
std::condition_variable m_condition;
bool m_closed;

buffer_type m_buffer;
_Elem* m_begin;
_Elem* m_end;
buffer_size_type m_chunk_size;

public:
basic_pipebuf(buffer_size_type buffer_size = 16 * 1024, buffer_size_type chunk_size = 16):
m_closed(false),
m_buffer(buffer_size),
m_begin(&m_buffer[0]),
m_end(&m_buffer[0] + m_buffer.size()),
m_chunk_size(chunk_size)
{
this->setp(m_begin + m_chunk_size, m_begin + 2 * m_chunk_size);
this->setg(m_begin, m_begin + m_chunk_size, m_begin + m_chunk_size);
}

void close(){
std::unique_lock<std::mutex> lock(m_mutex);
m_closed = true;
m_condition.notify_all();
}

private:
traits_type::int_type overflow(traits_type::int_type c) override
{
std::unique_lock<std::mutex> lock(m_mutex);

if (m_closed)
{
return traits_type::eof();
}

if (traits_type::eq_int_type(traits_type::eof(), c))
return traits_type::not_eof(c);

traits_type::int_type ret = traits_type::eof();

if (this->epptr() < m_end)
{
while (this->epptr() == this->eback() && !m_closed)
m_condition.wait(lock);

if (this->epptr() != this->eback())
{
this->setp(this->pbase() + m_chunk_size, this->epptr() + m_chunk_size);
ret = this->sputc(c);
}
}
else
{
while (!(this->eback() > m_begin || m_closed))
m_condition.wait(lock);

if (this->eback() > m_begin)
{
this->setp(m_begin, m_begin + m_chunk_size);
ret = this->sputc(c);
}
}

m_condition.notify_one();
return ret;
}

traits_type::int_type underflow() override
{
std::unique_lock<std::mutex> lock(m_mutex);
traits_type::int_type ret = traits_type::eof();

if (this->eback() != this->pbase())
{
if (this->egptr() < m_end)
{
while (this->egptr() == this->pbase() && !m_closed)
m_condition.wait(lock);

if (this->egptr() != this->pbase())
{
this->setg(this->eback() + m_chunk_size,
this->eback() + m_chunk_size,
this->egptr() + m_chunk_size);
}
else // if m_closed
{
this->setg(this->pbase(), this->pbase(), this->pptr());
}

ret = traits_type::to_int_type(*this->gptr());
}
else
{
while (!(this->pbase() > m_begin || m_closed))
m_condition.wait(lock);

if (this->pbase() > m_begin)
{
this->setg(m_begin, m_begin, m_begin + m_chunk_size);
}
else // if m_closed
{
this->setg(this->pbase(), this->pbase(), this->pptr());
}

ret = traits_type::to_int_type(*this->gptr());
}
}

m_condition.notify_one();
return ret;
}
};

template<class _Elem, class _Traits = std::char_traits<_Elem>>
class basic_opipestream :
public std::basic_ostream<_Elem, _Traits>
{
using buffer_type = basic_pipebuf<_Elem, _Traits> ;
buffer_type* mPbuf;

public:
basic_opipestream(buffer_type* pBuf) :
std::basic_ostream<_Elem, _Traits>(pBuf),
mPbuf(pBuf)
{
}

void close()
{
mPbuf->close();
}
};

template<class _Elem, class _Traits = std::char_traits<_Elem>>
class basic_ipipestream :
public std::basic_istream<_Elem, _Traits>
{
using buffer_type = basic_pipebuf<_Elem, _Traits>;
buffer_type* mPbuf;

public:
basic_ipipestream(buffer_type* pBuf) :
std::basic_istream<_Elem, _Traits>(pBuf),
mPbuf(pBuf)
{
}

void close()
{
mPbuf->close();
}
};

using pipebuf = basic_pipebuf<char>;
using opipestream = basic_opipestream<char>;
using ipipestream = basic_ipipestream<char>;

0 comments on commit a380953

Please sign in to comment.