Skip to content

Commit

Permalink
Stopped things from going 💥
Browse files Browse the repository at this point in the history
Signed-off-by: Arjo Chakravarty <[email protected]>
  • Loading branch information
arjo129 committed Nov 9, 2023
1 parent 5c5b66d commit 950c622
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 13 deletions.
9 changes: 9 additions & 0 deletions src/Barrier.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,12 @@ void Barrier::Cancel()
this->dataPtr->cancelled = true;
this->dataPtr->cv.notify_all();
}

#include <iostream>
//////////////////////////////////////////////////
void Barrier::Drop()
{
std::unique_lock<std::mutex> lock(this->dataPtr->mutex);
this->dataPtr->threadCount--;
this->dataPtr->cv.notify_all();
}
2 changes: 2 additions & 0 deletions src/Barrier.hh
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ namespace gz
/// reflect that.
public: ExitStatus Wait();

public: void Drop();

/// \brief Cancel the barrier, causing all threads to unblock and
/// return CANCELLED
public: void Cancel();
Expand Down
19 changes: 17 additions & 2 deletions src/SimulationRunner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ void SimulationRunner::ProcessSystemQueue()

this->postUpdateThreads.push_back(std::thread([&, id]()
{
auto parentEntity = system.parent;
std::stringstream ss;
ss << "PostUpdateThread: " << id;
GZ_PROFILE_THREAD_NAME(ss.str().c_str());
Expand All @@ -524,11 +525,24 @@ void SimulationRunner::ProcessSystemQueue()
this->postUpdateStartBarrier->Wait();
if (this->postUpdateThreadsRunning)
{
auto terminate = this->threadsToTerminate.find(parentEntity);
if (terminate != this->threadsToTerminate.end()) {
terminate->second--;
if (terminate->second == 0) {
this->threadsToTerminate.erase(terminate);
}
gzerr << "Terminating thread " << id << ", " << parentEntity <<"\n";
this->postUpdateStartBarrier->Drop();
this->postUpdateStopBarrier->Wait();
this->postUpdateStopBarrier->Drop();
break;
}
system.system->PostUpdate(this->currentInfo, this->entityCompMgr);
}
this->postUpdateStopBarrier->Wait();
}
gzdbg << "Exiting postupdate worker thread ("

gzerr << "Exiting postupdate worker thread ("
<< id << ")" << std::endl;
}));
id++;
Expand Down Expand Up @@ -873,7 +887,8 @@ void SimulationRunner::Step(const UpdateInfo &_info)
this->ProcessRecreateEntitiesCreate();

// Process entity removals.
this->systemMgr->ProcessRemovedEntities(this->entityCompMgr);
this->systemMgr->ProcessRemovedEntities(this->entityCompMgr,
this->threadsToTerminate);
this->entityCompMgr.ProcessRemoveEntityRequests();

// Process components removals
Expand Down
3 changes: 3 additions & 0 deletions src/SimulationRunner.hh
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,9 @@ namespace gz
/// at the appropriate time.
private: std::unique_ptr<msgs::WorldControlState> newWorldControlState;


private: std::unordered_map<Entity, std::size_t> threadsToTerminate;

private: bool resetInitiated{false};
friend class LevelManager;
};
Expand Down
48 changes: 43 additions & 5 deletions src/SystemManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
*/

#include <list>
#include <mutex>
#include <set>

#include <gz/common/StringUtils.hh>

#include "SystemInternal.hh"
#include "gz/sim/components/SystemPluginInfo.hh"
#include "gz/sim/Conversions.hh"
#include "SystemManager.hh"
Expand Down Expand Up @@ -107,10 +109,14 @@ size_t SystemManager::ActivatePendingSystems()
this->systems.push_back(system);

if (system.configure)
this->systemsConfigure.push_back(system.configure);
this->systemsConfigure.emplace_back(
system.parentEntity,
system.configure);

if (system.configureParameters)
this->systemsConfigureParameters.push_back(system.configureParameters);
this->systemsConfigureParameters.emplace_back(
system.parentEntity,
system.configureParameters);

if (system.reset)
this->systemsReset.emplace_back(system.parentEntity,
Expand Down Expand Up @@ -281,12 +287,12 @@ void SystemManager::AddSystemImpl(
}

//////////////////////////////////////////////////
const std::vector<ISystemConfigure *>& SystemManager::SystemsConfigure()
const std::vector<SystemHolder<ISystemConfigure>>& SystemManager::SystemsConfigure()
{
return this->systemsConfigure;
}

const std::vector<ISystemConfigureParameters *>&
const std::vector<SystemHolder<ISystemConfigureParameters>>&
SystemManager::SystemsConfigureParameters()
{
return this->systemsConfigureParameters;
Expand Down Expand Up @@ -453,7 +459,8 @@ void RemoveFromVectorIf(std::vector<Tp>& vec,

//////////////////////////////////////////////////
void SystemManager::ProcessRemovedEntities(
const EntityComponentManager &_ecm)
const EntityComponentManager &_ecm,
std::unordered_map<Entity, std::size_t> &_threadsToTerminate)
{
if (!_ecm.HasEntitiesMarkedForRemoval())
{
Expand All @@ -474,6 +481,37 @@ void SystemManager::ProcessRemovedEntities(
});
RemoveFromVectorIf(this->systemsPostupdate,
[&](const SystemHolder<ISystemPostUpdate>& system) {
// Post update system. Make sure that the threadsToTerminate
if (_ecm.IsMarkedForRemoval(system.parent)) {
auto threads = _threadsToTerminate.find(system.parent);
if (threads == _threadsToTerminate.end()) {
_threadsToTerminate.emplace(system.parent, 1);
}
else {
threads->second++;
}
gzerr << "Terminating system for" << system.parent <<"\n";
return true;
}
return false;
});
RemoveFromVectorIf(this->systemsConfigure,
[&](const SystemHolder<ISystemConfigure>& system) {
return _ecm.IsMarkedForRemoval(system.parent);
});
RemoveFromVectorIf(this->systemsConfigureParameters,
[&](const SystemHolder<ISystemConfigureParameters>& system) {
return _ecm.IsMarkedForRemoval(system.parent);
});

/// NOTE: We can't do this because the pointers get messed up.
/*RemoveFromVectorIf(this->systems,
[&](const SystemInternal& system) {
return _ecm.IsMarkedForRemoval(system.parentEntity);
});*/
std::lock_guard lock(this->pendingSystemsMutex);
RemoveFromVectorIf(this->pendingSystems,
[&](const SystemInternal& system) {
return _ecm.IsMarkedForRemoval(system.parentEntity);
});
}
16 changes: 10 additions & 6 deletions src/SystemManager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,14 @@ namespace gz

/// \brief Get a vector of all systems implementing "Configure"
/// \return Vector of systems' configure interfaces.
public: const std::vector<ISystemConfigure *>& SystemsConfigure();
public: const std::vector<SystemHolder<ISystemConfigure>>&
SystemsConfigure();

/// \brief Get an vector of all active systems implementing
/// "ConfigureParameters"
/// \return Vector of systems's configure interfaces.
public: const std::vector<ISystemConfigureParameters *>&
SystemsConfigureParameters();
public: const std::vector<SystemHolder<ISystemConfigureParameters>>&
SystemsConfigureParameters();

/// \brief Get an vector of all active systems implementing "Reset"
/// \return Vector of systems' reset interfaces.
Expand All @@ -163,8 +164,11 @@ namespace gz
/// \brief Process system messages and add systems to entities
public: void ProcessPendingEntitySystems();

/// \brief Remove systems that are attached to removed entities
/// \param[in] _entityCompMgr - ECM with entities marked for removal
public: void ProcessRemovedEntities(
const EntityComponentManager &_entityCompMgr);
const EntityComponentManager &_entityCompMgr,
std::unordered_map<Entity, std::size_t> &_threadsToTerminate);

/// \brief Implementation for AddSystem functions that takes an SDF
/// element. This calls the AddSystemImpl that accepts an SDF Plugin.
Expand Down Expand Up @@ -208,10 +212,10 @@ namespace gz
private: mutable std::mutex pendingSystemsMutex;

/// \brief Systems implementing Configure
private: std::vector<ISystemConfigure *> systemsConfigure;
private: std::vector<SystemHolder<ISystemConfigure>> systemsConfigure;

/// \brief Systems implementing ConfigureParameters
private: std::vector<ISystemConfigureParameters *>
private: std::vector<SystemHolder<ISystemConfigureParameters>>
systemsConfigureParameters;

/// \brief Systems implementing Reset
Expand Down

0 comments on commit 950c622

Please sign in to comment.