Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[METADATA] Expose all COMRPC metadata like Channels and Proxies (also… #1775

Merged
merged 16 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions Source/Thunder/Controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1216,22 +1216,27 @@ namespace Plugin {
return (Core::ERROR_NONE);
}

Core::hresult Controller::Proxies(const uint32_t linkId, IMetadata::Data::IProxiesIterator*& outProxies) const
Core::hresult Controller::Proxies(const Core::OptionalType<string>& linkId, IMetadata::Data::IProxiesIterator*& outProxies) const
{
Core::hresult result = Core::ERROR_UNKNOWN_KEY;

std::vector<IMetadata::Data::Proxy> collection;
bool proxySearch = RPC::Administrator::Instance().Allocations(linkId, [&collection](const std::vector<ProxyStub::UnknownProxy*>& proxies) {
bool proxySearch = RPC::Administrator::Instance().Allocations(linkId.IsSet() ? linkId.Value() : EMPTY_STRING, [&collection, &linkId](const string& origin, const std::vector<ProxyStub::UnknownProxy*>& proxies) {
for (const auto& proxy : proxies) {
IMetadata::Data::Proxy data;
data.Count = proxy->ReferenceCount();
data.Instance = proxy->Implementation();
data.Interface = proxy->InterfaceId();
data.Name = proxy->Name();
if (linkId.IsSet() == false) {
data.Origin = Core::NumberType<uint32_t>(proxy->ChannelId()).Text() + '@' + origin;
}
collection.emplace_back(std::move(data));
}
});

TRACE(Trace::Information, (_T("Found %d proxies to be listed and the search = [%s]"), collection.size(), proxySearch ? _T("true") : _T("false")));

if (proxySearch == true) {
using Iterator = IMetadata::Data::IProxiesIterator;

Expand Down Expand Up @@ -1440,6 +1445,8 @@ namespace Plugin {
buildInfo.ThreadPoolCount = THREADPOOL_COUNT;
#endif

buildInfo.COMRPCTimeOut = RPC::CommunicationTimeOut;

return (Core::ERROR_NONE);
}
}
Expand Down
2 changes: 1 addition & 1 deletion Source/Thunder/Controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ namespace Plugin {

// IMetadata overrides
Core::hresult Links(IMetadata::Data::ILinksIterator*& links) const override;
Core::hresult Proxies(const uint32_t linkId, IMetadata::Data::IProxiesIterator*& proxies) const override;
Core::hresult Proxies(const Core::OptionalType<string>& linkId, IMetadata::Data::IProxiesIterator*& proxies) const override;
Core::hresult Services(const Core::OptionalType<string>& callsign, IMetadata::Data::IServicesIterator*& services) const override;
Core::hresult CallStack(const uint8_t threadId, IMetadata::Data::ICallStackIterator*& callstack) const override;
Core::hresult Threads(IMetadata::Data::IThreadsIterator*& threads) const override;
Expand Down
30 changes: 12 additions & 18 deletions Source/Thunder/ExampleConfigWindows.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
"binding": "127.0.0.1",
"idletime": 60,
"ipv6": false,
"persistentpath": "C:/ThunderWin/artifacts/Persistent",
"volatilepath": "C:/ThunderWin/artifacts/temp",
"datapath": "C:/ThunderWin/artifacts/Debug/Plugins",
"systempath": "C:/ThunderWin/artifacts/Debug",
"proxystubpath": "C:/ThunderWin/artifacts/ProxyStubs/Debug",
"persistentpath": "D:/domotica/artifacts/Persistent",
"volatilepath": "D:/domotica//artifacts/temp",
"datapath": "D:/domotica/artifacts/Debug/Plugins",
"systempath": "D:/domotica/artifacts/Debug",
"proxystubpath": "D:/domotica/artifacts/ProxyStubs/Debug",
"communicator": "127.0.0.1:62000",
"redirect": "Service/Controller/UI",
"observe": {
"proxystubpath": "C:/ThunderWin/artifacts/dynamic/proxystubs",
"configpath": "C:/ThunderWin/artifacts/dynamic/config"
"proxystubpath": "D:/domotica/artifacts/dynamic/proxystubs",
"configpath": "D:/domotica/artifacts/dynamic/config"
},
"messaging": {
"port": 63000,
Expand Down Expand Up @@ -137,7 +137,7 @@
"callsign": "Butler",
"locator": "libButler.so",
"classname": "Butler",
"startmode": "Activated"
"startmode": "Deactivated"
},
{
"callsign": "ZigbeeControl",
Expand All @@ -154,10 +154,10 @@
"callsign": "ZWaveControl",
"locator": "libZWaveControl.so",
"classname": "ZWaveControl",
"startmode": "Activated",
"startmode": "Deactivated",
"configuration": {
"port": "\\\\.\\COM4",
"key": "ba:09:87:65:43:21:de:ad:be:ef:12:34:56:78:90:ab"
"key": "ba:09:87:65:43:21:de:ad:be:ef:12:34:56:78:90:ab"
}
},
{
Expand Down Expand Up @@ -401,7 +401,7 @@
"sleep": "5",
"single": false,
"crash": true,
"leak": true,
"leak": true,
"root": {
"mode": "Local"
}
Expand Down Expand Up @@ -458,7 +458,7 @@
"callsign": "WebServer",
"locator": "libWebServer.so",
"classname": "WebServer",
"startmode": "Deactivated",
"startmode": "Activated",
"communicator": "127.0.0.1:2349",
"configuration": {
"port": 8080,
Expand Down Expand Up @@ -571,12 +571,6 @@
"classname": "MessageControl",
"startmode": "Activated"
},
{
"callsign": "TraceControl",
"locator": "libtracecontrol.so",
"classname": "TraceControl",
"startmode": "Deactivated"
},
{
"callsign": "RemoteControl",
"locator": "libremotecontrol.so",
Expand Down
4 changes: 2 additions & 2 deletions Source/Thunder/PluginHost.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -712,10 +712,10 @@ POP_WARNING()
printf("Link: %s\n", index.Current().Remote.Value().c_str());
printf("------------------------------------------------------------\n");

RPC::Administrator::Instance().Allocations(index.Current().ID.Value(), [](const std::vector<ProxyStub::UnknownProxy*>& proxies) {
RPC::Administrator::Instance().Allocations(index.Current().Name.Value(), [](const string& origin, const std::vector<ProxyStub::UnknownProxy*>& proxies) {
for (const auto& proxy: proxies) {
Core::instance_id instanceId = proxy->Implementation();
printf("[%s] InstanceId: 0x%" PRIx64 ", RefCount: %d, InterfaceId %d [0x%X]\n", proxy->Name().c_str(), static_cast<uint64_t>(instanceId), proxy->ReferenceCount(), proxy->InterfaceId(), proxy->InterfaceId());
printf("[%s] InstanceId: 0x%" PRIx64 ", RefCount: %d, InterfaceId %d [0x%X], Origin: %s\n", proxy->Name().c_str(), static_cast<uint64_t>(instanceId), proxy->ReferenceCount(), proxy->InterfaceId(), proxy->InterfaceId(), origin.c_str());
}
printf("\n");
});
Expand Down
32 changes: 25 additions & 7 deletions Source/Thunder/PluginServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -532,10 +532,11 @@ namespace PluginHost {
ExternalAccess& operator=(const ExternalAccess&) = delete;

ExternalAccess(
const Core::NodeId& source,
const Core::NodeId& sourceNode,
const string& proxyStubPath,
const Core::ProxyType<RPC::InvokeServer>& handler)
: RPC::Communicator(source, proxyStubPath, Core::ProxyType<Core::IIPCServer>(handler))
const Core::ProxyType<RPC::InvokeServer>& handler,
const string& sourceName)
: RPC::Communicator(sourceNode, proxyStubPath, Core::ProxyType<Core::IIPCServer>(handler), sourceName.c_str())
, _plugin(nullptr) {
}
~ExternalAccess() override = default;
Expand Down Expand Up @@ -822,7 +823,7 @@ namespace PluginHost {
, _lastId(0)
, _metadata(plugin.MaxRequests.Value())
, _library()
, _external(PluginNodeId(server, plugin), server.ProxyStubPath(), handler)
, _external(PluginNodeId(server, plugin), server.ProxyStubPath(), handler, '/' + Callsign())
, _administrator(administrator)
, _composit(*this)
, _jobs(administrator)
Expand Down Expand Up @@ -855,6 +856,9 @@ namespace PluginHost {
}

public:
inline const RPC::Communicator& COMServer() const {
return (_external);
}
inline void Submit(Core::ProxyType<Core::IDispatch>&& job) {
_jobs.Push(std::move(job));
}
Expand Down Expand Up @@ -2205,7 +2209,7 @@ namespace PluginHost {
const uint8_t hardKillCheckWaitTime,
const bool delegatedReleases,
const Core::ProxyType<RPC::InvokeServer>& handler)
: RPC::Communicator(node, ProxyStubPathCreator(proxyStubPath, observableProxyStubPath), Core::ProxyType<Core::IIPCServer>(handler))
: RPC::Communicator(node, ProxyStubPathCreator(proxyStubPath, observableProxyStubPath), Core::ProxyType<Core::IIPCServer>(handler), _T("/"))
, _parent(parent)
, _persistentPath(persistentPath)
, _systemPath(systemPath)
Expand Down Expand Up @@ -3203,10 +3207,24 @@ namespace PluginHost {

entry.Activity = element.Source().IsOpen();
entry.State = Metadata::Channel::state::COMRPC;
entry.Name = string("/" EXPAND_AND_QUOTE(APPLICATION_NAME) "/Communicator");
entry.Name = element.Extension().Origin();
entry.Remote = element.Source().RemoteId();
});
_adminLock.Unlock();

for (const auto& entry : _services) {
entry.second->COMServer().Visit([&](const RPC::Communicator::Client& element)
{
Metadata::Channel& entry = metaData.Add();
entry.ID = element.Extension().Id();

entry.Activity = element.Source().IsOpen();
entry.State = Metadata::Channel::state::COMRPC;
entry.Name = element.Extension().Origin();
entry.Remote = element.Source().RemoteId();
});
}

_adminLock.Unlock();
}
uint32_t FromIdentifier(const string& callSign, Core::ProxyType<IShell>& service)
{
Expand Down
44 changes: 29 additions & 15 deletions Source/com/Administrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@

#include "Administrator.h"
#include "IUnknown.h"
#include "Communicator.h"

namespace Thunder {
namespace RPC {

/* static */ const string Administrator::DanglingId("/Dangling");

Administrator::Administrator()
: _adminLock()
, _stubs()
Expand Down Expand Up @@ -122,17 +125,17 @@ namespace RPC {
ChannelMap::iterator index(_channelProxyMap.find(proxy.Id()));

if (index != _channelProxyMap.end()) {
Proxies::iterator entry(index->second.begin());
while ((entry != index->second.end()) && ((*entry) != &proxy)) {
Proxies::iterator entry(index->second.second.begin());
while ((entry != index->second.second.end()) && ((*entry) != &proxy)) {
entry++;
}

ASSERT(entry != index->second.end());
ASSERT(entry != index->second.second.end());

if (entry != index->second.end()) {
index->second.erase(entry);
if (entry != index->second.second.end()) {
index->second.second.erase(entry);
removed = true;
if (index->second.size() == 0) {
if (index->second.second.size() == 0) {
_channelProxyMap.erase(index);
}
}
Expand Down Expand Up @@ -228,11 +231,11 @@ namespace RPC {
ChannelMap::iterator index(_channelProxyMap.find(channel->Id()));

if (index != _channelProxyMap.end()) {
Proxies::iterator entry(index->second.begin());
while ((entry != index->second.end()) && (((*entry)->InterfaceId() != id) || ((*entry)->Implementation() != impl))) {
Proxies::iterator entry(index->second.second.begin());
while ((entry != index->second.second.end()) && (((*entry)->InterfaceId() != id) || ((*entry)->Implementation() != impl))) {
entry++;
}
if (entry != index->second.end()) {
if (entry != index->second.second.end()) {
interface = (*entry)->QueryInterface(id);
if (interface != nullptr) {
result = (*entry);
Expand Down Expand Up @@ -261,11 +264,11 @@ namespace RPC {
ChannelMap::iterator index(_channelProxyMap.find(channelId));

if (index != _channelProxyMap.end()) {
Proxies::iterator entry(index->second.begin());
while ((entry != index->second.end()) && (((*entry)->InterfaceId() != id) || ((*entry)->Implementation() != impl))) {
Proxies::iterator entry(index->second.second.begin());
while ((entry != index->second.second.end()) && (((*entry)->InterfaceId() != id) || ((*entry)->Implementation() != impl))) {
entry++;
}
if (entry != index->second.end()) {
if (entry != index->second.second.end()) {
interface = (*entry)->Acquire(outbound, id);

// The implementation could be found, but the current implemented proxy is not
Expand All @@ -287,7 +290,18 @@ namespace RPC {
ASSERT(result != nullptr);

// Register it as it is remotely registered :-)
_channelProxyMap[channelId].push_back(result);
ChannelMap::iterator channelIndex(_channelProxyMap.find(channelId));

if (channelIndex != _channelProxyMap.end()) {
channelIndex->second.second.push_back(result);
}
else {
Proxies baseList;
baseList.emplace_back(result);
_channelProxyMap.emplace(std::piecewise_construct,
std::forward_as_tuple(channelId),
std::forward_as_tuple(std::pair<string, Proxies>(channel->Origin(), baseList)));
}

// This will increment the reference count to 2 (one in the ChannelProxyMap and one in the QueryInterface ).
interface = result->QueryInterface(id);
Expand Down Expand Up @@ -431,7 +445,7 @@ namespace RPC {
ChannelMap::iterator index(_channelProxyMap.find(channelId));

if (index != _channelProxyMap.end()) {
for (auto entry : index->second) {
for (auto entry : index->second.second) {
entry->Invalidate();
_danglingProxies.emplace_back(entry);

Expand All @@ -444,7 +458,7 @@ namespace RPC {
// the pendingProxies. The receiver of pendingProxies has to take
// care of releasing the last reference we, as administration layer
// hold upon this..
pendingProxies = std::move(index->second);
pendingProxies = std::move(index->second.second);
_channelProxyMap.erase(index);
}

Expand Down
22 changes: 15 additions & 7 deletions Source/com/Administrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@ namespace RPC {
};

public:
static const string DanglingId;

using Proxies = std::vector<ProxyStub::UnknownProxy*>;
using ChannelMap = std::unordered_map<uint32_t, Proxies>;
using ChannelMap = std::unordered_map<uint32_t, std::pair<string, Proxies > >;
using ReferenceMap = std::unordered_map<uint32_t, std::list< RecoverySet > >;
using Stubs = std::unordered_map<uint32_t, ProxyStub::UnknownStub*>;
using Factories = std::unordered_map<uint32_t, IMetadata*>;
Expand All @@ -149,25 +151,31 @@ namespace RPC {
}

template<typename ACTION>
bool Allocations(const uint32_t id, ACTION&& action) const {
bool Allocations(const string& linkId, ACTION&& action) const {
bool found = false;
_adminLock.Lock();
if (id == 0) {
if (linkId.empty() == true) {
for (const auto& proxy : _channelProxyMap) {
action(proxy.second);
action(proxy.second.first, proxy.second.second);
}
action(_danglingProxies);
action(DanglingId, _danglingProxies);
found = true;
}
else if (linkId == DanglingId) {
action(DanglingId, _danglingProxies);
found = true;
}
else {
ChannelMap::const_iterator index(_channelProxyMap.begin());
while ((found == false) && (index != _channelProxyMap.end())) {
if (index->first != id) {
ASSERT(index->second.second.size() != 0);

if (index->second.first != linkId) {
index++;
}
else {
found = true;
action(index->second);
action(index->second.first, index->second.second);
}
}
}
Expand Down
Loading
Loading