From 0a9eb9340b8bab7cc61f37d0bfd849a2ec6ea79e Mon Sep 17 00:00:00 2001 From: Mateusz Daniluk <121170681+VeithMetro@users.noreply.github.com> Date: Tue, 3 Oct 2023 12:30:04 +0200 Subject: [PATCH 1/2] [Messaging] Adding a redirection of standard out and error to messaging (#1415) * Formatting changes * Adding two new message types to Core::Messaging::IStore without additional members for now * Making one new message type instead of two called NonfunctionalStream * Changing the name to OperationalStream and adding a module name * Changing the message type enum - adding the new message types and removing the unnecessary ones * Creating and announcing new categories of operational stream message type * Changing the enum conversion to the new message type, adding the new module name to Deserialize() * Including the operational cateogires header files, filling Output method of StandardOut class * Adding the cpp file to the source in cmake, making some fixes to fix the new controls * Operational stream category is serialized as only 1 byte * Use the new singletons to open and redirect standard out and error filedescriptors * Adding questions and remarks in comments * Adding classes with singletons of ConsoleStreamRedirectTypes for both standard out and error * Make sure it is merged properly * Removing ConsoleStreamRedirect.cpp from cmake * [SYNC] Fixes.. * Moving the TextStreamRedirect templated class to core * Fixing the Serialize()/Deserialize() in Metadata so we don't miss anything * Changing the name of the class to indicate it's templated * [TEST] Add an example app to test redirections.. * Adding the new header to core.h * Removing the templated class from ConsoleStreamRedirect since its in core * [TEST] Fixes to get it compiling. * [FIXES] New API requires different setup ;-) * [TEST] Updated the template to use the same constructions (Pipe) as we use in Windows. * Making sure the that Opening the redirect after Closing it works properly * Disabling the default line buffering of stdout if its redirected * Adding the TextStreamRedirect header to core * Removing the testing printf and cleaning everything up * Removing a TRACE_L1 as it was causing a never-ending loop with redirected stderr * Make sure that the redirect test app is up-to-date with the newest changes * Removing unnecessary spaces * Adding setvbuf to WPEProcess since it does not propagate like file descriptor changes * Adding an env variable when Thunder is running in the backgroud * Reading THUNDER_BACKGROUND env variable in WPEProcess and disabling line-buffering of stdout accordingly * Remove unnecessary comments * Replacing STDOUT/STDERR_FILENO with ::fileno() to work on both Linux and Windows * Fixing the problem with ::fileno() being deprecated on Windows and replacing it with _fileno() * More fixes for Windows to be alignet with recent Linux changes * Removing a typo * Making sue Open and Close returns properly for both Linux and Windows * HANDLE on Windows cannot be compared with IResource::handle::INVALID as it's a void* * Fixing one more wrong comparison between void* and singed int --------- Co-authored-by: Pierre Wielders --- Source/WPEFramework/PluginHost.cpp | 11 + Source/WPEProcess/Process.cpp | 5 + Source/core/CMakeLists.txt | 1 + Source/core/MessageStore.cpp | 21 +- Source/core/MessageStore.h | 32 +- Source/core/TextStreamRedirectType.h | 498 +++++++++++++++++++ Source/core/core.h | 1 + Source/messaging/CMakeLists.txt | 6 +- Source/messaging/ConsoleStreamRedirect.h | 530 +++----------------- Source/messaging/MessageDispatcher.h | 3 - Source/messaging/OperationalCategories.cpp | 31 ++ Source/messaging/OperationalCategories.h | 88 ++++ Source/messaging/messaging.h | 1 + Tests/CMakeLists.txt | 5 + Tests/redirect/CMakeLists.txt | 33 ++ Tests/redirect/main.cpp | 553 +++++++++++++++++++++ 16 files changed, 1337 insertions(+), 482 deletions(-) create mode 100644 Source/core/TextStreamRedirectType.h create mode 100644 Source/messaging/OperationalCategories.cpp create mode 100644 Source/messaging/OperationalCategories.h create mode 100644 Tests/redirect/CMakeLists.txt create mode 100644 Tests/redirect/main.cpp diff --git a/Source/WPEFramework/PluginHost.cpp b/Source/WPEFramework/PluginHost.cpp index a6ef238c6..36aef0f0a 100644 --- a/Source/WPEFramework/PluginHost.cpp +++ b/Source/WPEFramework/PluginHost.cpp @@ -68,6 +68,7 @@ namespace PluginHost { #ifndef __WINDOWS__ case 'b': _background = true; + Core::SystemInfo::SetEnvironment(_T("THUNDER_BACKGROUND"), _T("true")); break; #endif case 'h': @@ -595,6 +596,16 @@ POP_WARNING() fprintf(stdout, "Could not enable messaging/tracing functionality!\n"); } } + + // Redirect the standard error to the messaging engine and the MessageControl plugin + // And if Thunder is running in the background, do the same for standard output + Messaging::ConsoleStandardError::Instance().Open(); + if (_background == true) { + // Line-buffering on text streams can still lead to messages not being displayed even if they end with a new line (only \n) + // So we disable buffering for stdout (line-buffered by default), as we do it in ProcessBuffer() before outputting the message anyway + ::setvbuf(stdout, NULL, _IONBF, 0); + Messaging::ConsoleStandardOut::Instance().Open(); + } #ifdef __CORE_WARNING_REPORTING__ class GlobalConfig : public Core::JSON::Container { diff --git a/Source/WPEProcess/Process.cpp b/Source/WPEProcess/Process.cpp index fd8cf40e2..a25605b36 100644 --- a/Source/WPEProcess/Process.cpp +++ b/Source/WPEProcess/Process.cpp @@ -10,6 +10,7 @@ MODULE_NAME_DECLARATION(BUILD_REFERENCE) namespace WPEFramework { + static string _background = _T("false"); namespace Process { @@ -547,6 +548,10 @@ int main(int argc, char** argv) Process::ConsoleOptions options(argc, argv); + if ((Core::SystemInfo::GetEnvironment(_T("THUNDER_BACKGROUND"), _background) == true) && (_background == "true")) { + ::setvbuf(stdout, NULL, _IONBF, 0); + } + if ((options.RequestUsage() == true) || (options.Locator == nullptr) || (options.ClassName == nullptr) || (options.RemoteChannel == nullptr) || (options.Exchange == 0)) { printf("Process [-h] \n"); printf(" -l \n"); diff --git a/Source/core/CMakeLists.txt b/Source/core/CMakeLists.txt index 724d5f552..871f5e46e 100644 --- a/Source/core/CMakeLists.txt +++ b/Source/core/CMakeLists.txt @@ -128,6 +128,7 @@ set(PUBLIC_HEADERS SystemInfo.h TextFragment.h TextReader.h + TextStreamRedirectType.h Thread.h ThreadPool.h Time.h diff --git a/Source/core/MessageStore.cpp b/Source/core/MessageStore.cpp index 2152e1f4d..2f331c223 100644 --- a/Source/core/MessageStore.cpp +++ b/Source/core/MessageStore.cpp @@ -30,8 +30,7 @@ ENUM_CONVERSION_BEGIN(Core::Messaging::Metadata::type) { Core::Messaging::Metadata::type::TRACING, _TXT("Tracing") }, { Core::Messaging::Metadata::type::LOGGING, _TXT("Logging") }, { Core::Messaging::Metadata::type::REPORTING, _TXT("Reporting") }, - { Core::Messaging::Metadata::type::STANDARD_OUT, _TXT("StandardOut") }, - { Core::Messaging::Metadata::type::STANDARD_ERROR, _TXT("StandardError") }, + { Core::Messaging::Metadata::type::OPERATIONAL_STREAM, _TXT("OperationalStream") }, ENUM_CONVERSION_END(Core::Messaging::Metadata::type) namespace { @@ -119,11 +118,12 @@ namespace Core { const char* MODULE_LOGGING = _T("SysLog"); const char* MODULE_REPORTING = _T("Reporting"); + const char* MODULE_OPERATIONAL_STREAM = _T("Operational Stream"); uint16_t Metadata::Serialize(uint8_t buffer[], const uint16_t bufferSize) const { uint16_t length = static_cast(sizeof(_type) + (_category.size() + 1)); - + if (_type == TRACING) { length += static_cast(_module.size() + 1); } @@ -159,7 +159,10 @@ namespace Core { _type = frameReader.Number(); _category = frameReader.NullTerminatedText(); + length = (static_cast(sizeof(_type) + (static_cast(_category.size()) + 1))); + if (_type == TRACING) { + length += (static_cast(_module.size()) + 1); _module = frameReader.NullTerminatedText(); } else if (_type == LOGGING) { @@ -168,16 +171,14 @@ namespace Core { else if (_type == REPORTING) { _module = MODULE_REPORTING; } - else { - ASSERT(_type != Metadata::type::INVALID); - } - - if (_type == TRACING) { - length = std::min(bufferSize, static_cast(sizeof(_type) + (static_cast(_category.size()) + 1) + (static_cast(_module.size()) + 1))); + else if (_type == OPERATIONAL_STREAM) { + _module = MODULE_OPERATIONAL_STREAM; } else { - length = std::min(bufferSize, static_cast(sizeof(_type) + (static_cast(_category.size()) + 1))); + ASSERT(_type != Metadata::type::INVALID); } + + length = std::min(bufferSize, length); } return (length); diff --git a/Source/core/MessageStore.h b/Source/core/MessageStore.h index 74e9de0ef..2e1f58191 100644 --- a/Source/core/MessageStore.h +++ b/Source/core/MessageStore.h @@ -29,6 +29,7 @@ namespace Core { extern EXTERNAL const char* MODULE_LOGGING; extern EXTERNAL const char* MODULE_REPORTING; + extern EXTERNAL const char* MODULE_OPERATIONAL_STREAM; struct EXTERNAL IEvent { virtual ~IEvent() = default; @@ -44,12 +45,11 @@ namespace Core { class EXTERNAL Metadata { public: enum type : uint8_t { - INVALID = 0, - TRACING = 1, - LOGGING = 2, - REPORTING = 3, - STANDARD_OUT = 4, - STANDARD_ERROR = 5 + INVALID = 0, + TRACING = 1, + LOGGING = 2, + REPORTING = 3, + OPERATIONAL_STREAM = 4 }; // @stop @@ -283,6 +283,26 @@ namespace Core { string _callsign; }; + /** + * @brief Data-Carrier, extended information about the operational-stream-type message. + * No additional info for now, used for function overloading. + */ + class EXTERNAL OperationalStream : public MessageInfo { + public: + OperationalStream(const OperationalStream&) = default; + OperationalStream& operator=(const OperationalStream&) = default; + + OperationalStream() + : MessageInfo() + { + } + OperationalStream(const MessageInfo& messageInfo) + : MessageInfo(messageInfo) + { + } + ~OperationalStream() = default; + }; + public: virtual ~IStore() = default; static IStore* Instance(); diff --git a/Source/core/TextStreamRedirectType.h b/Source/core/TextStreamRedirectType.h new file mode 100644 index 000000000..6023bc396 --- /dev/null +++ b/Source/core/TextStreamRedirectType.h @@ -0,0 +1,498 @@ +/* + * If not stated otherwise in this file or this component's LICENSE file the + * following copyright and licenses apply: + * + * Copyright 2022 Metrological + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace WPEFramework { +namespace Core { + template + class TextStreamRedirectType { + private: + using ParentClass = TextStreamRedirectType; + + class Reader { + private: + enum mode : uint8_t { + IDLE = 0x00, + SINGLE = 0x01, + DOUBLE = 0x02, + PENDING_RETURN = 0x11, + PENDING_LINEFEED = 0x21, + DROP_AND_RESET = 0x81 + }; + + public: + Reader() = delete; + Reader(Reader&&) = delete; + Reader(const Reader&) = delete; + Reader& operator=(Reader&&) = delete; + Reader& operator=(const Reader&) = delete; + + Reader(ParentClass& parent) + : _handler(parent) + , _offset(0) + , _delimiter(mode::IDLE) { + } + ~Reader() = default; + + public: + mode IsTermination(const mode lastMode, const int length, const TCHAR buffer[]) const { + mode result = mode::IDLE; + + ASSERT(length > 0); + + if (((lastMode == PENDING_RETURN) && (buffer[0] == '\r')) || + ((lastMode == PENDING_LINEFEED) && (buffer[0] == '\n'))) { + result = mode::DROP_AND_RESET; + } + else if ((lastMode & 0x30) == 0x00) { + // Nothing pending from the last time, start from scratch.. + if (buffer[0] == '\n') { + if (length == 1) { + result = mode::PENDING_RETURN; + } + else if (buffer[1] == '\r') { + result = mode::DOUBLE; + } + else { + result = mode::SINGLE; + } + } + else if (buffer[0] == '\r') { + if (length == 1) { + result = mode::PENDING_LINEFEED; + } + else if (buffer[1] == '\n') { + result = mode::DOUBLE; + } + else { + result = mode::SINGLE; + } + } + } + return (result); + } + void ProcessBuffer(const int readBytes) { + int index = 0; + int count = readBytes; + + ASSERT(readBytes > 0); + + // See if we have text termination in the new data.. + while (index < count) { + + _delimiter = IsTermination(_delimiter, (readBytes - index), &(_buffer[_offset])); + + // See if we need to skip characters... + if ((_delimiter & 0x03) == 0) { + index++; + _offset++; + } + else if (_delimiter == mode::DROP_AND_RESET) { + ASSERT(index == 0); + ASSERT(_offset == 0); + // This is a left over from the last check, just drop 1 character + ::memmove(_buffer, &(_buffer[1]), (readBytes - 1)); + count -= 1; + } + else { + // Time to send out a newline.. + _handler.Output(_offset, _buffer); + count -= (index + (_delimiter & 0x03)); + ::memmove(_buffer, &_buffer[_offset + (_delimiter & 0x03)], count); + _offset = 0; + index = 0; + } + } + + if (_offset == sizeof(_buffer)) { + _offset -= 32; + } + } + void Flush() { + if (_offset != 0) { + _handler.Output(_offset, _buffer); + _offset = 0; + } + } + TCHAR* Buffer() { + return (&(_buffer[_offset])); + } + uint16_t Length() { + return (sizeof(_buffer) - _offset); + } + + private: + ParentClass& _handler; + uint16_t _offset; + mode _delimiter; + TCHAR _buffer[1024]; + }; + +#ifdef __WINDOWS__ + class ReaderImplementation : public Reader { + private: + class ResourceMonitor : public Core::Thread { + private: + using Implementations = std::vector; + friend class Core::SingletonType; + + ResourceMonitor() + : Core::Thread(Core::Thread::DefaultStackSize(), _T("FileResourceMonitor")) + , _adminLock() + , _resources() { + ::memset(&_event, 0, sizeof(OVERLAPPED)); + _event.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); + } + public: + ResourceMonitor(ResourceMonitor&&) = delete; + ResourceMonitor(const ResourceMonitor&) = delete; + ResourceMonitor& operator= (ResourceMonitor&&) = delete; + ResourceMonitor& operator= (const ResourceMonitor&) = delete; + + static ResourceMonitor& Instance() { + static ResourceMonitor instance; + return (instance); + } + ~ResourceMonitor() override { + Core::Thread::Stop(); + Core::Thread::Wait(Core::Thread::BLOCKED | Core::Thread::STOPPED, Core::infinite); + } + + public: + void Register(ReaderImplementation& source) { + _adminLock.Lock(); + ASSERT(std::find(_resources.begin(), _resources.end(), &source) == _resources.end()); + _resources.push_back(&source); + source.Read(_event); + if (_resources.size() == 1) { + Core::Thread::Run(); + } + _adminLock.Unlock(); + } + void Unregister(ReaderImplementation& source) { + _adminLock.Lock(); + typename Implementations::iterator entry(std::find(_resources.begin(), _resources.end(), &source)); + ASSERT(entry != _resources.end()); + if (entry != _resources.end()) { + source.Cancel(_event); + _resources.erase(entry); + } + if (_resources.empty() == true) { + Core::Thread::Block(); + ::SetEvent(_event.hEvent); + } + _adminLock.Unlock(); + } + + uint32_t Worker() override { + if (::WaitForSingleObject(_event.hEvent, Core::infinite) == WAIT_OBJECT_0) { + ::ResetEvent(_event.hEvent); + _adminLock.Lock(); + // Iterate through the list of entries to flush the data read: + for (ReaderImplementation* entry : _resources) { + entry->Process(_event); + } + _adminLock.Unlock(); + } + return(Core::infinite); + } + + private: + Core::CriticalSection _adminLock; + Implementations _resources; + OVERLAPPED _event; + }; + + public: + ReaderImplementation() = delete; + ReaderImplementation(ReaderImplementation&&) = delete; + ReaderImplementation(const ReaderImplementation&) = delete; + ReaderImplementation& operator=(ReaderImplementation&&) = delete; + ReaderImplementation& operator=(const ReaderImplementation&) = delete; + + ReaderImplementation(ParentClass& parent, const Core::IResource::handle replacing) + : Reader(parent) + , _index(replacing) + , _copy(Core::IResource::INVALID) + , _handle(nullptr) { + ASSERT(replacing != Core::IResource::INVALID); + } + ~ReaderImplementation() { + Close(); + _index = Core::IResource::INVALID; + } + + public: + bool Open() { + + ASSERT(_index != Core::IResource::INVALID); + + Core::IResource::handle newDescriptor; + + if (CreateOverlappedPipe(_handle, newDescriptor) != false) { + _flushall(); + _copy = ::_dup(_index); + if (::_dup2(newDescriptor, _index) == -1) { + ::_close(newDescriptor); + ::CloseHandle(_handle); + _handle = nullptr; + } + else { + ResourceMonitor::Instance().Register(*this); + ::_close(newDescriptor); + } + } + return (_handle != nullptr); + } + bool Close() { + if (_handle != nullptr) { + _flushall(); + if (::_dup2(_copy, _index) != -1) { + ::_close(_copy); + ResourceMonitor::Instance().Unregister(*this); + ::CloseHandle(_handle); + _handle = nullptr; + } + Reader::Flush(); + } + return (_handle == nullptr); + } + + private: + bool CreateOverlappedPipe(HANDLE& readPipe, int& writePipe) + { + static volatile uint32_t sequenceNumber = 0; + uint32_t newIndex = Core::InterlockedIncrement(sequenceNumber); + + HANDLE pipeHandle, pipeOutput; + TCHAR PipeNameBuffer[MAX_PATH]; + + sprintf(PipeNameBuffer, + _T("\\\\.\\Pipe\\ThunderRedirectPipe.%08x.%08x"), Core::ProcessInfo().Id(), newIndex); + + pipeHandle = CreateNamedPipeA( + PipeNameBuffer, + PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_WAIT, + 1, + 4096, + 4096, + 120 * 1000, + nullptr + ); + + if (pipeHandle != INVALID_HANDLE_VALUE) { + pipeOutput = CreateFileA( + PipeNameBuffer, + GENERIC_WRITE, + 0, + nullptr, + OPEN_EXISTING, + FILE_ATTRIBUTE_NORMAL, + nullptr); + + if (pipeOutput == INVALID_HANDLE_VALUE) { + ::CloseHandle(pipeHandle); + } + else { + writePipe = _open_osfhandle(reinterpret_cast(pipeOutput), 0); + if (writePipe == -1) { + ::CloseHandle(pipeHandle); + ::CloseHandle(pipeOutput); + } + else { + readPipe = pipeHandle; + return (true); + } + } + } + return(false); + } + void Read(OVERLAPPED& event) { + ::ReadFile(_handle, Reader::Buffer(), Reader::Length(), nullptr, &event); + } + void Cancel(OVERLAPPED& event) { + // Maybe some data was still flushed, first flush that... + DWORD bytesRead = 0; + + if ((::GetOverlappedResult(_handle, &event, &bytesRead, FALSE) != 0) && (bytesRead > 0)) { + Reader::ProcessBuffer(bytesRead); + } + + ::CancelIoEx(_handle, &event); + } + void Process(OVERLAPPED& event) { + DWORD bytesRead = 0; + + if ((::GetOverlappedResult(_handle, &event, &bytesRead, FALSE) != 0) && (bytesRead > 0)) { + Reader::ProcessBuffer(bytesRead); + Read(event); + } + } + + private: + Core::IResource::handle _index; + Core::IResource::handle _copy; + HANDLE _handle; + }; + + // friend class Core::SingletonType; + +#else + class ReaderImplementation : public Core::IResource, public Reader { + public: + ReaderImplementation() = delete; + ReaderImplementation(ReaderImplementation&&) = delete; + ReaderImplementation(const ReaderImplementation&) = delete; + ReaderImplementation& operator=(ReaderImplementation&&) = delete; + ReaderImplementation& operator=(const ReaderImplementation&) = delete; + + ReaderImplementation(ParentClass& parent, const Core::IResource::handle replacing) + : Reader(parent) + , _index(replacing) + , _copy(Core::IResource::INVALID) { + ASSERT(replacing != Core::IResource::INVALID); + _handle[0] = Core::IResource::INVALID; + _handle[1] = Core::IResource::INVALID; + } + ~ReaderImplementation() override { + Close(); + _index = Core::IResource::INVALID; + } + + public: + bool Open() { + + ASSERT(_index != Core::IResource::INVALID); + + if ((_handle[0] == Core::IResource::INVALID) && (::pipe(_handle) == 0)) { + _copy = ::dup(_index); + ::fsync(_index); + int flags = ::fcntl(_handle[0], F_GETFL); + ::fcntl(_handle[0], F_SETFL, (flags | O_NONBLOCK)); + ::dup2(_handle[1], _index); + ::close(_handle[1]); + _handle[1] = Core::IResource::INVALID; + Core::ResourceMonitor::Instance().Register(*this); + } + return (_handle[0] != Core::IResource::INVALID); + } + bool Close() { + if (_handle[0] != Core::IResource::INVALID) { + + ::fsync(_handle[0]); + + if (::dup2(_copy, _index) != -1) { + ::close(_handle[0]); + ::close(_copy); + Core::ResourceMonitor::Instance().Unregister(*this); + _handle[0] = Core::IResource::INVALID; + _copy = Core::IResource::INVALID; + Reader::Flush(); + } + } + return (_handle[0] == Core::IResource::INVALID); + } + Core::IResource::handle Origin() const { + return (_copy == Core::IResource::INVALID ? _index : _copy); + } + Core::IResource::handle Descriptor() const override { + return (_handle[0]); + } + uint16_t Events() override { + if (_handle[0] != Core::IResource::INVALID) { + return (POLLHUP | POLLRDHUP | POLLIN); + } + return (0); + } + void Handle(const uint16_t events) override { + // If we have an event, read and see if we have a full line.. + if ((events & POLLIN) != 0) { + int readBytes; + + do { + readBytes = read(_handle[0], Reader::Buffer(), Reader::Length()); + + if (readBytes > 0) { + Reader::ProcessBuffer(readBytes); + } + + } while (readBytes > 0); + } + } + + private: + Core::IResource::handle _index; + Core::IResource::handle _copy; + Core::IResource::handle _handle[2]; + }; +#endif + + public: + TextStreamRedirectType(TextStreamRedirectType&&) = delete; + TextStreamRedirectType(const TextStreamRedirectType&) = delete; + TextStreamRedirectType operator=(TextStreamRedirectType&&) = delete; + TextStreamRedirectType operator=(const TextStreamRedirectType&) = delete; + + TextStreamRedirectType(const Core::IResource::handle source) + : _channel(*this, source) { + } + ~TextStreamRedirectType() = default; + + public: + bool Open() { + return (_channel.Open()); + } + bool Close() { + return (_channel.Close()); + } + void Format(const TCHAR format[], ...) + { + string dst; + va_list ap; + va_start(ap, format); + int length; + va_list apStrLen; + va_copy(apStrLen, ap); + length = vsnprintf(nullptr, 0, format, apStrLen); + va_end(apStrLen); + + if (length > 0) { + dst.resize(length); + vsnprintf((char*)dst.data(), dst.size() + 1, format, ap); + write(_channel.Origin(), dst.c_str(), length); + } + else { + dst = "Format error! format: "; + dst.append(format); + } + + va_end(ap); + } + + private: + void Output(const uint16_t length, const TCHAR buffer[]) { + _metadata.Output(length, buffer); + } + + private: + ReaderImplementation _channel; + STREAMTYPE _metadata; + }; +} +} diff --git a/Source/core/core.h b/Source/core/core.h index 42749c1d8..e6e2501e7 100644 --- a/Source/core/core.h +++ b/Source/core/core.h @@ -87,6 +87,7 @@ #include "SystemInfo.h" #include "TextFragment.h" #include "TextReader.h" +#include "TextStreamRedirectType.h" #include "Thread.h" #include "ThreadPool.h" #include "Time.h" diff --git a/Source/messaging/CMakeLists.txt b/Source/messaging/CMakeLists.txt index 76a7d5729..beff54646 100644 --- a/Source/messaging/CMakeLists.txt +++ b/Source/messaging/CMakeLists.txt @@ -23,7 +23,8 @@ add_library(${TARGET} MessageUnit.cpp TraceCategories.cpp Logging.cpp - DirectOutput.cpp) + DirectOutput.cpp + OperationalCategories.cpp) set(PUBLIC_HEADERS Module.h @@ -42,6 +43,7 @@ set(PUBLIC_HEADERS TextMessage.h BaseCategory.h ConsoleStreamRedirect.h + OperationalCategories.h ) target_compile_definitions(${TARGET} PRIVATE MESSAGING_EXPORTS) @@ -100,5 +102,3 @@ install( DESTINATION ${CMAKE_INSTALL_PREFIX}/include/${NAMESPACE}/tracing/ ) endif() - - diff --git a/Source/messaging/ConsoleStreamRedirect.h b/Source/messaging/ConsoleStreamRedirect.h index b2a44abcc..d47f8ed59 100644 --- a/Source/messaging/ConsoleStreamRedirect.h +++ b/Source/messaging/ConsoleStreamRedirect.h @@ -1,456 +1,10 @@ #pragma once #include "Control.h" +#include "OperationalCategories.h" namespace WPEFramework { namespace Messaging { - template - class ConsoleStreamRedirectType { - private: - using ParentClass = ConsoleStreamRedirectType; - - class Reader { - private: - enum mode : uint8_t { - IDLE = 0x00, - SINGLE = 0x01, - DOUBLE = 0x02, - PENDING_RETURN = 0x11, - PENDING_LINEFEED = 0x21, - DROP_AND_RESET = 0x81 - }; - - public: - Reader() = delete; - Reader(Reader&&) = delete; - Reader(const Reader&) = delete; - Reader& operator=(Reader&&) = delete; - Reader& operator=(const Reader&) = delete; - - Reader(ParentClass& parent) - : _handler(parent) - , _offset(0) - , _delimiter(mode::IDLE) { - } - ~Reader() = default; - - public: - mode IsTermination(const mode lastMode, const int length, const TCHAR buffer[]) const { - mode result = mode::IDLE; - - ASSERT(length > 0); - - if (((lastMode == PENDING_RETURN) && (buffer[0] == '\r')) || - ((lastMode == PENDING_LINEFEED) && (buffer[0] == '\n'))) { - result = mode::DROP_AND_RESET; - } - else if ((lastMode & 0x30) == 0x00) { - // Nothing pending from the last time, start from scratch.. - if (buffer[0] == '\n') { - if (length == 1) { - result = mode::PENDING_RETURN; - } - else if (buffer[1] == '\r') { - result = mode::DOUBLE; - } - else { - result = mode::SINGLE; - } - } - else if (buffer[0] == '\r') { - if (length == 1) { - result = mode::PENDING_LINEFEED; - } - else if (buffer[1] == '\n') { - result = mode::DOUBLE; - } - else { - result = mode::SINGLE; - } - } - } - return (result); - } - void ProcessBuffer(const int readBytes) { - int index = 0; - int count = readBytes; - - ASSERT(readBytes > 0); - - // See if we have text termination in the new data.. - while (index < count) { - - _delimiter = IsTermination(_delimiter, (readBytes - index), &(_buffer[_offset])); - - // See if we need to skip characters... - if ((_delimiter & 0x03) == 0) { - index++; - _offset++; - } - else if (_delimiter == mode::DROP_AND_RESET) { - ASSERT(index == 0); - ASSERT(_offset == 0); - // This is a left over from the last check, just drop 1 character - ::memmove(_buffer, &(_buffer[1]), (readBytes - 1)); - count -= 1; - } - else { - // Time to send out a newline.. - _handler.Output(_offset, _buffer); - count -= (index + (_delimiter & 0x03)); - ::memmove(_buffer, &_buffer[_offset + (_delimiter & 0x03)], count); - _offset = 0; - index = 0; - } - } - - if (_offset == sizeof(_buffer)) { - _offset -= 32; - } - } - void Flush() { - if (_offset != 0) { - _handler.Output(_offset, _buffer); - _offset = 0; - } - } - TCHAR* Buffer() { - return (&(_buffer[_offset])); - } - uint16_t Length() { - return (sizeof(_buffer) - _offset); - } - - private: - ParentClass& _handler; - uint16_t _offset; - mode _delimiter; - TCHAR _buffer[1024]; - }; - -#ifdef __WINDOWS__ - class ReaderImplementation : public Reader { - private: - class ResourceMonitor : public Core::Thread { - private: - using Implementations = std::vector; - friend class Core::SingletonType; - - ResourceMonitor() - : Core::Thread(Core::Thread::DefaultStackSize(), _T("FileResourceMonitor")) - , _adminLock() - , _resources() { - ::memset(&_event, 0, sizeof(OVERLAPPED)); - _event.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); - } - public: - ResourceMonitor(ResourceMonitor&&) = delete; - ResourceMonitor(const ResourceMonitor&) = delete; - ResourceMonitor& operator= (ResourceMonitor&&) = delete; - ResourceMonitor& operator= (const ResourceMonitor&) = delete; - - static ResourceMonitor& Instance() { - static ResourceMonitor instance; - return (instance); - } - ~ResourceMonitor() override { - Core::Thread::Stop(); - Core::Thread::Wait(Core::Thread::BLOCKED | Core::Thread::STOPPED, Core::infinite); - } - - public: - void Register(ReaderImplementation& source) { - _adminLock.Lock(); - ASSERT(std::find(_resources.begin(), _resources.end(), &source) == _resources.end()); - _resources.push_back(&source); - source.Read(_event); - if (_resources.size() == 1) { - Core::Thread::Run(); - } - _adminLock.Unlock(); - } - void Unregister(ReaderImplementation& source) { - _adminLock.Lock(); - typename Implementations::iterator entry(std::find(_resources.begin(), _resources.end(), &source)); - ASSERT(entry != _resources.end()); - if (entry != _resources.end()) { - source.Cancel(_event); - _resources.erase(entry); - } - if (_resources.empty() == true) { - Core::Thread::Block(); - ::SetEvent(_event.hEvent); - } - _adminLock.Unlock(); - } - - uint32_t Worker() override { - if (::WaitForSingleObject(_event.hEvent, Core::infinite) == WAIT_OBJECT_0) { - ::ResetEvent(_event.hEvent); - _adminLock.Lock(); - // Iterate through the list of entries to flush the data read: - for (ReaderImplementation* entry : _resources) { - entry->Process(_event); - } - _adminLock.Unlock(); - } - return(Core::infinite); - } - - private: - Core::CriticalSection _adminLock; - Implementations _resources; - OVERLAPPED _event; - }; - - public: - ReaderImplementation() = delete; - ReaderImplementation(ReaderImplementation&&) = delete; - ReaderImplementation(const ReaderImplementation&) = delete; - ReaderImplementation& operator=(ReaderImplementation&&) = delete; - ReaderImplementation& operator=(const ReaderImplementation&) = delete; - - ReaderImplementation(ParentClass& parent) - : Reader(parent) - , _index(Core::IResource::INVALID) - , _copy(Core::IResource::INVALID) - , _handle(nullptr) { - } - ~ReaderImplementation() { - Close(); - } - - public: - bool Open(const Core::IResource::handle replacing) { - - ASSERT(_index == Core::IResource::INVALID); - ASSERT(replacing != Core::IResource::INVALID); - - if (replacing != Core::IResource::INVALID) { - Core::IResource::handle newDescriptor; - - if (CreateOverlappedPipe(_handle, newDescriptor) != false) { - _flushall(); - _copy = ::_dup(replacing); - if (::_dup2(newDescriptor, replacing) == -1) { - ::_close(newDescriptor); - ::CloseHandle(_handle); - } - else { - _index = replacing; - ResourceMonitor::Instance().Register(*this); - ::_close(newDescriptor); - } - } - } - return (_index != Core::IResource::INVALID); - } - bool Close() { - if (_index != Core::IResource::INVALID) { - _flushall(); - if (::_dup2(_copy, _index) != -1) { - ::_close(_copy); - ResourceMonitor::Instance().Unregister(*this); - _index = Core::IResource::INVALID; - ::CloseHandle(_handle); - } - Reader::Flush(); - } - return (_index == Core::IResource::INVALID); - } - - private: - bool CreateOverlappedPipe(HANDLE& readPipe, int& writePipe) - { - static volatile uint32_t sequenceNumber = 0; - uint32_t newIndex = Core::InterlockedIncrement(sequenceNumber); - - HANDLE pipeHandle, pipeOutput; - TCHAR PipeNameBuffer[MAX_PATH]; - - sprintf(PipeNameBuffer, - _T("\\\\.\\Pipe\\ThunderRedirectPipe.%08x.%08x"), Core::ProcessInfo().Id(), newIndex); - - pipeHandle = CreateNamedPipeA( - PipeNameBuffer, - PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED, - PIPE_TYPE_BYTE | PIPE_WAIT, - 1, - 4096, - 4096, - 120 * 1000, - nullptr - ); - - if (pipeHandle != INVALID_HANDLE_VALUE) { - pipeOutput = CreateFileA( - PipeNameBuffer, - GENERIC_WRITE, - 0, - nullptr, - OPEN_EXISTING, - FILE_ATTRIBUTE_NORMAL, - nullptr); - - if (pipeOutput == INVALID_HANDLE_VALUE) { - ::CloseHandle(pipeHandle); - } - else { - writePipe = _open_osfhandle(reinterpret_cast(pipeOutput), 0); - if (writePipe == -1) { - ::CloseHandle(pipeHandle); - ::CloseHandle(pipeOutput); - } - else { - readPipe = pipeHandle; - return (true); - } - } - } - return(false); - } - void Read(OVERLAPPED& event) { - ::ReadFile(_handle, Reader::Buffer(), Reader::Length(), nullptr, &event); - } - void Cancel(OVERLAPPED& event) { - // Maybe some data was still flushed, first flush that... - DWORD bytesRead = 0; - - if ((::GetOverlappedResult(_handle, &event, &bytesRead, FALSE) != 0) && (bytesRead > 0)) { - Reader::ProcessBuffer(bytesRead); - } - - ::CancelIoEx(_handle, &event); - } - void Process(OVERLAPPED& event) { - DWORD bytesRead = 0; - - if ((::GetOverlappedResult(_handle, &event, &bytesRead, FALSE) != 0) && (bytesRead > 0)) { - Reader::ProcessBuffer(bytesRead); - Read(event); - } - } - - private: - Core::IResource::handle _index; - Core::IResource::handle _copy; - HANDLE _handle; - }; - - // friend class Core::SingletonType; -#else - class ReaderImplementation : public Core::IResource, public Reader { - public: - ReaderImplementation() = delete; - ReaderImplementation(ReaderImplementation&&) = delete; - ReaderImplementation(const ReaderImplementation&) = delete; - ReaderImplementation& operator=(ReaderImplementation&&) = delete; - ReaderImplementation& operator=(const ReaderImplementation&) = delete; - - ReaderImplementation(ParentClass& parent) - : Reader(parent) - , _index(Core::IResource::INVALID) - , _copy(Core::IResource::INVALID) - , _handle(Core::IResource::INVALID) { - } - ~ReaderImplementation() override { - Close(); - } - - public: - -bool Open(const Core::IResource::handle replacing) { - - ASSERT(_index == Core::IResource::INVALID); - ASSERT(replacing != Core::IResource::INVALID); - - if (replacing != Core::IResource::INVALID) { - _handle = memfd_create(_T("RedirectReaderFile"), 0); - if (_handle != Core::IResource::INVALID) { - _index = replacing; - _copy = ::dup(replacing); - ::fsync(replacing); - ::dup2(_handle, _index); - ::close(_handle); - Core::ResourceMonitor::Instance().Register(*this); - } - } - return (_index != Core::IResource::INVALID); -} -bool Close() { - if (_index != Core::IResource::INVALID) { - ::fsync(_copy); - ::dup2(_copy, _index); - ::close(_copy); - Core::ResourceMonitor::Instance().Unregister(*this); - _index = Core::IResource::INVALID; - Reader::Flush(); - } - return (_index == -1); -} - -Core::IResource::handle Descriptor() const override { - return (_handle); -} -uint16_t Events() override { - if (_handle != Core::IResource::INVALID) { - return (POLLHUP | POLLRDHUP | POLLIN); - } - return (0); -} -void Handle(const uint16_t events) override { - // If we have an event, read and see if we have a full line.. - if ((events & POLLIN) != 0) { - int readBytes, freeSpace; - - do { - readBytes = read(_handle, Reader::Buffer(), Reader::Length()); - - if (readBytes > 0) { - Reader::ProcessBuffer(readBytes); - } - - } while (readBytes == freeSpace); - } -} - - private: - Core::IResource::handle _index; - Core::IResource::handle _copy; - Core::IResource::handle _handle; - bool _register; - }; -#endif - - public: - ConsoleStreamRedirectType(ConsoleStreamRedirectType&&) = delete; - ConsoleStreamRedirectType(const ConsoleStreamRedirectType&) = delete; - ConsoleStreamRedirectType operator=(ConsoleStreamRedirectType&&) = delete; - ConsoleStreamRedirectType operator=(const ConsoleStreamRedirectType&) = delete; - - ConsoleStreamRedirectType() - : _channel(*this) { - } - ~ConsoleStreamRedirectType() = default; - - public: - bool Open(const Core::IResource::handle fileDescriptor) { - return (_channel.Open(fileDescriptor)); - } - bool Close() { - return (_channel.Close()); - } - - private: - void Output(const uint16_t length, const TCHAR buffer[]) { - _metadata.Output(length, buffer); - } - - private: - ReaderImplementation _channel; - STREAMTYPE _metadata; - }; - class EXTERNAL StandardOut { public: StandardOut(StandardOut&&); @@ -462,12 +16,15 @@ void Handle(const uint16_t events) override { ~StandardOut() = default; public: - void Output(const uint16_t length, const TCHAR buffer[]) { - // if (LocalLifetimeType::IsEnabled() == true) { - - // } - string text(buffer, length); - fprintf(stderr, "Redirected: \"%s\"\n", text.c_str()); + void Output(const uint16_t length, const TCHAR buffer[]) + { + if (OperationalStream::StandardOut::IsEnabled() == true) { + Core::Messaging::MessageInfo messageInfo(OperationalStream::StandardOut::Metadata(), Core::Time::Now().Ticks()); + Core::Messaging::IStore::OperationalStream operationalStream(messageInfo); + string text(buffer, length); + TextMessage data(text); + MessageUnit::Instance().Push(operationalStream, &data); + } } }; @@ -483,16 +40,69 @@ void Handle(const uint16_t events) override { ~StandardError() = default; public: - void Output(const uint16_t length, const TCHAR buffer[]) { - // if (LocalLifetimeType::IsEnabled() == true) { + void Output(const uint16_t length, const TCHAR buffer[]) + { + if (OperationalStream::StandardError::IsEnabled() == true) { + Core::Messaging::MessageInfo messageInfo(OperationalStream::StandardError::Metadata(), Core::Time::Now().Ticks()); + Core::Messaging::IStore::OperationalStream operationalStream(messageInfo); + string text(buffer, length); + TextMessage data(text); + MessageUnit::Instance().Push(operationalStream, &data); + } + } + }; + + class EXTERNAL ConsoleStandardOut : public Core::TextStreamRedirectType { + public: + ConsoleStandardOut(ConsoleStandardOut&&) = delete; + ConsoleStandardOut(const ConsoleStandardOut&) = delete; + ConsoleStandardOut& operator=(ConsoleStandardOut&&) = delete; + ConsoleStandardOut& operator=(const ConsoleStandardOut&) = delete; - // } - string text(buffer, length); - fprintf(stderr, "Redirected: \"%s\"\n", text.c_str()); + private: + ConsoleStandardOut() +#ifdef __WINDOWS__ + : Core::TextStreamRedirectType(::_fileno(stdout)) { +#else + : Core::TextStreamRedirectType(STDOUT_FILENO) { +#endif + } + + public: + ~ConsoleStandardOut() = default; + + static ConsoleStandardOut& Instance() + { + static ConsoleStandardOut singleton; + + return (singleton); } }; - using ConsoleStandardOut = ConsoleStreamRedirectType; - using ConsoleStandardError = ConsoleStreamRedirectType; + class EXTERNAL ConsoleStandardError : public Core::TextStreamRedirectType { + public: + ConsoleStandardError(ConsoleStandardError&&) = delete; + ConsoleStandardError(const ConsoleStandardError&) = delete; + ConsoleStandardError& operator=(ConsoleStandardError&&) = delete; + ConsoleStandardError& operator=(const ConsoleStandardError&) = delete; + + private: + ConsoleStandardError() +#ifdef __WINDOWS__ + : Core::TextStreamRedirectType(::_fileno(stderr)) { +#else + : Core::TextStreamRedirectType(STDERR_FILENO) { +#endif + } + + public: + ~ConsoleStandardError() = default; + static ConsoleStandardError& Instance() + { + static ConsoleStandardError singleton; + + return (singleton); + } + }; } } diff --git a/Source/messaging/MessageDispatcher.h b/Source/messaging/MessageDispatcher.h index 9f85f0dcd..63cd091ab 100644 --- a/Source/messaging/MessageDispatcher.h +++ b/Source/messaging/MessageDispatcher.h @@ -79,9 +79,6 @@ namespace Messaging { while (cursor.Offset() < cursor.Size()) { uint16_t chunkSize = 0; cursor.Peek(chunkSize); - - TRACE_L1("Flushing buffer data!"); - cursor.Forward(chunkSize); } diff --git a/Source/messaging/OperationalCategories.cpp b/Source/messaging/OperationalCategories.cpp new file mode 100644 index 000000000..e825c706a --- /dev/null +++ b/Source/messaging/OperationalCategories.cpp @@ -0,0 +1,31 @@ +/* + * If not stated otherwise in this file or this component's LICENSE file the + * following copyright and licenses apply: + * + * Copyright 2022 Metrological + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "OperationalCategories.h" + +namespace WPEFramework { + +namespace OperationalStream { + + // Announce upfront all OperationalStream categories... + OPERATIONAL_STREAM_ANNOUNCE(StandardOut); + OPERATIONAL_STREAM_ANNOUNCE(StandardError); + +} +} diff --git a/Source/messaging/OperationalCategories.h b/Source/messaging/OperationalCategories.h new file mode 100644 index 000000000..a2496b697 --- /dev/null +++ b/Source/messaging/OperationalCategories.h @@ -0,0 +1,88 @@ +/* + * If not stated otherwise in this file or this component's LICENSE file the + * following copyright and licenses apply: + * + * Copyright 2022 Metrological + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "Module.h" +#include "BaseCategory.h" +#include "Control.h" + +#ifdef __WINDOWS__ + +#define DEFINE_OPERATIONAL_CATEGORY(CATEGORY) \ + DEFINE_MESSAGING_CATEGORY(WPEFramework::OperationalStream::BaseOperationalType, CATEGORY) + +#else + +#define DEFINE_OPERATIONAL_CATEGORY(CATEGORY) \ + DEFINE_MESSAGING_CATEGORY(WPEFramework::OperationalStream::BaseOperationalType, CATEGORY) \ + template<> \ + EXTERNAL typename WPEFramework::OperationalStream::BaseOperationalType::Control \ + WPEFramework::OperationalStream::BaseOperationalType::_control; + +#endif + +#define OPERATIONAL_STREAM_ANNOUNCE(CATEGORY) \ + template<> WPEFramework::OperationalStream::BaseOperationalType::Control \ + WPEFramework::OperationalStream::BaseOperationalType::_control(true) + +namespace WPEFramework { + +namespace OperationalStream { + + template + class BaseOperationalType : public Messaging::BaseCategoryType { + public: + using BaseClass = Messaging::BaseCategoryType; + using Control = Messaging::ControlType; + + BaseOperationalType(const BaseOperationalType&) = delete; + BaseOperationalType& operator=(const BaseOperationalType&) = delete; + + BaseOperationalType() = default; + ~BaseOperationalType() = default; + + using BaseClass::BaseClass; + + public: + inline static void Announce() { + IsEnabled(); + } + + inline static bool IsEnabled() { + return (_control.IsEnabled()); + } + + inline static void Enable(const bool enable) { + _control.Enable(enable); + } + + inline static const Core::Messaging::Metadata& Metadata() { + return (_control.Metadata()); + } + + private: + static Control _control; + }; + + DEFINE_OPERATIONAL_CATEGORY(StandardOut) + DEFINE_OPERATIONAL_CATEGORY(StandardError) + +} +} diff --git a/Source/messaging/messaging.h b/Source/messaging/messaging.h index d45d33f5e..411f51b2e 100644 --- a/Source/messaging/messaging.h +++ b/Source/messaging/messaging.h @@ -33,6 +33,7 @@ #include "TraceFactory.h" #include "TextMessage.h" #include "ConsoleStreamRedirect.h" +#include "OperationalCategories.h" #ifdef __WINDOWS__ #pragma comment(lib, "messaging.lib") diff --git a/Tests/CMakeLists.txt b/Tests/CMakeLists.txt index 9a82f9c6e..8ed17cf46 100644 --- a/Tests/CMakeLists.txt +++ b/Tests/CMakeLists.txt @@ -2,6 +2,7 @@ option(LOADER_TEST "Utility to load a plugin in isolation." OFF) option(HTTPSCLIENT_TEST "Example how to do https requests with Thunder." OFF) option(WORKERPOOL_TEST "WorkerPool stress test" OFF) option(FILE_UNLINK_TEST "File unlink test" OFF) +option(REDIRECT_TEST "Test stream redirection" OFF) if(BUILD_TESTS) add_subdirectory(unit) @@ -19,6 +20,10 @@ if(LOADER_TEST) add_subdirectory(loader) endif() +if(REDIRECT_TEST) + add_subdirectory(redirect) +endif() + if(WORKERPOOL_TEST) add_subdirectory(workerpool-test) endif() diff --git a/Tests/redirect/CMakeLists.txt b/Tests/redirect/CMakeLists.txt new file mode 100644 index 000000000..539bd02bf --- /dev/null +++ b/Tests/redirect/CMakeLists.txt @@ -0,0 +1,33 @@ +# If not stated otherwise in this file or this component's license file the +# following copyright and licenses apply: +# +# Copyright 2020 Metrological +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add_executable(redirect main.cpp) + +target_link_libraries(redirect + PRIVATE + ${NAMESPACE}Core::${NAMESPACE}Core + ) + +set_target_properties(redirect PROPERTIES + CXX_STANDARD 11 + CXX_STANDARD_REQUIRED YES + ) + +SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread") + +install(TARGETS redirect DESTINATION bin) + diff --git a/Tests/redirect/main.cpp b/Tests/redirect/main.cpp new file mode 100644 index 000000000..3568e8ff6 --- /dev/null +++ b/Tests/redirect/main.cpp @@ -0,0 +1,553 @@ +#include +#include +#include + +#define MODULE_NAME application_redirect + +#include + +using namespace WPEFramework; + +namespace WPEFramework { +namespace Test { + template + class TextStreamRedirectType { + private: + using ParentClass = TextStreamRedirectType; + + class Reader { + private: + enum mode : uint8_t { + IDLE = 0x00, + SINGLE = 0x01, + DOUBLE = 0x02, + PENDING_RETURN = 0x11, + PENDING_LINEFEED = 0x21, + DROP_AND_RESET = 0x81 + }; + + public: + Reader() = delete; + Reader(Reader&&) = delete; + Reader(const Reader&) = delete; + Reader& operator=(Reader&&) = delete; + Reader& operator=(const Reader&) = delete; + + Reader(ParentClass& parent) + : _handler(parent) + , _offset(0) + , _delimiter(mode::IDLE) { + } + ~Reader() = default; + + public: + mode IsTermination(const mode lastMode, const int length, const TCHAR buffer[]) const { + mode result = mode::IDLE; + + ASSERT(length > 0); + + if (((lastMode == PENDING_RETURN) && (buffer[0] == '\r')) || + ((lastMode == PENDING_LINEFEED) && (buffer[0] == '\n'))) { + result = mode::DROP_AND_RESET; + } + else if ((lastMode & 0x30) == 0x00) { + // Nothing pending from the last time, start from scratch.. + if (buffer[0] == '\n') { + if (length == 1) { + result = mode::PENDING_RETURN; + } + else if (buffer[1] == '\r') { + result = mode::DOUBLE; + } + else { + result = mode::SINGLE; + } + } + else if (buffer[0] == '\r') { + if (length == 1) { + result = mode::PENDING_LINEFEED; + } + else if (buffer[1] == '\n') { + result = mode::DOUBLE; + } + else { + result = mode::SINGLE; + } + } + } + return (result); + } + void ProcessBuffer(const int readBytes) { + int index = 0; + int count = readBytes; + + ASSERT(readBytes > 0); + + // See if we have text termination in the new data.. + while (index < count) { + + _delimiter = IsTermination(_delimiter, (readBytes - index), &(_buffer[_offset])); + + // See if we need to skip characters... + if ((_delimiter & 0x03) == 0) { + index++; + _offset++; + } + else if (_delimiter == mode::DROP_AND_RESET) { + ASSERT(index == 0); + ASSERT(_offset == 0); + // This is a left over from the last check, just drop 1 character + ::memmove(_buffer, &(_buffer[1]), (readBytes - 1)); + count -= 1; + } + else { + // Time to send out a newline.. + _handler.Output(_offset, _buffer); + count -= (index + (_delimiter & 0x03)); + ::memmove(_buffer, &_buffer[_offset + (_delimiter & 0x03)], count); + _offset = 0; + index = 0; + } + } + + if (_offset == sizeof(_buffer)) { + _offset -= 32; + } + } + void Flush() { + if (_offset != 0) { + _handler.Output(_offset, _buffer); + _offset = 0; + } + } + TCHAR* Buffer() { + return (&(_buffer[_offset])); + } + uint16_t Length() { + return (sizeof(_buffer) - _offset); + } + + private: + ParentClass& _handler; + uint16_t _offset; + mode _delimiter; + TCHAR _buffer[1024]; + }; + +#ifdef __WINDOWS__ + class ReaderImplementation : public Reader { + private: + class ResourceMonitor : public Core::Thread { + private: + using Implementations = std::vector; + friend class Core::SingletonType; + + ResourceMonitor() + : Core::Thread(Core::Thread::DefaultStackSize(), _T("FileResourceMonitor")) + , _adminLock() + , _resources() { + ::memset(&_event, 0, sizeof(OVERLAPPED)); + _event.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL); + } + public: + ResourceMonitor(ResourceMonitor&&) = delete; + ResourceMonitor(const ResourceMonitor&) = delete; + ResourceMonitor& operator= (ResourceMonitor&&) = delete; + ResourceMonitor& operator= (const ResourceMonitor&) = delete; + + static ResourceMonitor& Instance() { + static ResourceMonitor instance; + return (instance); + } + ~ResourceMonitor() override { + Core::Thread::Stop(); + Core::Thread::Wait(Core::Thread::BLOCKED | Core::Thread::STOPPED, Core::infinite); + } + + public: + void Register(ReaderImplementation& source) { + _adminLock.Lock(); + ASSERT(std::find(_resources.begin(), _resources.end(), &source) == _resources.end()); + _resources.push_back(&source); + source.Read(_event); + if (_resources.size() == 1) { + Core::Thread::Run(); + } + _adminLock.Unlock(); + } + void Unregister(ReaderImplementation& source) { + _adminLock.Lock(); + typename Implementations::iterator entry(std::find(_resources.begin(), _resources.end(), &source)); + ASSERT(entry != _resources.end()); + if (entry != _resources.end()) { + source.Cancel(_event); + _resources.erase(entry); + } + if (_resources.empty() == true) { + Core::Thread::Block(); + ::SetEvent(_event.hEvent); + } + _adminLock.Unlock(); + } + + uint32_t Worker() override { + if (::WaitForSingleObject(_event.hEvent, Core::infinite) == WAIT_OBJECT_0) { + ::ResetEvent(_event.hEvent); + _adminLock.Lock(); + // Iterate through the list of entries to flush the data read: + for (ReaderImplementation* entry : _resources) { + entry->Process(_event); + } + _adminLock.Unlock(); + } + return(Core::infinite); + } + + private: + Core::CriticalSection _adminLock; + Implementations _resources; + OVERLAPPED _event; + }; + + public: + ReaderImplementation() = delete; + ReaderImplementation(ReaderImplementation&&) = delete; + ReaderImplementation(const ReaderImplementation&) = delete; + ReaderImplementation& operator=(ReaderImplementation&&) = delete; + ReaderImplementation& operator=(const ReaderImplementation&) = delete; + + ReaderImplementation(ParentClass& parent) + : Reader(parent) + , _index(Core::IResource::INVALID) + , _copy(Core::IResource::INVALID) + , _handle(nullptr) { + } + ~ReaderImplementation() { + Close(); + } + + public: + bool Open(const Core::IResource::handle replacing) { + + ASSERT(_index == Core::IResource::INVALID); + ASSERT(replacing != Core::IResource::INVALID); + + if (replacing != Core::IResource::INVALID) { + Core::IResource::handle newDescriptor; + + if (CreateOverlappedPipe(_handle, newDescriptor) != false) { + _flushall(); + _copy = ::_dup(replacing); + if (::_dup2(newDescriptor, replacing) == -1) { + ::_close(newDescriptor); + ::CloseHandle(_handle); + } + else { + _index = replacing; + ResourceMonitor::Instance().Register(*this); + ::_close(newDescriptor); + } + } + } + return (_index != Core::IResource::INVALID); + } + bool Close() { + if (_index != Core::IResource::INVALID) { + _flushall(); + if (::_dup2(_copy, _index) != -1) { + ::_close(_copy); + ResourceMonitor::Instance().Unregister(*this); + _index = Core::IResource::INVALID; + ::CloseHandle(_handle); + } + Reader::Flush(); + } + return (_index == Core::IResource::INVALID); + } + + private: + bool CreateOverlappedPipe(HANDLE& readPipe, int& writePipe) + { + static volatile uint32_t sequenceNumber = 0; + uint32_t newIndex = Core::InterlockedIncrement(sequenceNumber); + + HANDLE pipeHandle, pipeOutput; + TCHAR PipeNameBuffer[MAX_PATH]; + + sprintf(PipeNameBuffer, + _T("\\\\.\\Pipe\\ThunderRedirectPipe.%08x.%08x"), Core::ProcessInfo().Id(), newIndex); + + pipeHandle = CreateNamedPipeA( + PipeNameBuffer, + PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_WAIT, + 1, + 4096, + 4096, + 120 * 1000, + nullptr + ); + + if (pipeHandle != INVALID_HANDLE_VALUE) { + pipeOutput = CreateFileA( + PipeNameBuffer, + GENERIC_WRITE, + 0, + nullptr, + OPEN_EXISTING, + FILE_ATTRIBUTE_NORMAL, + nullptr); + + if (pipeOutput == INVALID_HANDLE_VALUE) { + ::CloseHandle(pipeHandle); + } + else { + writePipe = _open_osfhandle(reinterpret_cast(pipeOutput), 0); + if (writePipe == -1) { + ::CloseHandle(pipeHandle); + ::CloseHandle(pipeOutput); + } + else { + readPipe = pipeHandle; + return (true); + } + } + } + return(false); + } + void Read(OVERLAPPED& event) { + ::ReadFile(_handle, Reader::Buffer(), Reader::Length(), nullptr, &event); + } + void Cancel(OVERLAPPED& event) { + // Maybe some data was still flushed, first flush that... + DWORD bytesRead = 0; + + if ((::GetOverlappedResult(_handle, &event, &bytesRead, FALSE) != 0) && (bytesRead > 0)) { + Reader::ProcessBuffer(bytesRead); + } + + ::CancelIoEx(_handle, &event); + } + void Process(OVERLAPPED& event) { + DWORD bytesRead = 0; + + if ((::GetOverlappedResult(_handle, &event, &bytesRead, FALSE) != 0) && (bytesRead > 0)) { + Reader::ProcessBuffer(bytesRead); + Read(event); + } + } + + private: + Core::IResource::handle _index; + Core::IResource::handle _copy; + HANDLE _handle; + }; + + // friend class Core::SingletonType; + +#else + class ReaderImplementation : public Core::IResource, public Reader { + public: + ReaderImplementation() = delete; + ReaderImplementation(ReaderImplementation&&) = delete; + ReaderImplementation(const ReaderImplementation&) = delete; + ReaderImplementation& operator=(ReaderImplementation&&) = delete; + ReaderImplementation& operator=(const ReaderImplementation&) = delete; + + ReaderImplementation(ParentClass& parent, const Core::IResource::handle replacing) + : Reader(parent) + , _index(replacing) + , _copy(Core::IResource::INVALID) { + ASSERT(replacing != Core::IResource::INVALID); + _handle[0] = Core::IResource::INVALID; + _handle[1] = Core::IResource::INVALID; + } + ~ReaderImplementation() override { + Close(); + _index = Core::IResource::INVALID; + } + + public: + bool Open() { + + ASSERT(_index != Core::IResource::INVALID); + + if ((_handle[0] == Core::IResource::INVALID) && (::pipe(_handle) == 0)) { + _copy = ::dup(_index); + ::fsync(_index); + int flags = fcntl(_handle[0], F_GETFL); + fcntl(_handle[0], F_SETFL, (flags | O_NONBLOCK)); + ::dup2(_handle[1], _index); + close(_handle[1]); + _handle[1] = Core::IResource::INVALID; + Core::ResourceMonitor::Instance().Register(*this); + } + return (_handle[0] == Core::IResource::INVALID); + } + bool Close() { + if (_handle[0] != Core::IResource::INVALID) { + + ::fsync(_handle[0]); + ::dup2(_copy, _index); + ::close(_handle[0]); + ::close(_copy); + Core::ResourceMonitor::Instance().Unregister(*this); + _handle[0] = Core::IResource::INVALID; + _copy = Core::IResource::INVALID; + Reader::Flush(); + } + return (true); + } + Core::IResource::handle Origin() const { + return (_copy == Core::IResource::INVALID ? _index : _copy); + } + Core::IResource::handle Descriptor() const override { + return (_handle[0]); + } + uint16_t Events() override { + if (_handle[0] != Core::IResource::INVALID) { + return (POLLHUP | POLLRDHUP | POLLIN); + } + return (0); + } + void Handle(const uint16_t events) override { + // If we have an event, read and see if we have a full line.. + if ((events & POLLIN) != 0) { + int readBytes; + + do { + readBytes = read(_handle[0], Reader::Buffer(), Reader::Length()); + + if (readBytes > 0) { + Reader::ProcessBuffer(readBytes); + } + + } while (readBytes > 0); + } + } + + private: + Core::IResource::handle _index; + Core::IResource::handle _copy; + Core::IResource::handle _handle[2]; + }; +#endif + + public: + TextStreamRedirectType(TextStreamRedirectType&&) = delete; + TextStreamRedirectType(const TextStreamRedirectType&) = delete; + TextStreamRedirectType operator=(TextStreamRedirectType&&) = delete; + TextStreamRedirectType operator=(const TextStreamRedirectType&) = delete; + + TextStreamRedirectType(const Core::IResource::handle source) + : _channel(*this, source) { + } + ~TextStreamRedirectType() = default; + + public: + bool Open() { + return (_channel.Open()); + } + bool Close() { + return (_channel.Close()); + } + void Format(const TCHAR format[], ...) + { + string dst; + va_list ap; + va_start(ap, format); + int length; + va_list apStrLen; + va_copy(apStrLen, ap); + length = vsnprintf(nullptr, 0, format, apStrLen); + va_end(apStrLen); + + if (length > 0) { + dst.resize(length); + vsnprintf((char*)dst.data(), dst.size() + 1, format, ap); + write(_channel.Origin(), dst.c_str(), length); + } + else { + dst = "Format error! format: "; + dst.append(format); + } + + va_end(ap); + } + + private: + void Output(const uint16_t length, const TCHAR buffer[]) { + _metadata.Output(length, buffer); + } + + private: + ReaderImplementation _channel; + STREAMTYPE _metadata; + }; + + class RedirectedStream { + public: + RedirectedStream(RedirectedStream&&) = delete; + RedirectedStream(const RedirectedStream&) = delete; + RedirectedStream& operator=(RedirectedStream&&) = delete; + RedirectedStream& operator=(const RedirectedStream&) = delete; + + RedirectedStream() = default; + ~RedirectedStream() = default; + + public: + void Output(const uint16_t length, const TCHAR buffer[]) { + string text(buffer, length); + fprintf(stdout, "Redirected: \"%s\"\n", text.c_str()); + } + }; +} +} + + +void help() { + printf ("W -> Write some text to stderr\n"); + printf ("O -> Open the redirect from stderr\n"); + printf ("C -> Close the redirect from stderr\n"); + printf ("Q -> Quit\n>"); +} + +int main(int argc, char** argv) +{ + { + int element; + uint32_t counter = 0; + Test::TextStreamRedirectType redirector(fileno(stderr)); + + help(); + do { + element = toupper(getchar()); + + switch (element) { + case 'W': { + counter++; + fprintf(stdout, "Sending the number [%d] to a potentially redirected stream...\n", counter); + fprintf(stderr, "Sending: [%d] ...\n", counter); + break; + } + case 'O': { + fprintf(stdout, "Opening the redirection.\n"); + redirector.Open(); + break; + } + case 'C': { + fprintf(stdout, "Closing the redirection.\n"); + redirector.Close(); + break; + } + case 'Q': break; + default: { + } + } + } while (element != 'Q'); + } + + Core::Singleton::Dispose(); + + return (0); +} From ce43523baed13e922d1448ee7b1677e92e18e57c Mon Sep 17 00:00:00 2001 From: Pierre Wielders Date: Tue, 3 Oct 2023 13:39:31 +0200 Subject: [PATCH 2/2] [ToFromHex] Enahce the serialization to and from hex strings. (#1418) 1) Remove a strange feature where a hex byte array could be prefixed with characters a \\ and an X, I guess this is a check for the layer calling this functionality and should not be part of this. 2) allow serialization of bytestreams and Deserializeation of bytestreams to include a delimter between the diffreent bytes. This feature is backwards compatible. Default the '\0' character is passed as the delimiter which means that there will be no delimiting used in either step. --- Source/core/Serialization.cpp | 40 +++++++++++++++++++++++------------ Source/core/Serialization.h | 4 ++-- 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/Source/core/Serialization.cpp b/Source/core/Serialization.cpp index 4dfd50069..3ae5d1626 100644 --- a/Source/core/Serialization.cpp +++ b/Source/core/Serialization.cpp @@ -158,28 +158,25 @@ POP_WARNING() static const TCHAR hex_chars[] = "0123456789abcdef"; - void EXTERNAL ToHexString(const uint8_t object[], const uint32_t length, string& result) + void EXTERNAL ToHexString(const uint8_t object[], const uint32_t length, string& result, const TCHAR delimiter) { ASSERT(object != nullptr); uint32_t index = static_cast(result.length()); - result.resize(index + (length * 2)); + result.resize(index + (length * 2) + (delimiter == '\0' ? 0 : (length - 1))); result[1] = hex_chars[object[0] & 0xF]; for (uint32_t i = 0, j = index; i < length; i++) { - if ((object[i] == '\\') && ((i + 3) < length) && (object[i + 1] == 'x')) { - result[j++] = object[i + 2]; - result[j++] = object[i + 3]; - i += 3; - } else { - result[j++] = hex_chars[object[i] >> 4]; - result[j++] = hex_chars[object[i] & 0xF]; + if ((delimiter != '\0') && (i > 0)) { + result[j++] = delimiter; } + result[j++] = hex_chars[object[i] >> 4]; + result[j++] = hex_chars[object[i] & 0xF]; } } - uint32_t EXTERNAL FromHexString(const string& hexString, uint8_t* object, const uint32_t maxLength) + uint32_t EXTERNAL FromHexString(const string& hexString, uint8_t* object, const uint32_t maxLength, const TCHAR delimiter) { ASSERT(object != nullptr || maxLength == 0); uint8_t highNibble; @@ -187,14 +184,31 @@ POP_WARNING() uint32_t bufferIndex = 0, strIndex = 0; // assume first character is 0 if length is odd. - if (hexString.length() % 2 == 1) { + if ((delimiter == '\0') && (hexString.length() % 2 == 1)) { lowNibble = FromHexDigits(hexString[strIndex++]); object[bufferIndex++] = lowNibble; } while ((bufferIndex < maxLength) && (strIndex < hexString.length())) { - highNibble = FromHexDigits(hexString[strIndex++]); - lowNibble = FromHexDigits(hexString[strIndex++]); + if (delimiter == '\0') { + highNibble = FromHexDigits(hexString[strIndex++]); + lowNibble = FromHexDigits(hexString[strIndex++]); + } + else { + uint8_t nibble = FromHexDigits(hexString[strIndex++]); + if (hexString[strIndex] == delimiter) { + highNibble = 0; + lowNibble = nibble; + ++strIndex; + } + else { + highNibble = nibble; + lowNibble = FromHexDigits(hexString[strIndex++]); + if (hexString[strIndex] == delimiter) { + ++strIndex; + } + } + } object[bufferIndex++] = (highNibble << 4) + lowNibble; } diff --git a/Source/core/Serialization.h b/Source/core/Serialization.h index 85dca41b5..27698a073 100644 --- a/Source/core/Serialization.h +++ b/Source/core/Serialization.h @@ -255,8 +255,8 @@ POP_WARNING() //------------------------------------------------------------------------ // Serialize: binary buffer //------------------------------------------------------------------------ - void EXTERNAL ToHexString(const uint8_t object[], const uint32_t length, string& result); - uint32_t EXTERNAL FromHexString(const string& hexString, uint8_t* object, const uint32_t maxLength); + void EXTERNAL ToHexString(const uint8_t object[], const uint32_t length, string& result, const TCHAR delimiter = '\0'); + uint32_t EXTERNAL FromHexString(const string& hexString, uint8_t* object, const uint32_t maxLength, const TCHAR delimiter = '\0'); //------------------------------------------------------------------------ // Serialize: Base64