Skip to content

Commit

Permalink
Refactor Instance/Communicator/PortManager in C++
Browse files Browse the repository at this point in the history
Also align Python and C++ tests some more and improve MPI error handling
  • Loading branch information
LourensVeen committed Mar 16, 2024
1 parent c387442 commit 298062c
Show file tree
Hide file tree
Showing 32 changed files with 2,331 additions and 1,875 deletions.
357 changes: 108 additions & 249 deletions libmuscle/cpp/src/libmuscle/communicator.cpp

Large diffs are not rendered by default.

135 changes: 56 additions & 79 deletions libmuscle/cpp/src/libmuscle/communicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <libmuscle/namespace.hpp>
#include <libmuscle/peer_info.hpp>
#include <libmuscle/port.hpp>
#include <libmuscle/port_manager.hpp>
#include <libmuscle/ports_description.hpp>
#include <libmuscle/post_office.hpp>
#include <libmuscle/profiler.hpp>
Expand All @@ -20,6 +21,7 @@
#include <ymmsl/ymmsl.hpp>

#include <string>
#include <tuple>
#include <unordered_map>
#include <vector>

Expand All @@ -44,17 +46,16 @@ class Communicator {
*
* @param kernel The kernel this is the Communicator for.
* @param index The index for this instance.
* @param declared_ports The declared ports for this instance.
* @param port_manager The PortManager to use.
* @param logger The logger for this instance.
* @param profiler The profiler to use for recording sends and receives.
*/
Communicator(
ymmsl::Reference const & kernel,
std::vector<int> const & index,
Optional<PortsDescription> const & declared_ports,
PortManager & port_manager,
Logger & logger, Profiler & profiler);


/** Returns a list of locations that we can be reached at.
*
* These locations are of the form 'protocol:location', where the
Expand All @@ -65,54 +66,14 @@ class Communicator {
*/
std::vector<std::string> get_locations() const;

/** Connect this Communicator to its peers.
*
* This is the second stage in the simulation wiring process.
*
* Peers here are instances, and peer_dims and peer_locations are
* indexed by a Reference to an instance. Instance sets are multi-
* dimensional arrays with sizes given by peer_dims.
*
* @param conduits A list of conduits attached to this component,
* as received from the manager.
* @param peer_dims For each peer we share a conduit with, the
* dimensions of the instance set.
* @param peer_locations A list of locations for each peer instance we
* share a conduit with.
*/
void connect(
std::vector<ymmsl::Conduit> const & conduits,
PeerDims const & peer_dims,
PeerLocations const & peer_locations);

/** Returns true iff muscle_settings_in is connected.
*/
bool settings_in_connected() const;

/** Returns a description of the ports this Communicator has.
*
* @return A map, indexed by Operator, containing lists of port names.
* Operators with no associated ports are not included.
*/
PortsDescription list_ports() const;

/** Returns whether a port with the given name exists.
/** Inform this Communicator about its peers.
*
* @param port_name Port name to check.
*/
bool port_exists(std::string const & port_name) const;

/** Returns a Port object describing a port with the given name.
*
* @param port The port to retrieve.
*/
Port const & get_port(std::string const & port_name) const;

/** Returns a Port object describing a port with the given name.
* This tells the Communicator about its peers, so that it can route
* messages accordingly.
*
* @param port The port to retrieve.
* @param peer_info Information about the peers.
*/
Port & get_port(std::string const & port_name);
void set_peer_info(PeerInfo const & peer_info);

/** Send a message and settings to the outside world.
*
Expand All @@ -124,11 +85,14 @@ class Communicator {
* @param port_name The port on which this message is to be sent.
* @param message The message to send.
* @param slot The slot to send the message on.
* @param checkpoints_considered_until When we last checked if we
* should save a snapshot (wallclock time).
*/
void send_message(
std::string const & port_name,
Message const & message,
Optional<int> slot = {});
Optional<int> slot = {},
double checkpoints_considered_until = -std::numeric_limits<double>::infinity());

/** Receive a message and attached settings overlay.
*
Expand All @@ -149,71 +113,84 @@ class Communicator {
*
* @return The received message, with message.settings holding the
* settings overlay. The setings attribute is guaranteed to be set.
* Second, the saved_until metadata field from the received message.
*
* @throws std::runtime_error if no default was given and the port is
* not connected.
*/
Message receive_message(
std::tuple<Message, double> receive_message(
std::string const & port_name,
Optional<int> slot = {},
Optional<Message> const & default_msg = {}
);

/** Closes the given port.
*
* This signals to any connected instance that no more messages will
* be sent on this port, which it can use to decide whether to shut
* down or continue running.
*
* @param port_name The name of the port to close.
*/
void close_port(std::string const & port_name, Optional<int> slot = {});

/** Shuts down the Communicator, closing connections.
*/
void shutdown();

/** Get message counts for all ports on the communicator.
*/
PortMessageCounts get_message_counts();

/** Restore message counts on all ports.
*/
void restore_message_counts(PortMessageCounts const & port_message_counts);

PRIVATE:
using Ports_ = std::unordered_map<std::string, Port>;

ymmsl::Reference instance_id_() const;
Ports_ ports_from_declared_();
Ports_ ports_from_conduits_(
std::vector<ymmsl::Conduit> const & conduits) const;
Port settings_in_port_(std::vector<ymmsl::Conduit> const & conduits) const;
MPPClient & get_client_(ymmsl::Reference const & instance);

Endpoint get_endpoint_(
std::string const & port_name,
std::vector<int> const & slot
) const;

std::tuple<std::string, bool> split_port_desc_(
std::string const & port_desc) const;

std::tuple<std::vector<char>, mcp::ProfileData> try_receive_(
MPPClient & client, ymmsl::Reference const & receiver,
ymmsl::Reference const & peer);

void close_port_(std::string const & port_name, Optional<int> slot = {});

/* Closes outgoing ports.
*
* This sends a close port message on all slots of all outgoing ports.
*/
void close_outgoing_ports_();

/* Receives messages until a ClosePort is received.
*
* Receives at least once.
*
* @param port_name Port to drain.
*/
void drain_incoming_port_(std::string const & port_name);

/* Receives messages until a ClosePort is received.
*
* Works with (resizable) vector ports.
*
* @param port_name Port to drain.
*/
void drain_incoming_vector_port_(std::string const & port_name);

/* Closes incoming ports.
*
* This receives on all incoming ports until a ClosePort is received on them,
* signaling that there will be no more messages, and allowing the sending
* instance to shut down cleanly.
*/
void close_incoming_ports_();

/* Closes all ports.
*
* This sends a close port message on all slots of all outgoing ports, then
* receives one on all incoming ports.
*/
void close_ports_();

ymmsl::Reference kernel_;
std::vector<int> index_;
Optional<PortsDescription> declared_ports_;
PortManager & port_manager_;
PostOffice post_office_;
Logger & logger_;
Profiler & profiler_;
std::vector<std::unique_ptr<mcp::TransportServer>> servers_;
std::unordered_map<ymmsl::Reference, std::unique_ptr<MPPClient>> clients_;
Ports_ ports_;
std::unique_ptr<PeerInfo> peer_info_;
Optional<Port> muscle_settings_in_;
Optional<PeerInfo> peer_info_;
};

} }
Expand Down
Loading

0 comments on commit 298062c

Please sign in to comment.