Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IPv6 support for traffic stealing - WIP #2976

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,25 @@ For example, a test which only tests sanity of the ephemeral container feature s

On Linux, running tests may exhaust a large amount of RAM and crash the machine. To prevent this, limit the number of concurrent jobs by running the command with e.g. `-j 4`

### IPv6

Some tests create a single-stack IPv6 service. They can only be run on clusters with IPv6 enabled.
In order to test IPv6 on a local cluster on macOS, you can use Kind:

1. `brew install kind`
2. ```shell
cat >kind-config.yaml <<EOF
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
networking:
ipFamily: ipv6
apiServerAddress: 127.0.0.1
EOF
```
3. `kind create cluster --config kind-config.yaml`
4. When you run `kubectl get svc -o wide --all-namespaces` you should see IPv6 addresses.


### Cleanup

The Kubernetes resources created by the E2E tests are automatically deleted when the test exits. However, you can preserve resources from failed tests for debugging. To do this, set the `MIRRORD_E2E_PRESERVE_FAILED` variable to any value.
Expand Down
11 changes: 10 additions & 1 deletion mirrord/agent/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#![deny(missing_docs)]

use clap::{Parser, Subcommand};
use mirrord_protocol::{MeshVendor, AGENT_NETWORK_INTERFACE_ENV, AGENT_OPERATOR_CERT_ENV};
use mirrord_protocol::{
MeshVendor, AGENT_IPV6_ENV, AGENT_NETWORK_INTERFACE_ENV, AGENT_OPERATOR_CERT_ENV,
};

const DEFAULT_RUNTIME: &str = "containerd";

Expand Down Expand Up @@ -50,6 +52,13 @@ pub struct Args {
env = "MIRRORD_AGENT_IN_SERVICE_MESH"
)]
pub is_mesh: bool,

/// Enable support for IPv6-only clusters
///
/// Only when this option is set will take the needed steps to run on an IPv6 single stack
/// cluster.
#[arg(long, default_value_t = false, env = AGENT_IPV6_ENV)]
pub ipv6: bool,
}

impl Args {
Expand Down
44 changes: 34 additions & 10 deletions mirrord/agent/src/entrypoint.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
collections::HashMap,
mem,
net::{Ipv4Addr, SocketAddrV4},
net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6},
path::PathBuf,
sync::{
atomic::{AtomicU32, Ordering},
Expand Down Expand Up @@ -492,11 +492,33 @@ impl ClientConnectionHandler {
async fn start_agent(args: Args) -> Result<()> {
trace!("start_agent -> Starting agent with args: {args:?}");

let listener = TcpListener::bind(SocketAddrV4::new(
// listen for client connections
let ipv4_listener_result = TcpListener::bind(SocketAddrV4::new(
Ipv4Addr::UNSPECIFIED,
args.communicate_port,
))
.await?;
.await;

let listener = if args.ipv6 && ipv4_listener_result.is_err() {
debug!("IPv6 Support enabled, and IPv4 bind failed, binding IPv6 listener");
TcpListener::bind(SocketAddrV6::new(
Ipv6Addr::UNSPECIFIED,
args.communicate_port,
0,
0,
))
.await
} else {
ipv4_listener_result
}?;

match listener.local_addr() {
Ok(addr) => debug!(
client_listener_address = addr.to_string(),
"Created listener."
),
Err(err) => error!(%err, "listener local address error"),
}

let state = State::new(&args).await?;

Expand Down Expand Up @@ -566,13 +588,15 @@ async fn start_agent(args: Args) -> Result<()> {
let cancellation_token = cancellation_token.clone();
let watched_task = WatchedTask::new(
TcpConnectionStealer::TASK_NAME,
TcpConnectionStealer::new(stealer_command_rx).and_then(|stealer| async move {
let res = stealer.start(cancellation_token).await;
if let Err(err) = res.as_ref() {
error!("Stealer failed: {err}");
}
res
}),
TcpConnectionStealer::new(stealer_command_rx, args.ipv6).and_then(
|stealer| async move {
let res = stealer.start(cancellation_token).await;
if let Err(err) = res.as_ref() {
error!("Stealer failed: {err}");
}
res
},
),
);
let status = watched_task.status();
let task = run_thread_in_namespace(
Expand Down
4 changes: 4 additions & 0 deletions mirrord/agent/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ pub(crate) enum AgentError {
/// Temporary error for vpn feature
#[error("Generic error in vpn: {0}")]
VpnError(String),

/// When we neither create a redirector for IPv4, nor for IPv6
#[error("Could not create a listener for stolen connections")]
CannotListenForStolenConnections,
}

impl From<mpsc::error::SendError<StealerCommand>> for AgentError {
Expand Down
26 changes: 21 additions & 5 deletions mirrord/agent/src/steal/connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
collections::{HashMap, HashSet},
net::{IpAddr, Ipv4Addr, SocketAddr},
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
};

use fancy_regex::Regex;
Expand Down Expand Up @@ -289,6 +289,9 @@ pub(crate) struct TcpConnectionStealer {

/// Set of active connections stolen by [`Self::port_subscriptions`].
connections: StolenConnections,

/// Shen set, the stealer will use IPv6 if needed.
support_ipv6: bool,
}

impl TcpConnectionStealer {
Expand All @@ -297,14 +300,21 @@ impl TcpConnectionStealer {
/// Initializes a new [`TcpConnectionStealer`], but doesn't start the actual work.
/// You need to call [`TcpConnectionStealer::start`] to do so.
#[tracing::instrument(level = "trace")]
pub(crate) async fn new(command_rx: Receiver<StealerCommand>) -> Result<Self, AgentError> {
pub(crate) async fn new(
command_rx: Receiver<StealerCommand>,
support_ipv6: bool,
) -> Result<Self, AgentError> {
let config = envy::prefixed("MIRRORD_AGENT_")
.from_env::<TcpStealerConfig>()
.unwrap_or_default();

let port_subscriptions = {
let redirector =
IpTablesRedirector::new(config.stealer_flush_connections, config.pod_ips).await?;
let redirector = IpTablesRedirector::new(
config.stealer_flush_connections,
config.pod_ips,
support_ipv6,
)
.await?;

PortSubscriptions::new(redirector, 4)
};
Expand All @@ -315,6 +325,7 @@ impl TcpConnectionStealer {
clients: HashMap::with_capacity(8),
clients_closed: Default::default(),
connections: StolenConnections::with_capacity(8),
support_ipv6,
})
}

Expand Down Expand Up @@ -371,9 +382,14 @@ impl TcpConnectionStealer {
#[tracing::instrument(level = "trace", skip(self))]
async fn incoming_connection(&mut self, stream: TcpStream, peer: SocketAddr) -> Result<()> {
let mut real_address = orig_dst::orig_dst_addr(&stream)?;
let localhost = if self.support_ipv6 && real_address.is_ipv6() {
IpAddr::V6(Ipv6Addr::LOCALHOST)
} else {
IpAddr::V4(Ipv4Addr::LOCALHOST)
};
// If we use the original IP we would go through prerouting and hit a loop.
// localhost should always work.
real_address.set_ip(IpAddr::V4(Ipv4Addr::LOCALHOST));
real_address.set_ip(localhost);

let Some(port_subscription) = self.port_subscriptions.get(real_address.port()).cloned()
else {
Expand Down
14 changes: 14 additions & 0 deletions mirrord/agent/src/steal/ip_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,20 @@ pub fn new_iptables() -> iptables::IPTables {
.expect("IPTables initialization may not fail!")
}

/// wrapper around iptables::new that uses nft or legacy based on env
pub fn new_ip6tables() -> iptables::IPTables {
if let Ok(val) = std::env::var("MIRRORD_AGENT_NFTABLES")
&& val.to_lowercase() == "true"
{
// TODO: check if there is such a binary.
iptables::new_with_cmd("/usr/sbin/ip6tables-nft")
} else {
// TODO: check if there is such a binary.
iptables::new_with_cmd("/usr/sbin/ip6tables-legacy")
}
.expect("IPTables initialization may not fail!")
}

impl Debug for IPTablesWrapper {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IPTablesWrapper")
Expand Down
Loading
Loading