Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Julusian committed Dec 22, 2023
1 parent 0cb1ea7 commit 59eeda4
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 63 deletions.
2 changes: 1 addition & 1 deletion src/protocol/amcp/AMCPCommand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
namespace caspar { namespace protocol { namespace amcp {

std::future<std::wstring>
AMCPCommand::Execute(const std::shared_ptr<std::map<int, protocol::amcp::channel_context>>& channels)
AMCPCommand::Execute(const spl::shared_ptr<std::vector<protocol::amcp::channel_context>>& channels)
{
return command_(ctx_, channels);
}
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/amcp/AMCPCommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class AMCPCommand

using ptr_type = std::shared_ptr<AMCPCommand>;

std::future<std::wstring> Execute(const std::shared_ptr<std::map<int, protocol::amcp::channel_context>>& channels);
std::future<std::wstring> Execute(const spl::shared_ptr<std::vector<protocol::amcp::channel_context>>& channels);

void SendReply(const std::wstring& str, bool reply_without_req_id) const;

Expand Down
18 changes: 9 additions & 9 deletions src/protocol/amcp/AMCPCommandQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,18 @@

namespace caspar { namespace protocol { namespace amcp {

AMCPCommandQueue::AMCPCommandQueue(const std::wstring& name,
const std::shared_ptr<std::map<int, protocol::amcp::channel_context>>& channels)
AMCPCommandQueue::AMCPCommandQueue(const std::wstring& name,
const spl::shared_ptr<std::vector<protocol::amcp::channel_context>>& channels)
: executor_(L"AMCPCommandQueue " + name)
, channels_(channels)
{
}

AMCPCommandQueue::~AMCPCommandQueue() {}

std::future<bool> exec_cmd(std::shared_ptr<AMCPCommand> cmd,
const std::shared_ptr<std::map<int, protocol::amcp::channel_context>>& channels,
bool reply_without_req_id)
std::future<bool> exec_cmd(std::shared_ptr<AMCPCommand> cmd,
const spl::shared_ptr<std::vector<protocol::amcp::channel_context>>& channels,
bool reply_without_req_id)
{
try {
try {
Expand Down Expand Up @@ -126,10 +126,10 @@ void AMCPCommandQueue::Execute(std::shared_ptr<AMCPGroupCommand> cmd) const
caspar::timer timer;
CASPAR_LOG(warning) << "Executing batch: " << cmd->name() << L"(" << cmd->Commands().size() << L" commands)";

std::shared_ptr<std::map<int, protocol::amcp::channel_context>> delayed_channels;
std::vector<std::shared_ptr<core::stage_delayed>> delayed_stages;
std::vector<std::future<bool>> results;
std::vector<std::unique_lock<std::mutex>> channel_locks;
spl::shared_ptr<std::vector<protocol::amcp::channel_context>> delayed_channels;
std::vector<std::shared_ptr<core::stage_delayed>> delayed_stages;
std::vector<std::future<bool>> results;
std::vector<std::unique_lock<std::mutex>> channel_locks;

try {
for (auto& ch : *channels_) {
Expand Down
8 changes: 4 additions & 4 deletions src/protocol/amcp/AMCPCommandQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@ class AMCPCommandQueue
public:
using ptr_type = spl::shared_ptr<AMCPCommandQueue>;

AMCPCommandQueue(const std::wstring& name,
const std::shared_ptr<std::map<int, protocol::amcp::channel_context>>& channels);
AMCPCommandQueue(const std::wstring& name,
const spl::shared_ptr<std::vector<protocol::amcp::channel_context>>& channels);
~AMCPCommandQueue();

void AddCommand(std::shared_ptr<AMCPGroupCommand> command);
void Execute(std::shared_ptr<AMCPGroupCommand> cmd) const;

private:
executor executor_;
const std::shared_ptr<std::map<int, protocol::amcp::channel_context>> channels_;
executor executor_;
const spl::shared_ptr<std::vector<protocol::amcp::channel_context>> channels_;
};

}}} // namespace caspar::protocol::amcp
26 changes: 13 additions & 13 deletions src/protocol/amcp/amcp_command_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,22 @@ struct amcp_command_static_context

struct command_context
{
std::shared_ptr<amcp_command_static_context> static_context;
const std::shared_ptr<std::map<int, protocol::amcp::channel_context>> channels;
const IO::ClientInfoPtr client;
const channel_context channel;
const int channel_index;
const int layer_id;
std::vector<std::wstring> parameters;
std::shared_ptr<amcp_command_static_context> static_context;
const spl::shared_ptr<std::vector<protocol::amcp::channel_context>> channels;
const IO::ClientInfoPtr client;
const channel_context channel;
const int channel_index;
const int layer_id;
std::vector<std::wstring> parameters;

int layer_index(int default_ = 0) const { return layer_id == -1 ? default_ : layer_id; }

command_context(std::shared_ptr<amcp_command_static_context> static_context,
const std::shared_ptr<std::map<int, protocol::amcp::channel_context>> channels,
const IO::ClientInfoPtr& client,
channel_context channel,
int channel_index,
int layer_id)
command_context(std::shared_ptr<amcp_command_static_context> static_context,
const spl::shared_ptr<std::vector<protocol::amcp::channel_context>> channels,
const IO::ClientInfoPtr& client,
channel_context channel,
int channel_index,
int layer_id)
: static_context(std::move(static_context))
, channels(std::move(channels))
, client(client)
Expand Down
8 changes: 4 additions & 4 deletions src/protocol/amcp/amcp_command_repository.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ parse_channel_id(std::list<std::wstring>& tokens, std::wstring& channel_spec, in

struct amcp_command_repository::impl
{
const std::shared_ptr<std::map<int, protocol::amcp::channel_context>> channels_;
const spl::shared_ptr<std::vector<protocol::amcp::channel_context>> channels_;

std::map<std::wstring, std::pair<amcp_command_func, int>> commands{};
std::map<std::wstring, std::pair<amcp_command_func, int>> channel_commands{};

impl(const std::shared_ptr<std::map<int, protocol::amcp::channel_context>>& channels)
impl(const spl::shared_ptr<std::vector<protocol::amcp::channel_context>>& channels)
: channels_(channels)
{
}
Expand Down Expand Up @@ -204,12 +204,12 @@ struct amcp_command_repository::impl
};

amcp_command_repository::amcp_command_repository(
const std::shared_ptr<std::map<int, protocol::amcp::channel_context>>& channels)
const spl::shared_ptr<std::vector<protocol::amcp::channel_context>>& channels)
: impl_(new impl(channels))
{
}

const std::shared_ptr<std::map<int, protocol::amcp::channel_context>>& amcp_command_repository::channels() const
const spl::shared_ptr<std::vector<protocol::amcp::channel_context>>& amcp_command_repository::channels() const
{
return impl_->channels_;
}
Expand Down
4 changes: 2 additions & 2 deletions src/protocol/amcp/amcp_command_repository.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ namespace caspar { namespace protocol { namespace amcp {
class amcp_command_repository
{
public:
amcp_command_repository(const std::shared_ptr<std::map<int, protocol::amcp::channel_context>>& channels);
amcp_command_repository(const spl::shared_ptr<std::vector<protocol::amcp::channel_context>>& channels);

std::shared_ptr<AMCPCommand>
parse_command(IO::ClientInfoPtr client, std::list<std::wstring> tokens, const std::wstring& request_id) const;
bool check_channel_lock(IO::ClientInfoPtr client, int channel_index) const;

const std::shared_ptr<std::map<int, protocol::amcp::channel_context>>& channels() const;
const spl::shared_ptr<std::vector<protocol::amcp::channel_context>>& channels() const;

void register_command(std::wstring category, std::wstring name, amcp_command_func command, int min_num_params);

Expand Down
16 changes: 8 additions & 8 deletions src/protocol/amcp/amcp_command_repository_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ void amcp_command_repository_wrapper::register_command(std::wstring
{
std::weak_ptr<command_context_factory> weak_context_factory = context_factory_;
auto func = [weak_context_factory,
command](const command_context_simple& ctx,
const std::shared_ptr<std::map<int, protocol::amcp::channel_context>>& channels) {
command](const command_context_simple& ctx,
const spl::shared_ptr<std::vector<protocol::amcp::channel_context>>& channels) {
auto context_factory = weak_context_factory.lock();
if (!context_factory)
return make_ready_future<std::wstring>(L"");
Expand All @@ -52,8 +52,8 @@ void amcp_command_repository_wrapper::register_command(std::wstring ca
{
std::weak_ptr<command_context_factory> weak_context_factory = context_factory_;
auto func = [weak_context_factory,
command](const command_context_simple& ctx,
const std::shared_ptr<std::map<int, protocol::amcp::channel_context>>& channels) {
command](const command_context_simple& ctx,
const spl::shared_ptr<std::vector<protocol::amcp::channel_context>>& channels) {
auto context_factory = weak_context_factory.lock();
if (!context_factory)
return make_ready_future<std::wstring>(L"");
Expand All @@ -72,8 +72,8 @@ void amcp_command_repository_wrapper::register_channel_command(std::wstring
{
std::weak_ptr<command_context_factory> weak_context_factory = context_factory_;
auto func = [weak_context_factory,
command](const command_context_simple& ctx,
const std::shared_ptr<std::map<int, protocol::amcp::channel_context>>& channels) {
command](const command_context_simple& ctx,
const spl::shared_ptr<std::vector<protocol::amcp::channel_context>>& channels) {
auto context_factory = weak_context_factory.lock();
if (!context_factory)
return make_ready_future<std::wstring>(L"");
Expand All @@ -92,8 +92,8 @@ void amcp_command_repository_wrapper::register_channel_command(std::wstring
{
std::weak_ptr<command_context_factory> weak_context_factory = context_factory_;
auto func = [weak_context_factory,
command](const command_context_simple& ctx,
const std::shared_ptr<std::map<int, protocol::amcp::channel_context>>& channels) {
command](const command_context_simple& ctx,
const spl::shared_ptr<std::vector<protocol::amcp::channel_context>>& channels) {
auto context_factory = weak_context_factory.lock();
if (!context_factory)
return make_ready_future<std::wstring>(L"");
Expand Down
4 changes: 2 additions & 2 deletions src/protocol/amcp/amcp_command_repository_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ class command_context_factory
{
}

command_context create(const command_context_simple& simple_ctx,
const std::shared_ptr<std::map<int, protocol::amcp::channel_context>>& channels) const
command_context create(const command_context_simple& simple_ctx,
const spl::shared_ptr<std::vector<protocol::amcp::channel_context>>& channels) const
{
const channel_context channel = simple_ctx.channel_index >= 0 && channels->count(simple_ctx.channel_index)
? channels->at(simple_ctx.channel_index)
Expand Down
4 changes: 2 additions & 2 deletions src/protocol/amcp/amcp_shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ struct command_context_simple
};

typedef std::function<std::future<std::wstring>(
const command_context_simple& args,
const std::shared_ptr<std::map<int, protocol::amcp::channel_context>>& channels)>
const command_context_simple& args,
const spl::shared_ptr<std::vector<protocol::amcp::channel_context>>& channels)>
amcp_command_func;

}}} // namespace caspar::protocol::amcp
33 changes: 16 additions & 17 deletions src/shell/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,19 +105,18 @@ struct server::impl
std::shared_ptr<IO::AsyncEventServer> primary_amcp_server_;
std::shared_ptr<osc::client> osc_client_ = std::make_shared<osc::client>(io_service_);
std::vector<std::shared_ptr<void>> predefined_osc_subscriptions_;
std::shared_ptr<std::map<int, protocol::amcp::channel_context>> channels_;
spl::shared_ptr<core::cg_producer_registry> cg_registry_;
spl::shared_ptr<core::frame_producer_registry> producer_registry_;
spl::shared_ptr<core::frame_consumer_registry> consumer_registry_;
std::function<void(bool)> shutdown_server_now_;
spl::shared_ptr<std::vector<protocol::amcp::channel_context>> channels_;
spl::shared_ptr<core::cg_producer_registry> cg_registry_;
spl::shared_ptr<core::frame_producer_registry> producer_registry_;
spl::shared_ptr<core::frame_consumer_registry> consumer_registry_;
std::function<void(bool)> shutdown_server_now_;

impl(const impl&) = delete;
impl& operator=(const impl&) = delete;

explicit impl(std::function<void(bool)> shutdown_server_now)
: video_format_repository_()
, accelerator_(video_format_repository_)
, channels_(std::make_shared<std::map<int, protocol::amcp::channel_context>>())
, producer_registry_(spl::make_shared<core::frame_producer_registry>())
, consumer_registry_(spl::make_shared<core::frame_consumer_registry>())
, shutdown_server_now_(std::move(shutdown_server_now))
Expand Down Expand Up @@ -167,7 +166,7 @@ struct server::impl

destroy_producers_synchronously();
destroy_consumers_synchronously();
channels_->clear();
channels_.clear();

while (weak_io_service.lock())
std::this_thread::sleep_for(std::chrono::milliseconds(100));
Expand Down Expand Up @@ -260,7 +259,7 @@ struct server::impl
CASPAR_THROW_EXCEPTION(user_error() << msg_info(L"Invalid video-mode: " + format_desc_str));

auto weak_client = std::weak_ptr<osc::client>(osc_client_);
auto channel_id = static_cast<int>(channels_->size() + 1);
auto channel_id = static_cast<int>(channels_.size() + 1);
auto channel =
spl::make_shared<video_channel>(channel_id,
format_desc,
Expand All @@ -275,8 +274,7 @@ struct server::impl
});

const std::wstring lifecycle_key = L"lock" + std::to_wstring(channel_id);
channels_->insert(
std::make_pair(channel_id, protocol::amcp::channel_context(channel, channel->stage(), lifecycle_key)));
channels_.emplace_back(channel, channel->stage(), lifecycle_key);
}

return xml_channels;
Expand Down Expand Up @@ -325,15 +323,15 @@ struct server::impl
auto console_client = spl::make_shared<IO::ConsoleClientInfo>();

std::vector<spl::shared_ptr<core::video_channel>> channels_vec;
for (auto& cc : *channels_) {
channels_vec.emplace_back(cc.second.raw_channel);
for (auto& cc : channels_) {
channels_vec.emplace_back(cc.raw_channel);
}

for (auto& channel : *channels_) {
for (auto& channel : channels_) {
core::diagnostics::scoped_call_context save;
core::diagnostics::call_context::for_thread().video_channel = channel.first;
core::diagnostics::call_context::for_thread().video_channel = channel.raw_channel->index();

auto xml_channel = xml_channels.at(channel.first - 1);
auto xml_channel = xml_channels.at(channel.raw_channel->index() - 1);

// Consumers
if (xml_channel.get_child_optional(L"consumers")) {
Expand All @@ -342,7 +340,7 @@ struct server::impl

try {
if (name != L"<xmlcomment>")
channel.second.raw_channel->output().add(consumer_registry_->create_consumer(
channel.raw_channel->output().add(consumer_registry_->create_consumer(
name, xml_consumer.second, video_format_repository_, channels_vec));
} catch (...) {
CASPAR_LOG_CURRENT_EXCEPTION();
Expand All @@ -360,7 +358,8 @@ struct server::impl
const int id = attrs.get(L"id", -1);

try {
std::list<std::wstring> tokens{L"PLAY", (boost::wformat(L"%i-%i") % channel.first % id).str()};
std::list<std::wstring> tokens{
L"PLAY", (boost::wformat(L"%i-%i") % channel.raw_channel->index() % id).str()};
IO::tokenize(command, tokens);
auto cmd = amcp_command_repo_->parse_command(console_client, tokens, L"");

Expand Down

0 comments on commit 59eeda4

Please sign in to comment.