diff --git a/benches/token_router.rs b/benches/token_router.rs index 5b599cd604..7afda6b581 100644 --- a/benches/token_router.rs +++ b/benches/token_router.rs @@ -1,21 +1,17 @@ use divan::Bencher; -use quilkin::filters::token_router::{HashedTokenRouter, Router, TokenRouter}; +use quilkin::filters::token_router::TokenRouter; use rand::SeedableRng; mod shared; -#[divan::bench(types = [TokenRouter, HashedTokenRouter], args = ["single:duplicates", "single:unique", "multi:2..128:duplicates", "multi:2..128:unique"])] -fn token_router(b: Bencher, token_kind: &str) -where - T: Router + Sync, -{ - let filter = ::new(); +#[divan::bench(args = ["single:duplicates", "single:unique", "multi:2..128:duplicates", "multi:2..128:unique"])] +fn token_router(b: Bencher, token_kind: &str) { + let filter = TokenRouter::default(); let gc = shared::gen_cluster_map::<42>(token_kind.parse().unwrap()); let mut tokens = Vec::new(); let cm = std::sync::Arc::new(gc.cm); - cm.build_token_maps(); // Calculate the amount of bytes for all the tokens for eps in cm.iter() { diff --git a/crates/test/tests/mesh.rs b/crates/test/tests/mesh.rs index 14dfbf58f4..bcd064d8fb 100644 --- a/crates/test/tests/mesh.rs +++ b/crates/test/tests/mesh.rs @@ -52,7 +52,7 @@ trace_test!(relay_routing, { }), }) .unwrap(), - HashedTokenRouter::as_filter_config(None).unwrap(), + TokenRouter::as_filter_config(None).unwrap(), ]) .unwrap(), ..Default::default() @@ -226,7 +226,7 @@ trace_test!(filter_update, { }), }) .unwrap(), - HashedTokenRouter::as_filter_config(None).unwrap(), + TokenRouter::as_filter_config(None).unwrap(), ]) .unwrap(), ..Default::default() @@ -279,7 +279,7 @@ trace_test!(filter_update, { token.len().to_string(), ) .unwrap(), - HashedTokenRouter::as_filter_config(None).unwrap(), + TokenRouter::as_filter_config(None).unwrap(), ]) .unwrap(); }); diff --git a/src/filters.rs b/src/filters.rs index 0efa40e343..938ade78bb 100644 --- a/src/filters.rs +++ b/src/filters.rs @@ -66,7 +66,7 @@ pub use self::{ registry::FilterRegistry, set::{FilterMap, FilterSet}, timestamp::Timestamp, - token_router::{HashedTokenRouter, TokenRouter}, + token_router::TokenRouter, write::WriteContext, }; diff --git a/src/filters/set.rs b/src/filters/set.rs index fbcb715777..cf88ab9d85 100644 --- a/src/filters/set.rs +++ b/src/filters/set.rs @@ -51,7 +51,6 @@ impl FilterSet { filters::Debug::factory(), filters::Drop::factory(), filters::Firewall::factory(), - filters::HashedTokenRouter::factory(), filters::LoadBalancer::factory(), filters::LocalRateLimit::factory(), filters::Match::factory(), diff --git a/src/filters/token_router.rs b/src/filters/token_router.rs index 9298475dad..dd631b030c 100644 --- a/src/filters/token_router.rs +++ b/src/filters/token_router.rs @@ -21,51 +21,24 @@ use crate::{ net::endpoint::metadata, }; -use crate::generated::quilkin::filters::token_router::v1alpha1 as proto; +use xds::generated::quilkin::filters::token_router::v1alpha1 as proto; /// Filter that only allows packets to be passed to Endpoints that have a matching /// connection_id to the token stored in the Filter's dynamic metadata. +#[derive(Default)] pub struct TokenRouter { config: Config, } impl TokenRouter { - fn new(config: Config) -> Self { - Self { config } - } -} - -impl StaticFilter for TokenRouter { - const NAME: &'static str = "quilkin.filters.token_router.v1alpha1.TokenRouter"; - type Configuration = Config; - type BinaryConfiguration = proto::TokenRouter; - - fn try_from_config(config: Option) -> Result { - Ok(TokenRouter::new(config.unwrap_or_default())) - } -} - -#[async_trait::async_trait] -impl Filter for TokenRouter { - async fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> { - self.sync_read(ctx) - } -} - -impl Router for TokenRouter { - fn sync_read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> { + /// Non-async version of [`Filter::read`], as this filter does no actual async + /// operations. Used in benchmarking. + pub fn sync_read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> { match ctx.metadata.get(&self.config.metadata_key) { Some(metadata::Value::Bytes(token)) => { - let destinations = ctx.endpoints.filter_endpoints(|endpoint| { - if endpoint.metadata.known.tokens.contains(&**token) { - tracing::trace!(%endpoint.address, token = &*crate::codec::base64::encode(token), "Endpoint matched"); - true - } else { - false - } - }); + let tok = crate::net::cluster::Token::new(token); - ctx.destinations = destinations.into_iter().map(|ep| ep.address).collect(); + ctx.destinations = ctx.endpoints.addresses_for_token(tok); if ctx.destinations.is_empty() { Err(FilterError::new(Error::NoEndpointMatch( @@ -85,76 +58,27 @@ impl Router for TokenRouter { ))), } } - - fn new() -> Self { - Self::from_config(None) - } -} - -pub struct HashedTokenRouter { - config: Config, -} - -impl HashedTokenRouter { - fn new(config: Config) -> Self { - Self { config } - } } -impl StaticFilter for HashedTokenRouter { - const NAME: &'static str = "quilkin.filters.token_router.v1alpha1.HashedTokenRouter"; +impl StaticFilter for TokenRouter { + const NAME: &'static str = "quilkin.filters.token_router.v1alpha1.TokenRouter"; type Configuration = Config; type BinaryConfiguration = proto::TokenRouter; fn try_from_config(config: Option) -> Result { - Ok(Self::new(config.unwrap_or_default())) + Ok(Self { + config: config.unwrap_or_default(), + }) } } #[async_trait::async_trait] -impl Filter for HashedTokenRouter { +impl Filter for TokenRouter { async fn read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> { self.sync_read(ctx) } } -impl Router for HashedTokenRouter { - fn sync_read(&self, ctx: &mut ReadContext) -> Result<(), FilterError> { - match ctx.metadata.get(&self.config.metadata_key) { - Some(metadata::Value::Bytes(token)) => { - let tok = crate::net::cluster::Token::new(token); - - ctx.destinations = ctx.endpoints.addresses_for_token(tok); - - if ctx.destinations.is_empty() { - Err(FilterError::new(Error::NoEndpointMatch( - self.config.metadata_key, - crate::codec::base64::encode(token), - ))) - } else { - Ok(()) - } - } - Some(value) => Err(FilterError::new(Error::InvalidType( - self.config.metadata_key, - value.clone(), - ))), - None => Err(FilterError::new(Error::NoTokenFound( - self.config.metadata_key, - ))), - } - } - - fn new() -> Self { - Self::from_config(None) - } -} - -pub trait Router { - fn sync_read(&self, ctx: &mut ReadContext) -> Result<(), FilterError>; - fn new() -> Self; -} - #[derive(Debug, thiserror::Error)] pub enum Error { #[error("no routing token found for `{0}`")] diff --git a/src/net/cluster.rs b/src/net/cluster.rs index fb14aa8c8f..c237fbfa30 100644 --- a/src/net/cluster.rs +++ b/src/net/cluster.rs @@ -505,21 +505,6 @@ where ret } - /// Builds token maps for every locality. Only used by testing/benching - #[doc(hidden)] - pub fn build_token_maps(&self) { - self.token_map.clear(); - - for mut eps in self.map.iter_mut() { - eps.build_token_map(); - - for (token_hash, addrs) in &eps.token_map { - self.token_map - .insert(*token_hash, addrs.iter().cloned().collect()); - } - } - } - pub fn addresses_for_token(&self, token: Token) -> Vec { self.token_map .get(&token.0) diff --git a/tests/hashed_token_router.rs b/tests/hashed_token_router.rs deleted file mode 100644 index 894481b9d9..0000000000 --- a/tests/hashed_token_router.rs +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Copyright 2024 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::net::{Ipv6Addr, SocketAddr}; - -use tokio::time::{timeout, Duration}; - -use quilkin::{ - config::Filter, - filters::{Capture, HashedTokenRouter, StaticFilter}, - net::endpoint::{metadata::MetadataView, Endpoint}, - test::{AddressType, TestHelper}, -}; - -/// This test covers both hashed_token_router and capture filters, -/// since they work in concert together. -#[tokio::test] -async fn hashed_token_router() { - let mut t = TestHelper::default(); - let mut echo = t.run_echo_server(AddressType::Ipv6).await; - quilkin::test::map_to_localhost(&mut echo).await; - - let capture_yaml = " -suffix: - size: 3 - remove: true -"; - let endpoint_metadata = " -quilkin.dev: - tokens: - - YWJj # abc - "; - - let server_config = std::sync::Arc::new(quilkin::Config::default_non_agent()); - server_config.clusters.modify(|clusters| { - clusters.insert_default( - [Endpoint::with_metadata( - echo.clone(), - serde_yaml::from_str::>(endpoint_metadata).unwrap(), - )] - .into(), - ); - - clusters.build_token_maps(); - }); - - server_config.filters.store( - quilkin::filters::FilterChain::try_create([ - Filter { - name: Capture::factory().name().into(), - label: None, - config: serde_yaml::from_str(capture_yaml).unwrap(), - }, - Filter { - name: HashedTokenRouter::factory().name().into(), - label: None, - config: None, - }, - ]) - .map(std::sync::Arc::new) - .unwrap(), - ); - - let server_port = t.run_server(server_config, None, None).await; - - // valid packet - let (mut recv_chan, socket) = t.open_socket_and_recv_multiple_packets().await; - - let local_addr = SocketAddr::from((Ipv6Addr::LOCALHOST, server_port)); - let msg = b"helloabc"; - tracing::trace!(%local_addr, "sending echo packet"); - socket.send_to(msg, &local_addr).await.unwrap(); - - tracing::trace!("awaiting echo packet"); - assert_eq!( - "hello", - timeout(Duration::from_millis(500), recv_chan.recv()) - .await - .expect("should have received a packet") - .unwrap() - ); - - // send an invalid packet - let msg = b"helloxyz"; - socket.send_to(msg, &local_addr).await.unwrap(); - - let result = timeout(Duration::from_millis(500), recv_chan.recv()).await; - assert!(result.is_err(), "should not have received a packet"); -}