Skip to content

Commit

Permalink
[WIP] Sender and receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
gavv committed May 6, 2024
1 parent 95cfcdb commit 8285b14
Show file tree
Hide file tree
Showing 10 changed files with 335 additions and 7 deletions.
3 changes: 3 additions & 0 deletions driver/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ add_library(${DRIVER_NAME} MODULE
log_manager.cpp
log_sender.cpp
log_syslog.cpp
receiver.cpp
rpc_serdes.cpp
sender.cpp
tracer.cpp
uid_generator.cpp
)
Expand All @@ -28,6 +30,7 @@ target_link_libraries(${DRIVER_NAME} PRIVATE
${RPC_LIB}

# third-party libs
roc
aspl::libASPL
gRPC::grpc++_unsecure
spdlog::spdlog
Expand Down
66 changes: 62 additions & 4 deletions driver/device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#include "device.hpp"
#include "build_info.hpp"
#include "receiver.hpp"
#include "sender.hpp"

#include <fmt/core.h>
#include <spdlog/spdlog.h>
Expand Down Expand Up @@ -116,6 +118,7 @@ aspl::StreamParameters make_stream_params(const DeviceInfo& info)
} // namespace

Device::Device(std::shared_ptr<aspl::Plugin> hal_plugin,
std::shared_ptr<roc_context> network_context,
IndexAllocator& index_allocator,
UidGenerator& uid_generator,
const DeviceInfo& device_info)
Expand Down Expand Up @@ -151,7 +154,21 @@ Device::Device(std::shared_ptr<aspl::Plugin> hal_plugin,

hal_device_->AddStreamWithControlsAsync(make_stream_params(info_));

// TODO: roc_open
// we implement both interfaces
hal_device_->SetControlHandler(this);
hal_device_->SetIOHandler(this);

if (info_.type == DeviceType::Sender) {
network_transceiver_ = std::make_unique<Sender>(
network_context, info_.uid, info_.device_encoding, *info_.sender_config);
} else {
network_transceiver_ = std::make_unique<Receiver>(
network_context, info_.uid, info_.device_encoding, *info_.receiver_config);
}

if (!info_.enabled) {
network_transceiver_->pause();
}

sort_endpoints_();

Expand All @@ -176,7 +193,7 @@ Device::~Device()

toggle(false);

// TODO: roc_close
network_transceiver_.reset();

if (info_.index != 0) {
index_allocator_.release(info_.index);
Expand All @@ -197,6 +214,7 @@ void Device::toggle(bool enabled)
spdlog::debug("enabling device {}", info_.uid);

hal_plugin_->AddDevice(hal_device_);
network_transceiver_->resume();
} else {
spdlog::debug("device {} is already enabled", info_.uid);
}
Expand All @@ -205,6 +223,7 @@ void Device::toggle(bool enabled)
spdlog::debug("disabling device {}", info_.uid);

hal_plugin_->RemoveDevice(hal_device_);
network_transceiver_->pause();
} else {
spdlog::debug("device {} is already disabled", info_.uid);
}
Expand Down Expand Up @@ -240,7 +259,7 @@ void Device::bind_endpoint_(DeviceEndpointInfo& endpoint_info)
endpoint_info.slot,
endpoint_info.uri);

// TODO: roc_bind
network_transceiver_->bind(endpoint_info);
}

void Device::connect_endpoint_(DeviceEndpointInfo& endpoint_info)
Expand All @@ -250,7 +269,7 @@ void Device::connect_endpoint_(DeviceEndpointInfo& endpoint_info)
endpoint_info.slot,
endpoint_info.uri);

// TODO: roc_connect
network_transceiver_->connect(endpoint_info);
}

void Device::sort_endpoints_()
Expand All @@ -268,4 +287,43 @@ void Device::sort_endpoints_()
}
}

// aspl::ControlRequestHandler
OSStatus Device::OnStartIO()
{
spdlog::info("starting io on device {}", info_.uid);

network_transceiver_->resume();

return kAudioHardwareNoError;
}

void Device::OnStopIO()
{
spdlog::info("stopping io on device {}", info_.uid);

network_transceiver_->pause();
}

// aspl::IORequestHandler
void Device::OnReadClientInput(const std::shared_ptr<aspl::Client>& client,
const std::shared_ptr<aspl::Stream>& stream,
Float64 zero_timestamp,
Float64 timestamp,
void* bytes,
UInt32 bytes_count)
{
// TODO: ringbuf
network_transceiver_->read((uint64_t)timestamp, bytes, bytes_count);
}

void Device::OnWriteMixedOutput(const std::shared_ptr<aspl::Stream>& stream,
Float64 zero_timestamp,
Float64 timestamp,
const void* bytes,
UInt32 bytes_count)
{
// TODO: ringbuf
network_transceiver_->write((uint64_t)timestamp, bytes, bytes_count);
}

} // namespace rocvad
27 changes: 26 additions & 1 deletion driver/device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@

#include "device_defs.hpp"
#include "index_allocator.hpp"
#include "transceiver.hpp"
#include "uid_generator.hpp"

#include <roc/context.h>

#include <aspl/Device.hpp>
#include <aspl/Plugin.hpp>

Expand All @@ -20,10 +23,11 @@
namespace rocvad {

// Correspond to one virtual device.
class Device
class Device : private aspl::ControlRequestHandler, private aspl::IORequestHandler
{
public:
Device(std::shared_ptr<aspl::Plugin> hal_plugin,
std::shared_ptr<roc_context> network_context,
IndexAllocator& index_allocator,
UidGenerator& uid_generator,
const DeviceInfo& device_info);
Expand All @@ -40,6 +44,24 @@ class Device
DeviceEndpointInfo connect(DeviceEndpointInfo endpoint_info);

private:
// aspl::ControlRequestHandler
OSStatus OnStartIO() override;
void OnStopIO() override;

// aspl::IORequestHandler
void OnReadClientInput(const std::shared_ptr<aspl::Client>& client,
const std::shared_ptr<aspl::Stream>& stream,
Float64 zero_timestamp,
Float64 timestamp,
void* bytes,
UInt32 bytes_count) override;
void OnWriteMixedOutput(const std::shared_ptr<aspl::Stream>& stream,
Float64 zero_timestamp,
Float64 timestamp,
const void* bytes,
UInt32 bytes_count) override;

// endpoints
void bind_endpoint_(DeviceEndpointInfo& endpoint_info);
void connect_endpoint_(DeviceEndpointInfo& endpoint_info);
void sort_endpoints_();
Expand All @@ -54,6 +76,9 @@ class Device
std::shared_ptr<aspl::Plugin> hal_plugin_;
std::shared_ptr<aspl::Device> hal_device_;

// network sender or receiver
// which one is used depends on device type
std::unique_ptr<Transceiver> network_transceiver_;

// run-time device info
DeviceInfo info_;
Expand Down
30 changes: 28 additions & 2 deletions driver/device_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,39 @@

namespace rocvad {

namespace {

std::shared_ptr<roc_context> make_network_context()
{
roc_context_config config;
memset(&config, 0, sizeof(config));

roc_context* context = nullptr;
int err = roc_context_open(&config, &context);
if (err < 0) {
spdlog::critical("can't open network context: err={}", err);
return {};
}

return std::shared_ptr<roc_context>(context, [](roc_context* context) {
int err = roc_context_close(context);
if (err < 0) {
spdlog::critical("can't close network context: err={}", err);
}
});
}

} // namespace

DeviceManager::DeviceManager(std::shared_ptr<aspl::Plugin> hal_plugin,
std::shared_ptr<aspl::Storage> hal_storage)
: hal_plugin_(hal_plugin)
, device_storage_(hal_storage)
{
assert(hal_plugin_);

network_context_ = make_network_context();

load_devices_();
}

Expand Down Expand Up @@ -71,8 +97,8 @@ DeviceInfo DeviceManager::add_device(DeviceInfo info)
fmt::format("device with uid \"{}\" already exists", info.uid));
}

auto device =
std::make_shared<Device>(hal_plugin_, index_allocator_, uid_generator_, info);
auto device = std::make_shared<Device>(
hal_plugin_, network_context_, index_allocator_, uid_generator_, info);

info = device->info();

Expand Down
4 changes: 4 additions & 0 deletions driver/device_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include "index_allocator.hpp"
#include "uid_generator.hpp"

#include <roc/context.h>

#include <aspl/Plugin.hpp>
#include <aspl/Storage.hpp>

Expand Down Expand Up @@ -75,6 +77,8 @@ class DeviceManager
UidGenerator uid_generator_;

DeviceStorage device_storage_;

std::shared_ptr<roc_context> network_context_;
};

} // namespace rocvad
40 changes: 40 additions & 0 deletions driver/receiver.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) Roc Streaming authors
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

#include "receiver.hpp"

namespace rocvad {

Receiver::Receiver(std::shared_ptr<roc_context> network_context,
const std::string& uid,
const DeviceLocalEncoding& device_encoding,
const DeviceReceiverConfig& receiver_config)
{
}

void Receiver::bind(DeviceEndpointInfo& endpoint_info)
{
}

void Receiver::connect(DeviceEndpointInfo& endpoint_info)
{
}

void Receiver::pause() noexcept
{
}

void Receiver::resume() noexcept
{
}

void Receiver::read(uint64_t timestamp, void* bytes, size_t n_bytes) noexcept
{
}

} // namespace rocvad
41 changes: 41 additions & 0 deletions driver/receiver.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) Roc Streaming authors
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

#pragma once

#include "transceiver.hpp"

#include <roc/context.h>
#include <roc/receiver.h>

namespace rocvad {

class Receiver : public Transceiver
{
public:
Receiver(std::shared_ptr<roc_context> network_context,
const std::string& uid,
const DeviceLocalEncoding& device_encoding,
const DeviceReceiverConfig& receiver_config);

void bind(DeviceEndpointInfo& endpoint_info) override;
void connect(DeviceEndpointInfo& endpoint_info) override;

void pause() noexcept override;
void resume() noexcept override;

void read(uint64_t timestamp, void* bytes, size_t n_bytes) noexcept override;

private:
std::string uid_;

std::shared_ptr<roc_context> net_context_;
std::shared_ptr<roc_receiver> net_receiver_;
};

} // namespace rocvad
40 changes: 40 additions & 0 deletions driver/sender.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) Roc Streaming authors
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*/

#include "sender.hpp"

namespace rocvad {

Sender::Sender(std::shared_ptr<roc_context> network_context,
const std::string& uid,
const DeviceLocalEncoding& device_encoding,
const DeviceSenderConfig& sender_config)
{
}

void Sender::bind(DeviceEndpointInfo& endpoint_info)
{
}

void Sender::connect(DeviceEndpointInfo& endpoint_info)
{
}

void Sender::pause() noexcept
{
}

void Sender::resume() noexcept
{
}

void Sender::write(uint64_t timestamp, const void* bytes, size_t n_bytes) noexcept
{
}

} // namespace rocvad
Loading

0 comments on commit 8285b14

Please sign in to comment.