From 7a382d9e9bf5d71b6a0066f40e2ad3c380a6dbb6 Mon Sep 17 00:00:00 2001 From: Mateusz Daniluk <121170681+VeithMetro@users.noreply.github.com> Date: Fri, 8 Nov 2024 15:16:20 +0100 Subject: [PATCH] [MessageControl] Open a socket in UDPOutput only when Network is active (#320) * Make sure to only open a channel if the network subsystem is already active * Delete move constructor and operator * Updated is called automatically after Register * Add a new method UpdateChannel and call it in the Notification * Add an implementation of UpdateChannel * Remove unnecessary check whether the channel is open before closing --- MessageControl/MessageControl.cpp | 2 +- MessageControl/MessageOutput.cpp | 31 ++++++++++++++++++++--- MessageControl/MessageOutput.h | 42 +++++++++++++++++++++++++++++-- 3 files changed, 68 insertions(+), 7 deletions(-) diff --git a/MessageControl/MessageControl.cpp b/MessageControl/MessageControl.cpp index 02774b3..0f0cf43 100644 --- a/MessageControl/MessageControl.cpp +++ b/MessageControl/MessageControl.cpp @@ -114,7 +114,7 @@ namespace Thunder { Announce(new Publishers::FileOutput(abbreviate, _config.FileName.Value())); } if ((_config.Remote.Binding.Value().empty() == false) && (_config.Remote.Port.Value() != 0)) { - Announce(new Publishers::UDPOutput(abbreviate, Core::NodeId(_config.Remote.NodeId()))); + Announce(new Publishers::UDPOutput(abbreviate, Core::NodeId(_config.Remote.NodeId()), _service)); } _webSocketExporter.Initialize(service, _config.MaxExportConnections.Value()); diff --git a/MessageControl/MessageOutput.cpp b/MessageControl/MessageOutput.cpp index 8cf00a0..0b153fc 100644 --- a/MessageControl/MessageOutput.cpp +++ b/MessageControl/MessageOutput.cpp @@ -116,7 +116,8 @@ namespace Publishers { { ::memset(_sendBuffer, 0, sizeof(_sendBuffer)); } - UDPOutput::Channel::~Channel() { + UDPOutput::Channel::~Channel() + { Close(Core::infinite); } @@ -160,16 +161,38 @@ namespace Publishers { Trigger(); } - UDPOutput::UDPOutput(const Core::Messaging::MessageInfo::abbreviate abbreviate, const Core::NodeId& nodeId) + UDPOutput::UDPOutput(const Core::Messaging::MessageInfo::abbreviate abbreviate, const Core::NodeId& nodeId, PluginHost::IShell* service) : _convertor(abbreviate) , _output(nodeId) + , _notification(*this) + , _subSystem(service->SubSystems()) + { + ASSERT(_subSystem != nullptr); + + if (_subSystem != nullptr) { + _subSystem->AddRef(); + _subSystem->Register(&_notification); + } + } + + void UDPOutput::UpdateChannel() { - _output.Open(0); + if (_subSystem->IsActive(PluginHost::ISubSystem::NETWORK)) { + if (_output.IsOpen() == false) { + _output.Open(0); + } + ASSERT(_output.IsOpen() == true); + } + else { + _output.Close(Core::infinite); + } } void UDPOutput::Message(const Core::Messaging::MessageInfo& metadata, const string& text) /* override */ { - _output.Output(_convertor.Convert(metadata, text)); + if (_output.IsOpen() == true) { + _output.Output(_convertor.Convert(metadata, text)); + } } } // namespace Publishers diff --git a/MessageControl/MessageOutput.h b/MessageControl/MessageOutput.h index b414f8d..2e80f52 100644 --- a/MessageControl/MessageOutput.h +++ b/MessageControl/MessageOutput.h @@ -333,19 +333,57 @@ namespace Publishers { Core::CriticalSection _adminLock; }; + class Notification : public PluginHost::ISubSystem::INotification { + public: + Notification() = delete; + Notification(const Notification&) = delete; + Notification(Notification&&) = delete; + Notification& operator=(const Notification&) = delete; + Notification& operator=(Notification&&) = delete; + + explicit Notification(UDPOutput& parent) + : _parent(parent) { + } + ~Notification() = default; + + public: + void Updated() override + { + _parent.UpdateChannel(); + } + + BEGIN_INTERFACE_MAP(Notification) + INTERFACE_ENTRY(PluginHost::ISubSystem::INotification) + END_INTERFACE_MAP + + private: + UDPOutput& _parent; + }; + public: UDPOutput() = delete; UDPOutput(const UDPOutput&) = delete; UDPOutput& operator=(const UDPOutput&) = delete; - explicit UDPOutput(const Core::Messaging::MessageInfo::abbreviate abbreviate, const Core::NodeId& nodeId); - ~UDPOutput() = default; + explicit UDPOutput(const Core::Messaging::MessageInfo::abbreviate abbreviate, const Core::NodeId& nodeId, PluginHost::IShell* service); + + ~UDPOutput() override + { + if (_subSystem != nullptr) { + _subSystem->Unregister(&_notification); + _subSystem->Release(); + _subSystem = nullptr; + } + } + void UpdateChannel(); void Message(const Core::Messaging::MessageInfo& metadata, const string& text); private: Text _convertor; Channel _output; + Core::SinkType _notification; + PluginHost::ISubSystem* _subSystem; }; class WebSocketOutput : public IPublish {