Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support steam p2p sockets #346

Merged
merged 6 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions lightyear/src/connection/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::{Arc, RwLock};

use anyhow::Result;
use bevy::ecs::system::SystemParam;
Expand All @@ -13,7 +14,7 @@ use crate::connection::id::ClientId;
use crate::connection::netcode::ConnectToken;

#[cfg(all(feature = "steam", not(target_family = "wasm")))]
use crate::connection::steam::client::SteamConfig;
use crate::connection::steam::{client::SteamConfig, steamworks_client::SteamworksClient};
use crate::packet::packet::Packet;

use crate::prelude::client::ClientTransport;
Expand Down Expand Up @@ -87,6 +88,8 @@ pub enum NetConfig {
// TODO: for steam, we can use a pass-through io that just computes stats?
#[cfg(all(feature = "steam", not(target_family = "wasm")))]
Steam {
#[reflect(ignore)]
steamworks_client: Option<Arc<RwLock<SteamworksClient>>>,
#[reflect(ignore)]
config: SteamConfig,
conditioner: Option<LinkConditionerConfig>,
Expand Down Expand Up @@ -132,12 +135,19 @@ impl NetConfig {
}
#[cfg(all(feature = "steam", not(target_family = "wasm")))]
NetConfig::Steam {
steamworks_client,
config,
conditioner,
} => {
// TODO: handle errors
let client = super::steam::client::Client::new(config, conditioner)
.expect("could not create steam client");
let client = super::steam::client::Client::new(
steamworks_client.unwrap_or_else(|| {
Arc::new(RwLock::new(SteamworksClient::new(config.app_id)))
}),
config,
conditioner,
)
.expect("could not create steam client");
ClientConnection {
client: NetClientDispatch::Steam(client),
}
Expand Down
16 changes: 12 additions & 4 deletions lightyear/src/connection/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::fmt::Debug;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::{Arc, RwLock};

use crate::connection::id::ClientId;
#[cfg(all(feature = "steam", not(target_family = "wasm")))]
use crate::connection::steam::server::SteamConfig;
use crate::connection::steam::{server::SteamConfig, steamworks_client::SteamworksClient};
use crate::packet::packet::Packet;
use crate::prelude::client::ClientTransport;
use crate::prelude::server::ServerTransport;
Expand Down Expand Up @@ -104,6 +104,7 @@ pub enum NetConfig {
},
#[cfg(all(feature = "steam", not(target_family = "wasm")))]
Steam {
steamworks_client: Option<Arc<RwLock<SteamworksClient>>>,
config: SteamConfig,
conditioner: Option<LinkConditionerConfig>,
},
Expand Down Expand Up @@ -147,12 +148,19 @@ impl NetConfig {
// vs steam with p2p connections
#[cfg(all(feature = "steam", not(target_family = "wasm")))]
NetConfig::Steam {
steamworks_client,
config,
conditioner,
} => {
// TODO: handle errors
let server = super::steam::server::Server::new(config, conditioner)
.expect("could not create steam server");
let server = super::steam::server::Server::new(
steamworks_client.unwrap_or_else(|| {
Arc::new(RwLock::new(SteamworksClient::new(config.app_id)))
}),
config,
conditioner,
)
.expect("could not create steam server");
ServerConnection::Steam(server)
}
}
Expand Down
131 changes: 88 additions & 43 deletions lightyear/src/connection/steam/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,47 +7,59 @@ use crate::prelude::LinkConditionerConfig;
use crate::serialize::bitcode::reader::BufferPool;
use crate::transport::LOCAL_SOCKET;
use anyhow::{anyhow, Context, Result};
use bevy::reflect::Reflect;
use bevy::tasks::IoTaskPool;
use std::collections::VecDeque;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::{Arc, OnceLock, RwLock};
use steamworks::networking_sockets::{NetConnection, NetworkingSockets};
use steamworks::networking_types::{
NetConnectionEnd, NetConnectionInfo, NetworkingConfigEntry, NetworkingConfigValue,
NetworkingConnectionState, SendFlags,
NetworkingConnectionState, NetworkingIdentity, SendFlags,
};
use steamworks::{ClientManager, SingleClient};
use steamworks::{ClientManager, SingleClient, SteamId};
use tracing::{info, warn};

use super::{get_networking_options, SingleClientThreadSafe};
use super::get_networking_options;
use super::steamworks_client::SteamworksClient;

const MAX_MESSAGE_BATCH_SIZE: usize = 512;

#[derive(Debug, Clone)]
pub struct SteamConfig {
pub server_addr: SocketAddr,
pub socket_config: SocketConfig,
pub app_id: u32,
}

impl Default for SteamConfig {
fn default() -> Self {
Self {
server_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 27015)),
// app id of the public Space Wars demo app
socket_config: Default::default(),
app_id: 480,
}
}
}

// My initial plan was to have the `steamworks::Client` and `SingleClient` be fields of the [`Client`] struct.
// Everytime we try to reconnect, we could just drop the previous Client (which shutdowns the steam API)
// and call `init_app` again. However apparently the Shutdown api does nothing, the api only shuts down
// when the program ends.
// Instead, we will initialize lazy a static Client only once per program.
pub static CLIENT: OnceLock<(steamworks::Client<ClientManager>, SingleClientThreadSafe)> =
OnceLock::new();
/// Steam socket configuration for clients
#[derive(Debug, Clone)]
pub enum SocketConfig {
/// Connect to a server by IP address. Suitable for dedicated servers.
Ip { server_addr: SocketAddr },
/// Connect to another Steam user hosting a server. Suitable for
/// peer-to-peer games.
P2P { virtual_port: i32, steam_id: u64 },
}

impl Default for SocketConfig {
fn default() -> Self {
SocketConfig::Ip {
server_addr: SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 27015)),
}
}
}

/// Steam networking client wrapper
pub struct Client {
steamworks_client: Arc<RwLock<SteamworksClient>>,
config: SteamConfig,
connection: Option<NetConnection<ClientManager>>,
packet_queue: VecDeque<Packet>,
Expand All @@ -56,14 +68,13 @@ pub struct Client {
}

impl Client {
pub fn new(config: SteamConfig, conditioner: Option<LinkConditionerConfig>) -> Result<Self> {
CLIENT.get_or_init(|| {
info!("Creating new steamworks api client.");
let (client, single) = steamworks::Client::init_app(config.app_id).unwrap();
(client, SingleClientThreadSafe(single))
});

pub fn new(
steamworks_client: Arc<RwLock<SteamworksClient>>,
config: SteamConfig,
conditioner: Option<LinkConditionerConfig>,
) -> Result<Self> {
Ok(Self {
steamworks_client,
config,
connection: None,
packet_queue: VecDeque::new(),
Expand All @@ -74,7 +85,10 @@ impl Client {

fn connection_info(&self) -> Option<Result<NetConnectionInfo>> {
self.connection.as_ref().map(|connection| {
Self::client()
self.steamworks_client
.read()
.expect("could not get steamworks client")
.get_client()
.networking_sockets()
.get_connection_info(connection)
.map_err(|err| anyhow!("could not get connection info"))
Expand All @@ -87,29 +101,48 @@ impl Client {
.map_or(Ok(NetworkingConnectionState::None), |info| info.state())
.context("could not get connection state")
}

fn client() -> &'static steamworks::Client<ClientManager> {
&CLIENT.get().unwrap().0
}

fn single_client() -> &'static SingleClient {
&CLIENT.get().unwrap().1 .0
}
}

impl NetClient for Client {
fn connect(&mut self) -> Result<()> {
let options = get_networking_options(&self.conditioner);
self.connection = Some(
Self::client()
.networking_sockets()
.connect_by_ip_address(self.config.server_addr, vec![])
.context("failed to create connection")?,
);
info!(
"Opened steam connection to server at address: {}",
self.config.server_addr
);
// TODO: using the NetworkingConfigEntry options seems to cause an issue. See: https://github.com/Noxime/steamworks-rs/issues/169
// let options = get_networking_options(&self.conditioner);

match self.config.socket_config {
SocketConfig::Ip { server_addr } => {
self.connection = Some(
self.steamworks_client
.read()
.expect("could not get steamworks client")
.get_client()
.networking_sockets()
.connect_by_ip_address(server_addr, vec![])
.context("failed to create ip connection")?,
);
info!(
"Opened steam connection to server at address: {}",
server_addr
);
}
SocketConfig::P2P {
virtual_port,
steam_id,
} => {
self.connection = Some(
self.steamworks_client
.read()
.expect("could not get steamworks client")
.get_client()
.networking_sockets()
.connect_p2p(
NetworkingIdentity::new_steam_id(SteamId::from_raw(steam_id)),
virtual_port,
vec![],
)
.context("failed to create p2p connection")?,
);
}
}
Ok(())
}

Expand All @@ -134,7 +167,11 @@ impl NetClient for Client {
}

fn try_update(&mut self, delta_ms: f64) -> Result<()> {
Self::single_client().run_callbacks();
self.steamworks_client
.write()
.expect("could not get steamworks single client")
.get_single()
.run_callbacks();

// TODO: should I maintain an internal state for the connection? or just rely on `connection_state()` ?
// update connection state
Expand Down Expand Up @@ -180,7 +217,15 @@ impl NetClient for Client {
}

fn id(&self) -> ClientId {
ClientId::Steam(Self::client().user().steam_id().raw())
ClientId::Steam(
self.steamworks_client
.read()
.expect("could not get steamworks client")
.get_client()
.user()
.steam_id()
.raw(),
)
}

fn local_addr(&self) -> SocketAddr {
Expand Down
8 changes: 1 addition & 7 deletions lightyear/src/connection/steam/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,7 @@ use steamworks::networking_types::{NetworkingConfigEntry, NetworkingConfigValue}

pub(crate) mod client;
pub(crate) mod server;

// NOTE: it looks like there's SingleClient can actually be called on multiple threads
// - https://partner.steamgames.com/doc/api/steam_api#SteamAPI_RunCallbacks
pub(crate) struct SingleClientThreadSafe(steamworks::SingleClient);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So do you think it would be possible to use this as a wrapper to the user's provided SteamworksClient to avoid the Arc<RwLock>?


unsafe impl Sync for SingleClientThreadSafe {}
unsafe impl Send for SingleClientThreadSafe {}
pub(crate) mod steamworks_client;

pub(crate) fn get_networking_options(
conditioner: &Option<LinkConditionerConfig>,
Expand Down
Loading