Skip to content

Commit

Permalink
🗺 Rudimentary routing logic (#9)
Browse files Browse the repository at this point in the history
This change adds Orchestrator logic directing Ingest nodes to relay streams to Edge nodes that are subscribed, plus a simple unit test to validate this logic.

Relay nodes are not yet considered when constructing routes.
  • Loading branch information
haydenmc authored Nov 25, 2020
1 parent 36236b3 commit d2f30a8
Show file tree
Hide file tree
Showing 11 changed files with 440 additions and 343 deletions.
4 changes: 0 additions & 4 deletions meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ sources = files([
'src/FtlConnection.cpp',
'src/main.cpp',
'src/Orchestrator.cpp',
'src/StreamStore.cpp',
'src/SubscriptionStore.cpp',
'src/TlsConnectionManager.cpp',
'src/TlsConnectionTransport.cpp',
])
Expand Down Expand Up @@ -37,8 +35,6 @@ testsources = files([
# Project sources
'src/FtlConnection.cpp',
'src/Orchestrator.cpp',
'src/StreamStore.cpp',
'src/SubscriptionStore.cpp',
])

executable(
Expand Down
25 changes: 25 additions & 0 deletions src/ChannelSubscription.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* @file ChannelSubscription.h
* @author Hayden McAfee ([email protected])
* @date 2020-11-23
* @copyright Copyright (c) 2020 Hayden McAfee
*/

#pragma once

#include "FtlTypes.h"
#include "IConnection.h"

#include <memory>
#include <vector>

/**
* @brief Represents a subscription a node holds to a particular channel
*/
template <class TConnection>
struct ChannelSubscription
{
std::shared_ptr<TConnection> SubscribedConnection;
ftl_channel_id_t ChannelId;
std::vector<std::byte> StreamKey;
};
100 changes: 77 additions & 23 deletions src/Orchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,51 @@ template <class TConnection>
std::set<ftl_channel_id_t> Orchestrator<TConnection>::GetSubscribedChannels(
std::shared_ptr<TConnection> connection)
{
return subscriptions.GetSubscribedChannels(connection);
std::set<ftl_channel_id_t> returnVal;
for (const auto& sub : subscriptions.GetSubscriptions(connection))
{
returnVal.insert(sub.ChannelId);
}
return returnVal;
}
#pragma endregion

#pragma region Private methods
template <class TConnection>
void Orchestrator<TConnection>::openRoute(
Stream<TConnection> stream,
std::shared_ptr<TConnection> edgeConnection,
std::vector<std::byte> streamKey)
{
// TODO: Handle relays, generate real routes.
// for now, we just tell the ingest to start relaying directly to the edge node.
stream.IngestConnection->SendStreamRelay(ConnectionRelayPayload
{
.IsStartRelay = true,
.ChannelId = stream.ChannelId,
.StreamId = stream.StreamId,
.TargetHostname = edgeConnection->GetHostname(),
.StreamKey = streamKey,
});
}

template <class TConnection>
void Orchestrator<TConnection>::closeRoute(
Stream<TConnection> stream,
std::shared_ptr<TConnection> edgeConnection)
{
// TODO: Handle relays, generate real routes.
// for now, we just tell the ingest to stop relaying directly to the edge node.
stream.IngestConnection->SendStreamRelay(ConnectionRelayPayload
{
.IsStartRelay = false,
.ChannelId = stream.ChannelId,
.StreamId = stream.StreamId,
.TargetHostname = edgeConnection->GetHostname(),
.StreamKey = std::vector<std::byte>(),
});
}

template <class TConnection>
void Orchestrator<TConnection>::newConnection(std::shared_ptr<TConnection> connection)
{
Expand Down Expand Up @@ -109,26 +149,21 @@ void Orchestrator<TConnection>::connectionClosed(std::weak_ptr<TConnection> conn
if (auto strongConnection = connection.lock())
{
spdlog::info("Connection closed to {}", strongConnection->GetHostname());
// Remove all streams associated with this connection
if (auto streams = streamStore.RemoveAllConnectionStreams(strongConnection))

// First, clear any active routes to this connection
for (const auto& sub : subscriptions.GetSubscriptions(strongConnection))
{
// Tell subscribers for these channels that the streams have been removed
for (const auto& stream : streams.value())
if (auto stream = streamStore.GetStreamByChannelId(sub.ChannelId))
{
std::set<std::shared_ptr<IConnection>> subConnections =
subscriptions.GetSubscribedConnections(stream.ChannelId);
for (const auto& subConnection : subConnections)
{
subConnection->SendStreamPublish(
{
.IsPublish = false,
.ChannelId = stream.ChannelId,
.StreamId = stream.StreamId,
});
}
closeRoute(stream.value(), strongConnection);
}
}

// Remove all streams associated with this connection
streamStore.RemoveAllConnectionStreams(strongConnection);
// Remove all subscriptions associated with this connetion
subscriptions.ClearSubscriptions(strongConnection);

std::lock_guard<std::mutex> lock(connectionsMutex);
pendingConnections.erase(strongConnection);
connections.erase(strongConnection);
Expand Down Expand Up @@ -217,13 +252,23 @@ ConnectionResult Orchestrator<TConnection>::connectionChannelSubscription(
payload.ChannelId);

// Add the subscription
// TODO: store stream key
bool addResult = subscriptions.AddSubscription(strongConnection, payload.ChannelId);
bool addResult = subscriptions.AddSubscription(
strongConnection,
payload.ChannelId,
payload.StreamKey);
if (!addResult)
{
return ConnectionResult
{
.IsSuccess = false
};
}

// Check if this stream is already active
if (auto stream = streamStore.GetStreamByChannelId(payload.ChannelId))
{
// TODO: Relay strategy
// Establish a route to this edge node
openRoute(stream.value(), strongConnection, payload.StreamKey);
}

return ConnectionResult
Expand All @@ -238,7 +283,12 @@ ConnectionResult Orchestrator<TConnection>::connectionChannelSubscription(
strongConnection->GetHostname(),
payload.ChannelId);

// TODO: Halt existing relays
// Check if this stream is currently active
if (auto stream = streamStore.GetStreamByChannelId(payload.ChannelId))
{
// Close any existing route
closeRoute(stream.value(), strongConnection);
}

// Remove the subscription
bool removeResult =
Expand Down Expand Up @@ -278,7 +328,13 @@ ConnectionResult Orchestrator<TConnection>::connectionStreamPublish(
};
streamStore.AddStream(newStream);

// TODO: Relay strategy
// Start opening relays to any subscribed connections
std::vector<ChannelSubscription<TConnection>> channelSubs =
subscriptions.GetSubscriptions(payload.ChannelId);
for (const auto& subscription : channelSubs)
{
openRoute(newStream, subscription.SubscribedConnection, subscription.StreamKey);
}

return ConnectionResult
{
Expand All @@ -296,8 +352,6 @@ ConnectionResult Orchestrator<TConnection>::connectionStreamPublish(
// Attempt to remove it if it exists
if (auto removedStream = streamStore.RemoveStream(payload.ChannelId, payload.StreamId))
{
// TODO: Relay strategy

return ConnectionResult
{
.IsSuccess = true
Expand Down
9 changes: 7 additions & 2 deletions src/Orchestrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,19 @@ class Orchestrator
private:
/* Private members */
const std::shared_ptr<IConnectionManager<TConnection>> connectionManager;
StreamStore streamStore;
StreamStore<TConnection> streamStore;
std::mutex connectionsMutex;
std::set<std::shared_ptr<TConnection>> pendingConnections;
std::set<std::shared_ptr<TConnection>> connections;
std::mutex streamsMutex;
SubscriptionStore subscriptions;
SubscriptionStore<TConnection> subscriptions;

/* Private methods */
void openRoute(
Stream<TConnection> stream,
std::shared_ptr<TConnection> edgeConnection,
std::vector<std::byte> streamKey);
void closeRoute(Stream<TConnection> stream, std::shared_ptr<TConnection> edgeConnection);
/* ConnectionManager callback handlers */
void newConnection(std::shared_ptr<TConnection> connection);
/* Connection callback handlers */
Expand Down
5 changes: 2 additions & 3 deletions src/Stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@

#include <memory>

class IConnection;

/**
* @brief Describes an FTL stream
*/
template <class TConnection>
struct Stream
{
std::shared_ptr<IConnection> IngestConnection;
std::shared_ptr<TConnection> IngestConnection;
ftl_channel_id_t ChannelId;
ftl_stream_id_t StreamId;
};
99 changes: 0 additions & 99 deletions src/StreamStore.cpp

This file was deleted.

Loading

0 comments on commit d2f30a8

Please sign in to comment.