From 3dac8730b0d956b10a8bfcf914b50397d19cb37d Mon Sep 17 00:00:00 2001 From: Damiano Enerli Date: Fri, 23 Feb 2018 15:01:52 +0100 Subject: [PATCH] StreamConnectionReader - move implementation to cpp file --- .../yarp/os/impl/StreamConnectionReader.h | 342 ++---------------- src/libYARP_OS/src/StreamConnectionReader.cpp | 333 ++++++++++++++++- 2 files changed, 345 insertions(+), 330 deletions(-) diff --git a/src/libYARP_OS/include/yarp/os/impl/StreamConnectionReader.h b/src/libYARP_OS/include/yarp/os/impl/StreamConnectionReader.h index 82bf8d3b99b..6b14a3b842f 100644 --- a/src/libYARP_OS/include/yarp/os/impl/StreamConnectionReader.h +++ b/src/libYARP_OS/include/yarp/os/impl/StreamConnectionReader.h @@ -38,326 +38,48 @@ namespace yarp { class YARP_OS_impl_API yarp::os::impl::StreamConnectionReader : public ConnectionReader { public: - StreamConnectionReader() : - ConnectionReader(), - writer(nullptr), - in(nullptr), - str(nullptr), - protocol(nullptr), - messageLen(0), - textMode(false), - bareMode(false), - valid(false), - err(false), - shouldDrop(false), - writePending(false), - ref(nullptr), - convertedTextMode(false), - pushedIntFlag(false), - pushedInt(-1), - parentConnectionReader(nullptr) - { - } - + StreamConnectionReader(); virtual ~StreamConnectionReader(); void reset(yarp::os::InputStream& in, TwoWayStream *str, const Route& route, - size_t len, bool textMode, bool bareMode = false) - { - this->in = ∈ - this->str = str; - this->route = route; - this->messageLen = len; - this->textMode = textMode; - this->bareMode = bareMode; - this->valid = true; - ref = nullptr; - err = false; - convertedTextMode = false; - pushedIntFlag = false; - } - - virtual bool setSize(size_t len) override - { - reset(*in, str, route, len, textMode, bareMode); - return true; - } - - void setProtocol(Protocol *protocol) - { - this->protocol = protocol; - } - - virtual bool expectBlock(const yarp::os::Bytes& b) - { - if (!isGood()) { - return false; - } - yAssert(in!=nullptr); - size_t len = b.length(); - if (len==0) { - return true; - } - //if (len<0) len = messageLen; - if (len>0) { - YARP_SSIZE_T rlen = in->readFull(b); - if (rlen>=0) { - messageLen -= len; - return true; - } - } - err = true; - return false; - } - - virtual bool pushInt(int x) override - { - if (pushedIntFlag) { - return false; - } - pushedIntFlag = true; - pushedInt = x; - return true; - } - - virtual int expectInt() override - { - if (pushedIntFlag) { - pushedIntFlag = false; - return pushedInt; - } - if (!isGood()) { - return 0; - } - NetInt32 x = 0; - yarp::os::Bytes b((char*)(&x), sizeof(x)); - yAssert(in!=nullptr); - YARP_SSIZE_T r = in->read(b); - if (r<0 || (size_t)rread(b); - if (r<0 || (size_t)rread(b); - if (r<0 || (size_t)rread(b); - if (r<0 || (size_t)rreadLine('\n', &success); - if (!success) { - err = true; - return ""; - } - messageLen -= result.length()+1; - return result; - } - - virtual bool isTextMode() override - { - return textMode; - } - - virtual bool isBareMode() override - { - return bareMode; - } - + size_t len, bool textMode, bool bareMode = false); + void setProtocol(Protocol *protocol); + void suppressReply(); + bool dropRequested(); + + virtual bool expectBlock(const yarp::os::Bytes& b); + virtual ConstString expectString(int len); + virtual ConstString expectLine(); + virtual void flushWriter(); + virtual void setReference(yarp::os::Portable *obj); + + /**** OVERRIDES ****/ + virtual bool setSize(size_t len) override; + virtual size_t getSize() override; + virtual bool pushInt(int x) override; + virtual int expectInt() override; + virtual YARP_INT64 expectInt64() override; + virtual double expectDouble() override; + virtual bool expectBlock(const char *data, size_t len) override; + virtual ::yarp::os::ConstString expectText(int terminatingChar) override; + virtual bool isTextMode() override; + virtual bool isBareMode() override; virtual bool convertTextMode() override; - - virtual size_t getSize() override - { - return messageLen + (pushedIntFlag?sizeof(yarp::os::NetInt32):0); - } - - /* - virtual OutputStream *getReplyStream() override - { - if (str==nullptr) { - return nullptr; - } - return &(str->getOutputStream()); - } - */ - virtual yarp::os::ConnectionWriter *getWriter() override; - - void suppressReply() - { - str = nullptr; - } - - virtual void flushWriter(); - - // virtual TwoWayStream *getStreams() override - // { - // return str; - // } - - virtual yarp::os::Contact getRemoteContact() override - { - if (str!=nullptr) { - Contact remote = str->getRemoteAddress(); - remote.setName(route.getFromName()); - return remote; - } - Contact remote = yarp::os::Contact(route.getFromName(), route.getCarrierName()); - return remote; - } - - virtual yarp::os::Contact getLocalContact() override - { - if (str!=nullptr) { - Contact local = str->getLocalAddress(); - local.setName(route.getToName()); - return local; - } - return yarp::os::Contact(); - } - - - - virtual bool expectBlock(const char *data, size_t len) override - { - return expectBlock(yarp::os::Bytes((char*)data, len)); - } - - virtual ::yarp::os::ConstString expectText(int terminatingChar) override - { - if (!isGood()) { - return ""; - } - yAssert(in!=nullptr); - bool lsuccess = false; - ConstString result = in->readLine(terminatingChar, &lsuccess); - if (lsuccess) { - messageLen -= result.length()+1; - } - return ::yarp::os::ConstString(result.c_str()); - } - - virtual bool isValid() override - { - return valid; - } - - virtual bool isError() override - { - if (err) { - return true; - } - return !isActive(); - } - - virtual bool isActive() override - { - if (shouldDrop) { - return false; - } - if (!isValid()) { - return false; - } - if (in!=nullptr) { - if (in->isOk()) { - return true; - } - } - return false; - } - - virtual yarp::os::Portable *getReference() override - { - return ref; - } - - virtual void setReference(yarp::os::Portable *obj) - { - ref = obj; - } - + virtual yarp::os::Contact getRemoteContact() override; + virtual yarp::os::Contact getLocalContact() override; + virtual bool isValid() override; + virtual bool isError() override; + virtual bool isActive() override; + virtual yarp::os::Portable *getReference() override; virtual yarp::os::Bytes readEnvelope() override; - - virtual void requestDrop() override - { - shouldDrop = true; - } - - bool dropRequested() - { - return shouldDrop; - } - + virtual void requestDrop() override; virtual yarp::os::Searchable& getConnectionModifiers() override; - - virtual void setParentConnectionReader(ConnectionReader *parentConnectionReader) override - { - this->parentConnectionReader = parentConnectionReader; - } + virtual void setParentConnectionReader(ConnectionReader *parentConnectionReader) override; private: - bool isGood() - { - return isActive()&&isValid()&&!isError(); - } + bool isGood(); BufferedConnectionWriter *writer; StringInputStream altStream; diff --git a/src/libYARP_OS/src/StreamConnectionReader.cpp b/src/libYARP_OS/src/StreamConnectionReader.cpp index 28c3e1dfab0..3e3b2e7a11e 100644 --- a/src/libYARP_OS/src/StreamConnectionReader.cpp +++ b/src/libYARP_OS/src/StreamConnectionReader.cpp @@ -16,24 +16,129 @@ using namespace yarp::os::impl; using namespace yarp::os; -yarp::os::ConnectionWriter *StreamConnectionReader::getWriter() { - if (str==nullptr) { - return nullptr; +StreamConnectionReader::StreamConnectionReader() : ConnectionReader(), + writer(nullptr), + in(nullptr), + str(nullptr), + protocol(nullptr), + messageLen(0), + textMode(false), + bareMode(false), + valid(false), + err(false), + shouldDrop(false), + writePending(false), + ref(nullptr), + convertedTextMode(false), + pushedIntFlag(false), + pushedInt(-1), + parentConnectionReader(nullptr) +{ } + +StreamConnectionReader::~StreamConnectionReader() +{ + if (writer!=nullptr) + { + delete writer; + writer = nullptr; } - if (writer==nullptr) { - writer = new BufferedConnectionWriter(isTextMode(), isBareMode()); - yAssert(writer!=nullptr); +} + +void StreamConnectionReader::reset(InputStream &in, + TwoWayStream *str, + const Route &route, + size_t len, + bool textMode, + bool bareMode) +{ + this->in = ∈ + this->str = str; + this->route = route; + this->messageLen = len; + this->textMode = textMode; + this->bareMode = bareMode; + this->valid = true; + ref = nullptr; + err = false; + convertedTextMode = false; + pushedIntFlag = false; +} + +void StreamConnectionReader::setProtocol(Protocol *protocol) +{ + this->protocol = protocol; +} + +void StreamConnectionReader::suppressReply() +{ + str = nullptr; +} + +bool StreamConnectionReader::dropRequested() +{ + return shouldDrop; +} + +bool StreamConnectionReader::expectBlock(const Bytes &b) +{ + if (!isGood()) { + return false; } - writer->clear(); - writePending = true; - if (protocol!=nullptr) { - protocol->willReply(); + yAssert(in!=nullptr); + size_t len = b.length(); + if (len==0) { + return true; } - return writer; + //if (len<0) len = messageLen; + if (len>0) { + YARP_SSIZE_T rlen = in->readFull(b); + if (rlen>=0) { + messageLen -= len; + return true; + } + } + err = true; + return false; +} + +ConstString StreamConnectionReader::expectString(int len) +{ + if (!isGood()) { + return ""; + } + char *buf = new char[len]; + yarp::os::Bytes b(buf, len); + yAssert(in!=nullptr); + YARP_SSIZE_T r = in->read(b); + if (r<0 || (size_t)rreadLine('\n', &success); + if (!success) { + err = true; + return ""; + } + messageLen -= result.length()+1; + return result; +} -void StreamConnectionReader::flushWriter() { +void StreamConnectionReader::flushWriter() +{ if (writer!=nullptr) { if (writePending) { if (str!=nullptr) { @@ -49,16 +154,118 @@ void StreamConnectionReader::flushWriter() { writePending = false; } +void StreamConnectionReader::setReference(yarp::os::Portable *obj) +{ + ref = obj; +} -StreamConnectionReader::~StreamConnectionReader() { - if (writer!=nullptr) { - delete writer; - writer = nullptr; +bool StreamConnectionReader::setSize(size_t len) +{ + reset(*in, str, route, len, textMode, bareMode); + return true; +} + +size_t StreamConnectionReader::getSize() +{ + return messageLen + (pushedIntFlag?sizeof(yarp::os::NetInt32):0); +} + +bool StreamConnectionReader::pushInt(int x) +{ + if (pushedIntFlag) { + return false; } + pushedIntFlag = true; + pushedInt = x; + return true; } +int StreamConnectionReader::expectInt() +{ + if (pushedIntFlag) { + pushedIntFlag = false; + return pushedInt; + } + if (!isGood()) { + return 0; + } + NetInt32 x = 0; + yarp::os::Bytes b((char*)(&x), sizeof(x)); + yAssert(in!=nullptr); + YARP_SSIZE_T r = in->read(b); + if (r<0 || (size_t)rread(b); + if (r<0 || (size_t)rread(b); + if (r<0 || (size_t)rreadLine(terminatingChar, &lsuccess); + if (lsuccess) { + messageLen -= result.length()+1; + } + return ::yarp::os::ConstString(result.c_str()); +} + +bool StreamConnectionReader::isTextMode() +{ + return textMode; +} + +bool StreamConnectionReader::isBareMode() +{ + return bareMode; +} + +bool StreamConnectionReader::convertTextMode() +{ if (isTextMode()) { if (!convertedTextMode) { Bottle bot; @@ -75,8 +282,80 @@ bool StreamConnectionReader::convertTextMode() { return true; } +yarp::os::ConnectionWriter* StreamConnectionReader::getWriter() +{ + if (str==nullptr) { + return nullptr; + } + if (writer==nullptr) { + writer = new BufferedConnectionWriter(isTextMode(), isBareMode()); + yAssert(writer!=nullptr); + } + writer->clear(); + writePending = true; + if (protocol!=nullptr) { + protocol->willReply(); + } + return writer; +} -Bytes StreamConnectionReader::readEnvelope() { +yarp::os::Contact StreamConnectionReader::getRemoteContact() +{ + if (str!=nullptr) { + Contact remote = str->getRemoteAddress(); + remote.setName(route.getFromName()); + return remote; + } + Contact remote = yarp::os::Contact(route.getFromName(), route.getCarrierName()); + return remote; +} + +yarp::os::Contact StreamConnectionReader::getLocalContact() +{ + if (str!=nullptr) { + Contact local = str->getLocalAddress(); + local.setName(route.getToName()); + return local; + } + return yarp::os::Contact(); +} + +bool StreamConnectionReader::isValid() +{ + return valid; +} + +bool StreamConnectionReader::isError() +{ + if (err) { + return true; + } + return !isActive(); +} + +bool StreamConnectionReader::isActive() +{ + if (shouldDrop) { + return false; + } + if (!isValid()) { + return false; + } + if (in!=nullptr) { + if (in->isOk()) { + return true; + } + } + return false; +} + +yarp::os::Portable* StreamConnectionReader::getReference() +{ + return ref; +} + +Bytes StreamConnectionReader::readEnvelope() +{ if (protocol != nullptr) { const ConstString& env = protocol->getEnvelope(); return Bytes((char*)env.c_str(), env.length()); @@ -87,9 +366,13 @@ Bytes StreamConnectionReader::readEnvelope() { return Bytes(nullptr, 0); } +void StreamConnectionReader::requestDrop() +{ + shouldDrop = true; +} - -Searchable& StreamConnectionReader::getConnectionModifiers() { +Searchable& StreamConnectionReader::getConnectionModifiers() +{ if (config.size()==0) { if (protocol) { config.fromString(protocol->getSenderSpecifier().c_str()); @@ -97,3 +380,13 @@ Searchable& StreamConnectionReader::getConnectionModifiers() { } return config; } + +void StreamConnectionReader::setParentConnectionReader(ConnectionReader *parentConnectionReader) +{ + this->parentConnectionReader = parentConnectionReader; +} + +bool StreamConnectionReader::isGood() +{ + return isActive()&&isValid()&&!isError(); +}