Skip to content

Commit

Permalink
slight refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
pompon0 committed Sep 20, 2024
1 parent a298f50 commit 798962f
Show file tree
Hide file tree
Showing 17 changed files with 315 additions and 242 deletions.
9 changes: 4 additions & 5 deletions node/actors/executor/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Library files for the executor. We have it separate from the binary so that we can use these files in the tools crate.
use crate::io::Dispatcher;
use anyhow::Context as _;
use network::http;
pub use network::{gossip::attestation, RpcConfig};
use std::{
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -63,7 +62,7 @@ pub struct Config {

/// Http debug page configuration.
/// If None, debug page is disabled
pub debug_page: Option<http::DebugPageConfig>,
pub debug_page: Option<network::debug_page::Config>,

/// How often to poll the database looking for the batch commitment.
pub batch_poll_interval: time::Duration,
Expand Down Expand Up @@ -144,12 +143,12 @@ impl Executor {
net.register_metrics();
s.spawn(async { runner.run(ctx).await.context("Network stopped") });

if let Some(debug_config) = self.config.debug_page {
if let Some(cfg) = self.config.debug_page {
s.spawn(async {
http::DebugPageServer::new(debug_config, net)
network::debug_page::Server::new(cfg, net)
.run(ctx)
.await
.context("Http Server stopped")
.context("Debug page server stopped")
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,56 +24,131 @@ use tokio_rustls::{
pki_types::{CertificateDer, PrivateKeyDer},
ServerConfig,
},
server::TlsStream,
TlsAcceptor,
};
use zksync_concurrency::{ctx, scope};
use zksync_consensus_crypto::TextFmt as _;
use zksync_consensus_utils::debug_page;

const STYLE: &str = include_str!("style.css");

/// TLS certificate chain with a private key.
#[derive(Debug, PartialEq)]
pub struct TlsConfig {
/// TLS certificate chain.
pub cert_chain: Vec<CertificateDer<'static>>,
/// Private key for the leaf cert.
pub private_key: PrivateKeyDer<'static>,
}

/// Credentials.
#[derive(PartialEq, Clone)]
pub struct Credentials {
/// User for debug page
pub user: String,
/// Password for debug page
/// TODO: it should be treated as a secret: zeroize, etc.
pub password: String,
}

impl Credentials {
fn parse(value: String) -> anyhow::Result<Self> {
let [user, password] = value
.split(':')
.collect::<Vec<_>>()
.try_into()
.ok()
.context("bad format")?;
Ok(Self {
user: user.to_string(),
password: password.to_string(),
})
}
}

impl std::fmt::Debug for Credentials {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Credentials").finish_non_exhaustive()
}
}

/// Http debug page configuration.
#[derive(Debug, PartialEq)]
pub struct DebugPageConfig {
pub struct Config {
/// Public Http address to listen incoming http requests.
pub addr: SocketAddr,
/// Debug page credentials.
pub credentials: Option<debug_page::Credentials>,
/// Cert file path
pub certs: Vec<CertificateDer<'static>>,
/// Key file path
pub private_key: PrivateKeyDer<'static>,
pub credentials: Option<Credentials>,
/// TLS certificate to terminate the connections with.
pub tls: Option<TlsConfig>,
}

/// Http Server for debug page.
pub struct DebugPageServer {
config: DebugPageConfig,
pub struct Server {
config: Config,
network: Arc<Network>,
}

impl DebugPageServer {
#[async_trait::async_trait]
trait Listener: 'static + Send {
type Stream: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin;
async fn accept(&mut self) -> anyhow::Result<Self::Stream>;
}

#[async_trait::async_trait]
impl Listener for TcpListener {
type Stream = tokio::net::TcpStream;
async fn accept(&mut self) -> anyhow::Result<Self::Stream> {
Ok(TcpListener::accept(self).await?.0)
}
}

#[async_trait::async_trait]
impl Listener for TlsListener<TcpListener, TlsAcceptor> {
type Stream = TlsStream<tokio::net::TcpStream>;
async fn accept(&mut self) -> anyhow::Result<Self::Stream> {
Ok(TlsListener::accept(self).await?.0)
}
}

impl Server {
/// Creates a new Server
pub fn new(config: DebugPageConfig, network: Arc<Network>) -> DebugPageServer {
DebugPageServer { config, network }
pub fn new(config: Config, network: Arc<Network>) -> Self {
Self { config, network }
}

/// Runs the Server.
pub async fn run(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
let listener = TcpListener::bind(self.config.addr)
.await
.context("TcpListener::bind()")?;
if let Some(tls) = &self.config.tls {
let cfg = ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(tls.cert_chain.clone(), tls.private_key.clone_key())
.context("with_signle_cert()")?;

Check warning on line 129 in node/actors/network/src/debug_page/mod.rs

View workflow job for this annotation

GitHub Actions / typos

"signle" should be "single" or "signal".
self.run_with_listener(ctx, TlsListener::new(Arc::new(cfg).into(), listener))
.await
} else {
self.run_with_listener(ctx, listener).await
}
}

async fn run_with_listener<L: Listener>(
&self,
ctx: &ctx::Ctx,
mut listener: L,
) -> anyhow::Result<()> {
// Start a watcher to shut down the server whenever ctx gets cancelled
let graceful = hyper_util::server::graceful::GracefulShutdown::new();

scope::run!(ctx, |ctx, s| async {
let mut listener = TlsListener::new(
self.tls_acceptor(),
TcpListener::bind(self.config.addr).await?,
);

let http = http1::Builder::new();

// Start a loop to accept incoming connections
while let Ok(res) = ctx.wait(listener.accept()).await {
match res {
Ok((stream, _)) => {
Ok(stream) => {
let io = TokioIo::new(stream);
let conn = http.serve_connection(io, service_fn(|req| self.handle(req)));
// watch this connection
Expand All @@ -86,10 +161,6 @@ impl DebugPageServer {
});
}
Err(err) => {
if let Some(remote_addr) = err.peer_addr() {
tracing::error!("[client {remote_addr}] ");
}

tracing::error!("Error accepting connection: {}", err);
continue;
}
Expand All @@ -106,46 +177,41 @@ impl DebugPageServer {
request: Request<hyper::body::Incoming>,
) -> anyhow::Result<Response<Full<Bytes>>> {
let mut response = Response::new(Full::default());
match self.basic_authentication(request.headers()) {
Ok(_) => *response.body_mut() = self.serve(request),
Err(e) => {
*response.status_mut() = StatusCode::UNAUTHORIZED;
*response.body_mut() = Full::new(Bytes::from(e.to_string()));
let header_value = HeaderValue::from_str(r#"Basic realm="debug""#).unwrap();
response
.headers_mut()
.insert(header::WWW_AUTHENTICATE, header_value);
}
if let Err(err) = self.authenticate(request.headers()) {
*response.status_mut() = StatusCode::UNAUTHORIZED;
*response.body_mut() = Full::new(Bytes::from(err.to_string()));
let header_value = HeaderValue::from_str(r#"Basic realm="debug""#).unwrap();
response
.headers_mut()
.insert(header::WWW_AUTHENTICATE, header_value);
}
*response.body_mut() = self.serve(request);
Ok(response)
}

fn basic_authentication(&self, headers: &HeaderMap) -> anyhow::Result<()> {
self.config
.credentials
.clone()
.map_or(Ok(()), |credentials| {
// The header value, if present, must be a valid UTF8 string
let header_value = headers
.get("Authorization")
.context("The 'Authorization' header was missing")?
.to_str()
.context("The 'Authorization' header was not a valid UTF8 string.")?;
let base64encoded_segment = header_value
.strip_prefix("Basic ")
.context("The authorization scheme was not 'Basic'.")?;
let decoded_bytes = base64::engine::general_purpose::STANDARD
.decode(base64encoded_segment)
.context("Failed to base64-decode 'Basic' credentials.")?;
let incoming_credentials = debug_page::Credentials::try_from(
String::from_utf8(decoded_bytes)
.context("The decoded credential string is not valid UTF8.")?,
)?;
if credentials != incoming_credentials {
anyhow::bail!("Invalid password.")
}
Ok(())
})
fn authenticate(&self, headers: &HeaderMap) -> anyhow::Result<()> {
let Some(want) = self.config.credentials.as_ref() else {
return Ok(());
};

// The header value, if present, must be a valid UTF8 string
let header_value = headers
.get("Authorization")
.context("The 'Authorization' header was missing")?
.to_str()
.context("The 'Authorization' header was not a valid UTF8 string.")?;
let base64encoded_segment = header_value
.strip_prefix("Basic ")
.context("Unsupported authorization scheme.")?;
let decoded_bytes = base64::engine::general_purpose::STANDARD
.decode(base64encoded_segment)
.context("Failed to base64-decode 'Basic' credentials.")?;
let got = Credentials::parse(
String::from_utf8(decoded_bytes)
.context("The decoded credential string is not valid UTF8.")?,
)?;
anyhow::ensure!(want == &got, "Invalid credentials.");
Ok(())
}

fn serve(&self, _request: Request<hyper::body::Incoming>) -> Full<Bytes> {
Expand Down Expand Up @@ -262,16 +328,4 @@ impl DebugPageServer {
format!("{}...{}", &key[..10], &key[len - 11..len])
})
}

fn tls_acceptor(&self) -> TlsAcceptor {
let cert_der = self.config.certs.clone();
let key_der = self.config.private_key.clone_key();
Arc::new(
ServerConfig::builder()
.with_no_client_auth()
.with_single_cert(cert_der, key_der)
.unwrap(),
)
.into()
}
}
File renamed without changes.
2 changes: 1 addition & 1 deletion node/actors/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use zksync_consensus_utils::pipe::ActorPipe;

mod config;
pub mod consensus;
pub mod debug_page;
mod frame;
pub mod gossip;
pub mod http;
pub mod io;
mod metrics;
mod mux;
Expand Down
2 changes: 1 addition & 1 deletion node/libs/protobuf/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub mod build;
pub mod proto;
mod proto_fmt;
pub mod repr;
pub use repr::{read_required_repr, ProtoRepr};
pub use repr::{read_optional_repr, read_required_repr, ProtoRepr};
pub mod serde;
mod std_conv;
pub mod testonly;
Expand Down
5 changes: 5 additions & 0 deletions node/libs/protobuf/src/repr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ pub fn read_required_repr<P: ProtoRepr>(field: &Option<P>) -> anyhow::Result<P::
field.as_ref().context("missing field")?.read()
}

/// Parses an optional proto field.
pub fn read_optional_repr<P: ProtoRepr>(field: &Option<P>) -> anyhow::Result<Option<P::Type>> {
field.as_ref().map(ProtoRepr::read).transpose()
}

/// Encodes a proto message.
/// Currently it outputs a canonical encoding, but `decode` accepts
/// non-canonical encoding as well.
Expand Down
40 changes: 0 additions & 40 deletions node/libs/utils/src/debug_page.rs

This file was deleted.

10 changes: 0 additions & 10 deletions node/libs/utils/src/encode.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
//! Utilities for testing encodings.
use crate::debug_page;
use rand::{
distributions::{Alphanumeric, DistString, Distribution},
Rng,
Expand Down Expand Up @@ -150,15 +149,6 @@ impl Distribution<PathBuf> for EncodeDist {
}
}

impl Distribution<debug_page::Credentials> for EncodeDist {
fn sample<R: Rng + ?Sized>(&self, rng: &mut R) -> debug_page::Credentials {
debug_page::Credentials {
user: self.sample(rng),
password: self.sample(rng),
}
}
}

impl Distribution<time::Duration> for EncodeDist {
fn sample<R: Rng + ?Sized>(&self, rng: &mut R) -> time::Duration {
time::Duration::new(self.sample(rng), self.sample(rng))
Expand Down
1 change: 0 additions & 1 deletion node/libs/utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! Crate that holds several small utilities and primitives.

pub mod debug_page;
mod encode;
pub mod enum_util;
pub mod pipe;
Expand Down
Loading

0 comments on commit 798962f

Please sign in to comment.